当前位置: 首页 > 知识库问答 >
问题:

Spring WebFlux(Flux):如何动态发布

元昊苍
2023-03-14

我是反应编程和Spring WebFlux的新手。我想让我的应用程序1发布服务器发送的事件通过Flux和我的应用程序2不断地监听它。

我希望Flux按需发布(例如,当某些事情发生时)。我发现的所有示例都是使用Flux.Interval周期性地发布event,而且似乎没有办法在创建完Flux之后追加/修改其中的内容。

我怎样才能达到我的目标?或者我在概念上完全错了。

共有1个答案

乜明朗
2023-03-14

手动向Flux提供数据的技术之一是使用FluxProcessor#Sink方法,如下例所示

@SpringBootApplication
@RestController
public class DemoApplication {

    final FluxProcessor processor;
    final FluxSink sink;
    final AtomicLong counter;

    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);

    }

    public DemoApplication() {
        this.processor = DirectProcessor.create().serialize();
        this.sink = processor.sink();
        this.counter = new AtomicLong();
    }

    @GetMapping("/send")
    public void test() {
        sink.next("Hello World #" + counter.getAndIncrement());
    }

    @RequestMapping(produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<ServerSentEvent> sse() {
        return processor.map(e -> ServerSentEvent.builder(e).build());
    }
}

在这里,我创建了directprocessor以支持多个订阅服务器,这些订阅服务器将侦听数据流。此外,我还提供了额外的fluxprocessor#serialize,它为多生产者提供了安全的支持(从不同线程调用而不违反Reactive Streams规范规则,特别是规则1.3)。最后,通过调用“http://localhost:8080/send”,我们将看到消息hello World#1(当然,只有在您之前连接到“http://localhost:8080”的情况下才会出现)

在Reactor 3.4中,您有一个名为Reactor.core.publisher.sinks的新API。sinksAPI为手动数据发送提供了一个流畅的构建器,允许您指定数据流中的元素数量和背压行为、支持的订阅者数量和重播功能:

@SpringBootApplication
@RestController
public class DemoApplication {

    final Sinks.Many sink;
    final AtomicLong counter;

    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);

    }

    public DemoApplication() {
        this.sink = Sinks.many().multicast().onBackpressureBuffer();
        this.counter = new AtomicLong();
    }

    @GetMapping("/send")
    public void test() {
        EmitResult result = sink.tryEmitNext("Hello World #" + counter.getAndIncrement());

        if (result.isFailure()) {
          // do something here, since emission failed
        }
    }

    @RequestMapping(produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<ServerSentEvent> sse() {
        return sink.asFlux().map(e -> ServerSentEvent.builder(e).build());
    }
}

注意,通过sinksAPI发送消息引入了emission及其结果的新概念。这种API的原因是反应器延伸反应物流,并且必须遵循背压控制。也就是说,如果发出的信号多于请求的信号,并且底层实现不支持缓冲,则不会传递消息。因此,TryEmitNext的结果返回EmitResult,该结果指示消息是否已发送。

另外,请注意,默认情况下sinskAPI给出了sink的序列化版本,这意味着您不必关心并发性。但是,如果事先知道消息的发射是串行的,则可以构建不序列化给定消息的sinks.unsafe()版本

 类似资料:
  • 我正在使用SpringWebSockets,我想从存储库返回一个项目列表 我怎么能在WebSocket会话中发送此列表 这是如何将项目列表发送到websocket的处理程序方法

  • 我试图让服务器发送的事件与Mozilla Firefox一起工作。给定一个Spring Boot的网络服务 使用Chrome浏览器或Edge(始终是最新版本)可以正常工作。我可以在网络分析器选项卡中看到未完成的请求,并且每秒都会显示一个新的时间戳。 然而,当我使用Firefox(84.0.2或更早版本)时,请求也会显示在网络选项卡中,但不会显示响应头或流数据。当我终止Spring后端时,Firef

  • POST /feeds Input Name Type Description feed_content string 分享内容。如果存在附件,则为可选,否则必须存在 feed_from integer 客户端标识,1-PC、2-Wap、3-iOS、4-android、5-其他 feed_mark mixed 客户端请求唯一标识 feed_latitude string 纬度,当经度, GeoH

  • 问题内容: 我在XML中定义了以下布局: 如何使用LayoutInflater来获取ListView和ProgressBar并在代码中分配它? 问题答案: 通过这种方式:

  • 问题内容: 我们想动态地触发詹金斯中不同下游版本中的集成测试。我们有一个参数化的集成测试项目,该项目将测试名称作为参数。我们从git repo动态确定测试名称。 我们有一个父项目,该项目使用jenkins-cli为源代码中发现的每个测试启动集成项目的构建。父项目和集成项目通过匹配的指纹相关联。 这种方法的问题是汇总测试结果不起作用。我认为问题在于“下游”集成测试是通过jenkins-cli启动的,

  • 更新: 一点我想要实现的内容。我有两个服务--一个通过Http返回me,另一个通过Redis返回。对于这两种情况,我有完全相同的功能-10-15个操作符链,我想要实现的是避免重复代码。 例如: