在Reactor Netty中,当通过out.send(publisher)
向TCP通道发送数据时,任何发布服务器都可以工作。但是,如果不是简单的即时flux
而是使用带有延迟元素的更复杂的flux,那么它就会停止正常工作。例如,如果我们使用这个hello world TCP echo服务器,它将按照预期工作:
import reactor.core.publisher.Flux;
import reactor.netty.DisposableServer;
import reactor.netty.tcp.TcpServer;
import java.time.Duration;
public class Reactor1 {
public static void main(String[] args) throws Exception {
DisposableServer server = TcpServer.create()
.port(3344)
.handle((in, out) -> in
.receive()
.asString()
.flatMap(s ->
out.sendString(Flux.just(s.toUpperCase()))
))
.bind()
.block();
server.channel().closeFuture().sync();
}
}
但是,如果我们将out.sendstring
更改为
out.sendString(Flux.just(s.toUpperCase()).delayElements(Duration.ofSeconds(1)))
那么我们将期望对于每一个接收到的项目,都将产生一个延迟一秒的输出。
但是,html" target="_blank">服务器的行为方式是,如果它在间隔期间接收到多个项目,它将只针对第一个项目产生输出。例如,下面我们在第一秒内键入aa
和bb
,但只有aa
作为输出(一秒后):
$ nc localhost 3344
aa
bb
AA <after one second>
然后,如果我们稍后输入附加行,我们得到输出(一秒钟后),但来自前面的输入:
cc
BB <after one second>
如何使send()
在延迟的flux
情况下按预期工作?
尝试使用concatmap。这是可行的:
DisposableServer server = TcpServer.create()
.port(3344)
.handle((in, out) -> in
.receive()
.asString()
.concatMap(s ->
out.sendString(Flux.just(s.toUpperCase())
.delayElements(Duration.ofSeconds(1)))
))
.bind()
.block();
server.channel().closeFuture().sync();
延迟来话流量
DisposableServer server = TcpServer.create()
.port(3344)
.handle((in, out) -> in
.receive()
.asString()
.timestamp()
.delayElements(Duration.ofSeconds(1))
.concatMap(tuple2 ->
out.sendString(
Flux.just(tuple2.getT2().toUpperCase() +
" " +
(System.currentTimeMillis() - tuple2.getT1())
))
))
.bind()
.block();
我认为您不应该为out.sendString(...)
重新创建publisher,这是可行的:
DisposableServer server = TcpServer.create()
.port(3344)
.handle((in, out) -> out
.options(NettyPipeline.SendOptions::flushOnEach)
.sendString(in.receive()
.asString()
.map(String::toUpperCase)
.delayElements(Duration.ofSeconds(1))))
.bind()
.block();
server.channel().closeFuture().sync();
所以我想使用Flux来运行每个Mono任务,并等待每个任务2秒钟。这是我试过的 结果如下 正如您在时间戳delayElement上看到的,即使它在同一个线程上运行(测试工作人员),它似乎也不起作用。我在这里做错了什么? 编辑:我真的不明白它是如何工作的,但是我没有在平面图中添加delayElement,而是在它上面添加delayElements,它工作正常。 结果
本文向大家介绍Python检测网络延迟的代码,包括了Python检测网络延迟的代码的使用技巧和注意事项,需要的朋友参考一下 本文讲述了Python检测网络延迟的代码。分享给大家供大家参考,具体如下: 以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,如果有疑问大家可以留言交流,谢谢大家对呐喊教程的支持。
我用的是SpringMVC 4 Tomcat。我的应用程序用@ Autowired SimpMessagingTemplate得到了2个类(A和B)。每个类都有一个调用“convertAndSend”的线程。 A类发送一条由8个双字段和3个长字段组成的消息。它向大约500个主题发送消息,例如“/主题/价格. X”(其中X-一些随机字符串)。频率-在单个循环中每秒最多4次(每个主题)。 B 类发送一
可以在系统不繁忙或者临时下线前检测客户端和server或者proxy 的带宽: 1)使用 iperf -s 命令将 Iperf 启动为 server 模式: iperf –s ———————————————————— Server listening on TCP port 5001 TCP window size: 8.00 KByte (default) ———————————————————
如何延迟JMS消息发送或在不确定的时间内继续? 我使用的是Weblogic,正如您所知,在JMS发送之后,接收方将异步处理消息,但是,此时或有时外部资源还没有为接收方做好准备,因此,我想使用一些检查逻辑来延迟发送或处理消息。我猜例如:我将消息放入挂起队列,然后频繁检查资源可用性,一旦发送或继续消息? 大家都知道Weblogic是否支持这一点,或者如何实现它吗?
我需要从Reactor项目中解释这两种方法 文档链接 发布() 准备一个可连接的通量,该通量共享该通量序列,并以背压感知的方式向订户发送值。 和 发布(整型预取) 准备一个可连接的通量,该通量共享该通量序列,并以背压感知的方式向订户发送值。 大理石图对两者都是一样的。什么是参数? 我运行了这些例子,结果是一样的