当前位置: 首页 > 知识库问答 >
问题:

spring amqp出站网关从不同的thead(如jms出站网关)生成应答

洪黎昕
2023-03-14

问题陈述:

Spring amqp outbound gateway从不同的线程生成应答(如jms outbound gateway,具有不同的队列,使用相关键关联请求/响应)。

无法将消息与此示例关联。

Spring集成

    <int:gateway id="outboundGateway" service-interface="com.amqp.outbound.gateway.OutboundGateway"     
                        default-reply-channel="defaultReplyChannel" >
        <int:method name="process"   request-channel="inboundRequestChannel"/>
    </int:gateway>

    <int:channel id="defaultReplyChannel"/>
    <int:channel id="inboundRequestChannel"/>
    <int:channel id="enrichedInboundRequestChannel"/>
    <int:channel id="processAuthRequestChannel"/>
    <int:channel id="postProcessorChannel"/>

    <int:chain input-channel="inboundRequestChannel" output-channel="enrichedInboundRequestChannel">
        <int:service-activator id="serviceActivator"
                       ref="ouboundService"  method="createRequest"/>
    </int:chain>

    <int-amqp:outbound-gateway id="outboundGtwyId" header-mapper="headerMapper"
                        request-channel="enrichedInboundRequestChannel"
                        reply-channel="defaultReplyChannel"
                        amqp-template="template" 
                        reply-timeout="30000" 
                        exchange-name="request_exchange" 
                        routing-key="request_exchange_queue"/>

    <int-amqp:inbound-channel-adapter id="amqpMessageDriven"  queue-names="request_queue" 
                                 connection-factory="rabbitConnectionFactory"  channel="processAuthRequestChannel"/>

    <int:service-activator id="serviceActivator"
                       ref="ouboundService" input-channel="processAuthRequestChannel" output-channel="postProcessorChannel"
                       method="processRequest"/>

    <int-amqp:outbound-channel-adapter amqp-template="template" channel="postProcessorChannel" 
            header-mapper="headerMapper" exchange-name="reply_exchange" routing-key="reply_exchange_queue"/>

    <bean id="headerMapper" class="org.springframework.integration.amqp.support.DefaultAmqpHeaderMapper"/>

配置

@Bean
public RabbitTemplate template(ConnectionFactory rabbitConnectionFactory){
    final RabbitTemplate template = new RabbitTemplate(rabbitConnectionFactory);
    template.setQueue("reply_queue");
    return template;
}



@Bean
public Binding binding(){
    return BindingBuilder.bind(this.queue()).to(this.exchange()).with("request_exchange_queue");
}

@Bean
public DirectExchange exchange(){
    return new DirectExchange("request_exchange");
}

@Bean
public Queue queue(){
    return new Queue("request_queue", true, false, true);
}

@Bean
public Binding bindingReply(){
    return BindingBuilder.bind(this.queue()).to(this.exchange()).with("reply_exchange_queue");
}

@Bean
public DirectExchange exchangeReply(){
    return new DirectExchange("reply_exchange");
}


@Bean
public Queue replyQueue(){
    return new Queue("reply_queue", true, false, true);
}

服务

@Service
public final class OuboundService {


    public Message createRequest(String message){
        System.out.println("Inside createRequest : "+ message);
        final String transactionId = UUID.randomUUID().toString();
        final Message builtMessage = MessageBuilder.withBody(message.getBytes())
                .setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
                .setHeader(AmqpHeaders.CORRELATION_ID, transactionId)
                .build();
        return builtMessage;
    }


    public Message processRequest(Message message){
        System.out.println("Inside process Request : "+ new String(message.getBody()));
        System.out.println("Header values : "+message.getMessageProperties().getHeaders());
        final Message result = MessageBuilder.withBody("Successful".getBytes()).copyProperties(message.getMessageProperties())
                                .copyHeaders(message.getMessageProperties().getHeaders()).build();
        return result;
    }

}

错误:

组织。springframework。整合。处理程序。ReplyRequiredException:处理程序“outboundGtwyId”未生成回复,其“RequiredReply”属性设置为true。

GitHub源代码(解决方案)

https://github.com/kingkongprab/spring-amqp-outbound-gateway

共有1个答案

赵渊
2023-03-14

相关也在SpringAMQP中完成。有关更多信息,请参阅其rabbitmplate#sendanderecevie()。参考手册中也有关于这一问题的良好文档。

Spring与其AbstractAMQOutboundEndpointAMQBinboundGateway实现的集成提供了现成的请求-应答相关解决方案。如果无法在服务器端使用AmqpInboundGateway,则应确保correlationId从接收到的请求传输到要发送回的回复。是的,您可以使用专用的exchange进行回复,这是rabbitmplate#setQueue()支持的在客户端、出站端等待回复的功能。但如果没有适当的关联传输,这仍然是行不通的。也看到https://docs.spring.io/spring-integration/docs/4.3.12.RELEASE/reference/html/amqp.html#amqp-信息头的消息头(包括correlationId)如何在Spring集成中映射。

使现代化

谢谢你分享你的申请。

嗯,现在我看到了几个问题:

>

@Bean
public Binding bindingReply(){
    return BindingBuilder.bind(this.replyQueue()).to(this.exchangeReply()).with("reply_exchange_queue");
}

rabbitmplate必须使用setReplyAddress()。您必须为reply_队列配置MessageListenerContainer,并将rabbitemplate作为侦听器:

@Bean
public RabbitTemplate template(ConnectionFactory rabbitConnectionFactory){
    final RabbitTemplate template = new RabbitTemplate(rabbitConnectionFactory);
    template.setReplyAddress(replyQueue().getName());
    return template;
}

@Bean
public MessageListenerContainer replyContainer(RabbitTemplate template) {
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(template.getConnectionFactory());
    container.setQueues(replyQueue());
    container.setMessageListener(template);
    return container;
}

使用org.springframework.amqp.core.Message操作的Ou界限Service是无用的。通道适配器不知道这种类型的有效负载,您的自定义Message只是成为另一个Message的序列化主体org.springframework.amqp.core.。我把它改成了这个,一切都很好:

public String createRequest(String message){
    System.out.println("Inside createRequest : "+ message);
    return message;
}


public Message processRequest(Message message){
    System.out.println("Inside process Request : " + message);
    return message;
}

不管怎样,我建议你重新考虑你的设计,回到AmqpIn网关。

顺便说一句,在最终的解决方案中,您不需要关心任何相关性。框架会自动为您实现这一点。

 类似资料:
  • 我正在尝试将spring集成配置为向队列发送消息,然后接收消息,即非常简单的事情: 我认为解耦所必需的是在流程的两端都有一个消息网关。因此,我的第一次尝试(有效)如下所示: 其中MessageReceiverHandler()是扩展AbstractMessageHandler的bean。 所以上面我们有一个用于出站消息的消息网关。我假设我们也应该有一个用于入站消息的网关,允许我们将传入消息处理与应

  • 我可以看到请求消息被加入和出列,响应消息被加入和出列。但我有个例外, 我应该如何检索响应?

  • 我有一个 FileUpload 事件,应该将其发送到 http:outbound upload URL。为此,我必须首先对登录 URL 进行身份验证并获取响应,并设置要执行的出站上传 URL 的会话 ID。在我的情况下,我有一个事件侦听器,它侦听应用程序以发布文件上传事件。发布后,我的侦听器可以拾取并执行流。我正在尝试了解如何实现这一点,因为文件上传对象需要保留,直到登录响应返回。谢谢!

  • 我正在开发一个关于Spring集成的POC,使用如下。 从远程JMS队列订阅输入消息(A) 将输入消息(A)转换为(B) 使用(B)调用远程Web服务并接收响应 我的spring int-config-xml有以下内容 在我的Spring集成proj工作区中拥有所有jaxb生成的源代码。 在STS 3.8中执行此操作时。3,将抛出以下错误。 不确定我的代码中有什么错误。任何解决这一问题的帮助都是高

  • 我正在开发一个Spring集成应用程序,我有一个地图列表,我需要将其插入到表格中。 我使用了jdbc: Outsport-网关或适配器将记录插入到表中。 但是如何使用jdbc:出站网关从我的地图列表中插入所有记录。

  • 在带有Java配置的ftp出站网关的Spring集成文档示例(16.8.1)中,如何将应答通道的有效负载记录到控制台?