C++0x and concurrency and concurrency runtime and parallelism and technology04 Nov 2008 11:57 pm

A very common use case for concurrency is computing a value asynchronously, this is often called a ‘future’ and frequently folks talk about promises of future values as well.  Section 35 of the C++0x draft has a proposal for a future construct, and the Parallel Extensions to .NET also has a .NET class for a future.

Unfortunately futures aren’t included in the Parallel Pattern Library or the Asynchronous Agents Library in the Visual Studio 2010 CTP. But I’d like to show a couple of ways to build a simple future class with the CTP.

A synopsis for a simple future

Here’s a simple template class synopsis for a future that allows you to construct a future with a functor and then retrieve it’s value asynchronously:

   template
   <class T>
   class future{
   public:
       template
       <class Functor>
       future(Functor fn);
       ~future();
       T GetValue();
   };

Here’s a simple example of creating a future with a lambda and retrieving its value:

   future<int> f([](){
       return 2;
   });
   int result = f.GetValue();
   assert(result == 2);

The scenario above isn’t super exciting, and it isn’t really asynchronous but it’s functional and in practice a lot of concurrency can be realized by chaining and combining futures together into expressions and waiting on the final result.

Implementing it step-by-step

Let’s take a look at one possible way of implementing this template class.  There are in fact other strategies which can be more efficient if you’re building lots of very fine grained futures, but this is very easy to write and in practice for relatively large expressions is probably quite fine.  I’ll mention the overheads and tradeoffs as I go along.

The first thing we’ll need in our future class is a place to store that value and a way to compute it asynchronously. From the CTP, I’m going to use both the task constructs in <ppl.h> and the message blocks in <agents.h>. To do this let’s create a couple private member variables starting with an overwrite_buffer:

   overwrite_buffer<T> value_;

The overwrite_buffer is a template class defined in agents.h, called a “message block” that allows a single value to be “sent” to it or stored, and copies of the most recent value are extracted upon request typically using a helper function called ‘”receive”.  We will use this to store our result. It’s also important to note that ideally this would be a single assignment variable, but that isn’t included in the CTP (so I’m not using it here).

The next thing we need is a way to compute the value asynchronously and to do this I’m going to use a task_group from the PPL.

   task_group tg_;

task_group is a class defined in <ppl.h> and is used to schedule tasks, wait for them all to complete or cancel any outstanding tasks assigned to it.  In the CTP the task_group is actually thread-safe so I can add tasks to it, call wait or cancel from any thread and get well defined behavior.

Now let’s implement the constructor which will schedule a task to run the functor passed in and then place it’s result into our overwrite_buffer.  We schedule a task with the task_group::run template member function and assign it to the overwrite_buffer with the template method send defined in agents.h, note that here in the lambda I’m capturing fn & this by value and copying them into the lambda:

   template
   <class Functor>
   future(Functor fn){
       tg_.run([fn,this](){
           send(this->value_,fn());
       });
   }

I mentioned I would call out overheads and we’ve just created one. The overload for task_group::run that we just used will have a heap allocation because if you notice the ‘fn’ object is created on the stack and would be destructed if once the method exits, so we need to allocate a copy on the heap and delete it after it’s been executed (or canceled, or faulted).  There is an overloaded task_group::run method that takes a reference to a task_handle<T> that can be used which can avoid this particular overhead, but it will rely on the user to manage the lifetime of that task and ensure it is still valid whenever it is executed.

Next we need to implement GetValue to return the result.  To do this we’re going to use the template method receive, which is the analog of send.  When receive is used on a ‘message block’ it will actually block and wait for that value to be computed.  One of the important things about using the agents APIs to do this is that receive will generate a blocking notification to the Concurrency Runtime and the runtime will use this blocking notification to run additional work if it’s been scheduled.  Here’s how GetValue is implemented, it’s incredibly straightforward:

   T GetValue(){
       return receive(value_);
   };

The last thing we need to do is implement the destructor. Here I’m just calling task_group::wait to ensure that our task is cleaned up.  You may also want to call task_group::cancel if you don’t need that task to run to completion.

   ~future(){
       tg_.wait();
   };

Putting it all together

Here’s the complete program, that was a lot of typing for not very much code:

#include <agents.h>
template
<class T>
class future{
    overwrite_buffer<T> value_;
    task_group tg_;
public:
    template
    <class Functor>
    future(Functor fn){
        tg_.run([fn,this](){
            send(this->value_,fn());
        });
    }
    ~future(){
        tg_.wait();
    };
    T GetValue(){
        return receive(value_);
    };
};
void main(){
    future<int> f([](){
        return 2;
    });
    int result = f.GetValue();
    assert(result == 2);
}

Wrapping it up

I’m still not doing anything very interesting here and I could always do something like compute the fibonacci sequence the slow way. But in practice there are many more uses for a future, like applying a filter to an image or various forms of i/o where there is a combination of CPU work and perhaps a significant amount of latency.

As I alluded there are other implementation strategies here, for example the transform message block in <agents.h> could be used instead of a task_group and a series of message blocks could be combined to implement richer functionality like that described in the C++0x specification, most notably a timer and choice (wait for one of n inputs to be sent) or join would likely be required to ease implementation.

and here’s that recursive Fibonacci sequence, it’s late and I can’t resist:

   int fib(int n){
       if(n < 2)
           return n;
       future<int> n1([n](){return fib(n-1);});
       future<int> n2([n](){return fib(n-2);});
       return n1.GetValue() + n2.GetValue();
   }

-Rick

Trackback this Post | Feed on comments to this Post

Leave a Reply