当前位置: 首页 > 工具软件 > ActiveJ > 使用案例 >

ActiveJ框架学习——Async I/O之CSP

周意智
2023-12-01

2021SC@SDUSC

概述:

CSP (stands for Communicating Sequential Process) provides sequential I/O communication between asynchronous data suppliers and consumers. It is used for asynchronous streaming of data. CSP was inspired by the Go language channels.

 CSP(代表通信顺序过程)在异步数据供应商和消费者之间提供顺序的I/O通信。 它用于数据的异步流。 CSP的灵感来自Go语言通道。

特点:

  • 高性能和高吞吐速度
  • 为处理中等大小的对象进行了优化(如 ByteBufs)。
  • CSP已经达到DSL,它提供了一个简单的编程模型
  • 有一个异步的背压管理

CSP 通信通过 ChannelSupplier 和 ChannelConsumer 进行,它们分别提供和接受一些数据。只有在上一个请求完成之后,才应调用对这些通道的每个连续请求。CSP 使用promises来管理它。

ChannelSupplier 有一个 get() 方法,该方法返回所提供值的Promises。在完成此 Promise 并显示结果或异常之前,不应再次调用 get() 方法。另外,如果 get() 返回 Promise 为 null,则表示流的结束,不应从此供应商请求其他数据。

ChannelConsumer 有一个 accept()方法,该方法返回一个空的 promise 作为接受完成的标志。在此Promise完成之前,不应再次调用 accept() 方法。通过与通道供应者类比,如果接受空值,则表示流的结束。

以下是 ChannelSupplier 和 ChannelConsumer 之间通信的示例:

protected void doProcess() {
  input.get()
      .whenResult(data -> {
        if (data == null) {
          output.acceptEndOfStream()
              .whenResult(this::completeProcess);
        } else {
          data = data.toUpperCase() + '(' + data.length() + ')';

          output.accept(data)
              .whenResult(this::doProcess);
        }
      })
      .whenException(Throwable::printStackTrace);
}

通道队列

CSP的另一个重要概念是ChannelQueue接口及其实现:ChannelBuffer和ChannelZeroBuffer。它们提供消费者和供应商之间的通信,并允许他们在需要时创建这些管道的链。

基本上,一旦队列获得可用空间,这些缓冲区就会将对象从 ChannelConsumer 传递到ChannelSupplier。此过程由Promise控制。可以手动设置通道缓冲区的大小。ChannelZeroBuffer不存储任何值,而只是将它们逐个从ChannelConsumer传递到ChannelSupplier。

下面是使用项目缓冲区的简单示例:

public void accept(T item) {
    buffer.add(item);
    if (buffer.isSaturated()) {
        getSupplier().suspend();
    }
}

void produce() {
    while (!buffer.isEmpty()) {
        T item = buffer.poll();
        if (item != null) {
            send(item);
        } else {
            sendEndOfStream();
        }
    }
}

 类似资料: