我想配置TCP服务器以接收和回复来自多个客户端的数据。我搜索了许多其他线程,但找不到确切的方法。我第一次使用Spring集成,没有经验。
服务器要求
我尝试了以下代码,在这些代码中,我可以向客户发送数据,但可以达到我的上述要求
TCP服务器配置:
@Configuration
public class TcpServerConfig {
private List<TcpConnectionOpenEvent> clientList = new ArrayList<>();
public List<TcpConnectionOpenEvent> getClientList() {
return clientList;
}
@Bean
public TcpReceivingChannelAdapter server(TcpNetServerConnectionFactory cf) {
TcpReceivingChannelAdapter adapter = new TcpReceivingChannelAdapter();
adapter.setConnectionFactory(cf);
adapter.setOutputChannel(inputChannel());
return adapter;
}
@Bean
public MessageChannel inputChannel() {
return new QueueChannel();
}
@Bean
public MessageChannel outputChannel() {
return new DirectChannel();
}
@Bean
public TcpNetServerConnectionFactory cf() {
return new TcpNetServerConnectionFactory(1001);
}
@Bean
public IntegrationFlow outbound() {
return IntegrationFlows.from(outputChannel())
.handle(sender())
.get();
}
@Bean
public MessageHandler sender() {
TcpSendingMessageHandler tcpSendingMessageHandler = new TcpSendingMessageHandler();
tcpSendingMessageHandler.setConnectionFactory(cf());
return tcpSendingMessageHandler;
}
@Bean
public ApplicationListener<TcpConnectionOpenEvent> listener() {
return new ApplicationListener<TcpConnectionOpenEvent>() {
@Override
public void onApplicationEvent(TcpConnectionOpenEvent event) {
outputChannel().send(MessageBuilder.withPayload("foo")
.setHeader(IpHeaders.CONNECTION_ID, event.getConnectionId())
.build());
clientList.add(event);
}
};
}
}
测试代码:
@Service
public class Test {
private static final Logger LOGGER = LoggerFactory.getLogger(MessageServiceImpl.class);
@Autowired
TcpServerConfig tcpServerConfig;
@Autowired
private MessageChannel outputChannel;
@Autowired
private MessageChannel inputChannel;
@Scheduled(fixedRate = 1000)
void task() {
LOGGER.info("Client count: " + tcpServerConfig.getClientList().size());
for (TcpConnectionOpenEvent client : tcpServerConfig.getClientList()) {
outputChannel.send(MessageBuilder.withPayload("foo")
.setHeader(IpHeaders.CONNECTION_ID, client.getConnectionId())
.build());
}
}
}
任何帮助都将不胜感激。
这里有一个解决方案:
@SpringBootApplication
@EnableScheduling
public class So62877512ServerApplication {
public static void main(String[] args) {
SpringApplication.run(So62877512ServerApplication.class, args);
}
@Bean
public IntegrationFlow serverIn(Handler handler) {
return IntegrationFlows.from(Tcp.inboundAdapter(server()))
.transform(Transformers.objectToString())
.filter(handler, "existingConnection", spec -> spec
.discardFlow(f -> f
.handle(handler, "sendInitialReply")))
.handle(handler, "reply")
.get();
}
@Bean
public IntegrationFlow serverOut() {
return f -> f.handle(Tcp.outboundAdapter(server()));
}
@Bean
public TcpServerConnectionFactorySpec server() {
return Tcp.netServer(1234)
.serializer(TcpCodecs.lf())
.deserializer(TcpCodecs.lf()); // compatible with netcat
}
}
@Component
@DependsOn("serverOut")
class Handler {
private static final Logger LOG = LoggerFactory.getLogger(Handler.class);
private final ConcurrentMap<String, BlockingQueue<Message<?>>> clients = new ConcurrentHashMap<>();
private final MessageChannel out;
private final TcpNetServerConnectionFactory server;
public Handler(@Qualifier("serverOut.input") MessageChannel out, TcpNetServerConnectionFactory server) {
this.out = out;
this.server = server;
}
public boolean existingConnection(Message<?> message) {
String connectionId = message.getHeaders().get(IpHeaders.CONNECTION_ID, String.class);
boolean containsKey = this.clients.containsKey(connectionId);
if (!containsKey) {
this.clients.put(connectionId, new LinkedBlockingQueue<Message<?>>());
}
return containsKey;
}
public void sendInitialReply(Message<String> message) {
LOG.info("Replying to " + message.getPayload());
this.out.send(MessageBuilder.withPayload(message.getPayload().toUpperCase())
.copyHeaders(message.getHeaders()).build());
}
@Scheduled(fixedDelay = 5000)
public void sender() {
this.clients.forEach((key, queue) -> {
try {
this.out.send(MessageBuilder.withPayload("foo")
.setHeader(IpHeaders.CONNECTION_ID, key).build());
Message<?> reply = queue.poll(10, TimeUnit.SECONDS);
if (reply == null) {
LOG.error("Timeout waiting for " + key);
this.server.closeConnection(key);
}
else {
LOG.info("Reply " + reply.getPayload() + " from " + key);
}
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.error("Interrupted");
}
catch (Exception e) {
LOG.error("Failed to send to " + key, e);
}
});
}
public void reply(Message<String> in) {
BlockingQueue<Message<?>> queue = this.clients.get(in.getHeaders().get(IpHeaders.CONNECTION_ID, String.class));
if (queue != null) {
queue.add(in);
}
}
@EventListener
public void closed(TcpConnectionCloseEvent event) {
this.clients.remove(event.getConnectionId());
LOG.info(event.getConnectionId() + " closed");
}
}
$ nc localhost 1234
foo <- typed
FOO
foo
bar <- typed
foo
bar <- typed
foo
$ <- closed by server - timeout
2020-07-14 14:41:04.906 INFO 64763 --- [pool-1-thread-2] com.example.demo.Handler : Replying to foo
2020-07-14 14:41:13.841 INFO 64763 --- [ scheduling-1] com.example.demo.Handler : Reply bar from localhost:65115:1234:a9fc7e3d-4dda-4627-b765-4f0bb0835153
2020-07-14 14:41:21.465 INFO 64763 --- [ scheduling-1] com.example.demo.Handler : Reply bar from localhost:65115:1234:a9fc7e3d-4dda-4627-b765-4f0bb0835153
2020-07-14 14:41:36.473 ERROR 64763 --- [ scheduling-1] com.example.demo.Handler : Timeout waiting for localhost:65115:1234:a9fc7e3d-4dda-4627-b765-4f0bb0835153
2020-07-14 14:41:36.474 INFO 64763 --- [ scheduling-1] com.example.demo.Handler : localhost:65115:1234:a9fc7e3d-4dda-4627-b765-4f0bb0835153 closed
问题内容: 我要进行最简单的解释。我的Java TCP项目有一个服务器和三个客户端。 服务器具有一个ClientThread。每个客户端都有一个ServerThread和一个UserThread。 工作流程为: 1.客户端(例如,client_0)的UserThread获取用户输入,然后将消息发送到服务器。 2.服务器的ClientThread捕获来自client_0的消息,并将另一条消息发送到另
问题内容: 我要进行最简单的解释。我的Java TCP项目有一个服务器和三个客户端。 服务器具有一个ClientThread。每个客户端都有一个ServerThread和一个UserThread。 工作流程为: 1.客户端(例如,client_0)的UserThread获取用户输入,然后将消息发送到服务器。 2.服务器的ClientThread捕获来自client_0的消息,并将另一条消息发送到另
我有一个客户端测试功能,它将测试对象发送到socket.io服务器,该服务器的工作是等待客户端事件,然后将数据发送到原始客户端发出事件的房间中的每个客户端。但是,只有原始客户端从服务器接收事件。房间中的其他客户端没有接收到它。 我已经尝试了以及服务器端。两个客户端肯定加入同一个房间。服务器也从原始客户端接收事件。 Server.js: 父组件: 子组件: 我希望在两个连接的客户端上读取服务器消息,
我想做的是: 为了学习游戏编程,我使用专用服务器和UDP创建了一个回声设置。专用服务器在不同的城市(也就是不在我的本地网络中)。 在我的本地计算机上,我有一个udp客户端和服务器(两个不同的程序)。当我第一次启动python服务器时,windows防火墙立即询问我是否要添加异常。在允许我的python服务器联网之后,我仍然没有收到任何消息。(客户- 我的问题是: 游戏如何解决这个问题?我没有为我想
使用套接字列表编辑: 按照EJP的建议,这样做我的问题就解决了。
从spring文档中,我看到我们可以一起拥有对等eureka服务器,因此对于Application.yml中的Eureka1,我可以拥有: 在Eureka服务器2中,我可以: