当前位置: 首页 > 知识库问答 >
问题:

如何使用spring集成在TCP连接上实现keep alive连接?

须巴英
2023-03-14
private static final int SERIALIZER_HEADER_SIZE = 2;

/**
 * Serializer used by connection factory to send and receive messages
 */
@Bean
public ByteArrayLengthHeaderSerializer byteArrayLengthHeaderSerializer() {
    return new ByteArrayLengthHeaderSerializer(SERIALIZER_HEADER_SIZE);
}

@Bean
public AbstractClientConnectionFactory tcpClientConnectionFactory() {
    TcpNetClientConnectionFactory connFactory =
        new TcpNetClientConnectionFactory(props.getUrl(), props.getPort());
    connFactory.setSerializer(byteArrayLengthHeaderSerializer());
    connFactory.setDeserializer(byteArrayLengthHeaderSerializer());
    connFactory.setSoTimeout(props.getSoTimeout());
    if (props.isUseSSL()) {
        connFactory.setTcpSocketFactorySupport(new DefaultTcpNetSSLSocketFactorySupport(() -> {
            return SSLContext.getDefault();
        }));
    }

    return connFactory;
}

/**
 * Connection factory used to create TCP client socket connections
 */
@Bean
public AbstractClientConnectionFactory tcpCachedClientConnectionFactory() {
    CachingClientConnectionFactory cachingConnFactory =
        new CachingClientConnectionFactory(tcpClientConnectionFactory(), props.getMaxPoolSize());
    cachingConnFactory.setConnectionWaitTimeout(props.getMaxPoolWait());
    return cachingConnFactory;
}

基于此,有人对如何使用spring Integration实现此功能有什么建议吗?

共有1个答案

任文乐
2023-03-14

当使用简单的客户端连接工厂时,使用@InboundChannelAdapter设置应用程序级心跳消息非常容易。

简单的例子:

@SpringBootApplication
public class So46918267Application {

    public static void main(String[] args) throws IOException {
        // Simulated Server
        final ServerSocket server = ServerSocketFactory.getDefault().createServerSocket(1234);
        ExecutorService executor = Executors.newSingleThreadExecutor();
        executor.execute(() -> {
            try {
                Socket socket = server.accept();
                BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
                String line;
                while ((line = reader.readLine()) != null) {
                    System.out.println(line);
                    if (line.equals("keep_alive")) {
                        socket.getOutputStream().write("OK\r\n".getBytes());
                    }
                }
            }
            catch (IOException e) {
                e.printStackTrace();
            }
        });
        ConfigurableApplicationContext context = SpringApplication.run(So46918267Application.class, args);
        System.out.println("Hit enter to terminate");
        System.in.read();
        executor.shutdownNow();
        context.close();
        server.close();
    }

    @Bean
    public TcpNetClientConnectionFactory client() {
        return new TcpNetClientConnectionFactory("localhost", 1234);
    }

    @ServiceActivator(inputChannel = "toTcp")
    @Bean
    public TcpOutboundGateway gateway() {
        TcpOutboundGateway gateway = new TcpOutboundGateway();
        gateway.setConnectionFactory(client());
        return gateway;
    }

    // HEARTBEATS

    private final Message<?> heartbeatMessage = MessageBuilder.withPayload("keep_alive")
            .setReplyChannelName("heartbeatReplies")
            .build();

    @InboundChannelAdapter(channel = "toTcp", poller = @Poller(fixedDelay = "25000"))
    public Message<?> heartbeat() {
        return this.heartbeatMessage;
    }

    @ServiceActivator(inputChannel = "heartbeatReplies")
    public void reply(byte[] reply) {
        System.out.println(new String(reply));
    }

}

但是,在使用CachingClientConnectionFactory时,不清楚为什么要保持空闲连接池处于打开状态。但是,池的工作方式是将空闲连接保留在队列中,这样每个请求都将转到最早的连接,连接将返回到队列的末尾。

@InboundChannelAdapter(channel = "toTcp", 
    poller = @Poller(fixedDelay = "25000", maxMessagesPerPoll = "5"))
 类似资料:
  • 我试图创建一个接受入站连接的Tcp服务器,并异步地向连接的客户端发送消息。有一个Tcp服务器的示例,但它使用的是网关,是请求/响应,不支持异步。 null 异常org.springframework.web.util.NestedServletException:请求处理失败;嵌套异常是org.springframework.messaging.messagehandlingException:无

  • 我已经配置了HAProxy(1.5.4,但我也尝试了1.5.14),以便在TCP模式下平衡5672端口上公开AMQP协议(WSO2 Message Broker)的两台服务器。客户端通过Haproxy创建并使用到AMQP服务器的永久连接。 我更改了客户端和服务器TCP keepalive超时,设置net.ipv4.tcp_keepAlive_time=120(CentOS 7)。 在HAProxy

  • 下面是我到目前为止的代码: 有人能解释一下在和情况下我该做什么吗?以及如何知道网络已断开并重新连接?

  • <罢工> 错误: 没有类型为'org.springframework.test.web.servlet.mockMVC'的合格bean可用:至少需要1个符合autowire候选的bean。依赖项注释:{@org.springframework.beans.factory.annotation.autowire(required=true)}位于org.springframework.beans.f

  • null 如何在同一个连接上实现异步请求/响应? Spring TcpOutboundGateway javadoc提到:为该用例使用一对出站/入站适配器。 因此,除上述声明外:

  • 刚开始使用Spring Webflux Webclient,只是想知道Http连接的默认KeepAlive时间是多少?有没有办法增加keepAlive的时间?在我们的Rest服务中,我们可能每五分钟就会收到一个请求,这个请求需要很长的时间来处理,它需要500秒-10秒之间的时间。然而,在负载测试中,如果频繁发送请求,处理时间将小于250ms。