当前位置: 首页 > 知识库问答 >
问题:

如何在Spring集成中创建和保持多个连接

柴晔
2023-03-14

我有一个服务器和多个客户端,服务器将发送响应并等待确认,此外,我想永远保留该连接以获取下一条消息和确认,我应该如何在Spring集成中创建这些连接。我读到了Spring集成,我找不到保持连接的解决方案。

    public class ClientCall {
    public static void main(String[] args) {
        @SuppressWarnings("resource")
        ApplicationContext ctx = new AnnotationConfigApplicationContext(GatewayConfig.class);
        GatewayService gatewayService = ctx.getBean(GatewayService.class);
        //int i=0;
        Message message = new Message();        
        /*while(i<4)
        {*/
            message.setPayload("It's working");
            gatewayService.sendMessage(message);
        /*  i++;            
        }*/

    }
}


    @Service
    public interface GatewayService<T> {

    public void sendMessage(final T payload);

    public void receiveMessage(String response);

}


@EnableIntegration
@IntegrationComponentScan
@Configuration
@ComponentScan(basePackages = "com.gateway.service")
public class GatewayConfig {

    // @Value("${listen.port:6788}")
    private int port = 6785;

    @Autowired
    private GatewayService<Message> gatewayService;

    @MessagingGateway(defaultRequestChannel = "sendMessageChannel")
    public interface Gateway {
        void viaTcp(String payload);
    }

    @Bean
    public AbstractClientConnectionFactory clientCF() {
        TcpNetClientConnectionFactory clientConnectionFactory = new TcpNetClientConnectionFactory("localhost",this.port);
        clientConnectionFactory.setSingleUse(true);
        return clientConnectionFactory;
    }

    @Bean
    @ServiceActivator(inputChannel = "sendMessageChannel")
    public MessageHandler tcpOutGateway(AbstractClientConnectionFactory connectionFactory) {
        TcpOutboundGateway outGateway = new TcpOutboundGateway();
        outGateway.setConnectionFactory(connectionFactory);
        outGateway.setAsync(true);
        outGateway.setOutputChannel(receiveMessageChannel());
        outGateway.setRequiresReply(true);
        outGateway.setReplyChannel(receiveMessageChannel());
        return outGateway;
    }

    @Bean
    public MessageChannel sendMessageChannel() {
        DirectChannel channel = new DirectChannel();
        return channel;
    }


    @Bean
    public MessageChannel receiveMessageChannel() {
        DirectChannel channel = new DirectChannel();
        return channel;
    }

    @Transformer(inputChannel = "receiveMessageChannel", outputChannel = "processMessageChannel")
    public String convert(byte[] bytes) {
        return new String(bytes);
    }

    @ServiceActivator(inputChannel = "processMessageChannel")
    public void upCase(String response) {
        gatewayService.receiveMessage(response);
    }

    @Transformer(inputChannel = "errorChannel", outputChannel = "processMessageChannel")
    public void convertError(byte[] bytes) {
        String str = new String(bytes);
        System.out.println("Error: " + str);
    }

}

public class Message {

    private String payload;
  // getter setter
}


@Service
public class GatewayServiceImpl implements GatewayService<Message> {

    @Autowired
    private Gateway gateway;

    @Autowired
    private GatewayContextManger<String, Object> gatewayContextManger;

    @Override
    public void sendMessage(final Message message) {
        new Thread(new Runnable() {
            @Override
            public void run() {
                gateway.viaTcp(message.getPayload());
            }
        }).start();
    }

    @Override
    public void receiveMessage(final String response) {
        new Thread(new Runnable() {

            @Override
            public void run() {
                Message message = new Message();
                message.setPayload(response);
                Object obj = message;
                //Object obj = gatewayContextManger.get(message.getPayload());
                synchronized (message) {
                    obj.notify();
                    System.out.println("Message Received : "+message.getPayload());
                }
            }
        }).start();
    }

}

共有1个答案

任宾鸿
2023-03-14

您有:clientConnectionFactory。设置SingleUse(真) 这意味着请求后连接将关闭;将其保留为false(默认)以保持连接打开。

 类似资料:
  • 问题内容: 我正在使用Redis通过Redis-py客户端库存储两个数据库:0和1 。我想为每个数据库创建两个连接。目前,我正在这样做: 但是,我似乎找不到从连接创建Redis对象的方法。 我在这里犯一个菜鸟错误吗? 问题答案: 您真的不应该那样创建连接。让我引用redis-py文档。 在后台,redis- py使用连接池来管理与Redis服务器的连接。默认情况下,您创建的每个Redis实例将依次

  • 如何在我的项目中动态创建以轮询和检索服务器中的文件?

  • <罢工> 错误: 没有类型为'org.springframework.test.web.servlet.mockMVC'的合格bean可用:至少需要1个符合autowire候选的bean。依赖项注释:{@org.springframework.beans.factory.annotation.autowire(required=true)}位于org.springframework.beans.f

  • 问题内容: 我一直在处理我公司的CI扩展问题,同时试图弄清楚在CI和多个分支机构中采用哪种方法。在stackoverflow,多个功能分支和持续集成上也存在类似的问题。我开始了新的话题,因为我想进行更多的讨论并提供有关问题的一些分析。 到目前为止,我发现我可以采用2种主要方法(或者可能采取其他一些方法?)。 每个分支有多套工作(在这里谈论詹金斯/哈德森) 编写工具来管理额外的工作 批量创建/修改/

  • 我的流在数据库中配置,我的程序不断创建和销毁流。 因此,流配置(例如cron配置)可以随时更改。 这些流是用方法IntegrationFlowContext注册的。使用IntegrationFlowRegistration方法注册并销毁。销毁。 流的运行从第0秒开始,可以在任何一分钟开始。销毁和创建新流从每分钟1秒开始。 这是一个好方法吗?当我测试这个时,它起作用了。但我在想,这是一种很好的方法吗

  • 当使用嵌入式HSQLDB进行单元测试时,似乎无法正确处理hibernate实体映射文件中定义的模式和/或目录。hibernate映射如下所示: 我不能更改hibernate实体映射,我不想使用其他数据库引擎(我知道H2DB可以处理这个)。有人能说明如何使HSQLDB在这个单元测试上下文中工作吗?