Streams一直是一个复杂的话题,但是在一些示例的帮助下,希望我们今天能做到这一点。
从本质上讲,流是随时间可用的一系列数据元素。 就像实际的水流一样,数据流/变得可用,而不是一开始就拥有全部。 尽管这两个最重要的方面是对性能的巨大增强以及数据并不总是立即可用这一事实,但这有很多好处。
如前所述,使用流的主要原因之一是因为有时,数据无法立即获得。 例如,如果您正在收听天气流API,则当前温度是在当前时刻计算的,因此(1)只要您继续收听该服务,数据就将是无限的,并且(2)新数据可用 每分钟,因为它产生了。 不能使用有限数量的数据进行建模,因此我们决定使用无限数据流对其进行建模。
由于流的大小是不确定的,因此它们可能是无限的,重要的是要记住,我们不能像对待列表或数组那样习惯对它们进行整体操作。 这样,应用于流的函数通常返回修改了数据的另一个流。 这些叫做过滤器,并将它们链接起来,形成管道。
提到的另一个好处是性能。 当处理大量数据时,通常会导致对我们的计算机造成很大的内存消耗。 如果您尝试读取包含有关猫的数据的20Gb文件,进行处理,然后将其发送给朋友,这意味着除了应用程序使用的内存量外,我们还将向内存中加载额外的20Gb。 大多数笔记本电脑会死! 但这并不意味着它不可行。
如果不是将文件作为一个整体读取,而是将其建模为一个数据流,则可以一次读取一行,处理这些行并将其作为流发送出去。 这将使我们的应用程序仅使用额外的几个字节,而不是可怕的20Gb。 在考虑流时,我们始终必须将它们视为应用程序可以处理的一系列可管理数据的好序列–一旦处理完一个块,就可以读取并处理下一个数据。
Simplified I/O over a String, in Elixir
我们第一个实用的流方法将是字符串IOElixir中的模块。 一种字符串IO并不是真正的流,而只是一个字符串周围的包装器,它使我们可以对字符串流应用一些标准的I / O操作。 对我们来说这将是完美的,因为我们可以使用它来熟悉操作:
- 打开:获得对该资源的独占访问权。读/写:基本上是从流中读取或写入数据块。关:将资源返回给OS(操作系统)。
首先,我们打开一个字符串IO。 从回应开放/ 1函数,我们可以看到它为流提供了参考。
iex> {:ok, pid} = StringIO.open("content")
{:ok, #PID<0.470.0>}
如果我们要检查流的内容,可以通过内容/ 1通过将先前获得的参考传递给该函数。 在长生不老药内容/ 1函数将始终返回带有输入缓冲区和输出缓冲区的元组,例如{“进出”}。 在这种情况下,输出缓冲区将为空,因为我们还没有写任何东西。
iex(1)> StringIO.contents(pid)
{"content", ""}
以来字符串IO是一个将字符串建模为流的包装器,我们可以使用标准函数从中读取和写入流。IO模块。 在这种情况下,要编写一些内容,我们可以使用写/ 2功能。 请注意,现在我们如何在输入缓冲区和输出缓冲区中都有数据。
iex(2)> IO.write(pid, "written")
:ok
iex(3)> StringIO.contents(pid)
{"content", "written"}
大多数语言的大多数流模块也为我们提供了一种方法齐平内容,这意味着它将强制将流中的所有字节写出。 这适用于输出缓冲区。
iex(4)> StringIO.flush(pid)
"written"
iex(5)> StringIO.contents(pid)
{"content", ""}
最后,如果要读取输入缓冲区,可以使用读/ 2函数,从而清空数据流:
iex(6)> IO.read(pid, :all)
"content"
iex(7)> StringIO.contents(pid)
{"", ""}
请注意,在这种特定情况下,Elixir如何建模字符串IO作为具有输入缓冲区和输出缓冲区的元组,一个我们可以写入的缓冲区和一个我们可以从中读取的缓冲区。
I/O on a File, in C#/.NET
转到一个更实际的示例,我们将检查如何处理流中的文件。 在处理文件时,打开和关闭流变得越来越重要,但是稍后我们将进行讨论。
在.NET领域中,为了创建文件,我们可以使用文件创建功能。 这将为我们提供文件流该模型为我们的文件建模,因此我们可以对其进行写入。 打开流并对其进行写入后,我们必须将其关闭才能坚持这些更改并释放了操作系统为我们提供的资源。 此外,要再次阅读内容,我们将使用File.OpenRead并逐字节读取。 该片段如下所示,归功于MSDN:
using System;
using System.IO;
using System.Text;
namespace StreamTime
{
public class FileTheStream
{
public static void Main()
{
const string path = @"/Users/jgarcia/Desktop/example.txt";
//Create the file.
using (FileStream fs = File.Create(path))
{
AddText(fs, "This is some text");
AddText(fs, "This is some more text,");
AddText(fs, "\r\nand this is on a new line");
AddText(fs, "\r\n\r\nThe following is a subset of characters:\r\n");
for (var i = 1; i < 120; i++)
{
AddText(fs, Convert.ToChar(i).ToString());
}
}
//Open the stream and read it back.
using (FileStream fs = File.OpenRead(path))
{
var b = new byte[1024];
var temp = new UnicodeEncoding();
while (fs.Read(b,0,b.Length) > 0)
{
Console.WriteLine(temp.GetString(b));
}
}
}
private static void AddText(Stream fs, string value)
{
var info = new UnicodeEncoding().GetBytes(value);
fs.Write(info, 0, info.Length);
}
}
}
如您所见,在代码方面,我们只是打开一个流,读取,编写和关闭它。 好吧,您可能想知道关闭发生在哪里。 在C#中,每当我们使用使用 keyword with a resource, once it's finished 使用 it, it closes the resource – Just like Java's 尝试资源。 但是,让我们进一步谈谈流的打开和关闭。
Why do streams have to be opened or closed?
我们提到,每次打开流时,操作系统都需要专用资源,但是我们从未谈论过哪个或如何。 在大多数情况下,这取决于打开的流的性质–打开套接字与打开文件并不相同,等等。暂时,我们可以专注于文件。
每当打开新文件流时,操作系统就会为应用程序分配一个文件描述符(通常称为文件句柄)。 文件描述符基本上是一个数字,用于唯一标识计算机操作系统中打开的文件-它描述了数据源以及如何访问它。 文件描述符指向内核的全局文件表,该表包含诸如流的偏移量和访问限制之类的信息。
可以想象,文件描述符不像内存-如果不将其返回给操作系统,情况可能会变得丑陋。 应用程序在长时间运行的应用程序中崩溃只是时间问题。 这通常称为文件句柄泄漏。 在Windows计算机中,当您尝试删除文件时,通常的后果之一是该文件表明该文件正在被另一个程序使用。 资源未正确释放。
幸运的是,当今大多数语言为我们提供了使我们能够适当释放这些资源的结构。 如前所述,我们有类似的声明使用在C#中尝试在我们的工具箱中使用Java资源。 不过在Elixir中,您必须使用IO.close / 1功能。
I/O over a socket, in Java
使用流的另一种情况是应用程序打开套接字时。 为了使两个进程通信,每个进程都需要打开一个套接字,然后通过该套接字发送消息-套接字通过流发送和接收消息。 接下来显示的是一个片段,其中包含一个用Java开发的非常简单的回显服务器。 请注意,套接字如何既包含输入流又包含输出流,以及我们如何读取通过输入流发送的数据并将其写入输出流的数据。
package com.manzanit0;
import java.io.*;
import java.net.ServerSocket;
public class EchoServer {
public static void main(String[] args) {
int portNumber = 4098;
try (
var serverSocket = new ServerSocket(portNumber);
var clientSocket = serverSocket.accept();
var outputStream = clientSocket.getOutputStream();
var inputStream = clientSocket.getInputStream()
) {
while(true) {
var bytes = readAllBytes(inputStream);
outputStream.write(bytes);
}
} catch (IOException e) {
System.out.println(e.getMessage());
}
}
private static byte[] readAllBytes(InputStream stream) throws IOException {
StringBuilder data = new StringBuilder();
// available only returns a value after reading at least 1 character -> do/while.
do {
data.append((char) stream.read());
} while (stream.available() > 0);
return data.toString().getBytes();
}
}
我想提到的另一件事是,由于流使用不确定数量的数据(可能是无限的)工作,因此如果您尝试执行该程序,您将注意到,直到您将其送入中止命令(Ctrl / Cmd,它才会停止运行) + C)。 只要我们不输入数据,我们就可以继续输入,并且流将继续提供数据。 在这个特定的例子中,我已经详细说明了bytes [] readAllBytes(输入流流)仅读取可用字节并返回它们的函数,但是输入流课为我们提供了readAllBytes()该方法将阻塞直到流关闭,然后返回接收到的所有字节。
您可能想知道,如果我想第二次读取流的数据怎么办? 可能吗?。 确实有可能,但是为了理解它,我们必须引入最后一个概念:寻求。
Seeking a stream – understanding the side-effects of reading
如果您尝试第二次读取流,则可能会发现自己无法读取以前的数据,而只能读取新数据。 某些流不支持查找,但假设支持,则其背后的原因是,流具有指向最后读取的字节的游标。 每次我们读取一个新字节时,该游标都会前进到新位置。 为了读取已处理的字节,我们将需要将光标一直倒退到开头。 这称为寻求。
In Java, the way to do this is using the mark(int)
and reset()
method of the InputStream
class, in the C# example, we would simply set file.InputStream.Position = 0
. These are the side effects of reading a stream. If a stream doesn't support seeking, another solution would be copying our read bytes to another array and maintain a copy. Nonetheless, take into account that sometimes one of the purposes of using streams is to go easy on memory consumption, and we're copying all the read data in a memory array, then we're annulling this completely.
Wrapping up
我们已经讨论了很多事情,但是如果我们要做一个非常快速的TLDR并总结一些关键点,我将继续:
- 流管理的数据量不确定,有时是无限的。流为应用程序带来了巨大的性能提升,因为它们不必在处理数据之前将所有数据加载到内存中。就像必须打开流一样,必须将它们关闭,否则就不会释放OS的资源,这可能是非常昂贵的成本,无论是套接字还是文件句柄。一些语言提供了处理资源处置的结构,例如使用在C#中尝试在Java中。流具有指向最后读取的字节的游标。 在某些情况下,我们可以倒退该游标,以便第二次读取相同的数据。 这称为寻求。
Following up, in case you want to delve a little more into sockets, as a concept, feel free to check out this other post I have about them: URL.
Øriginally posted at: https://manzanit0.github.io/