Stateful Stages

Sometimes you need to keep state in a single stage across the values flowing through it. This might be some constant value you want to parameterize a stage with. Or it might be a changing value like an aggregate.

Imagine a flow where a stage just counts the numbers of values passing through. That´s simple to implement using stateful stages. A stateful stage implements the interface ISeqHandler<TIn, TOut>, e.g.

class CounterStage : ISeqHandler<int,int>
{
    public int Counter;

    #region Implementation of ISeqHandler<int,int>
    public void Handle(int input, Port<int> output)
    {
        this.Counter++;
        output.Post(input);
    }
    #endregion
}


This CounterStage can count the number of integer values flowing through it:

var count = new CounterStage {Counter = 0};
var flow = new Flow<int, int>(count);

flow.Input.Post(1);
flow.Input.Post(2);
flow.Input.Post(3);

Assert.AreEqual(3, count.Counter);

With ISeqHandler<> you can create stages which can be used flexibly in different contexts.

And remember: Since CCR Flows are sequential, at each point in time there is only one thread executing a stage. So it´s thread-safe by default. No need to protect the stage state against concurrent access. (Of course you´ve to be careful with scatter stages.)

Statefulness without ISeqHandler

There is another way to implement stateful stage handlers. To be honest, I just realized that while I worked on ISeqHandler. It´s very simple: Create just any class, instanciate it, and pass one of its methods matching any of the handler delegate types to the stage, e.g.

class CounterStage2
{
    public int Counter;

    public int Count(int input)
    {
        this.Counter++;
        return input;
    }
}
...
var flow = new Flow<int, int>(new CounterStage2().Count);

ISeqHandler still seems to me to be a more clean solution ;-) I´ll keep it in the library.

Last edited Jul 3, 2009 at 9:41 PM by ralfw, version 5

Comments

No comments yet.