当前位置: 首页 > 知识库问答 >
问题:

如何连锁Reactor订户

王修为
2023-03-14

我有一个现有的接口链,我想作为一个反应器运行,而不是管理我自己的线程和队列

public interface UserLookupService {
    public User lookup(String id);
}
public interface UsersHandler {
    public void handle(List<User> users>);
}
UserLookupService userSvc = ...;
UsersHandler usersHandler = ...

// Works well to lookup users in parallel. 
Flux.just("userA", "userB", "userC")
    .parallel(2)
    .runOn(Schedulers.parallel())
    .subscribe(str -> {
        userSvc.lookup(str);
    });

我如何链接该结果,使其调用UsersHandler与批量的User

共有1个答案

洪哲彦
2023-03-14

订阅某些东西会触发链,所以你通常不能“链”订阅者,他们是链中的最后一件事。

想想看,如果这样,你设置了你的反应式管道,当你订阅时,你触发管道启动,链就会产生一个结果。

web服务器中,< code>subscriber通常是调用客户端,当客户端< code > subscribe 时,他会触发将发布数据的服务器中的事件链。

通量有点像1到n个单声道的列表。可以说,单声道/通量中的每个对象都有许多“状态”。这些是成功错误取消下一步已完成等。

Mono/Flux内部进入成功状态时,它将发出其中的值。当mono中的某些问题得到解决时,Mono通常会进入成功

当你声明Flux.just("userA","userB","userC")时,你基本上是在要求通量来解析你输入到它的输入。放置一个字符串会立即解析,所以通量会进入成功状态,一旦有东西订阅就开始发出字符串。所以你所要做的就是在某人订阅后声明你想要发生的链。

这可以通过几种不同的方式完成,当你想做某事并更改值时,就像你想从字符串到我们通常使用map的用户一样

如果我们只想对每个对象做一些事情而不返回任何东西,我们可以使用doOnNext

Flux.just("userA", "userB", "userC")
            .parallel(2)
            .runOn(Schedulers.parallel())
            .map(userString -> {
                return lookupService.lookup(userString);
            })
            .doOnNext(user -> {
                // if you want to do something on each user
                // will return void so if you want to log something
                // or handle each user
            }).subscribe();

订阅应该是链条中的最后一环。

 类似资料:
  • 我已经阅读了整个反应堆的文件,但我无法找到正确的模式,以解决以下问题。我有一个方法可以异步地执行某些操作。我以Flux的形式返回结果响应,消费者可以订阅它。 该方法有以下定义: 返回的通量是一个热通量,结果可以在任何给定的时间异步来。 其中,是通过此通道的所有消息的。这个实现的一个问题是,消费者是在结果消息到来之后订阅的,它可能会错过其中的一些消息。 所以,我正在寻找的是一个解决方案,将允许消费者

  • 我正在尝试关闭nettyreactor.ipc.netty.tcp.TcpClient的传输控制协议,但我找不到简单的方法,没有“断开连接”、“停止”或“关闭”方法。有人能帮我吗?我正在使用reamer-net.0.7.9。RELEASE库。 我的课程结构如下: 我感谢你的帮助,提前非常感谢。

  • 现在,我有三个函数:updateFieldFromCollection1()、

  • 当我使用带有固定连接提供者的TCP客户端时,我不能重用连接。 他们使连接超过maxConnection变量。 这是我写的。 即使我使用10个maxConnection,比如“ConnectionProvider”。固定(“测试”,10)”,有22个空闲连接。天啊 //NettyClient.java //NettyClientTest。Java语言 14:55:27.397[reactor-tcp

  • 我想链一个完整的未来,使它在处理过程中扇出。我的意思是,我对一个列表有一个开放的、可完成的未来,我想对列表中的每一项进行计算。 第一步是调用m_myApi.get响应(请求,执行器)发出异步调用。 该异步调用的结果有一个getCandidates方法。我想并行分析所有这些候选者。 目前,我的代码以串行方式解析它们 我想要这样的东西:

  • 我使用publishOn和subscribeOn的流量相同,如下所示: 虽然,当我使用两者时,日志中不会打印任何内容。但是当我只使用publishOn时,我得到了以下信息日志: publishOn比Subscribeon更受推荐吗?或者它比subscribeon有更多的偏好?两者之间的区别是什么,什么时候使用哪个?