C#多线程系列之资源池限制
作者:痴者工良
Semaphore、SemaphoreSlim 类
两者都可以限制同时访问某一资源或资源池的线程数。
这里先不扯理论,我们从案例入手,通过示例代码,慢慢深入了解。
Semaphore 类
这里,先列出 Semaphore 类常用的 API。
其构造函数如下:
| 构造函数 | 说明 | 
|---|---|
| Semaphore(Int32, Int32) | 初始化 Semaphore 类的新实例,并指定初始入口数和最大并发入口数。 | 
| Semaphore(Int32, Int32, String) | 初始化 Semaphore 类的新实例,并指定初始入口数和最大并发入口数,根据需要指定系统信号灯对象的名称。 | 
| Semaphore(Int32, Int32, String, Boolean) | 初始化 Semaphore 类的新实例,并指定初始入口数和最大并发入口数,还可以选择指定系统信号量对象的名称,以及指定一个变量来接收指示是否创建了新系统信号量的值。 | 
Semaphore 使用纯粹的内核时间(kernel-time)方式(等待时间很短),并且支持在不同的进程间同步线程(像Mutex)。
Semaphore 常用方法如下:
| 方法 | 说明 | 
|---|---|
| Close() | 释放由当前 WaitHandle占用的所有资源。 | 
| OpenExisting(String) | 打开指定名称为信号量(如果已经存在)。 | 
| Release() | 退出信号量并返回前一个计数。 | 
| Release(Int32) | 以指定的次数退出信号量并返回前一个计数。 | 
| TryOpenExisting(String, Semaphore) | 打开指定名称为信号量(如果已经存在),并返回指示操作是否成功的值。 | 
| WaitOne() | 阻止当前线程,直到当前 WaitHandle 收到信号。 | 
| WaitOne(Int32) | 阻止当前线程,直到当前 WaitHandle 收到信号,同时使用 32 位带符号整数指定时间间隔(以毫秒为单位)。 | 
| WaitOne(Int32, Boolean) | 阻止当前线程,直到当前的 WaitHandle 收到信号为止,同时使用 32 位带符号整数指定时间间隔,并指定是否在等待之前退出同步域。 | 
| WaitOne(TimeSpan) | 阻止当前线程,直到当前实例收到信号,同时使用 TimeSpan 指定时间间隔。 | 
| WaitOne(TimeSpan, Boolean) | 阻止当前线程,直到当前实例收到信号为止,同时使用 TimeSpan 指定时间间隔,并指定是否在等待之前退出同步域。 | 
示例
我们来直接写代码,这里使用 《原子操作 Interlocked》 中的示例,现在我们要求,采用多个线程执行计算,但是只允许最多三个线程同时执行运行。
使用 Semaphore ,有四个个步骤:
new 实例化 Semaphore,并设置最大线程数、初始化时可进入线程数;
使用 .WaitOne(); 获取进入权限(在获得进入权限前,线程处于阻塞状态)。
离开时使用 Release() 释放占用。
Close() 释放Semaphore 对象。
《原子操作 Interlocked》 中的示例改进如下:
    class Program
    {
        // 求和
        private static int sum = 0;
        private static Semaphore _pool;
        // 判断十个线程是否结束了。
        private static int isComplete = 0;
        // 第一个程序
        static void Main(string[] args)
        {
            Console.WriteLine("执行程序");
            // 设置允许最大三个线程进入资源池
            // 一开始设置为0,就是初始化时允许几个线程进入
            // 这里设置为0,后面按下按键时,可以放通三个线程
            _pool = new Semaphore(0, 3);
            for (int i = 0; i < 10; i++)
            {
                Thread thread = new Thread(new ParameterizedThreadStart(AddOne));
                thread.Start(i + 1);
            }
            Console.ForegroundColor = ConsoleColor.Red;
            Console.WriteLine("任意按下键(不要按关机键),可以打开资源池");
            Console.ForegroundColor = ConsoleColor.White;
            Console.ReadKey();
            // 准许三个线程进入
            _pool.Release(3);
            // 这里没有任何意义,就单纯为了演示查看结果。
            // 等待所有线程完成任务
            while (true)
            {
                if (isComplete >= 10)
                    break;
                Thread.Sleep(TimeSpan.FromSeconds(1));
            }
            Console.WriteLine("sum = " + sum);
            // 释放池
            _pool.Close();
            
        }
        public static void AddOne(object n)
        {
            Console.WriteLine($"    线程{(int)n}启动,进入队列");
            // 进入队列等待
            _pool.WaitOne();
            Console.WriteLine($"第{(int)n}个线程进入资源池");
            // 进入资源池
            for (int i = 0; i < 10; i++)
            {
                Interlocked.Add(ref sum, 1);
                Thread.Sleep(TimeSpan.FromMilliseconds(500));
            }
            // 解除占用的资源池
            _pool.Release();
            isComplete += 1;
            Console.WriteLine($"                     第{(int)n}个线程退出资源池");
        }
    }看着代码有点多,快去运行一下,看看结果。
示例说明
实例化 Semaphore 使用了new Semaphore(0,3); ,其构造函数原型为
public Semaphore(int initialCount, int maximumCount);
initialCount 表示一开始允许几个进程进入资源池,如果设置为0,所有线程都不能进入,要一直等资源池放通。
maximumCount 表示最大允许几个线程进入资源池。
Release() 表示退出信号量并返回前一个计数。这个计数指的是资源池还可以进入多少个线程。
可以看一下下面的示例:
        private static Semaphore _pool;
        static void Main(string[] args)
        {
            _pool = new Semaphore(0, 5);
            _pool.Release(5);
            new Thread(AddOne).Start();
            Thread.Sleep(TimeSpan.FromSeconds(10));
            _pool.Close();
        }
        public static void AddOne()
        {
            _pool.WaitOne();
            Thread.Sleep(1000);
            int count = _pool.Release();
            Console.WriteLine("在此线程退出资源池前,资源池还有多少线程可以进入?" + count);
        }信号量
前面我们学习到 Mutex,这个类是全局操作系统起作用的。我们从 Mutex 和 Semphore 中,也看到了 信号量这个东西。
信号量分为两种类型:本地信号量和命名系统信号量。
- 命名系统信号量在整个操作系统中均可见,可用于同步进程的活动。
 - 局部信号量仅存在于进程内。
 
当 name 为 null 或者为空时,Mutex 的信号量时局部信号量,否则 Mutex 的信号量是命名系统信号量。
Semaphore 的话,也是两种情况都有。
如果使用接受名称的构造函数创建 Semaphor 对象,则该对象将与该名称的操作系统信号量关联。
两个构造函数:
Semaphore(Int32, Int32, String) Semaphore(Int32, Int32, String, Boolean)
上面的构造函数可以创建多个表示同一命名系统信号量的 Semaphore 对象,并可以使用 OpenExisting 方法打开现有的已命名系统信号量。
我们上面使用的示例就是局部信号量,进程中引用本地 Semaphore 对象的所有线程都可以使用。 每个 Semaphore 对象都是单独的本地信号量。
SemaphoreSlim类
SemaphoreSlim 跟 Semaphore 有啥关系?
微软文档:
SemaphoreSlim 表示对可同时访问资源或资源池的线程数加以限制的 Semaphore 的轻量替代。
SemaphoreSlim 不使用信号量,不支持进程间同步,只能在进程内使用。
它有两个构造函数:
| 构造函数 | 说明 | 
|---|---|
| SemaphoreSlim(Int32) | 初始化 SemaphoreSlim 类的新实例,以指定可同时授予的请求的初始数量。 | 
| SemaphoreSlim(Int32, Int32) | 初始化 SemaphoreSlim 类的新实例,同时指定可同时授予的请求的初始数量和最大数量。 | 
示例
我们改造一下前面 Semaphore 中的示例:
    class Program
    {
        // 求和
        private static int sum = 0;
        private static SemaphoreSlim _pool;
        // 判断十个线程是否结束了。
        private static int isComplete = 0;
        static void Main(string[] args)
        {
            Console.WriteLine("执行程序");
            // 设置允许最大三个线程进入资源池
            // 一开始设置为0,就是初始化时允许几个线程进入
            // 这里设置为0,后面按下按键时,可以放通三个线程
            _pool = new SemaphoreSlim(0, 3);
            for (int i = 0; i < 10; i++)
            {
                Thread thread = new Thread(new ParameterizedThreadStart(AddOne));
                thread.Start(i + 1);
            }
            Console.WriteLine("任意按下键(不要按关机键),可以打开资源池");
            Console.ReadKey();
            // 
            _pool.Release(3);
            // 这里没有任何意义,就单纯为了演示查看结果。
            // 等待所有线程完成任务
            while (true)
            {
                if (isComplete >= 10)
                    break;
                Thread.Sleep(TimeSpan.FromSeconds(1));
            }
            Console.WriteLine("sum = " + sum);
            // 释放池
        }
        public static void AddOne(object n)
        {
            Console.WriteLine($"    线程{(int)n}启动,进入队列");
            // 进入队列等待
            _pool.Wait();
            Console.WriteLine($"第{(int)n}个线程进入资源池");
            // 进入资源池
            for (int i = 0; i < 10; i++)
            {
                Interlocked.Add(ref sum, 1);
                Thread.Sleep(TimeSpan.FromMilliseconds(200));
            }
            // 解除占用的资源池
            _pool.Release();
            isComplete += 1;
            Console.WriteLine($"                     第{(int)n}个线程退出资源池");
        }
    }SemaphoreSlim 不需要 Close()。
两者在代码上的区别是就这么简单。
区别
如果使用下面的构造函数实例化 Semaphor(参数name不能为空),那么创建的对象在整个操作系统内都有效。
public Semaphore (int initialCount, int maximumCount, string name);
Semaphorslim 则只在进程内内有效。
SemaphoreSlim 类不会对 Wait、WaitAsync 和 Release 方法的调用强制执行线程或任务标识。
而 Semaphor 类,会对此进行严格监控,如果对应调用数量不一致,会出现异常。
此外,如果使用 SemaphoreSlim(Int32 maximumCount) 构造函数来实例化 SemaphoreSlim 对象,获取其 CurrentCount 属性,其值可能会大于 maximumCount。 编程人员应负责确保调用一个 Wait 或 WaitAsync 方法,便调用一个 Release。
这就好像笔筒里面的笔,没有监控,使用这使用完毕后,都应该将笔放进去。如果原先有10支笔,每次使用不放进去,或者将别的地方的笔放进去,那么最后数量就不是10了。

到此这篇关于C#多线程系列之资源池限制的文章就介绍到这了。希望对大家的学习有所帮助,也希望大家多多支持脚本之家。
