C#实现控制线程池最大数并发线程
作者:itshare
1. 实验目的:
使用线程池的时候,有时候需要考虑服务器的最大线程数目和程序最快执行所有业务逻辑的取舍。
并非逻辑线程越多也好,而且新的逻辑线程必须会在线程池的等待队列中等待 ,直到线程池中工作的线程执行完毕,
才会有系统线程取出等待队列中的逻辑线程,进行CPU运算。
2. 解决问题:
<a>如果不考虑服务器实际可支持的最大并行线程个数,程序不停往线程池申请新的逻辑线程,这个时候我们可以发现CPU的使用率会不断飙升,并且内存、网络带宽占用也会随着逻辑线程在CPU队列中堆积,而不断增大。
<b>如果我们想在主程序有200个http网络通讯需要执行,如何每次循环用10个线程并发处理10个网络http通讯回话,下一次循环只有在上一次循环的10个线程都执行完毕后才会执行下一次循环,并且主程序监听和等待200个http网络通讯都在CPU线程池中执行完毕后,才会退出主程序。
3. 实现逻辑:
我们通过两个AutoResetEvent和线程监听器Monitor,分别实现:
<a>wait_sync: 任务线程的 并发执行,每次循环只处理最大10个线程分别对网络做http通讯回话。并且当前循环的10个线程都执行完毕后,才会进行下一次循环处理。
<b> wait_main: 主程序线程的监听和等待,只有所有任务线程都执行完毕后,主程序线程才会退出程序。
<c> list_Thread: 负责记录每次循环,CPU实际分配的系统线程的个数。和Monitor配合使用,Monitor.Enter(list_Thread)=占用共享线程资源的占用锁,Monitor.Exit(list_Thread)释放共享线程资源的占用锁。
<d> n_total_thread: 配合wait_main使用,记录全部逻辑线程,已经执行完毕的当前总个数,用来判断主线程是否还需要继续等待,还是可以结束主程序运行。
4. 主要代码:
<a> 线程池控制代码,如下:
/// <summary> /// 多线程调用WCF /// </summary> /// <param name="select">调用WCF的方式,1=Restful,2=Tcp</param> /// <param name="num"></param> static void DoTest_MultiThread(string select, long num) { int n_max_thread = 10; // 设置并行最大为10个线程 int n_total_thread = 0; // 用来控制:主程序的结束执行,当所有任务线程执行完毕 ILog log_add = new LogHelper("Add_Thread"); ILog log_del = new LogHelper("Del_Thread"); ILog log_wait = new LogHelper("Wait_Thread"); ILog log_set = new LogHelper("Set_Thread"); ILog log_for = new LogHelper("For_Thread"); Console.Title = string.Format("调用WCF的方式 => {0}, 调用次数=> {1}" , select == "1" ? "Restful" : "Socket" , num); List<int> list_Thread = new List<int>(); System.Threading.AutoResetEvent wait_sync = new System.Threading.AutoResetEvent(false); // 用来控制:并发最大个数线程=n_max_thread System.Threading.AutoResetEvent wait_main = new System.Threading.AutoResetEvent(false); // 用来控制:主程序的结束执行,当所有任务线程执行完毕 DateTime date_step = DateTime.Now; for (long i = 0; i < num; i++) { Num_Query_Static++; if (i >0 && (i+1-1) % n_max_thread == 0) // -1 表示第max个线程尚未开始 { //log_wait.Info(string.Format("thread n= {0},for i= {1}", dic_Thread.Count, i + 1)); wait_sync.WaitOne(); // 每次并发10个线程,等待处理完毕后,在发送下一次并发线程 } log_for.Info(string.Format("thread n= {0},for i= {1}", list_Thread.Count, i + 1)); System.Threading.ThreadPool.QueueUserWorkItem ((data) => { int id = System.Threading.Thread.CurrentThread.ManagedThreadId; System.Threading.Monitor.Enter(list_Thread); list_Thread.Add(id); System.Threading.Monitor.Exit(list_Thread); log_add.Info(string.Format("id={0}, count={1}", id, list_Thread.Count)); // 日志 if (select == "1") // Restful方式调用 { Query_Htty(); } else { Query_Socket(); } n_total_thread += 1; if (list_Thread.Count == (n_max_thread) || n_total_thread == num) { list_Thread.Clear(); //log_set.Info(string.Format("thread n= {0},for i= {1}", dic_Thread.Count, i + 1)); //wait_sync.Set(); if (n_total_thread != num) { wait_sync.Set(); // 任务线程,继续执行 } else { wait_main.Set(); // 主程序线程,继续执行 } } }, list_Thread); } wait_main.WaitOne(); Console.WriteLine(string.Format("总测试{0}次,总耗时{1}, 平均耗时{2}" , num , (DateTime.Now - date_step).ToString() , (DateTime.Now - date_step).TotalMilliseconds / num)); Query_Thread(); }
<b> WCF后台服务代码
private static ILog log = new LogHelper("SeqService"); // 日志 private static Dictionary<int, DateTime> dic_thread = new Dictionary<int, DateTime>(); // 线程列表 private static long Num = 0; // 线程个数 private static object lock_Num = 0; // 共享数据-锁 /// <summary> /// 在线申请流水号 /// </summary> /// <returns></returns> [WebGet(UriTemplate = "GetSeqNum/Json", ResponseFormat = WebMessageFormat.Json)] public string GetSeqNumber() { lock (lock_Num) { Num++; int id_thread = System.Threading.Thread.CurrentThread.ManagedThreadId; DateTime now = DateTime.Now; if (!dic_thread.TryGetValue(id_thread, out now)) { dic_thread.Add(id_thread, DateTime.Now); } } string ret = DateTime.Now.ToString("yyyyMMdd") + Num.ToString(new string('0', 9)); log.Info(string.Format("{0}, Thread={1}/{2}", ret, System.Threading.Thread.CurrentThread.ManagedThreadId, dic_thread.Count)); return ret; }
5. 实验结果
1. 10000个WCF网络http请求,CPU分成每次10个(10可以按需求调整)线程并发执行,并且主程序在所有请求都执行完毕后,才退出主程序。
1. 前端日志:LogFile\Add_Thread\Info
2. WCF日志:LogFile\SeqService\Info