编辑2:我切换到TCPNetServerConnectionFactory
,并用ThreadAffinityClientConnectionFactory
包装客户端工厂,这样设备就可以很好地到达后端。但是当后端发回一些东西时,我会得到错误无法找到GenericMessage
的出站套接字,客户机套接字死亡。我认为这是因为后端没有正确路由消息所必需的头。如何捕获此信息?我的配置类如下:
@Configuration
@EnableIntegration
@IntegrationComponentScan
public class ServerConfiguration {
@Bean
public AbstractServerConnectionFactory serverFactory() {
AbstractServerConnectionFactory factory = new TcpNetServerConnectionFactory(8000);
factory.setSerializer(new MapJsonSerializer());
factory.setDeserializer(new MapJsonSerializer());
return factory;
}
@Bean
public AbstractClientConnectionFactory clientFactory() {
AbstractClientConnectionFactory factory = new TcpNioClientConnectionFactory("localhost", 3333);
factory.setSerializer(new MapJsonSerializer());
factory.setDeserializer(new MapJsonSerializer());
factory.setSingleUse(true);
return new ThreadAffinityClientConnectionFactory(factory);
}
@Bean
public TcpReceivingChannelAdapter inboundDeviceAdapter(AbstractServerConnectionFactory connectionFactory) {
TcpReceivingChannelAdapter inbound = new TcpReceivingChannelAdapter();
inbound.setConnectionFactory(connectionFactory);
return inbound;
}
@Bean
public TcpSendingMessageHandler outboundDeviceAdapter(AbstractServerConnectionFactory connectionFactory) {
TcpSendingMessageHandler outbound = new TcpSendingMessageHandler();
outbound.setConnectionFactory(connectionFactory);
return outbound;
}
@Bean
public TcpReceivingChannelAdapter inboundBackendAdapter(AbstractClientConnectionFactory connectionFactory) {
TcpReceivingChannelAdapter inbound = new TcpReceivingChannelAdapter();
inbound.setConnectionFactory(connectionFactory);
return inbound;
}
@Bean
public TcpSendingMessageHandler outboundBackendAdapter(AbstractClientConnectionFactory connectionFactory) {
TcpSendingMessageHandler outbound = new TcpSendingMessageHandler();
outbound.setConnectionFactory(connectionFactory);
return outbound;
}
@Bean
public IntegrationFlow backendIntegrationFlow() {
return IntegrationFlows.from(inboundBackendAdapter(clientFactory()))
.log(LoggingHandler.Level.INFO)
.handle(outboundDeviceAdapter(serverFactory()))
.get();
}
@Bean
public IntegrationFlow deviceIntegrationFlow() {
return IntegrationFlows.from(inboundDeviceAdapter(serverFactory()))
.log(LoggingHandler.Level.INFO)
.handle(outboundBackendAdapter(clientFactory()))
.get();
}
}
您的要求并不完全清楚,所以我假设您的意思是想要客户机和服务器之间的spring集成代理。类似于:
iot-device -> spring server -> message-transformation -> spring client -> back-end-server
如果是这种情况,您可以实现一个ClientConnectionidAware
客户机连接工厂,该工厂包装一个标准工厂。
在集成流中,将消息中传入的IP_connectionID
标头绑定到线程(在ThreadLocal
中)。
@SpringBootApplication
public class So51200675Application {
public static void main(String[] args) {
SpringApplication.run(So51200675Application.class, args).close();
}
@Bean
public ApplicationRunner runner() {
return args -> {
Socket socket = SocketFactory.getDefault().createSocket("localhost", 1234);
socket.getOutputStream().write("foo\r\n".getBytes());
BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
System.out.println(reader.readLine());
socket.close();
};
}
@Bean
public Map<String, String> fromToConnectionMappings() {
return new ConcurrentHashMap<>();
}
@Bean
public Map<String, String> toFromConnectionMappings() {
return new ConcurrentHashMap<>();
}
@Bean
public IntegrationFlow proxyInboundFlow() {
return IntegrationFlows.from(Tcp.inboundAdapter(serverFactory()))
.transform(Transformers.objectToString())
.<String, String>transform(s -> s.toUpperCase())
.handle((p, h) -> {
mapConnectionIds(h);
return p;
})
.handle(Tcp.outboundAdapter(threadConnectionFactory()))
.get();
}
@Bean
public IntegrationFlow proxyOutboundFlow() {
return IntegrationFlows.from(Tcp.inboundAdapter(threadConnectionFactory()))
.transform(Transformers.objectToString())
.<String, String>transform(s -> s.toUpperCase())
.enrichHeaders(e -> e
.headerExpression(IpHeaders.CONNECTION_ID, "@toFromConnectionMappings.get(headers['"
+ IpHeaders.CONNECTION_ID + "'])").defaultOverwrite(true))
.handle(Tcp.outboundAdapter(serverFactory()))
.get();
}
private void mapConnectionIds(Map<String, Object> h) {
try {
TcpConnection connection = threadConnectionFactory().getConnection();
String mapping = toFromConnectionMappings().get(connection.getConnectionId());
String incomingCID = (String) h.get(IpHeaders.CONNECTION_ID);
if (mapping == null || !(mapping.equals(incomingCID))) {
System.out.println("Adding new mapping " + incomingCID + " to " + connection.getConnectionId());
toFromConnectionMappings().put(connection.getConnectionId(), incomingCID);
fromToConnectionMappings().put(incomingCID, connection.getConnectionId());
}
}
catch (Exception e) {
e.printStackTrace();
}
}
@Bean
public ThreadAffinityClientConnectionFactory threadConnectionFactory() {
return new ThreadAffinityClientConnectionFactory(clientFactory()) {
@Override
public boolean isSingleUse() {
return false;
}
};
}
@Bean
public AbstractServerConnectionFactory serverFactory() {
return Tcp.netServer(1234).get();
}
@Bean
public AbstractClientConnectionFactory clientFactory() {
AbstractClientConnectionFactory clientFactory = Tcp.netClient("localhost", 1235).get();
clientFactory.setSingleUse(true);
return clientFactory;
}
@Bean
public IntegrationFlow serverFlow() {
return IntegrationFlows.from(Tcp.inboundGateway(Tcp.netServer(1235)))
.transform(Transformers.objectToString())
.<String, String>transform(p -> p + p)
.get();
}
@Bean
public ApplicationListener<TcpConnectionCloseEvent> closer() {
return e -> {
if (fromToConnectionMappings().containsKey(e.getConnectionId())) {
String key = fromToConnectionMappings().remove(e.getConnectionId());
toFromConnectionMappings().remove(key);
System.out.println("Removed mapping " + e.getConnectionId() + " to " + key);
threadConnectionFactory().releaseConnection();
}
};
}
}
编辑3
对于我来说,使用MapJsonSerializer
可以很好地工作。
@SpringBootApplication
public class So51200675Application {
public static void main(String[] args) {
SpringApplication.run(So51200675Application.class, args).close();
}
@Bean
public ApplicationRunner runner() {
return args -> {
Socket socket = SocketFactory.getDefault().createSocket("localhost", 1234);
socket.getOutputStream().write("{\"foo\":\"bar\"}\n".getBytes());
BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
System.out.println(reader.readLine());
socket.close();
};
}
@Bean
public Map<String, String> fromToConnectionMappings() {
return new ConcurrentHashMap<>();
}
@Bean
public Map<String, String> toFromConnectionMappings() {
return new ConcurrentHashMap<>();
}
@Bean
public MapJsonSerializer serializer() {
return new MapJsonSerializer();
}
@Bean
public IntegrationFlow proxyRequestFlow() {
return IntegrationFlows.from(Tcp.inboundAdapter(serverFactory()))
.<Map<String, String>, Map<String, String>>transform(m -> {
m.put("foo", m.get("foo").toUpperCase());
return m;
})
.handle((p, h) -> {
mapConnectionIds(h);
return p;
})
.handle(Tcp.outboundAdapter(threadConnectionFactory()))
.get();
}
@Bean
public IntegrationFlow proxyReplyFlow() {
return IntegrationFlows.from(Tcp.inboundAdapter(threadConnectionFactory()))
.<Map<String, String>, Map<String, String>>transform(m -> {
m.put("foo", m.get("foo").toLowerCase() + m.get("foo"));
return m;
})
.enrichHeaders(e -> e
.headerExpression(IpHeaders.CONNECTION_ID, "@toFromConnectionMappings.get(headers['"
+ IpHeaders.CONNECTION_ID + "'])").defaultOverwrite(true))
.handle(Tcp.outboundAdapter(serverFactory()))
.get();
}
private void mapConnectionIds(Map<String, Object> h) {
try {
TcpConnection connection = threadConnectionFactory().getConnection();
String mapping = toFromConnectionMappings().get(connection.getConnectionId());
String incomingCID = (String) h.get(IpHeaders.CONNECTION_ID);
if (mapping == null || !(mapping.equals(incomingCID))) {
System.out.println("Adding new mapping " + incomingCID + " to " + connection.getConnectionId());
toFromConnectionMappings().put(connection.getConnectionId(), incomingCID);
fromToConnectionMappings().put(incomingCID, connection.getConnectionId());
}
}
catch (Exception e) {
e.printStackTrace();
}
}
@Bean
public ThreadAffinityClientConnectionFactory threadConnectionFactory() {
return new ThreadAffinityClientConnectionFactory(clientFactory()) {
@Override
public boolean isSingleUse() {
return false;
}
};
}
@Bean
public AbstractServerConnectionFactory serverFactory() {
return Tcp.netServer(1234)
.serializer(serializer())
.deserializer(serializer())
.get();
}
@Bean
public AbstractClientConnectionFactory clientFactory() {
AbstractClientConnectionFactory clientFactory = Tcp.netClient("localhost", 1235)
.serializer(serializer())
.deserializer(serializer())
.get();
clientFactory.setSingleUse(true);
return clientFactory;
}
@Bean
public IntegrationFlow backEndEmulatorFlow() {
return IntegrationFlows.from(Tcp.inboundGateway(Tcp.netServer(1235)
.serializer(serializer())
.deserializer(serializer())))
.<Map<String, String>, Map<String, String>>transform(m -> {
m.put("foo", m.get("foo") + m.get("foo"));
return m;
})
.get();
}
@Bean
public ApplicationListener<TcpConnectionCloseEvent> closer() {
return e -> {
if (fromToConnectionMappings().containsKey(e.getConnectionId())) {
String key = fromToConnectionMappings().remove(e.getConnectionId());
toFromConnectionMappings().remove(key);
System.out.println("Removed mapping " + e.getConnectionId() + " to " + key);
threadConnectionFactory().releaseConnection();
}
};
}
}
和
我有一个示例Spring启动应用程序来运行图形QL服务器,具有作为客户端,我的pom有以下依赖项: 当我尝试从客户端连接时,出现以下错误: 狩猎决议好心建议。 我还有几个问题: 我应该使用SimpleGraphQLHttpServlet将请求路由到endpoint吗 我正在React UI上使用apollo client,那么它是强制使用apollo server还是spring boot可以工作
我正在尝试在Java创建一个多客户机/服务器应用程序。我有些问题因为我的线好像纠缠在一起了...这就是我要做的。 > 我有一个服务器类,它通过使用以下代码来接受客户端: while(true){Socket Socket=ServerSocket.Accept();} 我的服务器应该记住所连接的客户端,因此我使用该套接字创建一个名为ClientThread的新线程,并将该线程放在服务器上的列表中
客户端应用程序在以下代码处挂起:
问题内容: 我正在使用RMI编写密码系统的原型。 我有一个问题,因为当我启动两个客户端时,它们从OneTimePad类的服务器中的一个对象获得了响应。 因此客户端A获取为客户端b保留的密钥,由于特定的算法,这种情况不会发生。 服务器仅向客户端发送E和N变量(例如在RSA中),因此我无法序列化OneTimePad对象并通过网络发送它(因为它将具有所有密钥)。 如何为每个客户端创建OneTimePad
下面是我的服务器代码: 下面是我的客户端活动代码: 以下是客户端活动的xml布局文件: 因此,我开始认为这不是连接端口的问题,而是应用程序的android客户端的问题。但我想不出有什么问题。 顺便说一下,当我试图发送消息时,运行客户端的手机和运行服务器的笔记本电脑都连接到了同一个网络。
真的需要你帮忙。 我的项目是通过电缆连接两台PC机,并使用tcp套接字将客户端文本框形式的字符串发送到服务器。问题是ATI只能发送一个字符串,然后连接就会关闭。 注意:某个端口上的连接将在表单加载中建立并成功。