Flow.Receive() not sequential?

Nov 8, 2009 at 1:32 PM

Dear Ralf,

thanks for both - the XcoAppSpace and the CCRFlows project. I am currently spending some of my weekends on diving into the asynchronous paradigm and - doing so - refactoring a current private project to meet these goals. As expected, its not the  component-based architecture and XcoAppSpace that's difficult to follow, but when it comes to transfering smaller functional units from synchronous sequential code into asynchronous sequential flows.

Thats why I am working my way into the CCRFlow project. However, I disovered an inconsistency - at least at me it looked like that.
Remember the StatfulStages example, having the quick brown fox jumpin over the lazy dog - and splitting this sentence into multiple lines:
I don't care about the statefulness here, it is the non-sequential output of the flow I want to draw attention to:

            var flow = Flow<string>
                .Do<string>(SplitTextIntoWords)
                .Do(new AssembleLineFromWords(10));

            flow.Input.Post("the quick brown fox jumps over the lazy dog");

            flow.Receive(Console.WriteLine);

Running the example as is, the output lines are randomly interchanged:

the quick
jumps
over the
brown fox

lazy dog

I then added an additional stage to the flow that does nothing else than flow.Receive() would do: It writes the lines to the console: 

            var flow = Flow<string>
                .Do<string>(SplitTextIntoWords)
                .Do(new AssembleLineFromWords(10))
                .Do<string>(WriteLine);

            ...


        static void WriteLine(string textIn, Port<string> textOut)
        {
            Console.WriteLine(textIn);
            textOut.Post(textIn);
        }

 It turns out that - within the flow - the order of lines is well handled:

The quick
brown fox
jumps
over the
lazy dog

 So something gets messed up when the flow.receive retrieves its messages from the flows final output port.
The stage handlers obviously are implemented to process the messages in their input ports sequentially,
whereas the flow.Receive method is not.

I played a little around with the implementation in your code, but couldn't find a solution to the problem.
I suspected that the non-persistent message handler that is recursively set up within Abstract StageBase
assures the sequential handling:

        protected void WaitForNextInput()
        {
            Arbiter.Activate(
                this.taskQueue,
                this.inputHandler
                );
        }

So I tried to implement a similar thing for the Flow.Receive() method, but this did not solve the problem.

Have you got any ideas, or am I myself on some "wrong line" .. :-)

My best Greetings!

Claas