我是反应编程和Spring WebFlux的新手。我想让我的应用程序1发布服务器发送的事件通过Flux和我的应用程序2不断地监听它。
我希望Flux按需发布(例如,当某些事情发生时)。我发现的所有示例都是使用Flux.Interval周期性地发布event,而且似乎没有办法在创建完Flux之后追加/修改其中的内容。
我怎样才能达到我的目标?或者我在概念上完全错了。
手动向Flux
提供数据的技术之一是使用FluxProcessor#Sink
方法,如下例所示
@SpringBootApplication
@RestController
public class DemoApplication {
final FluxProcessor processor;
final FluxSink sink;
final AtomicLong counter;
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
public DemoApplication() {
this.processor = DirectProcessor.create().serialize();
this.sink = processor.sink();
this.counter = new AtomicLong();
}
@GetMapping("/send")
public void test() {
sink.next("Hello World #" + counter.getAndIncrement());
}
@RequestMapping(produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent> sse() {
return processor.map(e -> ServerSentEvent.builder(e).build());
}
}
在这里,我创建了directprocessor
以支持多个订阅服务器,这些订阅服务器将侦听数据流。此外,我还提供了额外的fluxprocessor#serialize
,它为多生产者提供了安全的支持(从不同线程调用而不违反Reactive Streams规范规则,特别是规则1.3)。最后,通过调用“http://localhost:8080/send”,我们将看到消息hello World#1
(当然,只有在您之前连接到“http://localhost:8080”的情况下才会出现)
在Reactor 3.4中,您有一个名为Reactor.core.publisher.sinks
的新API。sinks
API为手动数据发送提供了一个流畅的构建器,允许您指定数据流中的元素数量和背压行为、支持的订阅者数量和重播功能:
@SpringBootApplication
@RestController
public class DemoApplication {
final Sinks.Many sink;
final AtomicLong counter;
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
public DemoApplication() {
this.sink = Sinks.many().multicast().onBackpressureBuffer();
this.counter = new AtomicLong();
}
@GetMapping("/send")
public void test() {
EmitResult result = sink.tryEmitNext("Hello World #" + counter.getAndIncrement());
if (result.isFailure()) {
// do something here, since emission failed
}
}
@RequestMapping(produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent> sse() {
return sink.asFlux().map(e -> ServerSentEvent.builder(e).build());
}
}
注意,通过sinks
API发送消息引入了emission
及其结果的新概念。这种API的原因是反应器延伸反应物流,并且必须遵循背压控制。也就是说,如果发出
的信号多于请求的信号,并且底层实现不支持缓冲,则不会传递消息。因此,TryEmitNext
的结果返回EmitResult
,该结果指示消息是否已发送。
另外,请注意,默认情况下sinsk
API给出了sink
的序列化版本,这意味着您不必关心并发性。但是,如果事先知道消息的发射是串行的,则可以构建不序列化给定消息的sinks.unsafe()
版本
我正在使用SpringWebSockets,我想从存储库返回一个项目列表 我怎么能在WebSocket会话中发送此列表 这是如何将项目列表发送到websocket的处理程序方法
我试图让服务器发送的事件与Mozilla Firefox一起工作。给定一个Spring Boot的网络服务 使用Chrome浏览器或Edge(始终是最新版本)可以正常工作。我可以在网络分析器选项卡中看到未完成的请求,并且每秒都会显示一个新的时间戳。 然而,当我使用Firefox(84.0.2或更早版本)时,请求也会显示在网络选项卡中,但不会显示响应头或流数据。当我终止Spring后端时,Firef
POST /feeds Input Name Type Description feed_content string 分享内容。如果存在附件,则为可选,否则必须存在 feed_from integer 客户端标识,1-PC、2-Wap、3-iOS、4-android、5-其他 feed_mark mixed 客户端请求唯一标识 feed_latitude string 纬度,当经度, GeoH
问题内容: 我在XML中定义了以下布局: 如何使用LayoutInflater来获取ListView和ProgressBar并在代码中分配它? 问题答案: 通过这种方式:
问题内容: 我们想动态地触发詹金斯中不同下游版本中的集成测试。我们有一个参数化的集成测试项目,该项目将测试名称作为参数。我们从git repo动态确定测试名称。 我们有一个父项目,该项目使用jenkins-cli为源代码中发现的每个测试启动集成项目的构建。父项目和集成项目通过匹配的指纹相关联。 这种方法的问题是汇总测试结果不起作用。我认为问题在于“下游”集成测试是通过jenkins-cli启动的,
更新: 一点我想要实现的内容。我有两个服务--一个通过Http返回me,另一个通过Redis返回。对于这两种情况,我有完全相同的功能-10-15个操作符链,我想要实现的是避免重复代码。 例如: