当前位置: 首页 > 知识库问答 >
问题:

Project Reactor将一个出版商分为两个,至少有两个订阅者

漆雕嘉平
2023-03-14

如何在Reactor中将一个发布者拆分为两个,这样就存在两个相同的数据流,可以在不同的流中处理下游的数据?

因此,我可以映射每个流并单独订阅每个流。

我在 API 中看不到任何表明这是在 API 上的内容。

在发布之前,我需要等到两个订阅者都启动并准备好。

共有1个答案

姬高扬
2023-03-14

感谢您的输入,我没有想清楚,当然只是有多个订阅者:

  val flux = Flux.just("MyData1", "MyData2", "MyData3");

  flux.doOnNext { println("Subscribing one$it") }.subscribe()

  flux.doOnNext { println("Subscribing Two$it") }.subscribe()

将输出:

Subscribing oneMyData1
Subscribing oneMyData2
Subscribing oneMyData3
Subscribing TwoMyData1
Subscribing TwoMyData2
Subscribing TwoMyData3

如上所述,有Share,但此API不允许设置最小订阅者数量,因此最好调用下面的函数,就我而言,我想等到我们有两个订阅者。文档状态

第一次订阅时导致源Flux订阅一次的Flux,因此延迟订阅的用户可能会错过项目。

val flux = Flux.just("MyData1", "MyData2", "MyData3").publish().refCount(2)

这将生成以下输出,以确保在启动第二个订阅服务器时出现延迟时不会丢失消息。

Subscribing oneMyData1
Subscribing TwoMyData1
Subscribing oneMyData2
Subscribing TwoMyData2
Subscribing oneMyData3
Subscribing TwoMyData3
 类似资料:
  • 我有值,我希望它们至少有两个十进制数字,但我不想修剪其余的数字。 是否有构建方法或更优雅的方法来实现这一点? 我不能使用,因为我希望在存在其他小数时保留它们。

  • 我有两个枚举,它们实现了如下相同的接口:- 名为SystemA和SystemB的两个枚举正在实现此接口。 我想要像这样应用getConfigFunction:- 但它会抛出编译时错误,因为形成的流的类型为:Config[]。所以我尝试了以下修改:- 但这也失败了,因为我还需要修改forEach部分。有人能帮忙吗?我该如何修改forEach部分,或者我该如何使用map()/flatmap()函数,这

  • 问题内容: 我有一个表字段,其中包含用户的姓氏和名字。是否有可能分裂成那些2场,? 所有记录的格式均为“名字的姓氏”(不带引号,中间还有空格)。 问题答案: 不幸的是,MySQL没有分割字符串功能。但是,您可以为此创建一个用户定义的函数,例如以下文章中描述的函数: Federico Cargnelutti撰写的MySQL Split String Function 使用该功能: 您将可以按照以下方

  • 问题内容: 一位访问员最近问我这个问题:给定三个布尔变量a,b和c,如果三个变量中至少有两个是true,则返回true。 我的解决方案如下: 他说,这可以进一步改善,但是如何呢? 问题答案: 而不是写: 写: 至于表达式本身,是这样的: 或此(无论您觉得更容易掌握): 它测试和准确一次,最多一次。 参考文献 JLS 15.25条件运算符?:

  • 问题内容: 如何将这两个JToken合并为一个JToken。听起来应该很简单,但无法解决。 谢谢您的帮助! 到目前为止,这是我尝试过的: 我首先将第一个对象分配给变量,然后尝试将其连接到第二个变量。我有一个循环,可以带回具有三个字段的多个页面。最终目标是抓取每个页面并创建一个包含所有页面的大J。 像这样的东西: 问题答案: 您可以用来将一个合并到另一个。请注意,可以控制数组的合并方式。从Enume

  • 问题内容: 在Java中将列表拆分为两个子列表的最简单,最标准和/或最有效的方法是什么?可以更改原始列表,因此无需复制。方法签名可以是 [EDIT] 返回原始列表上的视图,如果修改了原始视图,该视图将无效。因此,除非它也放弃了原始参考文献,否则无法使用(或者,如Marc Novakowski的回答所述,使用但立即复制结果)。 问题答案: 快速半伪代码: 它使用标准的List实现方法,并避免了所有循