当前位置: 首页 > 知识库问答 >
问题:

Spring Boot Integration-在连接初始化时发送问候

澹台星剑
2023-03-14
    null
@SpringBootApplication
@EnableIntegration
public class ExampleApp {

    public static void main(String[] args) {
        SpringApplication.run(ExampleApp.class, args);
    }

    // Create listener on port 1234
    @Bean
    public AbstractServerConnectionFactory serverConnectionFactory() {
        TcpNetServerConnectionFactory tcpNetServerConnectionFactory = new TcpNetServerConnectionFactory(1234);
        return tcpNetServerConnectionFactory;
    }

    // Inbound channel
    @Bean
    public MessageChannel requestChannel() {
        return new DirectChannel();
    }

    // Outbound channel
    @Bean
    public MessageChannel replyChannel() {
        return new DirectChannel();
    }

    // Inbound gateway
    @Bean
    public TcpInboundGateway tcpInboundGateway(AbstractServerConnectionFactory serverConnectionFactory, MessageChannel requestChannel, MessageChannel replyChannel) {
        TcpInboundGateway tcpInboundGateway = new TcpInboundGateway();
        tcpInboundGateway.setConnectionFactory(serverConnectionFactory);
        tcpInboundGateway.setRequestChannel(requestChannel);
        tcpInboundGateway.setReplyChannel(replyChannel);
        return tcpInboundGateway;
    }

    // Send reply for incoming message -> working
    @ServiceActivator(inputChannel = "requestChannel", outputChannel = "replyChannel")
    public Message<String> processMessage(Message<String> message) {
        Message reply = MessageBuilder
                .withPayload("OK")
                .setHeader(IpHeaders.CONNECTION_ID, message.getHeaders().get(IpHeaders.CONNECTION_ID, String.class))
                .build();
        return reply;
    }

    // Send greeting -> not working
    @Bean
    public ApplicationListener<TcpConnectionEvent> listener(MessageChannel replyChannel) {
        return tcpConnectionEvent -> {
            if (tcpConnectionEvent instanceof TcpConnectionOpenEvent) {
                Message<String> message = MessageBuilder
                        .withPayload("Hello client")
                        .setHeader(IpHeaders.CONNECTION_ID, tcpConnectionEvent.getConnectionId())
                        .build();
                replyChannel.send(message);
            }
        };
    }
}
nc -C localhost 1234

连接已建立,但日志中出现以下错误:

未能发布TcpConnectionOpenEvent[Source=TCPNetConnection:LocalHost:37656:1234:187CFBC2-7E5D-4F4E-97DE-1A3B55A4E264],[Factory=ServerConnectionFactory,ConnectionId=LocalHost:37656:1234:187CFBC2-7E5D-4F4E-97DE-1A3B55A4E264]打开:Dispatcher没有通道“Application.ReplyChannel”的订阅服务器

如果我向服务器发送一个字符串,他会按预期回复“OK”。

多亏了加里·罗素的评论,我找到了解决办法。入站网关必须“拆分为”入站/出站通道适配器对。以下是完整的工作示例:

@SpringBootApplication
@EnableIntegration
public class ExampleApp {

    public static void main(String[] args) {
        SpringApplication.run(ExampleApp.class, args);
    }

    // Create listener on port 1234
    @Bean
    public AbstractServerConnectionFactory serverConnectionFactory() {
        TcpNetServerConnectionFactory tcpNetServerConnectionFactory = new TcpNetServerConnectionFactory(1234);
        return tcpNetServerConnectionFactory;
    }

    // Inbound channel
    @Bean
    public MessageChannel requestChannel() {
        return new DirectChannel();
    }

    // Outbound channel
    @Bean
    public MessageChannel replyChannel() {
        return new DirectChannel();
    }

    // Inbound channel adapter
    @Bean
    public TcpReceivingChannelAdapter receivingChannelAdapter(AbstractServerConnectionFactory serverConnectionFactory, MessageChannel requestChannel) {
        TcpReceivingChannelAdapter tcpReceivingChannelAdapter = new TcpReceivingChannelAdapter();
        tcpReceivingChannelAdapter.setConnectionFactory(serverConnectionFactory);
        tcpReceivingChannelAdapter.setOutputChannel(requestChannel);
        return tcpReceivingChannelAdapter;
    }

    // Outbound channel adapter
    @Bean
    @ServiceActivator(inputChannel = "replyChannel")
    public TcpSendingMessageHandler tcpSendingMessageHandler(AbstractServerConnectionFactory serverConnectionFactory) {
        TcpSendingMessageHandler tcpSendingMessageHandler = new TcpSendingMessageHandler();
        tcpSendingMessageHandler.setConnectionFactory(serverConnectionFactory);
        return tcpSendingMessageHandler;
    }

    // Send reply for incoming message -> working
    @ServiceActivator(inputChannel = "requestChannel", outputChannel = "replyChannel")
    public Message<String> processMessage(Message<String> message) {
        Message<String> reply = MessageBuilder
                .withPayload("OK")
                .setHeader(IpHeaders.CONNECTION_ID, message.getHeaders().get(IpHeaders.CONNECTION_ID, String.class))
                .build();
        return reply;
    }

    // Send greeting -> now working
    @Bean
    public ApplicationListener<TcpConnectionEvent> listener(MessageChannel replyChannel) {
        return tcpConnectionEvent -> {
            if (tcpConnectionEvent instanceof TcpConnectionOpenEvent) {
                Message<String> message = MessageBuilder
                        .withPayload("Hello client")
                        .setHeader(IpHeaders.CONNECTION_ID, tcpConnectionEvent.getConnectionId())
                        .build();
                replyChannel.send(message);
            }
        };
    }
}

现在,客户端在建立连接时得到问候语“Hello client”,在每个发送消息时得到答复“OK”。

共有1个答案

许远航
2023-03-14

ReplyChannel.Send(消息);

你不能那样做;当第一个请求进来时,应答通道被连接起来。

在任何情况下,您都不能使用这样的网关,应答通道是用于对请求的应答,而不是用于发送任意消息。

 类似资料:
  • 如何使用dataTables实例化不加载数据的表(服务器模式),然后在单击按钮时加载数据。如果serverSide在初始化时设置为true,则表将自动发送ajax请求,然后呈现数据,这不是我想要的(

  • 我是气流新手,正在尝试启动MySQL db到气流。我已经创建了一个虚拟环境并安装了相应的软件包。我得到以下错误: 气流初始数据库 回溯(最近一次调用):文件“/Users//Documents/Test/venv/bin/airflow”,第23行,在导入argcomplete模块NotFoundError:没有名为“argcomplete”的模块 我已经安装了必要的软件包,特别是pip3 ins

  • 我的项目的设置是- 用于持久化的Spring JDBC 下面是我的应用程序的日志,它捕获了与数据库的交互。 日志中有两件事是清楚的- 连接池仅在收到执行查询的第一个请求时才开始创建连接 一个包含4个连接的池需要将近30秒的时间来初始化 我的问题是- 如何配置DBCP在启动时自动初始化? 创建连接真的需要那么长时间吗? 注意:请不要建议切换到C3P0或Tomcat连接池。我知道这些解决方案。我更感兴

  • 在 Swift 3 中,dispatch_once函数被删除,迁移指南建议使用初始化闭包: 让myGlobal = { … global包含对闭包调用的初始化…}() _ = myGlobal //使用myGlobal只会在首次使用时调用初始化代码。 我想从初始化闭包中访问“Self”实例变量,如下所示: 为什么“自我”在闭包中是不可接近的,如何才能使它成为现实?

  • HTTPProxyHandler总是发送连接请求是正确的吗?这不是只在你想做隧道的时候才适用吗?

  • 问题内容: 在当前的问题中(我将文件打印到Java中的物理打印机),我一直在疯狂地遍历代码,试图从所使用的每个类的javadoc中吞噬所有有用的丢失信息。 现在,我从以前的问题中抽出了很多代码,所以有相当一部分我不是自己写的。我注意到的问题是,我抓取的代码正在初始化一个对象,例如实现接口(Doc)的“SimpleDoc”并将其分配给该接口? 小代码段: 现在,据我所知,我们创建了对象。我熟悉继承,