当前位置: 首页 > 知识库问答 >
问题:

Spring Integration--如何在同一个连接上实现异步TCP套接字请求/响应?

归泽宇
2023-03-14
    null
@MessagingGateway(defaultRequestChannel = REQUEST_CHANNEL, errorChannel = ERROR_CHANNEL)
public interface ClientGtw {
    Future<Response> send(Request request);
}

@Bean
@ServiceActivator(inputChannel = REQUEST_CHANNEL)
public MessageHandler outboundGateway(TcpNioClientConnectionFactory connectionFactory) {
    TcpOutboundGateway gateway = new TcpOutboundGateway();
    gateway.setConnectionFactory(connectionFactory);
    gateway.setRequestTimeout(TimeUnit.SECONDS.toMillis(timeout));
    gateway.setRemoteTimeout(TimeUnit.SECONDS.toMillis(timeout));
    return gateway;
}

@Bean
public TcpNioClientConnectionFactory clientConnectionFactory(AppConfig config) {    
    Host host = getHost(config);

    TcpNioClientConnectionFactory factory = new TcpNioClientConnectionFactory(host.name, host.port);
    factory.setSingleUse(false);
    factory.setSoTimeout((int) TimeUnit.SECONDS.toMillis(timeout));

    SerializerDeserializer sd = new SerializerDeserializer();
    factory.setDeserializer(sd);
    factory.setSerializer(sd);
    return factory;
}

如何在同一个连接上实现异步请求/响应?

Spring TcpOutboundGateway javadoc提到:为该用例使用一对出站/入站适配器。

因此,除上述声明外:

@Bean
public TcpInboundGateway inboundGateway(AbstractServerConnectionFactory connectionFactory) {
    TcpInboundGateway gateway = new TcpInboundGateway();
    gateway.setConnectionFactory(connectionFactory);
    gateway.setRequestTimeout(TimeUnit.SECONDS.toMillis(timeout));
    return gateway;
}

@Bean
public AbstractServerConnectionFactory serverFactory(AppConfig config) {
    Host host = getHost(config);
    AbstractServerConnectionFactory connectionFactory = new TcpNetServerConnectionFactory(host.port);
    connectionFactory.setSingleUse(true);
    connectionFactory.setSoTimeout(timeout);
    return connectionFactory;
}
@Bean
public TcpInboundGateway inboundGateway(TcpNioClientConnectionFactory connectionFactory) {
    TcpInboundGateway gateway = new TcpInboundGateway();
    gateway.setConnectionFactory(connectionFactory);
    gateway.setRequestTimeout(TimeUnit.SECONDS.toMillis(timeout));
    gateway.setClientMode(true);
    return gateway;
}

共有1个答案

杨超
2023-03-14

使用一对通道适配器而不是出站网关。不使用MessagingGateway,您可以自己在应用程序中进行关联,也可以使用tcp-client-server-multiplex示例应用程序中使用的相同技术。它使用聚合器将出站消息的副本与入站消息聚合,以回复网关。

它是旧的,使用XML配置,但同样的技术也适用。

<publish-subscribe-channel id="input" />

<ip:tcp-outbound-channel-adapter id="outAdapter.client"
    order="2"
    channel="input"
    connection-factory="client" /> <!-- Collaborator -->

<!-- Also send a copy to the custom aggregator for correlation and
     so this message's replyChannel will be transferred to the
     aggregated message.
     The order ensures this gets to the aggregator first -->
<bridge input-channel="input" output-channel="toAggregator.client"
        order="1"/>

<!-- Asynch receive reply -->
<ip:tcp-inbound-channel-adapter id="inAdapter.client"
    channel="toAggregator.client"
    connection-factory="client" /> <!-- Collaborator -->

<!-- dataType attribute invokes the conversion service, if necessary -->
<channel id="toAggregator.client" datatype="java.lang.String" />

<aggregator input-channel="toAggregator.client"
    output-channel="toTransformer.client"
    expire-groups-upon-completion="true"
    expire-groups-upon-timeout="true"
    discard-channel="noResponseChannel"
    group-timeout="1000"
    correlation-strategy-expression="payload.substring(0,3)"
    release-strategy-expression="size() == 2" />

<channel id="noResponseChannel" />

<service-activator input-channel="noResponseChannel" ref="echoService" method="noResponse" />

<transformer input-channel="toTransformer.client"
    expression="payload.get(1)"/> <!-- The response is always second -->

(这个简单的示例与前3个字节相关)。

 类似资料:
  • 我在详细学习http协议时,偶然发现了这个教程。我认为当发出http请求时,会通过客户端(浏览器)和服务器建立TCP连接。在TCP连接上发送http请求数据,在服务器端解析http请求,并在同一TCP连接上发送http响应。发送响应后,TCP连接终止。对于下一个http请求,将建立一个新的TCP连接。但下面的教程说了些别的。我是对还是错?

  • 在一个简单的客户端-服务器TCP套接字中,当我试图在Android Studio中创建客户端套接字时,遇到了一个问题。导致错误的指令是<code>Sockets=new Socket(主机名,端口) 我使用的端口是 1993,我尝试了变量的一些值: 在每种情况下,应用程序都会因为这个错误而停止,下面的错误属于第一点,其他的得到类似的错误。 Eclipse中的相同指令工作,我在命令行上使用服务器“n

  • 我目前正在自定义camel组件中使用netty监听套接字(使用具有netty通道处理程序的使用者),将消息从套接字传递到多个camelendpoint,并最终将最终结果写入平面文件。 这很好,但显然仅限于“仅限”功能。 我想做的是,不是将最终结果写入平面文件,而是通过套接字将最终结果发送回请求者。 像这样的东西不工作...伪代码: 我怀疑发生的是我对 导致NettyProducer尝试建立到loc

  • 在套接字最终接受另一端消失的情况下,什么指定了这个超时?是操作系统(Ubuntu 11.04),还是来自TCP/IP规范,还是套接字配置选项?

  • 基于此,有人对如何使用spring Integration实现此功能有什么建议吗?