我正在尝试做一些项目前的经验,对最新的反应堆网络版本,缺乏文档;我使用的是0.8.0.m3版本。
我用这个tcp服务器开发了一个简单的spring boot应用程序,它可以正确启动,并且看起来可以正常工作:
@PostConstruct
public void startServer() throws InterruptedException {
TcpServer.create().
host("localhost").
port(1235).
handle((in, out) -> {
Flux<String> fluxString = in.receive().asString().log().
map(text -> {
return "Hi server have received "+text;});
return out.sendString(fluxString).then();
}
).
wiretap().bindNow();
}
如果我尝试使用客户端进行测试,交互似乎是正确的,但我无法收到任何响应:
int counter = 10;
CountDownLatch latch = new CountDownLatch(counter);
Flux<String> input = Flux.range(0, counter).map(i->""+i);
TcpClient.create().
host("localhost").
port(1235).
handle((in, out) -> {
in.receive().subscribe(receiv -> {System.out.println(receiv);latch.countDown();});
return out.sendString(input).neverComplete();
}
).
wiretap().connectNow();
System.out.println("waiting closure");
boolean result = latch.await(5, TimeUnit.SECONDS);
查看wiretap日志,客户机将每个int作为字符串单独发送,而服务器只接收一个聚合字符串“0123456789”并只发送一个响应。客户端没有接收到任何东西,锁存器也没有递减1,保持在10(我希望至少接收到一个聚合响应)。
谁能解释一下客户机有什么问题,以及服务器如何单独接收每一个整数?
Thx G
您可能需要修复一些问题。我想说这对学习来说有点复杂。
对于服务器:
TcpServer.create()
.host("localhost")
.port(1235)
.doOnConnection(c ->
//The origin input are 0,1,2,3,4,5,6,7,8,9.
//So you need a decoder split every 1 byte as a ByteBuf.
c.addHandler(
"1ByteStringDecoder",
new ByteToMessageDecoder() {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
out.add(in.readBytes(1));
}
}
)
)
.handle((in, out) -> {
Flux<String> fluxString = in.receive()
.asString()
.log()
.map(text -> {
return "Hi server have received " + text;
});
//Since the output is quite small, you need flush it
return out.options(o -> o.flushOnEach())
.sendString(fluxString)
.neverComplete();
}
)
.wiretap()
.bindNow();
对于客户端:
int counter = 10;
CountDownLatch latch = new CountDownLatch(counter);
startServer();
Flux<String> input = Flux.range(0, counter)
.map(i -> "" + i);
TcpClient.create()
.host("localhost")
.port(1235)
.doOnConnected(c ->
c.addHandler(
//The covert input are "Hi server have received " + (0,1,2,3,4,5,6,7,8,9).
//So you need a decoder split every 25 byte as a ByteBuf.
"25ByteStringDecoder",
new ByteToMessageDecoder() {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
out.add(in.readBytes(25));
}
}
)
)
.handle((in, out) -> {
in.receive()
.asString()//You need convert ByteBuf to String.
.subscribe(receiv -> {
System.out.println(receiv);
latch.countDown();
});
out.options(o -> o.flushOnEach())
.sendString(input)
.then()
.subscribe(); //You need to ask your client to send the data by subscribe
return Mono.never();
}
)
.wiretap()
.connectNow();
System.out.println("waiting closure");
boolean result = latch.await(5, TimeUnit.SECONDS);
我已经在互联网上搜索了一两个星期,想找到一个UDP客户端程序,它可以同时发送和接收数据,但是对于c#来说,没有关于这个主题的内容。在过去的几天里,我尝试创建一个UDP客户端,其中包含一个接收数据的线程。 发送UDP数据包效果很好,但程序无法接收我发送到的服务器,我相信服务器正在将所有数据包发送到不同的端口。 我如何修复这个程序? 有没有一种更简单的方法来进行UDP编程,比如用于TCP的Stream
我有一个Angular 4和Java/Spring MVC/Jhipster网站,我正试图使用本教程创建一个websocket(使用sock js和stomp.js)。我可以连接到websocket,但是当我尝试从Java代码向客户端发送数据时,什么也没有发生。请帮帮我。 下面,我只是试图访问我的浏览器中的http://localhost:9001/api/websocket/testSend,并
如果我创建上面的类并尝试在tomcat7上部署war,我会看到以下错误。
我正在尝试构建一个简单的应用程序,可以从FCM(Firebase云消息传递系统)接收通知。我能够生成令牌并将其显示在文本视图中。另外,我还编写了在“FirebaseMessagingService”类“中使用log.d()在LOG上打印通知消息的逻辑,该类从”FirebaseMessagingService“扩展而来。但是,一旦我从FCM控制台发送消息,应用程序就会崩溃,并且在Android显示器
我正在开发一个Web应用程序与Java后端在下面提到的配置与Loadbalancer(禅Loadbalancer)环境 APP1=Centos 7、Apache 2.4.6、Tomcat 7.0.53、Mod_Ajp连接器、, APP2=Centos 7、Apache 2.4.6、Tomcat 7.0.53、Mod_Ajp连接器、, 在一段时间后,tomcat JVM没有响应apache请求的问题
我在试着让我的弹性搜索下沉并运行。然而,我得到了以下错误,并正在耗尽如何修复它的想法。任何帮助都很感激。以下是错误: 我运行的是CDH 5.3和elasticsearch 1.4.2