technology

Continuations and Directed Graphs Part 1: implementing with the Parallel Pattern Library

I’ve been thinking a lot about continuations and directed graphs the last couple days and different ways to implement them.  I’ve also been looking closer at std::thread, std::future and std::async to ensure I fully understand them.  So I decided I’d make a (small) multi-part series out of this and show some simple sketches of implementations of continuation using the Parallel Pattern Library, std::future and std::async and the Agents Library.  

This is part 1 which will cover a simple implementation of ‘run_when’ and ‘wait_for_all’ using the PPL. 

First a simple dependency / continuation

Here’s an incredibly simple dependency, task 2 depends on task 1.

void ContinueableTask(int in)
{
   printf("building project: %d\n", in);
};

void SimpleContinuation()
{
   ContinueableTask(1);

   //task 2 depends on task 1
   ContinueableTask(2);
}

 

There’s not a lot of concurrency here with only two tasks, yet we can still make this execution ‘parallel’ with a task_group as follows:
void SimpleContinuationTasks()
{
   //task1   task_group task1;
   task1.run([](){ContinueableTask(1);});

   //task2 depends on task 1 
   task_group task2;
   task2.run([&](){
      task1.wait();
      ContinueableTask(2);
   });   

   //wait for task 2
   task2.wait();
}

 

Making it look a bit cleaner

 
The code here isn’t too bad, but is a little awkward.  It would sure be nice to write something like this instead:
—-
void SimpleContinuation()
{
   auto task1 = run_task([](){ContinueableTask(1);});    

   //task 2 depends on task 1
   auto task2 = run_when(task1, [](){ContinueableTask(2);});
   wait_for_all(task1, task2);
}

 

Implementing this is straightforward in the PPL (in part 2 you’ll see that it’s also straightforward with std::future and std::async and that run_task looks *a lot* like std::async). 

typedef shared_ptr<task_group> shared_task_ptr;

template<typename Func>inline shared_task_ptr run_task(Func& fn)
{
   shared_ptr<task_group> tasks = make_shared<task_group>();
   tasks->run(fn);
   return tasks;
}

 

I’m using a shared_ptr here because task_group is non-copyable and this allows me to return it by value without having to do anything unnatural in the interface (like create one then pass it in by reference).

run_when is also pretty straightforward, here is a template implementation:

template<typename Func>inline shared_task_ptr run_when(shared_task_ptr tasks, Func fn)
{
   tasks->wait();
   return run_task(fn);
}
 
and finally wait_for_all:
 
inline void wait_for_all(shared_task_ptr t1, shared_task_ptr t2)
{
   t1->wait();
   t2->wait();
}

Expanding this is straightforward

 
It should be pretty easy to see that you can expand run_when and wait_for_all to include overloads for more parameters and that writing an example with more complex dependencies is straightforward:
void MoreComplexContinuations()
{
   auto p1 = run_task([](){ContinueableTask(1);});
   auto p2 = run_task([](){ContinueableTask(2);});
   auto p3 = run_task([](){ContinueableTask(3);});
   auto p4 = run_when(p1, [](){ContinueableTask(4);});
   auto p5 = run_when(p2, p3, [](){ContinueableTask(5);});
   auto p6 = run_when(p3, p4, [](){ContinueableTask(6);});
   wait_for_all(p1, p2, p3, p4, p5, p6);
}

 

Next Time part2: std::async and std::future

In the next few days, I’ll post part 2 and show how easy to implement this pattern in with std::async and std::future from the upcoming C++0x standard.

C++0x
concurrency
concurrency runtime
parallelism
technology

Comments (0)

Permalink

PDC09: while (true) ++C;

I’m getting ready for my session at PDC 2009, Forever C++ and thought that I would take a moment to provide an outline of what I’ll be talking about and demo’ing.

First and foremost, I’ll be providing an overview of the programming model in the Parallel Pattern Library and Asynchronous Agents Library.  I’ll hit a lot of the APIs, how to use them and how to apply them to your application.   I’ll touch on a few of the algorithms and extras we provided in the sample pack as well.

I’ll also cover our tooling support for debugging and profiling multi-threaded applications in  Visual Studio 2010 and I’ll show you how (hopefully) how to recognize a couple common patterns that are particularly useful when profiling PPL code in the new concurrency view of the profile.

Finally, I’ll talk about a couple ways you can coordinate your concurrent work, manage shared state across tasks and threads and integrate this into your existing application.  I won’t be able to spend as much time on this as I had originally hoped, but myself and others from the team will be at our booths and at ‘ask the experts’ so there will be plenty of opportunities for you to ask your specific questions or share feedback with us.

Finally just thought I’d share a screenshot of one of our demos running on a high core count box.

image

Unfortunately this box won’t be at PDC but I’ll show this app launched on my quad-core briefly and Dana Groff will be showing it live in on a high core count server box in his talk on the Concurrency Runtime (SVR10).

Finally rumour has it that Dana and I will be doing another short talk in the Theatre Wednesday at 1pm.

Hope to see you there…

-Rick

C++0x
concurrency
concurrency runtime
technology

Comments (1)

Permalink

Building an Asynchronous Future in Visual Studio 2010

A very common use case for concurrency is computing a value asynchronously, this is often called a ‘future’ and frequently folks talk about promises of future values as well.  Section 35 of the C++0x draft has a proposal for a future construct, and the Parallel Extensions to .NET also has a .NET class for a future.

Unfortunately futures aren’t included in the Parallel Pattern Library or the Asynchronous Agents Library in the Visual Studio 2010 CTP. But I’d like to show a couple of ways to build a simple future class with the CTP.

A synopsis for a simple future

Here’s a simple template class synopsis for a future that allows you to construct a future with a functor and then retrieve it’s value asynchronously:

   template
   <class T>
   class async_future{
   public:
       template
       <class Functor>
       async_future(Functor fn);
       ~async_future();
       T GetValue();
   };

Here’s a simple example of creating a future with a lambda and retrieving its value:

   async_future<int> f([](){
       return 2;
   });
   int result = f.GetValue();
   assert(result == 2);

The scenario above isn’t super exciting, and it isn’t really asynchronous but it’s functional and in practice a lot of concurrency can be realized by chaining and combining futures together into expressions and waiting on the final result.

Implementing it step-by-step

Let’s take a look at one possible way of implementing this template class.  There are in fact other strategies which can be more efficient if you’re building lots of very fine grained futures, but this is very easy to write and in practice for relatively large expressions is probably quite fine.  I’ll mention the overheads and tradeoffs as I go along.

The first thing we’ll need in our future class is a place to store that value and a way to compute it asynchronously. From the CTP, I’m going to use both the task constructs in <ppl.h> and the message blocks in <agents.h>. To do this let’s create a couple private member variables starting with an overwrite_buffer:

   overwrite_buffer<T> value_;

The overwrite_buffer is a template class defined in agents.h, called a “message block” that allows a single value to be “sent” to it or stored, and copies of the most recent value are extracted upon request typically using a helper function called ‘”receive”.  We will use this to store our result. It’s also important to note that ideally this would be a single assignment variable, but that isn’t included in the CTP (so I’m not using it here).

The next thing we need is a way to compute the value asynchronously and to do this I’m going to use a task_group from the PPL.

      task_group tg_;

task_group is a class defined in <ppl.h> and is used to schedule tasks, wait for them all to complete or cancel any outstanding tasks assigned to it.  In the CTP the task_group is actually thread-safe so I can add tasks to it, call wait or cancel from any thread and get well defined behavior.

Now let’s implement the constructor which will schedule a task to run the functor passed in and then place it’s result into our overwrite_buffer.  We schedule a task with the task_group::run template member function and assign it to the overwrite_buffer with the template method send defined in agents.h, note that here in the lambda I’m capturing fn & this by value and copying them into the lambda:

   template
   <class Functor>
   async_future(Functor fn){
       tg_.run([fn,this](){
           send(this->value_,fn());
       });
   }

I mentioned I would call out overheads and we’ve just created one. The overload for task_group::run that we just used will have a heap allocation because if you notice the ‘fn’ object is created on the stack and would be destructed if once the method exits, so we need to allocate a copy on the heap and delete it after it’s been executed (or canceled, or faulted).  There is an overloaded task_group::run method that takes a reference to a task_handle<T> that can be used which can avoid this particular overhead, but it will rely on the user to manage the lifetime of that task and ensure it is still valid whenever it is executed.

Next we need to implement GetValue to return the result.  To do this we’re going to use the template method receive, which is the analog of send.  When receive is used on a ‘message block’ it will actually block and wait for that value to be computed.  One of the important things about using the agents APIs to do this is that receive will generate a blocking notification to the Concurrency Runtime and the runtime will use this blocking notification to run additional work if it’s been scheduled.  Here’s how GetValue is implemented, it’s incredibly straightforward:

   T GetValue(){
       return receive(value_);
   };


The last thing we need to do is implement the destructor. Here I’m just calling task_group::wait to ensure that our task is cleaned up.  You may also want to call task_group::cancel if you don’t need that task to run to completion.

   ~async_future(){
       tg_.wait();
   };

Putting it all together

Here’s the complete program, that was a lot of typing for not very much code:

#include <agents.h>
template
<class T>
class async_future{
    overwrite_buffer<T> value_;
    task_group tg_;
public:
    template
    <class Functor>
    async_future(Functor fn){
        tg_.run([fn,this](){
            send(this->value_,fn());
        });
    }
    ~async_future(){
        tg_.wait();
    };
    T GetValue(){
        return receive(value_);
    };
};
void main(){
    async_future<int> f([](){
        return 2;
    });
    int result = f.GetValue();
    assert(result == 2);
}

Wrapping it up

I’m still not doing anything very interesting here and I could always do something like compute the fibonacci sequence the slow way. But in practice there are many more uses for a future, like applying a filter to an image or various forms of i/o where there is a combination of CPU work and perhaps a significant amount of latency.

As I alluded there are other implementation strategies here, for example the transform message block in <agents.h> could be used instead of a task_group and a series of message blocks could be combined to implement richer functionality like that described in the C++0x specification, most notably a timer and choice (wait for one of n inputs to be sent) or join would likely be required to ease implementation.

and here’s that recursive Fibonacci sequence, it’s late and I can’t resist:

   int fib(int n){
       if(n < 2)
           return n;
       async_future<int> n1([n](){return fib(n-1);});
       async_future<int> n2([n](){return fib(n-2);});
       return n1.GetValue() + n2.GetValue();
   }

-Rick

C++0x
concurrency
concurrency runtime
parallelism
technology

Comments (1)

Permalink

Get Adobe Flash playerPlugin by wpburn.com wordpress themes