C#教程

关注公众号 jb51net

关闭
首页 > 软件编程 > C#教程 > c# blockingcollection

C# BlockingCollection的使用小结

作者:卷纸要用清风的

BlockingCollection是一个线程安全的集合,它可以使用多种底层集合,并提供阻塞和限制大小的功能,本文主要介绍了C# BlockingCollection,感兴趣的可以了解一下

什么是BlockingCollection<T>

BlockingCollection<T> 是一个线程安全的集合,它提供了一种机制,允许一个或多个生产者线程将数据添加到集合中,同时允许一个或多个消费者线程从集合中取出数据。它内部封装了一个线程安全的集合(如 ConcurrentQueue<T>ConcurrentStack<T>ConcurrentBag<T>),并提供了阻塞和限制集合大小的功能。

主要特点

构造函数

BlockingCollection<T> 提供了多种构造方式:

// 使用默认的 ConcurrentQueue<T>,无容量限制
var blockingCollection = new BlockingCollection<int>();

// 使用默认的 ConcurrentQueue<T>,并指定最大容量
var blockingCollection = new BlockingCollection<int>(10);

// 指定底层集合类型
var blockingCollection = new BlockingCollection<int>(new ConcurrentStack<int>());

常用方法

生产者操作

消费者操作

示例代码

以下是一个简单的生产者-消费者示例,使用 BlockingCollection<T> 实现:

using System;
using System.Collections.Concurrent;
using System.Threading;

class Program
{
    static void Main()
    {
        // 创建一个容量为 5 的 BlockingCollection
        var blockingCollection = new BlockingCollection<int>(5);

        // 启动生产者线程
        Thread producerThread = new Thread(() =>
        {
            for (int i = 1; i <= 10; i++)
            {
                blockingCollection.Add(i); // 添加元素
                Console.WriteLine($"Producer added: {i}");
                Thread.Sleep(500); // 模拟生产时间
            }
            blockingCollection.CompleteAdding(); // 标记不再添加元素
        });

        // 启动消费者线程
        Thread consumerThread = new Thread(() =>
        {
            foreach (var item in blockingCollection.GetConsumingEnumerable())
            {
                Console.WriteLine($"Consumer consumed: {item}");
                Thread.Sleep(1000); // 模拟消费时间
            }
        });

        producerThread.Start();
        consumerThread.Start();

        producerThread.Join();
        consumerThread.Join();
    }
}

输出示例

Producer added: 1
Producer added: 2
Consumer consumed: 1
Producer added: 3
Consumer consumed: 2
Producer added: 4
Consumer consumed: 3
Producer added: 5
Consumer consumed: 4
Producer added: 6
Consumer consumed: 5
Producer added: 7
Consumer consumed: 6
Producer added: 8
Consumer consumed: 7
Producer added: 9
Consumer consumed: 8
Producer added: 10
Consumer consumed: 9
Consumer consumed: 10

注意事项

串口接收

在使用 BlockingCollection<T> 存储串口接收的数据,并在其他线程中取出时,是否能保证数据的顺序,主要取决于以下两个因素:

底层存储的类型

BlockingCollection<T> 允许指定底层存储的类型。默认情况下,它使用 ConcurrentQueue<T> 作为底层存储,而 ConcurrentQueue<T> 是一个先进先出 FIFO的队列。这意味着数据的添加顺序和取出顺序是一致的,因此可以保证顺序。
如果你使用其他类型的底层存储(如 ConcurrentStack<T> 或自定义的线程安全集合),则顺序可能会有所不同。例如:

线程安全和并发访问

BlockingCollection<T> 是线程安全的,因此即使在多线程环境下,数据的添加和取出操作也是安全的。只要底层存储是 FIFO 的(如 ConcurrentQueue<T>),数据的顺序就能得到保证。

串口数据接收的顺序性

串口通信本身是按字节顺序接收数据的,因此只要数据是逐字节接收并立即添加到 BlockingCollection<T> 中,数据的顺序就能得到保证。
示例代码
以下是一个示例,展示如何使用 BlockingCollection<T> 存储串口接收的数据,并在其他线程中按顺序取出:

using System;
using System.Collections.Concurrent;
using System.IO.Ports;
using System.Threading;

class SerialPortExample
{
    private SerialPort _serialPort;
    private BlockingCollection<string> _dataQueue = new BlockingCollection<string>();

    public SerialPortExample(string portName)
    {
        _serialPort = new SerialPort(portName)
        {
            BaudRate = 9600,
            DataBits = 8,
            Parity = Parity.None,
            StopBits = StopBits.One,
            ReadTimeout = 500
        };

        _serialPort.DataReceived += SerialPort_DataReceived;
    }

    private void SerialPort_DataReceived(object sender, SerialDataReceivedEventArgs e)
    {
        try
        {
            string data = _serialPort.ReadLine(); // 假设数据以换行符分隔
            _dataQueue.Add(data); // 将数据添加到阻塞集合
            Console.WriteLine($"Received and added: {data}");
        }
        catch (Exception ex)
        {
            Console.WriteLine($"Error in DataReceived: {ex.Message}");
        }
    }

    public void Start()
    {
        _serialPort.Open();
        Thread consumerThread = new Thread(ConsumeData);
        consumerThread.Start();
    }

    private void ConsumeData()
    {
        foreach (var data in _dataQueue.GetConsumingEnumerable())
        {
            Console.WriteLine($"Consumed: {data}");
            // 处理数据
        }
    }

    public void Stop()
    {
        _dataQueue.CompleteAdding();
        _serialPort.Close();
    }

    static void Main()
    {
        SerialPortExample example = new SerialPortExample("COM3");
        example.Start();

        Console.WriteLine("Press Enter to exit...");
        Console.ReadLine();

        example.Stop();
    }
}

关键点

因此,只要使用默认的 ConcurrentQueue<T> 作为底层存储,并且正确处理串口数据的接收和添加,BlockingCollection<T> 是可以保证数据顺序的。

到此这篇关于C# BlockingCollection的使用小结的文章就介绍到这了,更多相关c# blockingcollection内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文