A while ago I was chatting with some coworkers about the different ways to schedule tasks in the Concurrency Runtime, Parallel Pattern Library and Agents Library and then how to see them in our parallel debugger windows. So, I wrote a simple program which covers a large portion of the surface area of each of the libraries, and in particular shows almost all of the ways that you can schedule a task, with comments.
If you’re curious about how to visualize these tasks in the new parallel debugger windows in Visual Studio 2010, then head over to Daniel Moth’s blog where he has several examples.
Here’s the example code, you can read it top down:
#include <windows.h> #include <ppl.h> #include <agents.h> #include <concrt.h> #include <concrtrm.h> #include <math.h> #include <list> #include <vector> using namespace ::Concurrency; using namespace ::std; //a pointer to function void PointerToFunction() { printf("hello world from a pointer to function\n"); } //a C++ functor object class MyFunctor { public: void operator()()const { printf("hello world from a functor\n"); } void MemberFunction() { printf("here's a member function, you can't use this as a functor without binding it\n"); } }; void TaskGroupTasks() { //task groups and structured taskgroups use //a task_handle as the base task task_group tasks; //you can of course use a lambda as we often show tasks.run([](){printf("hello from a lambda\n");}); //you can also use a classic C++ functor tasks.run(MyFunctor()); //task groups allow you to wait multiple times on them... //so you can call wait and then reuse it //this is not the case with structured_task_group tasks.wait(); //you can also schedule a pointer to function as a task handle tasks.run(&PointerToFunction); //and you can use the stl helper functions to bind things //to a function object that aren't usually treated that way //like a member function on a class MyFunctor f; tasks.run(bind(mem_fun(&MyFunctor::MemberFunction),&f)); tasks.wait(); //you can also use task_handles //we usually do it with a lambda and //tge make_task helper function auto task = make_task([](){printf("this is the easiest and most performant task_handle\n");}); //you have to have task handle be l-values (all the previous examples were temporaries or r-values tasks.run(task); //using an r-value task_handle is a compiler error because when the user uses a task_handle //they are responsible for managing the lifetime of a task //tasks.run(make_task([](){printf("this is a compiler error\n");})); //you can use a task_handle explicitly but you need to know the template type //the make_task helper hides this for you... //i.e. auto foo = make_task(....) avoides code like this: task_handle<function<void(void)>> task1 = [](){printf("the types of lambdas are anonymous\n");}; tasks.run(task1); tasks.wait(); //we also added run_and_wait which is useful for supporting cancellation and is analgous //to running a task sequentially. tasks.run_and_wait([](){printf("this task will be run on the calling thread\n");}); { //structured task groups are a bit different, but the functionality is a subset of //the task_group. Generally the guidance is to use task_group or parallel_inovke as opposed //to structured_task_group, unless you are buiding a parallel algorithm like a loop or //a recursive sort. //You can also *only* schedule task_handles with structured_task_group, not general functors. structured_task_group structured_tasks; auto structured_task = make_task([](){printf("our first structured task\n");}); structured_tasks.wait(); //you can't reuse structured task groups after a call to wait structured_task_group tasks2; auto structured_task2 = make_task([](){printf("our second structured task\n");}); tasks2.run(structured_task2); //run and wait is useful for recursion and cancellation composability tasks2.run_and_wait([](){printf("this task will be run on the calling thread\n");}); } //that's basically it for task_groups. } DWORD WINAPI ThreadFunction(PVOID data) { printf("hello from create thread.\n"); return S_OK; } void LightWeightTaskFunc(void* data) { int* pNum = (int*) data; printf("hello from a lightweight task:%d.\n",*pNum); } void LightweightTasks() { //the other type of tasks in the runtime are lightweight tasks //the most basic version of the lightweight task looks a lot like a //thread creation function. Here's one of those just to remind us what they look like: CreateThread(NULL,NULL,&ThreadFunction,NULL,NULL,NULL); //to schedule lightweight tasks, you need a Scheduler or a ScheduleGroup //so let's create one of each. Scheduler* sched = Scheduler::Create(0); ScheduleGroup * scheduleGroup = sched->CreateScheduleGroup(); //remember with this syntax, all data needs to be //cast to a single void* and dereferenced. int* num = new int(5); //but even this offers some usabilility improvements over CreateThread //the additional parameters are pushed off to scheduling. //here's how to schedule a LWT from a scheduler instance sched->ScheduleTask(&LightWeightTaskFunc,(void*)num); //you can also do it from the current scheduler, without an instance CurrentScheduler::ScheduleTask(&LightWeightTaskFunc,(void*)num); //a schedule group's interface is the same scheduleGroup->ScheduleTask(&LightWeightTaskFunc,(void*)num); //Lightweight tasks are also used in agents. any time an asend is called we'll schedule an LWT. //good luck catching this in the debugger however unbounded_buffer<int> buf1; //but here's a trick... event e; //if you have a filter that blocks you should be able to catch it... auto blockingFilter = [&] (int in) -> bool { //set your breakpoint here e.wait(); return true; }; //note that there are constructor overloads for all message blocks //that let you specify a filter method to filter messages based on type //and that let you specify which scheduler or schedule group to run the //the task in. unbounded_buffer<int> buf2(*sched,blockingFilter); buf1.link_target(&buf2); //this will block asend(&buf1,1); //this unblocks the task e.set(); receive(&buf2); //we also have call and transform that themselves invoke tasks. //they take functors so all the variations from the task_group tasks apply //the filter method, scheduler and schedule group variations also apply transformer<int,double> t ([](int in)-> double{ printf("inside a transform LWT\n"); return sqrt((double)in); }); call <double> c([](double d){ printf("inside a call\n"); return; }); t.link_target(&c); //now we can asynchronously send messages to the transformer and call and //see them as tasks asend(t,4); //timers also generate messages and potentially tasks... timer<double> myTimer(800,2); myTimer.start(); // let's wait for it... (we are now 'blocked') receive(&myTimer); myTimer.stop(); //note I didn’t use overwrite_buffer or single_assignment as examples //but they are similar to unbounded_buffer } void Agents() { //here's how to implement a simple agent, it also uses a lwt //declared inline to hide it... class MyAgent : public agent { public: MyAgent() {}; //an agent requires that you override 'run' //this is it's asynchronous start method //which is spawned as a task when agent::start is called void run() { printf("inside an agent's run method\n"); //this sets the agent as done (signature was changed since beta1, but this is the currnent one) //it used to look like this I believe: agent::done(agent_done); this->done(); } }; MyAgent a; //start the agent a.start(); //wait for it to complete agent::wait(&a); } void Algorithms() { //we have 3 algorithms in ppl: parallel_for_each, parallel_for, parallel_invoke //parallel_invoke has overloads for up to 10 functors... //all algorithms may use run_and_wait internally for some or any tasks //let's put these all up as a functor... auto algorithms = [](){ // a list and a vector so we have both forward and random // access iterators std::list<int> l; std::vector<int> v; for(int i = 0; i < 10; ++i) { l.push_back(i); v.push_back(i); } auto printInt = [](int in){printf("hello from parallel agorithm iteration: %d\n",in);}; parallel_invoke([](){printf("invoke 1\n");}, [](){printf("invoke 2\n");}); //we can use the same functor for parallel_for and both versions of parallel_for_each parallel_for(0,10,1,printInt); parallel_for(0,10,1,printInt); parallel_for_each(l.begin(),l.end(),printInt); parallel_for_each(v.begin(),v.end(),printInt); // and these can be nested in various ways parallel_invoke([&](){parallel_for(0,10,1,printInt);}, [&](){parallel_for(0,10,1,printInt);}, [&](){parallel_for_each(l.begin(),l.end(),printInt);}, [&](){parallel_for_each(v.begin(),v.end(),printInt);} ); }; //these can all be run sequentially one right after another algorithms(); //but realistically we should expect things to be composed together: //like this transformer<int,bool> t([&](int i)->bool{ algorithms(); return true; }); //now all this work will itself be run in an lwt asend(&t,1); //and we can block and wait for it receive(&t); } int main() { TaskGroupTasks(); LightweightTasks(); Agents(); Algorithms(); return 0; }
Post a Comment