Pipelines - .NET中的新IO API指引(一)

Pipelines - .NET中的新IO API指引(二)

关于System.IO.Pipelines的一篇说明

System.IO.Pipelines: .NET高性能IO

System.IO.Pipelines 是对IO的统一抽象,文件、com口、网络等等,重点在于让调用者注意力集中在读、写缓冲区上,典型的就是 IDuplexPipe中的Input Output。

可以理解为将IO类抽象为读、写两个缓冲区。

目前官方实现还处于preview状态,作者使用Socket和NetworkStream 实现了一个 Pipelines.Sockets.Unofficial

作者在前两篇中提到使用System.IO.Pipelines 改造StackExchange.Redis,在本篇中作者采用了改造现有的SimplSockets库来说明System.IO.Pipelines的使用。

文章中的代码(SimplPipelines,KestrelServer )

## SimplSockets说明

+ 可以单纯的发送(Send),也可以完成请求/响应处理(SendRecieve)
+ 同步Api
+ 提供简单的帧协议封装消息数据
+ 使用byte[]
+ 服务端可以向所有客户端广播消息
+ 有心跳检测等等

属于非常典型的传统Socket库。

## 作者的改造说明

### 对缓冲区数据进行处理的一些方案及选型

1. 使用byte[]拷贝出来,作为独立的数据副本使用,简单易用但成本高(分配和复制)
   2. 使用 ReadOnlySequence<byte> ,零拷贝,快速但有限制。一旦在管道上执行Advance操作,数据将被回收。在有严格控制的服务端处理场景(数据不会逃离请求上下文)下可以使用,言下之意使用要求比较高。
   3. 作为2的扩展方案,将数据载荷的解析处理代码移至类库中(处理ReadOnlySequence<byte>),只需将解构完成的数据发布出来,也许需要一些自定义的structs 映射(map)一下。这里说的应该是直接将内存映射为Struct?
   4. 通过返回Memory<byte> 获取一份数据拷贝,也许需要从ArrayPool<byte>.Share 池中返回一个大数组;但是这样对调用者要求较高,需要返回池。并且从Memory<T> 获取一个T[]属于高级和不安全的操作。不安全,有风险。( not all Memory<T> is based on T[])
   5. 一个妥协方案,返回一个提供Memory<T>(Span<T>)的东西,并且使用一些明确的显而易见的Api给用户,这样用户就知道应该如何处理返回结果。比如IDisposable/using这种,在Dispose()被调用时将资源归还给池。
  
   作者认为,设计一个通用的消息传递Api时,方案5更为合理,调用方可以保存一段时间的数据并且不会干扰到管道的正常工作,也可以很好的利用ArrayPool。如果调用者没有使用using也不会有什么大麻烦,只是效率会降低一些,就像使用方案1一样。
     但是方案的选择需要充分考虑你的实际场景,比如在StackExchange.Redis 客户端中使用的是方案3;在不允许数据离开请求上下文时使用方案2.。
  一旦选定方案,以后基本不可能再更改。

针对效率最高的方案2,作者提出的专业建议是 **使用ref struct** 。
此处选择的是方案5,与方案4的区别就是对Memory<T> 的处理,作者使用 System.Buffers.IMemoryOwner<T>接口

 public interface IMemoryOwner<T> : IDisposable{Memory<T> Memory { get; }}

以下为实现代码,Dispose时归还借出的数组,并考虑线程安全,避免多次归还(very bad)。

private sealed class ArrayPoolOwner<T>:IMemoryOwner<T>{private readonly int _length;private T[] _oversized;internal ArrayPoolOwner(T[] oversized,int length){_length=length;_oversized=oversized;}public Memory<T> Memory=>new Memory<T>(GetArray(),0,_length);private T[] GetArray()=>Interlocked.CompareExchange(ref _oversized,null,null)?? throw new ObjectDisposedException(ToString());public void Dispose(){var arr=Interlocked.Exchange(ref _oversized,null);if(arr!=null) ArrayPool<T>.Shared.Return(arr);}
}

Dispose后如果再次调用Memory将会失败,即 使用时 using,不要再次使用。
**对ArrayPool的一些说明**
+ 从ArrayPool借出的数组比你需要的要大,你给定的大小在ArrayPool看来属于下限(不可小于你给定的大小),见:ArrayPool<T>.Shared.Rent(int minimumLength);
+ 归还时数组默认不清空,因此你借出的数组内可能会有垃圾数据;如果需要清空,在归还时使用 ArrayPool<T>.Shared.Return(arr,true) ;
作者对ArrayPool的一些建议: 
增加 IMemoryOwner<T> RentOwned(int length),T[] Rent(int minimumLength) 及借出时清空数组,归还时清空数组的选项。

 这里的想法是通过IMemoryOwner<T>实现一种所有权的转移,典型调用方法如下

 void DoSomething(IMemoryOwner<byte> data){using(data){// ... other things here ...DoTheThing(data.Memory);}// ... more things here ...}

通过ArrayPool的借、还机制避免频繁分配。

 **作者的警告:**
 + 不要把data.Memory 单独取出乱用,using完了就不要再操作它了(这种错误比较基础)
 + 有人会用MemoryMarshal搞出数组使用,作者认为可以实现一个 MemoryManager<T>(ArrayPoolOwner<T> : MemoryManager<T>, since MemoryManager<T> : IMemoryOwner<T>)让.Span如同.Memory一样失败。
 ---- 作者也挺纠结(周道)的 :)。
使用  ReadOnlySequence<T> 填充ArrayPoolOwner(构造,实例化)

public static IMemoryOwner<T> Lease<T>(this ReadOnlySequence<T> source){if (source.IsEmpty) return Empty<T>();int len = checked((int)source.Length);var arr = ArrayPool<T>.Shared.Rent(len);//借出source.CopyTo(arr);return new ArrayPoolOwner<T>(arr, len);//dispose时归还}

### 基本API

服务端和客户端虽然不同但代码有许多重叠的地方,比如都需要某种线程安全机制的写入,需要某种读循环来处理接收的数据,因此可以共用一个基类。
基类中使用IDuplexPipe(包括input,output两个管道)作为管道。

public abstract class SimplPipeline : IDisposable{private IDuplexPipe _pipe;protected SimplPipeline(IDuplexPipe pipe)=> _pipe = pipe;public void Dispose() => Close();public void Close() {/* burn the pipe*/}}

首先,需要一种线程安全的写入机制并且不会过度阻塞调用方。在原SimplSockets(包括StackExchange.Redis v1)中使用消息队列来处理。调用方Send时同步的将消息入队,在将来的某刻,消息出队并写入到Socket中。此方式存在的问题
+ 有许多移动的部分
+ 与“pipelines”有些重复
管道本身即是队列,本身具备输出(写、发送)缓冲区,没必要再增加一个队列,直接把数据写入管道即可。取消原有队列只有一些小的影响,在StackExchange.Redis v1 中使用队列完成优先级排序处理(队列跳转),作者表示不担心这一点。
**写入Api设计**
 + 不一定时同步的
 + 调用方可以单纯的传入一段内存数据(ReadOnlyMember<byte>),或者是一个(IMemoryOwner<byte>)由Api写入后进行清理。
  + 先假设读、写分开(暂不考虑响应)

protected async ValueTask WriteAsync(IMemoryOwner<byte> payload, int messageId)//调用方不再使用payload,需要我们清理{using (payload){await WriteAsync(payload.Memory, messageId);}}
protected ValueTask WriteAsync(ReadOnlyMemory<byte> payload, int messageId);//调用方自己清理

messageId标识一条消息,写入消息头部, 用于之后处理响应回复信息。
   返回值使用ValueTask因为写入管道通常是同步的,只有管道执行Flush时才可能是异步的(大多数情况下也是同步的,除非在管道被备份时)。

### 写入与错误

首先需要保证单次写操作,lock在此不合适,因为它不能与异步操作很好的协同。考虑flush有可能是异步的,导致后续(continuation )部分可能会在另外的线程上。这里使用与异步兼容的SemaphoreSlim。
下面是一条指南:**一般来说, 应用程序代码应针对可读性进行优化;库代码应针对性能进行优化。**
以下为机翻原文
您可能同意也可能不同意这一点, 但这是我编写代码的一般指南。我的意思是,类库代码往往有一个单一的重点目的, 往往由一个人的经验可能是 "深刻的, 但不一定是    广泛的" 维护;你的大脑专注于那个领域, 用奇怪的长度来优化代码是可以的。相反,应用程序代码往往涉及更多不同概念的管道-"宽但不一定深" (深度隐藏在各种库      中)。应用程序代码通常具有更复杂和不可预知的交互, 因此重点应放在可维护和 "明显正确" 上。
  基本上, 我在这里的观点是, 我倾向于把很多注意力集中在通常不会放入应用程序代码中的优化上, 因为我从经验和广泛的基准测试中知道它们真的很重要。所以。。。我要做一些看起来很奇怪的事情, 我希望你和我一起踏上这段旅程。

“明显正确”的代码
private readonly SemaphoreSlim _singleWriter= new SemaphoreSlim(1);
protected async ValueTask WriteAsync(ReadOnlyMemory<byte> payload, int messageId)
{await _singleWriter.WaitAsync();try{WriteFrameHeader(writer, payload.Length, messageId);await writer.WriteAsync(payload);}finally{_singleWriter.Release();}
}

这段代码没有任何问题,但是即便所有部分都是同步完成的,就会产生多余的状态机-------大概是 不是所有地方都需要异步处理 的意思。
通过两个问题进行重构
- 单次写入是否没有竞争?(无人争用)
- Flush是否为同步
重构,将原WriteAsync 更名为 WriteAsyncSlowPath,增加新的WriteAsync
作者的“一些看起来很奇怪的” 实现
protected ValueTask WriteAsync(ReadOnlyMemory<byte> payload, int messageId)
{// try to get the conch; if not, switch to async
//writer已经被占用,异步if (!_singleWriter.Wait(0))return WriteAsyncSlowPath(payload, messageId);bool release = true;try{WriteFrameHeader(writer, payload.Length, messageId);var write = writer.WriteAsync(payload);if (write.IsCompletedSuccessfully) return default;release = false;return AwaitFlushAndRelease(write);}finally{if (release) _singleWriter.Release();}
}
async ValueTask AwaitFlushAndRelease(ValueTask<FlushResult> flush)
{try { await flush; }finally { _singleWriter.Release(); }
}

三个地方
1. _singleWriter.Wait(0) 意味着writer处于空闲状态,没有其他人在调用
2. write.IsCompletedSuccessfully 意味着writer同步flush
3. 辅助方法 AwaitFlushAndRelease 处理异步flush情况
-------------------------------------------------------------------------------------

### 协议头处理

协议头由两个int组成,小端,第一个是长度,第二个是messageId,共8字节。

void WriteFrameHeader(PipeWriter writer, int length, int messageId)
{var span = writer.GetSpan(8);BinaryPrimitives.WriteInt32LittleEndian(span, length);BinaryPrimitives.WriteInt32LittleEndian(span.Slice(4), messageId);writer.Advance(8);
}

 

### 管道客户端实现 发送

public class SimplPipelineClient : SimplPipeline
{public async Task<IMemoryOwner<byte>> SendReceiveAsync(ReadOnlyMemory<byte> message){var tcs = new TaskCompletionSource<IMemoryOwner<byte>>();int messageId;lock (_awaitingResponses){messageId = ++_nextMessageId;if (messageId == 0) messageId = 1;_awaitingResponses.Add(messageId, tcs);}await WriteAsync(message, messageId);return await tcs.Task;}public async Task<IMemoryOwner<byte>> SendReceiveAsync(IMemoryOwner<byte> message){using (message){return await SendReceiveAsync(message.Memory);}}
}

- _awaitingResponses 是个字典,保存已经发送的消息,用于将来处理对某条(messageId)消息的回复。

### 接收循环
protected async Task StartReceiveLoopAsync(CancellationToken cancellationToken = default)
{try{while (!cancellationToken.IsCancellationRequested){var readResult = await reader.ReadAsync(cancellationToken);if (readResult.IsCanceled) break;var buffer = readResult.Buffer;var makingProgress = false;while (TryParseFrame(ref buffer, out var payload, out var messageId)){makingProgress = true;await OnReceiveAsync(payload, messageId);}reader.AdvanceTo(buffer.Start, buffer.End);if (!makingProgress && readResult.IsCompleted) break;}try { reader.Complete(); } catch { }}catch (Exception ex){try { reader.Complete(ex); } catch { }}
}
protected abstract ValueTask OnReceiveAsync(ReadOnlySequence<byte> payload, int messageId);

接收缓冲区里什么时间会有什么东西由发送方和系统环境决定,因此延迟是必然的,所以这里全部按异步处理就好。
- TryParseFrame 读出缓冲区数据,根据帧格式解析出实际数据、id等
- OnRecieveAsync 处理数据,比如对于回复/响应的处理等
- reader中的数据读出后尽快Advance一下,通知管道你的读取进度;
解析帧
private bool TryParseFrame(ref ReadOnlySequence<byte> input,out ReadOnlySequence<byte> payload, out int messageId)
{if (input.Length < 8){   // not enough data for the headerpayload = default;messageId = default;return false;}int length;if (input.First.Length >= 8){   // already 8 bytes in the first segmentlength = ParseFrameHeader(input.First.Span, out messageId);}else{   // copy 8 bytes into a local spanSpan<byte> local = stackalloc byte[8];input.Slice(0, 8).CopyTo(local);length = ParseFrameHeader(local, out messageId);}// do we have the "length" bytes?if (input.Length < length + 8){payload = default;return false;}// success!payload = input.Slice(8, length);input = input.Slice(payload.End);return true;
}

缓冲区是不连续的,一段一段的,像链表一样,第一段就是input.First。
代码很简单,主要演示一些用法;
辅助方法

static int ParseFrameHeader(ReadOnlySpan<byte> input, out int messageId)
{var length = BinaryPrimitives.ReadInt32LittleEndian(input);messageId = BinaryPrimitives.ReadInt32LittleEndian(input.Slice(4));return length;
}

OnReceiveAsync

protected override ValueTask OnReceiveAsync(ReadOnlySequence<byte> payload, int messageId)
{if (messageId != 0){   // request/responseTaskCompletionSource<IMemoryOwner<byte>> tcs;lock (_awaitingResponses){if (_awaitingResponses.TryGetValue(messageId, out tcs)){_awaitingResponses.Remove(messageId);}}tcs?.TrySetResult(payload.Lease());}else{   // unsolicitedMessageReceived?.Invoke(payload.Lease());}return default;
}


到此为止,其余部分主要是一些服务端和其他功能实现及benchmark。。。

Pipe——高性能IO(二)相关推荐

  1. select * 映射错误_高性能IO模型分析-浅析Select、Poll、Epoll机制(三)

    本章(第三章)内容其实和第二章内容,都是第一章内容的延伸.第二章内容是第一章内容的延伸,本章内容则是第一章内容再往底层方面的延伸,也是面试中考察网络方面知识时,可能会问到的几个点. select.po ...

  2. spring boot高性能实现二维码扫码登录(中)——Redis版

    前言 本打算用CountDownLatch来实现,但有个问题我没有考虑,就是当用户APP没有扫二维码的时候,线程会阻塞5分钟,这反而造成性能的下降.好吧,现在回归传统方式:前端ajax每隔1秒或2秒发 ...

  3. System.IO.Pipelines: .NET高性能IO

    本文翻译自dotnet团队博客文章:https://blogs.msdn.microsoft.com/dotnet/2018/07/09/system-io-pipelines-high-perfor ...

  4. 【转】5.2高性能IO模型浅析

    服务器端编程经常需要构造高性能的IO模型,常见的IO模型有四种: (1)同步阻塞IO(Blocking IO):即传统的IO模型. (2)同步非阻塞IO(Non-blocking IO):默认创建的s ...

  5. 详解 epoll 原理【Redis,Netty,Nginx实现高性能IO的核心原理】

    epoll原理剖析:epoll原理剖析以及reactor模型应用 redis源码分析:redis源码分析及driver现实 Nginx模块开发:Nginx源码从模块开发入手,3个项目弄透nginx模块 ...

  6. 图文详解 epoll 原理【Redis,Netty,Nginx实现高性能IO的核心原理】epoll 详解

    [Redis,Netty,Nginx 等实现高性能IO的核心原理] I/O 输入输出(input/output)的对象可以是文件(file), 网络(socket),进程之间的管道(pipe).在li ...

  7. 高性能IO模型浅析--范志东(Florian)

    代码改变世界 Posts - 74, Articles - 4, Comments - 439 Cnblogs Dashboard Login Home Contact Gallery RSS Flo ...

  8. 【Redis版】spring boot高性能实现二维码扫码登录(中)

    作者: 刘冬 来源:http://www.cnblogs.com/GoodHelper/p/8643071.html 前言 本打算用CountDownLatch来实现,但有个问题我没有考虑,就是当用户 ...

  9. 【订阅与发布机制版】spring boot高性能实现二维码扫码登录(下)

    点击上方[JAVA乐园],选择"置顶公众号",有内涵有价值的文章第一时间送达! 作者: 刘冬 来源:https://www.cnblogs.com/GoodHelper/p/865 ...

最新文章

  1. 比用Pytorch框架快200倍!0.76秒后,笔记本上的CNN就搞定了MNIST | 开源
  2. 微软 CTO 韦青:对微软这样已经走过44年的公司,现在也只是个小小小的开始!!!
  3. Qt Creator定位项
  4. java--用 * 打印出各种图形(新手请进)
  5. 程序员面试金典 - 面试题 02.08. 环路检测(快慢指针)
  6. 2021年中国家装行业数字化转型研究报告
  7. 最佳论文!牛津大学揭示梯度下降复杂度理论
  8. 机器学习入门:多变量线性回归
  9. 华为Mate X 5G再次秒售罄;全新折叠屏手机渲染图曝光:确实不一般!
  10. 苹果6s最大屏幕尺寸_iPhone 6s:经典的小屏旗舰,百元价位也能做苹果党
  11. HTTP协议到底是怎么回事
  12. 对 /etc/rc.d/init.d 目录的一点理解
  13. 2020-12-06 高等数学:常用积分公式
  14. 西安交通大学电子图书站点被黑
  15. java mq编程_MQ java 基础编程
  16. 经典蓝牙和低功耗蓝牙的区别
  17. 20162311 2016-2017-2《程序设计与数据结构》课程总结
  18. 谁动了“支付”的奶酪?移动支付蓝海剖析
  19. 关于遇到PermissionError: [Errno 13] Permission denied:···这个问题
  20. firefox56 版插件 httprequester

热门文章

  1. UIKeyboard键盘相关知识点
  2. link rel=canonical 用法
  3. JavaScript replace string 替换字符
  4. 最大的路径的节点值之和
  5. 真的没办法一心一意麽? php 文件操作
  6. 输入缓冲区对程序的影响及解决方法(多种语言都会出现)
  7. Nginx 简单命令
  8. python遍历获取一个类的所有子类
  9. django 通过路径传参 视图获取get请求
  10. javascript-演练-二级联动下拉框