在reactor Netty之前,我将创建Netty Tcp服务器的方式是创建服务器引导并添加我的自定义管道类。Reactor-Netty有tcpServer.create(),但似乎我必须创建一个新的函数接口,它接受NettyInbound和NettyOutbound并返回一个Mono。但是,如果我想添加一个构建管道的ChannelInitializer,我必须阻塞以获得NettyContext。传入的消息由功能接口接收,我可以发送响应,但没有东西通过我的管道。
有没有一种方法使我们的反应堆网状,并使消息通过一个定制的管道流动?
使用neverComplete()返回mono.just(“hi”)可以在建立连接和接收到消息时成功地将“hi”发送到客户端,但我需要将其卸载到管道,然后将结果反馈给客户端。
public void startServer() throws InterruptedException{
EventLoopGroup group = new NioEventLoopGroup(1);
try {
final TcpServer server = TcpServer.create(opts -> opts
.eventLoopGroup(group)
.listen(tcpSocketAddress));
server
.newHandler((in, out) -> {
in.receive()
.take(1)
.log(ApolloApplicationTests.class.getName())
.subscribe(data -> {
log.info("Server Received: {}", data.toString(CharsetUtil.UTF_8));
latch.countDown();
});
return out.sendString(Mono.just("Hi")).neverComplete();
})
.block().addHandler(clientEndPoint)
.channel()
.closeFuture().sync();
} finally {
group.shutdownGracefully().sync();
}
}
import java.util.List;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Configurable;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.MessageToMessageDecoder;
import reactor.util.Logger;
import reactor.util.Loggers;
@Configurable
@Component
public class ClientEndPoint extends ChannelInitializer<Channel> {
final Logger log = Loggers.getLogger(ApolloApplication.class);
private ChannelPipeline pipeline;
@Autowired
private ChannelHandlerAdapter messageInterchange;
@Autowired
private LengthFieldBasedFrameDecoder lowOrderVliDecoder;
@Autowired
private MessageToMessageDecoder<ByteBuf> messageDecoder;
@Autowired
private LengthFieldPrepender vliEncoder;
@Autowired
@Qualifier("inBound")
List<ChannelHandler> inBoundHandlers;
@Autowired
@Qualifier("outBound")
List<ChannelHandler> outBoundHandlers;
@Override
protected void initChannel(Channel sc) throws Exception {
this.pipeline = sc.pipeline();
this.pipeline.addLast("lowOrderVliDecoder", this.lowOrderVliDecoder);
this.pipeline.addLast("messageDecoder", this.messageDecoder);
this.pipeline.addLast("vliEncoder", this.vliEncoder);
for (ChannelHandler handler : this.inBoundHandlers) {
this.pipeline.addLast(handler);
}
this.pipeline.addLast("messageInterchange", this.messageInterchange);
for (ChannelHandler handler : this.outBoundHandlers) {
this.pipeline.addLast(handler);
}
}
public void accept(Channel sc) {
this.pipeline = sc.pipeline();
this.pipeline.addLast("lowOrderVliDecoder", this.lowOrderVliDecoder);
this.pipeline.addLast("messageDecoder", this.messageDecoder);
this.pipeline.addLast("vliEncoder", this.vliEncoder);
for (ChannelHandler handler : this.inBoundHandlers) {
this.pipeline.addLast(handler);
}
this.pipeline.addLast("messageInterchange", this.messageInterchange);
for (ChannelHandler handler : this.outBoundHandlers) {
this.pipeline.addLast(handler);
}
}
}
所以我想出来了
public Mono<? extends NettyContext> initializeServer() throws InterruptedException {
this.log.debug("Server Initializing");
BiFunction<? super NettyInbound, ? super NettyOutbound, ? extends Publisher<Void>> serverHandler = (in,
out) -> {
in.receive().asString().subscribe(data -> {
this.log.debug("Received " + data + " on " + in);
});
return Flux.never();
};
TcpServer server = TcpServer.create(opts -> opts.afterChannelInit(pipeline).listen(tcpSocketAddress));
return server.newHandler(serverHandler);
}
其中pipeline是实现Consumer的类,并将accept方法中的pipeline构建为典型的netty pipeline。
然后我启动服务器
private void startServer(Mono<? extends NettyContext> connected) {
ChannelFuture f = connected.block(Duration.ofSeconds(5)).channel()
.closeFuture();
final CountDownLatch channelLatch = new CountDownLatch(1);
f.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture cf) throws Exception {
log.debug("Channel Disconnected");
}
});
f.awaitUninterruptibly();
// Now we are sure the future is completed.
assert f.isDone();
if (f.isCancelled()) {
this.log.warn("Connection Cancelled");
} else if (!f.isSuccess()) {
if (f.cause() != null) {
f.cause().printStackTrace();
} else {
this.log.warn("Connection not successful");
}
} else {
channelLatch.countDown();
this.log.info("Server Start Successful");
}
try {
channelLatch.await();
} catch (InterruptedException ex) {
throw new CancellationException("Interrupted while waiting for streaming " + "connection to arrive.");
}
}
我正在尝试编写android应用程序,将照片发送到FTP服务器。我使用Appache commons网络库来实现这一点。当设备使用Wi-Fi时,一切正常,照片被上传到FTP服务器。但我的客户希望,当应用中的Wi-Fi关闭,设备通过USB电缆从个人电脑(Windows)接入网络时,这个应用也能做到这一点。我能够使用“gnirehtet”连接设备: https://medium.com/genymob
问题内容: 这是我第一次使用axios,遇到错误。 使用正确的url和参数,当我检查网络请求时,确实可以从服务器中获得正确的答案,但是当我打开控制台时,我看到它没有调用回调,而是捕获了错误。 错误:网络错误堆栈跟踪:createError @ http:// localhost:3000 / static / js / bundle.js:2188:15 handleError @ http://
制表符组件
我一直在使用Spring Boot 2.0.1及其Webflux库开发一个示例reactive web api。我一直在看网上的例子,试图建立它,但我被两件事难倒了。下面是我的两个问题。 1)如何返回响应实体流,当我尝试时,我得到一个错误,说只能返回单个响应实体。下面是我当前的代码。 2)当我更新一个对象时,我使用一个flatMap来更新保存在Mongo中的对象,然后使用一个Map来将其转换为响应
我正在处理一个反应式quarkus后端服务,它执行以下操作。 使用与postgres交互的Hibernate反应式Panache从后端获取记录列表 使用记录的标识符,从另一个远程服务获取数据 我正在使用Mutiny来执行反应式管道。远程服务和数据库集成都以非阻塞方式单独工作。我只是需要帮助写一个连接这些的管道。例如:下面的例子 我被困在处理统一包装列表,然后试图处理列表中的单个项目。要么是Uni
本文向大家介绍Docker容器的网络管理和网络隔离的实现,包括了Docker容器的网络管理和网络隔离的实现的使用技巧和注意事项,需要的朋友参考一下 一、Docker网络的管理 1、Docker容器的方式 1)Docker访问外网 Docker容器连接到宿主机的Docker0网桥访问外网;默认自动将docker0网桥添加到docker容器中。 2)容器和容器之间通信 需要管理员创建网桥;将不同的容器