我们从stream流说起,tream的API 相对来说,大家都很熟悉。
流有些什么问题呢?模糊:在不同场景里使用不同的工作模式:
-
有时时只读,有时只写,有时又是读写同时。
-
即使同一个情况下,都有可能一会只读,一会儿只写(比如:DeflateStream)
-
在一些双重流(NetworkStream,SslStream)情况下,几乎不可能知道数据何时终止传输了,只能不断的使用循环处理。
-
流提供的定位操作也不同,有的是length,有的是Position
-
大量的数据拷贝,内存分配。 有鉴如此,pipelines 在.NET Core 2.1 加入。
pipelines是什么 pipelines可以理解为提供了一组二进制流访问的API,包含内存管理(内存的合并、回收),流控(积压与防溢出),多线程管理。
-
Stream
using (MemoryStream ms = new MemoryStream()) { WriteSomeData(ms); ms.Position = 0; ReadSomeData(ms); } void WriteSomeData(Stream stream) { byte[] bytes = Encoding.ASCII.GetBytes("hello, world!"); stream.Write(bytes, 0, bytes.Length); stream.Flush(); } void ReadSomeData(Stream stream) { int bytesRead; byte[] buffer = new byte[256]; do { bytesRead = stream.Read(buffer, 0, buffer.Length); if (bytesRead > 0) { string s = Encoding.ASCII.GetString(buffer, 0, bytesRead); Console.Write(s); } } while (bytesRead > 0); }
-
Pipelines
async Task ProcessPipe() { Pipe pipe = new Pipe(); await WriteSomeDataAsync(pipe.Writer); pipe.Writer.Complete(); await ReadSomeDataAsync(pipe.Reader); } async ValueTask WriteSomeDataAsync(PipeWriter writer) { Memory<byte> workspace = writer.GetMemory(512); int bytes = Encoding.ASCII.GetBytes( "hello, world!", workspace.Span); writer.Advance(bytes); await writer.FlushAsync(); } async ValueTask ReadSomeDataAsync(PipeReader reader) { while (true) { ReadResult read = await reader.ReadAsync(); ReadOnlySequence<byte> buffer = read.Buffer; if (buffer.IsEmpty && read.IsCompleted) break; if(buffer.IsSingleSegment) { Encoding.ASCII.GetString(buffer.First); }else { foreach (Memory<byte> segment in buffer) { string s = Encoding.ASCII.GetString(segment.Span); Console.Write(s); } } reader.AdvanceTo(buffer.End); } }
pipeline 提供了两个独立的读写API处理读写问题。
pipewriter:
处理写操作,pipeline使用GetMemory获取内存区替代显示分配byte[] 。然后向管道写入数据。这里并不是只能使用512个字节,可以零字节,也可以一万字节,通过Advance(bytes) 完成单次的数据写入,同时告知管道有多少数据可以被reader消费。之后Flush管道。
数据写入完成之后,调用complete方法,通知管道写入操作结束。
pipereader:
read.IsCompleted
表示writer是否执行complete。
buffer.IsEmpty
告诉我们单前迭代种管道是否已经没有数据。
AdvanceTo
告诉管道我们已经消费了多少数据。
另外
上面的列子我们使用AdvanceTo(buffer.End),把一切都消费掉了。更一般的情况可能是自定义的数据的格式或者通过 cr/lf
来处理数据隔断等,即可能一次消费的长度是小于521或者大于512字节。 ReadOnlySequence<byte>
-
Slice()
方法通过切片可以返回我们真实需要处理的数据。 -
GetPosition()
获取相对处理的位置。 -
reader.AdvanceTo(consumedToPosition, buffer.End)
告诉管道我们已经消费了多少位置,同时检查了所有数据,对于消费点到结束点数据我们不能处理,我们需要更多的数据来处理。
回过头在看看PipeWriter.FlushAsync()
和PipeReader.ReadAsync()。
他们之间有微妙的平衡。
-
ReadAsync
表示管道需要数据,会唤醒reader,执行读取循环。 -