2021SC@SDUSC
CSP (stands for Communicating Sequential Process) provides sequential I/O communication between asynchronous data suppliers and consumers. It is used for asynchronous streaming of data. CSP was inspired by the Go language channels.
CSP(代表通信顺序过程)在异步数据供应商和消费者之间提供顺序的I/O通信。 它用于数据的异步流。 CSP的灵感来自Go语言通道。
- 高性能和高吞吐速度
- 为处理中等大小的对象进行了优化(如 ByteBufs)。
- CSP已经达到DSL,它提供了一个简单的编程模型
- 有一个异步的背压管理
CSP 通信通过 ChannelSupplier 和 ChannelConsumer 进行,它们分别提供和接受一些数据。只有在上一个请求完成之后,才应调用对这些通道的每个连续请求。CSP 使用promises来管理它。
ChannelSupplier 有一个 get() 方法,该方法返回所提供值的Promises。在完成此 Promise 并显示结果或异常之前,不应再次调用 get() 方法。另外,如果 get() 返回 Promise 为 null,则表示流的结束,不应从此供应商请求其他数据。
ChannelConsumer 有一个 accept()方法,该方法返回一个空的 promise 作为接受完成的标志。在此Promise完成之前,不应再次调用 accept() 方法。通过与通道供应者类比,如果接受空值,则表示流的结束。
以下是 ChannelSupplier 和 ChannelConsumer 之间通信的示例:
protected void doProcess() {
input.get()
.whenResult(data -> {
if (data == null) {
output.acceptEndOfStream()
.whenResult(this::completeProcess);
} else {
data = data.toUpperCase() + '(' + data.length() + ')';
output.accept(data)
.whenResult(this::doProcess);
}
})
.whenException(Throwable::printStackTrace);
}
CSP的另一个重要概念是ChannelQueue接口及其实现:ChannelBuffer和ChannelZeroBuffer。它们提供消费者和供应商之间的通信,并允许他们在需要时创建这些管道的链。
基本上,一旦队列获得可用空间,这些缓冲区就会将对象从 ChannelConsumer 传递到ChannelSupplier。此过程由Promise控制。可以手动设置通道缓冲区的大小。ChannelZeroBuffer不存储任何值,而只是将它们逐个从ChannelConsumer传递到ChannelSupplier。
下面是使用项目缓冲区的简单示例:
public void accept(T item) {
buffer.add(item);
if (buffer.isSaturated()) {
getSupplier().suspend();
}
}
void produce() {
while (!buffer.isEmpty()) {
T item = buffer.poll();
if (item != null) {
send(item);
} else {
sendEndOfStream();
}
}
}