程序员人生 网站导航

TPL Part 4 -- Task的协同

栏目:php教程时间:2015-06-23 08:11:38

简单的Continuation

Task.ContinueWith(Task): 当指定的Task履行终了时。

void Main() { Task rootTask = new Task(()=>{ Console.WriteLine("root task completed"); }); root Task.ContinueWith((Task previousTask)=>{ Console.WriteLine("continute task completed"); }); rootTask.Start(); // waitfor input before exiting Console.WriteLine("Press enter to finish"); Console.ReadLine(); }


Task.ContinueWhenAll(Task[]):当指定的所有Task都履行终了时,示例代码:

 

Task continuation = Task.Factory.ContinueWhenAll<int>(tasks, antecedents =>{ foreach(Task<int> t in antecedents) { // dosomething } });


 

 

TaskFactory.ContinueWhenAny(Task[]):当指定的所有Task的任意1个履行终了时,代码与ContinueWhenAll类似(以下代码中,打印出前1个Task的履行时间):

Task continuation = Task.Factory.ContinueWhenAny<int>(tasks, (Task<int>antecedent) => { //write out a message using the antecedent result Console.WriteLine("The first task slept for {0} milliseconds", antecedent.Result); });


Continue 选项

OnlyOnRanToCompletion仅当履行完

NotOnRanToCompletion:没有履行完(被取消或出现异常)

OnlyOnFaulted:仅当出现异常

NotOnFaulted:没有出现异常

OnlyOnCancelled:仅当被取消

NotOnCancelled:没有被取消

处理异常

void Main() { Task rootTask = new Task(()=>{ Console.WriteLine("root task completed"); throw new Exception("root throwed exception"); }); rootTask.ContinueWith((Task previousTask)=>{ Console.WriteLine("even root throw exception , I still run"); }); rootTask.Start(); // waitfor input before exiting Console.WriteLine("Press enter to finish"); Console.ReadLine(); }


以上代码中,第1个task中抛出了异常,Continue的Task依然会继续履行。可是Task被Finalized时异常就会抛出。

解决方案:

void Main() { Task rootTask = new Task(()=>{ Console.WriteLine("root task completed"); throw new Exception("root throwed exception"); }); var t2 = rootTask.ContinueWith((Task previousTask)=>{ // if(previousTask.Status== TaskStatus.Faulted){ throw previousTask.Exception.InnerException; } Console.WriteLine("even root throw exception , I still run"); }); rootTask.Start(); try{ t2.Wait(); } catch(AggregateException ex){ ex.Handle(inner=>{Console.WriteLine("exception handled in main thread"); return true;}); } // waitfor input before exiting Console.WriteLine("Press enter to finish"); Console.ReadLine(); }


在Task中冒泡抛出异常,在主线程中等待最后那个Task的履行并对AggregateException进行处理。

创建子Task

创建子Task并附加在父Task上:

void Main() { Task parentTask = new Task(() => { Console.WriteLine("parent task started"); //create the first child task Task childTask = new Task(() => { // writeout a message and wait Console.WriteLine("Child task running"); Thread.Sleep(1000); Console.WriteLine("Child task throwed exception"); throw new Exception(); } ,TaskCreationOptions.AttachedToParent); Console.WriteLine("start child task..."); childTask.Start(); Console.WriteLine("parent task ended"); }); // startthe parent task parentTask.Start(); // waitfor input before exiting Console.WriteLine("Press enter to finish"); Console.ReadLine(); }


1. 父Task会抛出子Task中的异常

2. 父Task的状态会遭到所附加的子Task状态的影响

Barrier的使用

 

class BankAccount { public int Balance { get; set; } } ; void Main() { //create the array of bank accounts BankAccount[] accounts = new BankAccount[6]; for(int i = 0;i < accounts.Length; i++) { accounts[i] = new BankAccount(); } //create the total balance counter int totalBalance = 0; //create the barrier Barrier barrier = new Barrier(3, (myBarrier) => { // zerothe balance totalBalance= 0; // sumthe account totals foreach(BankAccount account in accounts) { totalBalance+= account.Balance; } // writeout the balance Console.WriteLine("[From barrier :] Total balance: {0}",totalBalance); }); //define the tasks array Task[] tasks = new Task[3]; // loopto create the tasks for(int i = 0;i < tasks.Length; i++) { tasks[i]= new Task((stateObj) => { //create a typed reference to the account BankAccount account = (BankAccount)stateObj; // startof phase Random rnd = new Random(); for(int j = 0;j < 1000; j++) { account.Balance+= 2; } Thread.Sleep(new Random().Next(3000)); Console.WriteLine("Task {0} waiting, phase {1} ", Task.CurrentId,barrier.CurrentPhaseNumber); //signal the barrier barrier.SignalAndWait(); account.Balance-= 1000; Console.WriteLine("barrier finished ."); // endof phase Console.WriteLine("Task {0}, phase {1} ended", Task.CurrentId,barrier.CurrentPhaseNumber); //signal the barrier barrier.SignalAndWait(); }, accounts[i]); } // startthe task foreach(Task t in tasks) { t.Start(); } // waitfor all of the tasks to complete Task.WaitAll(tasks); // waitfor input before exiting Console.WriteLine("Press enter to finish"); Console.ReadLine(); }


在以上代码中,打开了3个barrier和3个Task,在Task中为每一个账户添加2000,然后给barrier发出同步信号,当barrier收到3个信号时,对账号进行求和并保存;当barrier完成逻辑后,控制权交给了每一个Task,此时每一个Task对account减1000,再次求和,最后结果为3000。

如果希望通过Cancel来控制barrier的行动,还可以在barrier中传入tokenSource.Token:barrier.SignalAndWait(tokenSource.Token);并在Task中履行Cancel:tokenSource.Cancel()。

可以通过调用barrier.RemoveParticipant();来减少barrier的count。

CountEventDown

作用和Barrier类似,累计信号数量,当信号量到达指定数量,set event。

void Main() { CountdownEvent cdevent = new CountdownEvent(5); //create a Random that we will use to generate // sleepintervals Random rnd = new Random(); //create 5 tasks, each of which will wait for // arandom period and then signal the event Task[] tasks = new Task[6]; for(int i = 0;i < tasks.Length; i++) { //create the new task tasks[i]= new Task(() => { // putthe task to sleep for a random period // up toone second Thread.Sleep(rnd.Next(500, 1000)); //signal the event Console.WriteLine("Task {0} signalling event",Task.CurrentId); cdevent.Signal(); }); }; //create the final task, which will rendezous with the other 5 // usingthe count down event tasks[5] = new Task(()=> { // waiton the event Console.WriteLine("Rendezvous task waiting"); cdevent.Wait(); Console.WriteLine("CountDownEvent has been set"); }); // startthe tasks foreach(Task t in tasks) { t.Start(); } Task.WaitAll(tasks); // waitfor input before exiting Console.WriteLine("Press enter to finish"); Console.ReadLine(); }


在以上代码中,开启了5个Task和1个count为5的CountDownEvent对象,每一个Task中完成任务后分别对CountDownEvent发信号,当凑齐5个信号后,会打印出CountDownEvent has been set。

ManualResetEvent 和 AutoResetEvent

熟习.net之前版本的应当都对它们很熟习,用于在多线程环境中完成线程同步。区分在于,前者必须调用reset才能恢覆信号;而AutoResetEvent则会自动reset。在此不再赘述。

SemaphoreSlim

void Main() { SemaphoreSlim semaphore = new SemaphoreSlim(3); //create the cancellation token source CancellationTokenSource tokenSource = new CancellationTokenSource(); //create and start the task that will wait on the event for(int i = 0;i < 10; i++) { Task.Factory.StartNew((obj)=> { semaphore.Wait(tokenSource.Token); // printout a message when we are released Console.WriteLine("Task {0} released", obj); },i,tokenSource.Token); } //create and start the signalling task Task signallingTask = Task.Factory.StartNew(() => { // loopwhile the task has not been cancelled while(!tokenSource.Token.IsCancellationRequested) { // go tosleep for a random period tokenSource.Token.WaitHandle.WaitOne(500); //signal the semaphore semaphore.Release(3); Console.WriteLine("Semaphore released"); } // if wereach this point, we know the task has been cancelled tokenSource.Token.ThrowIfCancellationRequested(); },tokenSource.Token); // askthe user to press return before we cancel // thetoken and bring the tasks to an end Console.WriteLine("Press enter to cancel tasks"); Console.ReadLine(); //cancel the token source and wait for the tasks tokenSource.Cancel(); // waitfor input before exiting Console.WriteLine("Press enter to finish"); Console.ReadLine(); }


在以上代码中,new了1个SemaphoreSlim对象并传入3,开了10个Task线程,每当有信号从Semaphore传来时,打印Task[i]被release。同时开1个信号线程,每500毫秒release3个Task。

可见,Semaphore的作用主要是可以选择1次release多少个Task。

 

Producer / Consumer(生产者/消费者模式)

以下代码中,new了1个BlockingCollection,类型为Deposit。开了3个生产者Task,每一个生产者中创建20个Deposit对象并给Amount赋值为100。在主线程中等待生产者Task履行终了,调用blockingCollection.CompleteAdding()方法。以后开1个消费者Task用于操作账户对象,循环判断blockingCollection.IsCompleted属性(生产者是不是完成工作),从集合拿出存款对象,增加账户余额。

示例代码:

class BankAccount { public int Balance { get; set; } } class Deposit { public int Amount { get; set; } } void Main() { BlockingCollection<Deposit> blockingCollection = new BlockingCollection<Deposit>(); var producers = new List<Task>(); for(int i = 0;i < 3; i++) { var producer = Task.Factory.StartNew((obj) => { //create a series of deposits for(int j = 0;j < 20; j++) { //create the transfer var randAmount = new Random().Next(100); Deposit deposit = new Deposit { Amount = randAmount}; Thread.Sleep(newRandom().Next(200)); // placethe transfer in the collection blockingCollection.Add(deposit); Console.WriteLine(string.Format("Amount: {0} deposit Processed, index: {1}",randAmount, int.Parse(obj.ToString()) +j)); } }, i*20); producers.Add(producer); }; //create a many to one continuation that will signal // theend of production to the consumer Task.Factory.ContinueWhenAll(producers.ToArray(),antecedents => { //signal that production has ended Console.WriteLine("Signalling production end"); blockingCollection.CompleteAdding(); }); //create a bank account BankAccount account = new BankAccount(); //create the consumer, which will update // thebalance based on the deposits Task consumer = Task.Factory.StartNew(() => { while(!blockingCollection.IsCompleted) { Deposit deposit; // tryto take the next item if(blockingCollection.TryTake(outdeposit)) { //update the balance with the transfer amount account.Balance+= deposit.Amount; } } // printout the final balance Console.WriteLine("Final Balance: {0}", account.Balance); }); // waitfor the consumer to finish consumer.Wait(); // waitfor input before exiting Console.WriteLine("Press enter to finish"); Console.ReadLine(); }


------分隔线----------------------------
------分隔线----------------------------

最新技术推荐