当前位置: 首页 > 知识库问答 >
问题:

Spring 集成和 RabbitMQ:接收消息时没有可用的输出通道或回复通道标头

牛经赋
2023-03-14

我写了一个带有请求和回复的简单消息流。我必须使用两个独立的队列,所以我声明AmqpOutboundAdapter发送消息,声明Amqp入站Adapter接收回复。

@Bean
@FindADUsers
public AmqpOutboundEndpoint newFindADUsersOutboundAdapter() {
    return Amqp.outboundAdapter(amqpTemplate())
            .routingKeyExpression("headers[" + ADUsersFindConfig.ROUTING_KEY_HEADER + "]")
            .exchangeName(getExchange())
            .headerMapper(amqpHeaderMapper())
            .get();
}

@Bean
public AmqpInboundChannelAdapter newFindADUsersResponseInboundChannelAdapter(
        ADUsersFindResponseConfig config) {
    return Amqp.inboundAdapter(rabbitConnectionFactory(), findADUsersResponseQueue)
            .headerMapper(amqpHeaderMapper())
            .outputChannel(config.newADUsersFindResponseOutputChannel())
            .get();
}

它应该适用于@MessagingGateway:

@MessagingGateway
public interface ADUsersFindService {

     String FIND_AD_USERS_CHANNEL = "adUsersFindChannel";

     String FIND_AD_USERS_REPLY_OUTPUT_CHANNEL = "adUsersFindReplyOutputChannel";

     String FIND_AD_USERS_REPLY_CHANNEL = "adUsersFindReplyChannel";

     String CORRELATION_ID_REQUEST_HEADER = "correlation_id";

     String ROUTING_KEY_HEADER = "replyRoutingKey";

     String OBJECT_TYPE_HEADER = "object.type";

     @Gateway(requestChannel = FIND_AD_USERS_CHANNEL, replyChannel = FIND_AD_USERS_REPLY_CHANNEL)
ADResponse find(ADRequest adRequest, @Header(ROUTING_KEY_HEADER) String routingKey, @Header(OBJECT_TYPE_HEADER) String objectType);
}

ADUsersFindResponseConfig类类似于:

 @Configuration
 @Import(JsonConfig.class)
 public class ADUsersFindResponseConfig {

     @Autowired
     public NullChannel nullChannel;

     @Autowired
     private JsonObjectMapper<?, ?> mapper;

     /**
      * @return The output channel for the flow
      */
     @Bean(name = ADUsersFindService.FIND_AD_USERS_REPLY_OUTPUT_CHANNEL)
     public MessageChannel newADUsersFindResponseOutputChannel() {
         return MessageChannels.direct().get();
     }

     /**
      * @return The output channel for gateway
      */
     @Bean(name = ADUsersFindService.FIND_AD_USERS_REPLY_CHANNEL)
     public MessageChannel newADUsersFindResponseChannel() {
         return MessageChannels.direct().get();
     }

     @Bean
     public IntegrationFlow findADUsersResponseFlow() {
         return IntegrationFlows
                 .from(newADUsersFindResponseOutputChannel())
                 .transform(new JsonToObjectTransformer(ADResponse.class, mapper))
                 .channel(newADUsersFindResponseChannel())
                 .get();
     }
 }

发送消息正常工作,但我在接收消息时遇到问题。我希望收到的消息将被传递到名为FIND_AD_USERS_REPLY_OUTPUT_channel的通道,然后消息将使用findADUsersResponseFlow反序列化为ADResponse对象,下一个ADRespons对象将被传递给网关replyChannel-FIND_ADUSERS_REPLY-channel。最后,“find”方法返回此对象。不幸的是,org.springframework.integration.handler。BridgeHandler收到一条消息,我收到异常:

 org.springframework.messaging.MessagingException: ; nested exception is org.springframework.messaging.core.DestinationResolutionException: no output-channel or replyChannel header available

消息日志如下:

 11:51:35.697 [SimpleAsyncTaskExecutor-1] INFO  New message - GenericMessage [payload={...somepayload...}, headers={correlation_id=7cbd958e-4b09-4e4c-ba8e-5ba574f3309a, replyRoutingKey=findADUsersResponse.ad, amqp_consumerQueue=findADUsersResponseQueue, history=newFindADUsersResponseInboundChannelAdapter,adUsersFindReplyOutputChannel,adUsersFindReplyChannel,infoLog,infoLoggerChain.channel#0,infoLoggerChain.channel#1, id=37a4735d-6983-d1ad-e0a1-b37dc17e48ef, amqp_consumerTag=amq.ctag-8Qs5YEun1jXYRf85Hu1URA, object.type=USER, timestamp=1469094695697}]

所以我很确定该消息已传递给adUsersFindReplyChannel。此外(如果很重要)请求消息和回复消息都将“replyTo”标头设置为 null。我做错了什么?

共有1个答案

陆浩博
2023-03-14

回复通道标头是一个活动对象,不能通过 AMQP 序列化。

您可以使用出站网关而不是一对适配器,框架将处理头。

如果您出于某种原因必须使用适配器,您需要做两件事:

>

  • 使用标头通道注册表将通道对象转换为在注册表中注册的字符串。

    确保标头映射器配置为发送/接收replyChannel标头,并且接收系统在回复中返回标头。

  •  类似资料:
    • 我正在使用Spring Boot以及Spring集成版本。我正在尝试为调用远程超文本传输协议服务的Spring集成流之一正确设置重试建议。流程的简化草案如下所示:

    • 我在应用程序中收到一个错误,我没有得到解决方案。应用程序请求REST/JSON并尝试与另一个应用程序通信。有一个轮询器可以异步调用每个请求。当一个do a请求总是分派错误“没有可用的输出通道或replyChannel头”,它被重定向到errorChannel句柄。 日志打印: 后发送(发送=true)在通道'错误通道',消息:错误消息[有效载荷=org.springframework.messag

    • 我需要在我的Spring集成上下文中动态地将消息分配给MessageChannel。当我知道我想要的MessageChannel的名称时,我可以通过从上下文中获取MessageChannel bean来做到这一点。 我需要做的是通过编程查找在ChannelAdapter/服务中设置的消息通道的名称/id。 但是,MessageChannel API没有与之关联的getName()或getId()方

    • 我正在处理Spring集成和文件。我正在使用提供的DSL来处理我的文件。在我的集成流程结束时,我使用文件将结果输出到一个新文件。outboundGateway(…) 。但是,我不断收到以下错误消息:没有可用的输出通道或replyChannel标头。根据这篇文章底部的帖子,解决方案是将预期回复设置为false,但是,我如何使用DSL做到这一点? 下面显示了我在集成流程的最后一部分中写入文件的操作。

    • 这是使用注释代码示例的Spring集成执行器通道的后续问题。 我试图通过在“公共频道”中发布一条消息并阅读味精中设置的REPLY_CHANNEL来测试用红色突出显示的框。 “公共通道”是发布-订阅通道。REPLY_通道是一个队列通道。 由于这是一个JUnit测试,我已经模拟了jdbcTemboard、数据源和Impl以忽略任何DB调用。 我的问题是:当我在“公共频道”上发布消息时,我在REPLY\

    • 我正在通过绑定到不同Webshpere MQ的入站和出站原子和JMS使用带有JTA支持的Spring集成。流程如下: JMS入站通道适配器收到消息 一些转变 输出队列的JMS出站通道适配器 发生错误时,收到消息 异常类型路由器将未处理的错误路由到自定义重新抛出服务,并将处理的错误路由到接收者列表路由器,该路由器将它们发送到2个错误队列 我的问题是,即使消息到达errorChannel下游(在已处理