多线程编程之计算限制型异步操作(续)
时间:2011-04-15 来源:qiang.xu
1. CLR线程池简介
1.1 CLR为什么支持线程池
1.2 线程池ThreadPool管理线程
2. 线程执行上下文
2.1 线程执行上下文简介
2.2 一个简单示例
3. 线程池常见应用情景示例
3.1 将一个线程添加至线程池中(向线程传递参数)
3.2 协作式取消
4. Task对象
5. Task常见编程情景
5.1 创建Task,并启动该Task
5.2 获取Task任务的结果
5.3 Task任务异常处理
5.4 取消Task
5.5 启动新Task
6. 定时器Timer
<4>. Task对象
通过上一篇的介绍,我们知道通过ThreadPool的QueueUserWorkItem方法能够很简单的将一个工作线程添加到线程池中,但是这其中的最大的问题是无法得到工作线程的返回值,还好在.net 4.0中提供了System.Threading.Tasks命名空间中提供了Task类和泛型Task<TResult>类型,非泛型的Task类型(无法得到返回值)如下:
public class Task : IThreadPoolWorkItem, IAsyncResult, IDisposable
{
// 构造函数重载
public Task(Action action); // 无参
public Task(Action<object> action, object state); // 向action中传递state参数
public Task(Action action, CancellationToken cancellationToken); // 支持协作式取消
// 使用creationOptions标志创建Task,TaskCreationOptions将在下面介绍
public Task(Action action, TaskCreationOptions creationOptions);
// 其他组合构造函数重载版本...
// get属性:AsyncState ,CreationOptions ,CurrentId,Exception,Factory等
// 该任务完成之后,启动另外的任务方法:
public Task ContinueWith(Action<Task> continuationAction);
// 重载版本
public void Start(); // 调用Task的Start来调度任务运行
public void Start(TaskScheduler scheduler); // 使用某个调度器
// 在Task上调用Wait方法将“等待”指定Task结束
public void Wait(); // t.Wait(),将等待t结束
public static void WaitAll(params Task[] tasks); // 等待tasks全部结束
public static int WaitAny(params Task[] tasks);
// ......
}
泛型的Task<TResult>定义除了上面的方法之外,需要特别注意的是:
public class Task<TResult> : Task
{
// 构造函数
public Task(Func<TResult> function);
// 得到Task运算结果
public TResult Result { get; internal set; }
}
最后需要特别指出的是 TaskCreationOptions类型,该类型定义在System.Threading.Tasks命名空间中:
{
None = 0,
PreferFairness = 1,
LongRunning = 2, // 需要长时间运行
AttachedToParent = 4, // 附加到父Task中
}
<5>. Task使用情景
5.1 创建Task,并启动该Task
通过前面对Task类型的分析可以看出,Task的构造函数和ThreadPoll的QueueUserWorkItem函数的参数很类似,下面是一个生成并启动Task的代码段:
ThreadPool.QueueUserWorkItem Task t = new Task(ComputeBoundOp, 5);t.Start();
5.2 获取Task任务的结果
Task相对于线程池来说最重要的一个特性是能够得到Task的返回值,下面演示了这一特性:
static void Main(string[] args) { Task<Int32> t = new Task<Int32>( (n => SumInt32((Int32)n)), 5); // 启动task t.Start(); // 等待task完成 t.Wait(); // 输出结果 Console.WriteLine(t.Result); Console.WriteLine("Press any key to continue.."); Console.ReadKey(); } private static Int32 SumInt32(Int32 m) { Int32 sum = 0; for (int i = 1; i <= m; ++i) { sum += i; } return sum; }5.3 Task任务异常处理
上面的SumInt32函数如果m的值过大的话,那么sum可能出现溢出的情况,于是SumInt32将抛出异常,那么该Exception该如何处理,Task线程池的做法是允许该Task(SumInt32)返回到线程池中,但是在调用t.Result时将抛出System.AggregateException异常。System.AggregateException类型定义如下:
public class AggregateException : Exception{
// 构造函数
// 通过函数名称Flatten(压平)也能大致了解该函数的功能,英文的解释:If any inner exceptions are themselves instances of AggregateException, // this method will recursively flatten all of them.
public AggregateException Flatten();
// 通过handle函数的predicate函数决定哪些exception是已经处理的,哪些是未处理的。Each invocation of the predicate returns true or false to in // dicate whether the Exception was handled.
public void Handle(Func<Exception, bool> predicate);
// 其他函数
那么一个极端的情况是如果一直不去查询Result属性会怎样?CLR在进行垃圾回收该Task对象时,如果存在未处理的异常,CLR将终止该线程。
5.4 取消Task
static void Main(string[] args) { CancellationTokenSource cts = new CancellationTokenSource(); Task<Int32> t = new Task<Int32> ( () => SumInt32(cts.Token, 1000) ); // 开启任务 t.Start(); // 取消任务,这是可能SumInt32已经完成或者是没有完成 cts.Cancel(); // 得到结果时,可能抛出异常 try { Console.WriteLine("The Result is {0}", t.Result); } catch (System.AggregateException ex) { // 如果异常是因为取消而抛出的,调用handle方法表明该异常是 // 无需处理的,如果还存在未处理的异常,CLR将结束该进程 ex.Handle(e => e is OperationCanceledException); // 如果没有未处理的异常才能打印这句话 Console.WriteLine("Sum is canceled"); } Console.WriteLine("Press any key to continue.."); Console.ReadKey(); } private static Int32 SumInt32(CancellationToken token, Int32 m) { Int32 sum = 0; for (int i = 0; i < m; ++i) { // 抛出异常表明任务没有完成,这里将抛出OperationCanceledException token.ThrowIfCancellationRequested(); sum += i; } return sum;}
5.5 启动新Task
上面的代码中为了得到一个Task的返回值的话,那么需要调用Task的Wait方法,这显然降低了性能。一种更好的形式是但Task结束之后,它自动启动另外的Task去处理这个结果。则可以通过Task类的ContinueWith方法实现,该方法定义如下:
public Task ContinueWith(Action<Task<TResult>> continuationAction);
public Task ContinueWith(Action<Task<TResult>> continuationAction, CancellationToken cancellationToken);// 将cancellationToken附加到新Task中
public Task ContinueWith(Action<Task<TResult>> continuationAction, TaskContinuationOptions continuationOptions);
// 其他重载定义
在上面的最后一个重载版本中continuationOptions类型为TaskContinuationOptions,定义如下:
public enum TaskContinuationOptions{
None = 0,
// ....
AttachedToParent = 4, // 将Task关联到它的父Task中,这将表明除非所有的子任务完成,否则不会认为父任务完成
// 下面的标识指出在什么情况下运行ContinueWith任务
NotOnRanToCompletion = 65536,
NotOnFaulted = 131072,
OnlyOnCanceled = 196608,
// 更为常用的类型
OnlyOnFaulted = 327680,
OnlyOnRanToCompletion = 393216,
// 其他类型
}
下面是一段示例代码:
Task<Int32> t = new Task<Int32> ( ( n => SumInt32((Int32)n) ), 10000 ); // 开始该task,并不去关心该task何时结束 t.Start(); // 如果Task结束,并且完成了 t.ContinueWith ( task => Console.WriteLine("Task result is " + task.Result), TaskContinuationOptions.OnlyOnRanToCompletion ); Console.WriteLine("Press any key..");Console.ReadKey();
<6>. 定时器Timer
在System.Threading命名空间中存在Timer类,能够实现让线程池在每隔一段时间调用一个方法,Timer定义如下:
public sealed class Timer : MarshalByRefObject, IDisposable{
public Timer(TimerCallback callback);
// 回调方法callback,state向callback传递的参数,dueTime多长时间之后首次调用callback方法,period每次调用callback的时间间隔
public Timer(TimerCallback callback, object state, int dueTime, int period);
public Timer(TimerCallback callback, object state, TimeSpan dueTime, TimeSpan period);
// 其他重载版本
// 修改dueTime和period参数
public bool Change(int dueTime, int period);
// 其他重载版本
}
一个简单试用示例:
class Program { private static Timer s_timer; static void Main(string[] args) { Console.WriteLine("Main thread : starting a timer"); // 立即开始执行该timer, using(s_timer = new Timer(ComputeBoundOp, 5, 0, Timeout.Infinite)) { Console.WriteLine("Main thread : do other work .."); Thread.Sleep(10000); // 10s } } private static Int32 SumInt32(CancellationToken token, Int32 m) { Int32 sum = 0; for (int i = 0; i < m; ++i) { // 抛出异常表明任务没有完成,这里将抛出OperationCanceledException token.ThrowIfCancellationRequested(); sum += i; } return sum; } private static Int32 SumInt32(Int32 m) { Int32 sum = 0; for (int i = 1; i <= m; ++i) { sum += i; } return sum; } private static void ComputeBoundOp(Object state) { Console.WriteLine("In compute bound operation, state = {0}", state); // 模拟任务,休眠1s时间 Thread.Sleep(1000); // 两秒之后在调用这个方法 s_timer.Change(2000, Timeout.Infinite); // 这个方法返回到线程池中,等待下一个工作项 } }