阅读本篇前,读者需对.NET4 System.Threading.Tasks 以及 Task Schedulers 有一定的了解。如果不是很了解,请查阅以下相关信息:
Task: http://msdn.microsoft.com/en-us/library/system.threading.tasks.task%28VS.100%29.aspx
Task Schedulers: http://msdn.microsoft.com/en-us/library/dd997402.aspx
首先回顾相关场景:最近工作需要一直在.NET4下编写window service。在WindowsService下使用了多线程相关技术。期间就用了到了线程池。使用线程池的目的:在系统中进行多线程并发也担心并发数量太大影响性能。于是使用线程池进行排队。一批一批执行多线程。当我在使用传统的.NET线程池的过程中碰见了一些问题,请看以下代码:
复制
try { ThreadPool.SetMaxThreads(2, 2); for (int i = 0; i < 5; i++) { ThreadPool.QueueUserWorkItem(new WaitCallback(InvokeThread1), i); } } catch { Console.WriteLine("error"); }
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
这里建立一个同时2个线程并发的线程池。在上述代码第7行传入InvokeThread1方法:
复制
static void InvokeThread1(object obj) { throw new NullReferenceException(); }
1.
2.
3.
假设程序发生异常,这个异常却让整个程线程池序崩溃了。主程序并未catch到这个exception。也许您会说这本来就是这样的嘛,有什么好贴出来的。但是在.NET4中我们可以避免掉这个问题。(此时体现出.NET4的异常强大)。还有个问题有必要提到:如果一次有两个线程同时并发(一共要执行5个线程,每次并发2个)。假设其中一个线程执行过程中出现了异常,要让这两个线程以外的三个线程都停止运行,来节省系统资源。传统的线程池也许可以做到,但是控制起来估计不会让你太轻松。但是在.NET4的Task机制中,这些都得到了妥善的解决,现将以上两个问题解决方案给出。如果存在不足的地方,请您指出。
一、自定义TaskScheduler
TaskScheduler代码如下:
自定义TaskScheduler
复制
//自定义TaskScheduler public class CustomTaskScheduler : TaskScheduler, IDisposable { //调用Task的线程 Thread[] _Threads; //Task Collection BlockingCollection<Task> _Tasks = new BlockingCollection<Task>(); int _ConcurrencyLevel; //设置schedule并发 public CustomTaskScheduler(int concurrencyLevel) { _Threads = new Thread[concurrencyLevel]; this._ConcurrencyLevel = concurrencyLevel; for (int i = 0; i < concurrencyLevel; i++) { _Threads[i] = new Thread(() => { foreach (Task task in _Tasks.GetConsumingEnumerable()) this.TryExecuteTask(task);27 }); _Threads[i].Start(); } } protected override void QueueTask(Task task) { _Tasks.Add(task); } protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued) { if (_Threads.Contains(Thread.CurrentThread)) return TryExecuteTask(task); return false; } public override int MaximumConcurrencyLevel50 { get { return _ConcurrencyLevel;54 } } protected override IEnumerable<Task> GetScheduledTasks() { return _Tasks.ToArray(); } public void Dispose() { this._Tasks.CompleteAdding(); foreach (Thread t in _Threads) { t.Join(); } } }
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
36.
37.
38.
39.
40.
41.
42.
43.
44.
45.
46.
47.
48.
49.
50.
该scheduler代码很简单,重写相关System.Threading.Tasks.TaskScheduler类下的相关方法即可,代码中已给出相关注释。
二、使用自定义的TaskScheduler
调用TaskScheduler代码:
复制
List<string> listMsg = new List<string>() { "Task1", "Task2", "Task3", "Task4", "Task5", "Task6" }; List<Task> listTask = new List<Task>(); foreach (string msg in listMsg) { Task myTask = new Task(obj => InvokeThread2((string)obj), msg, token); listTask.Add(myTask); myTask.Start(customTaskScheduler); } try { //等待所有线程全部运行结束 Task.WaitAll(listTask.ToArray()); } catch (AggregateException ex) { //.NET4 Task的统一异常处理机制 foreach (Exception inner in ex.InnerExceptions) { Console.WriteLine("Exception type {0} from {1}", inner.GetType(), inner.Source); } } Console.ReadLine();
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
InvokeThread2 相关代码:
复制
static void InvokeThread2(string msg) { try { var x = Convert.ToInt32(msg.Replace("Task", "").Trim()); Console.WriteLine(msg); Thread.Sleep(1000 * 5); Console.WriteLine("{0} ok", msg); } catch (Exception ex) { //如果有异常发生则取消正在排队的所有线程。 tokenSource.Cancel(); Exception exception = new Exception("error"); exception.Source = msg; throw exception; } }
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
以上代码运行效果如下:
接着在TaskScheduler调用代码中如果将第一行代码listMsg值修改成 List<string> listMsg = new List<string>() { "Task1", "Task2", "TaskA", "Task3", "Task4", "Task5", "Task6", "Task7", "Task8", "Task9" };这时候我们将得到以下结果:
这个运行结果重点要强调的地方为:后面这7个exception。聪明的您或许已经看出来前6个exception属于没有执行的"Task4", "Task5", "Task6", "Task7", "Task8", "Task9",而最后一个exception才是真正的发生异常的"TaskA"。这里主要用到了Task的统一异常处理机制AggregateException。可以从运行结果得到:Task1,Task2,Task3执行成功了,但是TaskA发生了异常导致了后面排队的"Task4", "Task5", "Task6", "Task7", "Task8", "Task9"都不会执行了。节省了系统资源,同时也提高了系统性能。
三、小结
本文主要用到了的是.NET4 的Task相关技术,Task让我们在多核并行控制的时候更加简单,功能更加强大。如果需进一步了解相关技术,博客园已经有不少教程。微软的MSDN也提供了很多参考资料。 最后希望本文可以给您的开发带来帮助。
原文链接:http://www.cnblogs.com/ryanding/archive/2011/03/22/1990799.html
【编辑推荐】