C#使用channel实现Plc异步任务之间的通信
作者:潘诺西亚的火山
在C#的并发编程中,Channel是一种非常强大的数据结构,用于在生产者和消费者之间进行通信,本文将给大家介绍C#使用channel实现Plc异步任务之间的通信,文中有相关的代码示例供大家参考,感兴趣的朋友跟着小编一起来看看吧
channel 通信的例子:
using ConsoleApp2; using System.Collections.Concurrent; using System.Threading.Channels; var queue = new BlockingCollection<Message>(new ConcurrentQueue<Message>()); var opt = new BoundedChannelOptions(10) { FullMode = BoundedChannelFullMode.Wait, SingleReader = true, SingleWriter = true, Capacity = 100 //最大容量 }; //有限的 var channelTest = Channel.CreateBounded<Message>(opt); //无限的 var channel = Channel.CreateUnbounded<Message>(); var sender1 = SendMessageThreadAsync(channel.Writer, 1); var sender2 = SendMessageThreadAsync(channel.Writer, 2); var receiver1 = ReceiveMessageThreadAsync(channel.Reader, 3); var receiver2 = ReceiveMessageThreadAsync(channel.Reader, 4); //await sender; // make sure all messages are received await Task.WhenAll(sender1, sender2); channel.Writer.Complete(); await Task.WhenAll(receiver1, receiver2); //await receiver; Console.WriteLine("Press any key to exit..."); Console.ReadKey(); async Task SendMessageThreadAsync(ChannelWriter<Message> writer, int id) { for (int i = 0; i < 20; i++) { await writer.WriteAsync(new Message(id, i.ToString())); Console.WriteLine($"Thread {id} sent {i}"); await Task.Delay(100); } } async Task ReceiveMessageThreadAsync(ChannelReader<Message> reader, int id) { //try //{ // while (!reader.Completion.IsCompleted) // { // var message = await reader.ReadAsync(); // Console.WriteLine($"Thread {id} received {message.Content}"); // } //} //catch (Exception ex) //{ // Console.WriteLine($"Thread {id} channel closed:{ex.Message}"); //} await foreach (var message in reader.ReadAllAsync()) { Console.WriteLine($"Thread {id} received {message.Content}"); } } record Message(int FromId, string Content);
改造为Plc的实例
record PlcDataMessage { public bool IsConnected { get; init; } public DbData DbData { get; init; } // 可以添加其他需要传递的信息 }
// 创建一个无边界的Channel来发送和接收消息 var plcDataChannel = Channel.CreateUnbounded<PlcDataMessage>(); // 启动一个新的任务来模拟PLC数据读取 Task.Factory.StartNew(async () => { var cts = new CancellationTokenSource(); // 假设您已经有了取消令牌源 while (!cts.IsCancellationRequested) { try { // ... 省略了连接PLC的代码,这部分逻辑保持不变 ... if (MyIsConnected) { DbData dbDataTemp = await s7Plc.ReadClassAsync<DbData>(42, 0); // 心跳和其他操作... // 构造消息并发送到Channel var message = new PlcDataMessage { IsConnected = MyIsConnected, DbData = dbDataTemp }; await plcDataChannel.Writer.WriteAsync(message, cts.Token); } // ... 其他逻辑保持不变 ... } catch (Exception ex) { // 处理异常并重新连接PLC(如果需要) // ... // 可以通过Channel发送一个特殊的消息来表示连接已断开或发生了错误 // 这里省略了这部分逻辑 // 休眠一段时间后再重试 await Task.Delay(2000, cts.Token); } } // 完成后通知Channel不再发送更多数据 plcDataChannel.Writer.Complete(); }, cts.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default); // 在另一个任务或线程中读取Channel中的数据 Task.Run(async () => { await foreach (var message in plcDataChannel.Reader.ReadAllAsync(cts.Token)) { if (message.IsConnected) { lock (lockObj) { // 更新dbData,这里假设dbData是一个线程安全的对象或结构 dbData.Str_S = message.DbData.Str_S.Trim(); // ... 更新其他属性 ... } // 处理读取到的数据... } else { // 处理PLC断开连接的情况... } } // 读取完成,Channel已关闭 Console.WriteLine("PLC数据读取完毕。"); }, cts.Token); // ... 其他代码,如等待所有任务完成、处理取消逻辑等 ...
using System; using System.Threading; using System.Threading.Channels; using System.Threading.Tasks; // ... 其他必要的引用和类型定义 ... // 创建一个无边界的Channel来发送和接收消息 var plcDataChannel = Channel.CreateUnbounded<PlcDataMessage>(); // 取消令牌源 var cts = new CancellationTokenSource(); // 启动一个新的任务来模拟PLC数据读取 Task.Run(async () => { Plc s7Plc = null; bool MyIsConnected = false; int errorTimes = 0; try { while (!cts.IsCancellationRequested) { if (s7Plc == null || !MyIsConnected) { // 尝试连接PLC(略去具体实现) // ... if (MyIsConnected) { // 连接成功,发送连接成功消息(如果需要) // ... } } else { try { // 读取PLC数据(略去具体实现) DbData dbDataTemp = await s7Plc.ReadClassAsync<DbData>(42, 0, cts.Token); // 心跳和其他操作... // 构造消息并发送到Channel var message = new PlcDataMessage { IsConnected = MyIsConnected, DbData = dbDataTemp }; await plcDataChannel.Writer.WriteAsync(message, cts.Token); errorTimes = 0; // 重置错误计数器 } catch (Exception ex) { errorTimes++; // 处理异常(例如记录日志) // ... // 在达到一定错误次数后,关闭PLC连接并重置 if (errorTimes > someThreshold) { s7Plc?.Close(); s7Plc = null; MyIsConnected = false; // 可以选择发送一个断开连接的消息到Channel } // 休眠一段时间后再重试 await Task.Delay(2000, cts.Token); } } // 可以添加一些延时来减少循环的频率 await Task.Delay(somePollingInterval, cts.Token); } } catch (OperationCanceledException) { // 取消是预期的,不需要额外处理 } finally { // 确保关闭PLC连接和Channel写入器 s7Plc?.Close(); plcDataChannel.Writer.Complete(); } }, cts.Token); // 在另一个任务或线程中读取Channel中的数据 Task.Run(async () => { await foreach (var message in plcDataChannel.Reader.ReadAllAsync(cts.Token)) { if (message.IsConnected) { // 更新dbData(这里假设dbData是一个线程安全的对象或结构) // 根据需要添加适当的同步机制 // ... // 处理读取到的数据... } else { // 处理PLC断开连接的情况... } } // 读取完成,Channel已关闭 Console.WriteLine("PLC数据读取完毕。"); }, cts.Token); // ... 其他代码,如等待所有任务完成、处理取消逻辑等 ... // 在某个适当的时刻取消任务 // cts.Cancel(); // 等待所有任务完成(如果需要
拓展:C# Channel实现线程间通信
C# Channel实现线程间通信
同步方式实现:
using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using System.Threading.Channels; using System.Threading.Tasks; namespace ConsoleApp1 { public class ChannelDemo { static Channel<Message> channel1 = Channel.CreateUnbounded<Message>(); public static void Main2() { sender.Start(1); receive1.Start(2); receive2.Start(3); sender.Join(); Thread.Sleep(3000); receive1.Interrupt(); receive2.Interrupt(); receive1.Join(); receive2.Join(); Console.ReadKey(); } static Thread sender = new Thread(SendMsg); static Thread receive1 = new Thread(ReceiveMsg); static Thread receive2 = new Thread(ReceiveMsg); static void SendMsg(object id) { for (int i = 0; i < 20; i++) { if (channel1.Writer.TryWrite(new Message((int)id, i.ToString()))) { Console.WriteLine($"【线程{id}】发送了【{i}】"); } } } static void ReceiveMsg(object id) { try { while (true) { if (channel1.Reader.TryRead(out Message message)) { Console.WriteLine($"【线程{id}】从【线程{message.id}】接收了【{message.content}】"); } Thread.Sleep(1); } } catch (ThreadInterruptedException ex) { Console.WriteLine($"接收结束"); } } } }
异步方式:
using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using System.Runtime.Remoting.Channels; using System.Text; using System.Threading; using System.Threading.Channels; using System.Threading.Tasks; namespace ConsoleApp1 { public class ChannelDemo2 { static Channel<Message> channel1 = Channel.CreateUnbounded<Message>(); public static async void Main2() { await Task.WhenAll(sender, sender2); channel1.Writer.Complete(); await Task.WhenAll(receive1, receive2); Console.ReadKey(); } static Task sender = SendMsgAsync(channel1.Writer, 1); static Task sender2 = SendMsgAsync(channel1.Writer, 4); static Task receive1 = ReceiveMsgAsync(channel1.Reader, 2); static Task receive2 = ReceiveMsgAsync(channel1.Reader, 3); static async Task SendMsgAsync(ChannelWriter<Message> writer, int id) { for (int i = 0; i < 20; i++) { await writer.WriteAsync(new Message((int)id, i.ToString())); Console.WriteLine($"【线程{id}】发送了【{i}】"); } } static async Task ReceiveMsgAsync(ChannelReader<Message> reader,int id) { try { while (!reader.Completion.IsCompleted) { Message message = await reader.ReadAsync(); Console.WriteLine($"【线程{id}】从【线程{message.id}】接收了【{message.content}】"); } } catch (ChannelClosedException ex) { Console.WriteLine($"ChannelClosed 接收结束"); } } } }
在对Channel进行实例化的时候,也可以传递一个Options,这里面可以对消息容量,是否多个发送者和接受者进行定义。
以上就是C#使用channel实现Plc异步任务之间的通信的详细内容,更多关于C# channel Plc异步通信的资料请关注脚本之家其它相关文章!