C#中System.IO.Pipelines库的使用详解
作者:rjcql
System.IO.Pipelines 是一个库,旨在使在 .NET 中执行高性能 I/O 更加容易,本文主要为大家详细介绍了System.IO.Pipelines具体使用方法,感兴趣的可以了解下
写在前面
在进一步了解Socket粘包分包的过程中,了解到了.NET 中的 System.IO.Pipelines,可以更优雅高效的解决这个问题;先跟随官方的示例做个初步的认识。
System.IO.Pipelines 是一个库,旨在使在 .NET 中执行高性能 I/O 更加容易。 该库的目标为适用于所有 .NET 实现的 .NET Standard。
System.IO.Pipelines 具有高性能的流数据分析功能,可以减少代码复杂性。
老规矩通过NuGet安装该类库
代码实现
using System.Buffers; using System.IO.Pipelines; using System.Text; class Program { static async Task Main() { using var stream = File.OpenRead("lorem-ipsum.txt"); var reader = PipeReader.Create(stream); var writer = PipeWriter.Create( Console.OpenStandardOutput(), new StreamPipeWriterOptions(leaveOpen: true)); WriteUserCancellationPrompt(); var processMessagesTask = ProcessMessagesAsync(reader, writer); var userCanceled = false; var cancelProcessingTask = Task.Run(() => { while (char.ToUpperInvariant(Console.ReadKey().KeyChar) != 'C') { WriteUserCancellationPrompt(); } userCanceled = true; // No exceptions thrown reader.CancelPendingRead(); writer.CancelPendingFlush(); }); await Task.WhenAny(cancelProcessingTask, processMessagesTask); Console.WriteLine( $"\n\nProcessing {(userCanceled ? "cancelled" : "completed")}.\n"); } static void WriteUserCancellationPrompt() => Console.WriteLine("Press 'C' to cancel processing...\n"); static async Task ProcessMessagesAsync( PipeReader reader, PipeWriter writer) { try { while (true) { ReadResult readResult = await reader.ReadAsync(); ReadOnlySequence<byte> buffer = readResult.Buffer; try { if (readResult.IsCanceled) { break; } if (TryParseLines(ref buffer, out string message)) { FlushResult flushResult = await WriteMessagesAsync(writer, message); if (flushResult.IsCanceled || flushResult.IsCompleted) { break; } } if (readResult.IsCompleted) { if (!buffer.IsEmpty) { throw new InvalidDataException("Incomplete message."); } break; } } finally { reader.AdvanceTo(buffer.Start, buffer.End); } } } catch (Exception ex) { Console.Error.WriteLine(ex); } finally { await reader.CompleteAsync(); await writer.CompleteAsync(); } } static bool TryParseLines( ref ReadOnlySequence<byte> buffer, out string message) { SequencePosition? position; StringBuilder outputMessage = new(); while (true) { position = buffer.PositionOf((byte)'\n'); if (!position.HasValue) break; outputMessage.Append(Encoding.ASCII.GetString(buffer.Slice(buffer.Start, position.Value))) .AppendLine(); buffer = buffer.Slice(buffer.GetPosition(1, position.Value)); }; message = outputMessage.ToString(); return message.Length != 0; } static ValueTask<FlushResult> WriteMessagesAsync( PipeWriter writer, string message) => writer.WriteAsync(Encoding.ASCII.GetBytes(message)); }
调用示例
总结
例子中用到的文本文件是一个以\n 换行符作为结尾的多行文本,微软官方示例没有提供,这个是自己建的测试文件,如果没有检测到\n会抛出异常。
从运行的结果可以看到,从传入的流中识别以\n结尾,作为数据块的区分,利用这个特性定义数据报文的尾部,实现分包。
到此这篇关于C#中System.IO.Pipelines库的使用详解的文章就介绍到这了,更多相关C# System.IO.Pipelines内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!