Single Flow

Flows are code units (or call it functions or activities or stages) chained together by CCR ports. A simple process (or algorithm) like
  1. Do A,
  2. then do B,
  3. finally do C

can be viewed as a flow: Data is flowing first into A, gets processed, then the result flows to B, gets processed, the result flows on to C and gets processed. Flows define how processing stages are chained together to successively work on data flowing into and through the process.

Windows Workflow Foundation comes to mind. But CCR Flows are much more lightweight. No designer needed, so no designer is provided.

Like with your synchronous sequential code the basic building blocks of flows are methods. But where in a sync seq process theses methods are dependent on each other they are not in flows! That´s a very important distinction. It leads to less coupling and higher flexibility.

Conceptual processing stages

Conceptually a processing stage is a black box with at least one input "channel" and one output "channel". Internally these "channels" are built using CCR Ports. But you can think of them in a more abstract way.

fig1.gif

So each processing stage needs to be defined with two types: a type for input values and a type for output values. Think of a processing stage as a function with a single input parameter and a return value.

At each point in time a stage processes only one value. Although values might arrive at a high rate through the input "channel" there is no parallelism in processing them. But parallelism of course exists between stages! That way flows can scale but retain sequentiallity. That makes reasoning about them much easier. Think of flows as large queues with internal processing logic. They are first-in, first-out (FIFO) structures.

Chaining individual stages together to flows is done by making the input "channel" of a later stage the output "channel" of an earlier stage.

fig2.gif

You can call that a single flow because there is just a single path from the first stage to the last stage.

Implementing flow stages

Conceptually a flow stage is a black box with two "channels": one going in, the other going out. But in code a flow stage is a function. It accepts one input parameter, and returns a result. Here´s a single stage flow increnting a number:

using CcrFlows.Core;
...
var flow = new Flow<int, int>(i => i+1);
...
flow.Input.Post(1);
...
flow.Receive(Console.WriteLine);

This works well for simple transformation where a single input value is transformed into a single output value. But what if the transformation is 1:n? Then you need to be able to pass on multiple values to the next stage. You can do that by posting to the output "channel" instead of returning a value. Here´s the difference:

OutputType SimpleTransformation_1_1(InputType v) { ... return outputValue; }

void NotSoSimpleTransformation_1_N(InputType v, Port<OutputType> output)
{
    ...
    output.Post(...);
    ...
    output.Post(...);
    ...
}

A example of a 1:n transformation could be splitting a text into words:

static void Main()
{
    var flow = Flow<string>.Do<string>(SplitIntoWords);

    flow.Input.Post("the quick brown fox jumps over the lazy dog");

    flow.Receive(Console.WriteLine);
    Console.ReadLine();
}

static void SplitIntoWords(string text, Port<string> words)
{
    foreach (string word in text.Split(' '))
        words.Post(word);
}

As you can see, implementing single flow stages is easy. To translate your current sync seq processes into async seq CCR Flow processes just strip off any dependencies off your current processing routines. Don´t nest them, instead lay them out in a sequential flow where each does a little bit of work and then passes its result on to the next stage. And the beauty of it is: no stage knows of its predecessors or successors. This is the foundation of flexibility and reuse!

Chaining stages together

Creating a single stage flow is as simple as creating a Flow<> instance like this:

var flow = new Flow<InputType, OutputType>(InputProcessor);

Alternatively you can use a factory method:

var flow = Flow<InputType>.Do<OutputType>(InputProcessor);

This also lends itself to chaining together several stages. You can call Do<>() on each stage to append a successor:

var flow = Flow<InputType1>
    .Do<OutputType1>(InputProcessor1)
    .Do<OutputType2>(InputProcessor2)
    .Do<OutputType3>(InputProcessor3);

The resulting flow is of type Flow<InputType1, OutputType3>. The input type of InputProcessor2, for example, is OutputType1; its output type is OutputType2. So the type parameter of Do<>() is the output type of that stage. The input type is carried over from the previous stage.

Next check out how you can introduce parallel processing again into flows: ScatteredProcessing.

Last edited Jul 1, 2009 at 1:43 PM by ralfw, version 5

Comments

No comments yet.