问题陈述:
Spring amqp outbound gateway从不同的线程生成应答(如jms outbound gateway,具有不同的队列,使用相关键关联请求/响应)。
无法将消息与此示例关联。
Spring集成
<int:gateway id="outboundGateway" service-interface="com.amqp.outbound.gateway.OutboundGateway"
default-reply-channel="defaultReplyChannel" >
<int:method name="process" request-channel="inboundRequestChannel"/>
</int:gateway>
<int:channel id="defaultReplyChannel"/>
<int:channel id="inboundRequestChannel"/>
<int:channel id="enrichedInboundRequestChannel"/>
<int:channel id="processAuthRequestChannel"/>
<int:channel id="postProcessorChannel"/>
<int:chain input-channel="inboundRequestChannel" output-channel="enrichedInboundRequestChannel">
<int:service-activator id="serviceActivator"
ref="ouboundService" method="createRequest"/>
</int:chain>
<int-amqp:outbound-gateway id="outboundGtwyId" header-mapper="headerMapper"
request-channel="enrichedInboundRequestChannel"
reply-channel="defaultReplyChannel"
amqp-template="template"
reply-timeout="30000"
exchange-name="request_exchange"
routing-key="request_exchange_queue"/>
<int-amqp:inbound-channel-adapter id="amqpMessageDriven" queue-names="request_queue"
connection-factory="rabbitConnectionFactory" channel="processAuthRequestChannel"/>
<int:service-activator id="serviceActivator"
ref="ouboundService" input-channel="processAuthRequestChannel" output-channel="postProcessorChannel"
method="processRequest"/>
<int-amqp:outbound-channel-adapter amqp-template="template" channel="postProcessorChannel"
header-mapper="headerMapper" exchange-name="reply_exchange" routing-key="reply_exchange_queue"/>
<bean id="headerMapper" class="org.springframework.integration.amqp.support.DefaultAmqpHeaderMapper"/>
配置
@Bean
public RabbitTemplate template(ConnectionFactory rabbitConnectionFactory){
final RabbitTemplate template = new RabbitTemplate(rabbitConnectionFactory);
template.setQueue("reply_queue");
return template;
}
@Bean
public Binding binding(){
return BindingBuilder.bind(this.queue()).to(this.exchange()).with("request_exchange_queue");
}
@Bean
public DirectExchange exchange(){
return new DirectExchange("request_exchange");
}
@Bean
public Queue queue(){
return new Queue("request_queue", true, false, true);
}
@Bean
public Binding bindingReply(){
return BindingBuilder.bind(this.queue()).to(this.exchange()).with("reply_exchange_queue");
}
@Bean
public DirectExchange exchangeReply(){
return new DirectExchange("reply_exchange");
}
@Bean
public Queue replyQueue(){
return new Queue("reply_queue", true, false, true);
}
服务
@Service
public final class OuboundService {
public Message createRequest(String message){
System.out.println("Inside createRequest : "+ message);
final String transactionId = UUID.randomUUID().toString();
final Message builtMessage = MessageBuilder.withBody(message.getBytes())
.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
.setHeader(AmqpHeaders.CORRELATION_ID, transactionId)
.build();
return builtMessage;
}
public Message processRequest(Message message){
System.out.println("Inside process Request : "+ new String(message.getBody()));
System.out.println("Header values : "+message.getMessageProperties().getHeaders());
final Message result = MessageBuilder.withBody("Successful".getBytes()).copyProperties(message.getMessageProperties())
.copyHeaders(message.getMessageProperties().getHeaders()).build();
return result;
}
}
错误:
组织。springframework。整合。处理程序。ReplyRequiredException:处理程序“outboundGtwyId”未生成回复,其“RequiredReply”属性设置为true。
GitHub源代码(解决方案)
https://github.com/kingkongprab/spring-amqp-outbound-gateway
相关也在SpringAMQP中完成。有关更多信息,请参阅其rabbitmplate#sendanderecevie()
。参考手册中也有关于这一问题的良好文档。
Spring与其AbstractAMQOutboundEndpoint
和AMQBinboundGateway
实现的集成提供了现成的请求-应答相关解决方案。如果无法在服务器端使用AmqpInboundGateway
,则应确保correlationId
从接收到的请求传输到要发送回的回复。是的,您可以使用专用的exchange进行回复,这是rabbitmplate#setQueue()
支持的在客户端、出站端等待回复的功能。但如果没有适当的关联
传输,这仍然是行不通的。也看到https://docs.spring.io/spring-integration/docs/4.3.12.RELEASE/reference/html/amqp.html#amqp-信息头的消息头(包括correlationId
)如何在Spring集成中映射。
使现代化
谢谢你分享你的申请。
嗯,现在我看到了几个问题:
>
@Bean
public Binding bindingReply(){
return BindingBuilder.bind(this.replyQueue()).to(this.exchangeReply()).with("reply_exchange_queue");
}
rabbitmplate
必须使用setReplyAddress()
。您必须为reply_队列
配置MessageListenerContainer
,并将rabbitemplate
作为侦听器:
@Bean
public RabbitTemplate template(ConnectionFactory rabbitConnectionFactory){
final RabbitTemplate template = new RabbitTemplate(rabbitConnectionFactory);
template.setReplyAddress(replyQueue().getName());
return template;
}
@Bean
public MessageListenerContainer replyContainer(RabbitTemplate template) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(template.getConnectionFactory());
container.setQueues(replyQueue());
container.setMessageListener(template);
return container;
}
使用org.springframework.amqp.core.Message操作的Ou界限Service
是无用的。通道适配器不知道这种类型的有效负载
,您的自定义Message
只是成为另一个Message
的序列化主体
org.springframework.amqp.core.。我把它改成了这个,一切都很好:
public String createRequest(String message){
System.out.println("Inside createRequest : "+ message);
return message;
}
public Message processRequest(Message message){
System.out.println("Inside process Request : " + message);
return message;
}
不管怎样,我建议你重新考虑你的设计,回到AmqpIn网关。
顺便说一句,在最终的解决方案中,您不需要关心任何相关性
。框架会自动为您实现这一点。
我正在尝试将spring集成配置为向队列发送消息,然后接收消息,即非常简单的事情: 我认为解耦所必需的是在流程的两端都有一个消息网关。因此,我的第一次尝试(有效)如下所示: 其中MessageReceiverHandler()是扩展AbstractMessageHandler的bean。 所以上面我们有一个用于出站消息的消息网关。我假设我们也应该有一个用于入站消息的网关,允许我们将传入消息处理与应
我可以看到请求消息被加入和出列,响应消息被加入和出列。但我有个例外, 我应该如何检索响应?
我有一个 FileUpload 事件,应该将其发送到 http:outbound upload URL。为此,我必须首先对登录 URL 进行身份验证并获取响应,并设置要执行的出站上传 URL 的会话 ID。在我的情况下,我有一个事件侦听器,它侦听应用程序以发布文件上传事件。发布后,我的侦听器可以拾取并执行流。我正在尝试了解如何实现这一点,因为文件上传对象需要保留,直到登录响应返回。谢谢!
我正在开发一个关于Spring集成的POC,使用如下。 从远程JMS队列订阅输入消息(A) 将输入消息(A)转换为(B) 使用(B)调用远程Web服务并接收响应 我的spring int-config-xml有以下内容 在我的Spring集成proj工作区中拥有所有jaxb生成的源代码。 在STS 3.8中执行此操作时。3,将抛出以下错误。 不确定我的代码中有什么错误。任何解决这一问题的帮助都是高
我正在开发一个Spring集成应用程序,我有一个地图列表,我需要将其插入到表格中。 我使用了jdbc: Outsport-网关或适配器将记录插入到表中。 但是如何使用jdbc:出站网关从我的地图列表中插入所有记录。
在带有Java配置的ftp出站网关的Spring集成文档示例(16.8.1)中,如何将应答通道的有效负载记录到控制台?