当前位置: 首页 > 知识库问答 >
问题:

Reactor网络--如何用延迟通量发送

权烨磊
2023-03-14

在Reactor Netty中,当通过out.send(publisher)向TCP通道发送数据时,任何发布服务器都可以工作。但是,如果不是简单的即时flux而是使用带有延迟元素的更复杂的flux,那么它就会停止正常工作。例如,如果我们使用这个hello world TCP echo服务器,它将按照预期工作:

import reactor.core.publisher.Flux;
import reactor.netty.DisposableServer;
import reactor.netty.tcp.TcpServer;

import java.time.Duration;

public class Reactor1 {
    public static void main(String[] args) throws Exception {
        DisposableServer server = TcpServer.create()
            .port(3344)
            .handle((in, out) -> in
                .receive()
                .asString()
                .flatMap(s ->
                    out.sendString(Flux.just(s.toUpperCase()))
                ))
            .bind()
            .block();
        server.channel().closeFuture().sync();
    }
}

但是,如果我们将out.sendstring更改为

out.sendString(Flux.just(s.toUpperCase()).delayElements(Duration.ofSeconds(1)))

那么我们将期望对于每一个接收到的项目,都将产生一个延迟一秒的输出。

但是,html" target="_blank">服务器的行为方式是,如果它在间隔期间接收到多个项目,它将只针对第一个项目产生输出。例如,下面我们在第一秒内键入aabb,但只有aa作为输出(一秒后):

$ nc localhost 3344
aa
bb
AA <after one second>

然后,如果我们稍后输入附加行,我们得到输出(一秒钟后),但来自前面的输入:

cc
BB <after one second>

如何使send()在延迟的flux情况下按预期工作?

共有2个答案

越胤
2023-03-14

尝试使用concatmap。这是可行的:

DisposableServer server = TcpServer.create()
        .port(3344)
        .handle((in, out) -> in
                .receive()
                .asString()
                .concatMap(s ->
                        out.sendString(Flux.just(s.toUpperCase())
                                           .delayElements(Duration.ofSeconds(1)))
                ))
            .bind()
            .block();
server.channel().closeFuture().sync();

延迟来话流量

DisposableServer server = TcpServer.create()
        .port(3344)
        .handle((in, out) -> in
                .receive()
                .asString()
                .timestamp()
                .delayElements(Duration.ofSeconds(1))
                .concatMap(tuple2 ->
                        out.sendString(
                                Flux.just(tuple2.getT2().toUpperCase() +
                                        " " +
                                        (System.currentTimeMillis() - tuple2.getT1())
                                ))
                ))
        .bind()
        .block();
柯清野
2023-03-14

我认为您不应该为out.sendString(...)重新创建publisher,这是可行的:

DisposableServer server = TcpServer.create()
        .port(3344)
        .handle((in, out) -> out
                .options(NettyPipeline.SendOptions::flushOnEach)
                .sendString(in.receive()
                        .asString()
                        .map(String::toUpperCase)
                        .delayElements(Duration.ofSeconds(1))))
        .bind()
        .block();
server.channel().closeFuture().sync();
 类似资料:
  • 所以我想使用Flux来运行每个Mono任务,并等待每个任务2秒钟。这是我试过的 结果如下 正如您在时间戳delayElement上看到的,即使它在同一个线程上运行(测试工作人员),它似乎也不起作用。我在这里做错了什么? 编辑:我真的不明白它是如何工作的,但是我没有在平面图中添加delayElement,而是在它上面添加delayElements,它工作正常。 结果

  • 本文向大家介绍Python检测网络延迟的代码,包括了Python检测网络延迟的代码的使用技巧和注意事项,需要的朋友参考一下 本文讲述了Python检测网络延迟的代码。分享给大家供大家参考,具体如下: 以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,如果有疑问大家可以留言交流,谢谢大家对呐喊教程的支持。

  • 我用的是SpringMVC 4 Tomcat。我的应用程序用@ Autowired SimpMessagingTemplate得到了2个类(A和B)。每个类都有一个调用“convertAndSend”的线程。 A类发送一条由8个双字段和3个长字段组成的消息。它向大约500个主题发送消息,例如“/主题/价格. X”(其中X-一些随机字符串)。频率-在单个循环中每秒最多4次(每个主题)。 B 类发送一

  • 可以在系统不繁忙或者临时下线前检测客户端和server或者proxy 的带宽: 1)使用 iperf -s 命令将 Iperf 启动为 server 模式: iperf –s ———————————————————— Server listening on TCP port 5001 TCP window size: 8.00 KByte (default) ———————————————————

  • 如何延迟JMS消息发送或在不确定的时间内继续? 我使用的是Weblogic,正如您所知,在JMS发送之后,接收方将异步处理消息,但是,此时或有时外部资源还没有为接收方做好准备,因此,我想使用一些检查逻辑来延迟发送或处理消息。我猜例如:我将消息放入挂起队列,然后频繁检查资源可用性,一旦发送或继续消息? 大家都知道Weblogic是否支持这一点,或者如何实现它吗?

  • 我需要从Reactor项目中解释这两种方法 文档链接 发布() 准备一个可连接的通量,该通量共享该通量序列,并以背压感知的方式向订户发送值。 和 发布(整型预取) 准备一个可连接的通量,该通量共享该通量序列,并以背压感知的方式向订户发送值。 大理石图对两者都是一样的。什么是参数? 我运行了这些例子,结果是一样的