当前位置: 首页 > 知识库问答 >
问题:

Spring cloud stream RabbitMQ动态路由消息

公羊喜
2023-03-14

我已经实现了如下所示的示例Spring Dynamic Destination

在rabbitmq中,它动态地创建一个交换,但没有提供绑定或路由密钥的选项。我的要求是用路由密钥向这个动态创建的exchange发送消息。我需要如何实现这一点来设置路由密钥?

@Component
public class DDProducerBean {

    @Autowired
    private BinderAwareChannelResolver poChannelResolver = null;

    public void publish(DDSocketVO ddSocketVO) throws Exception {
        this.poChannelResolver.resolveDestination(ddSocketVO.getDestination()).send(MessageBuilder.withPayload(new ObjectMapper().
                setVisibility(PropertyAccessor.FIELD, Visibility.ANY).
                writeValueAsString(ddSocketVO)).build());
    }

}

共有1个答案

凌宏大
2023-03-14

以下是建议的解决办法

基本上,使用BinderAwareChannelResolver创建一个带有动态目的地的MessageChannel,然后使用RabbitAdmin API连接到RabbitMQ,并在发送消息之前使用路由密钥将新创建的exchange绑定到另一个队列或exchange。

@Autowired
private BinderAwareChannelResolver poChannelResolver;

public void publish(WebSocketVO webSocketVO) throws Exception {

    MessageChannel channel = this.poChannelResolver.resolveDestination(webSocketVO.getDestination());

    CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
    connectionFactory.setUsername(System.getProperty("spring.cloud.stream.binders.corerabbit.environment.spring.rabbitmq.username"));
    connectionFactory.setPassword(System.getProperty("spring.cloud.stream.binders.corerabbit.environment.spring.rabbitmq.password"));
    connectionFactory.setAddresses(System.getProperty("spring.cloud.stream.binders.corerabbit.environment.spring.rabbitmq.addresses"));
    connectionFactory.setVirtualHost(System.getProperty("spring.cloud.stream.binders.corerabbit.environment.spring.rabbitmq.virtual-host"));

    AmqpAdmin amqpAdmin = new RabbitAdmin(connectionFactory);

    TopicExchange sourceExchange = new TopicExchange(webSocketVO.getDestination(), false, true);
    TopicExchange destExchange = new TopicExchange("amq.topic");

    amqpAdmin.declareBinding(BindingBuilder.bind(destExchange).to(sourceExchange).with(webSocketVO.getRoutingKeyExpression()));




    channel.send(MessageBuilder.withPayload(new ObjectMapper().
            setVisibility(PropertyAccessor.FIELD, Visibility.ANY).
            writeValueAsString(webSocketVO)).build());


    amqpAdmin.deleteExchange(webSocketVO.getDestination());

    connectionFactory.destroy();

}
 类似资料:
  • 我想创建一个公共项目(使用spring cloud stream),根据消息内容动态地将消息路由到不同的(消费者)项目。(rabbitmq作为消息代理) spring cloud stream支持吗?如果没有,有什么建议的方法来实现这一点?thx公司

  • 问题内容: 我目前有一个内置路由的AngularJS应用程序。它可以正常工作,并且一切正常。 我的app.js文件如下所示: 我的应用程序内置了CMS,您可以在其中复制 / pages 目录中的新html文件并添加新的html文件。 即使对于新动态添加的文件,我仍然希望通过路由提供程序。 在理想的情况下,路由模式为: $ routeProvider.when(’/ pagename ‘,{temp

  • 本文向大家介绍详解vue路由篇(动态路由、路由嵌套),包括了详解vue路由篇(动态路由、路由嵌套)的使用技巧和注意事项,需要的朋友参考一下 什么是路由?网络原理中,路由指的是根据上一接口的数据包中的IP地址,查询路由表转发到另一个接口,它决定的是一个端到端的网络路径。 web中,路由的概念也是类似,根据URL来将请求分配到指定的一个'端'。(即根据网址找到能处理这个URL的程序或模块) 使用vue

  • 本文向大家介绍静态路由和动态路由之间的区别,包括了静态路由和动态路由之间的区别的使用技巧和注意事项,需要的朋友参考一下 静态路由 遵循用户定义的路由的静态路由或非自适应路由,并且直到网络管理员更改路由表后,路由表才会更改。静态路由使用简单的路由算法,并且比动态路由提供更高的安全性。 动态路由 顾名思义,动态路由或自适应路由会在网络发生任何更改或网络拓扑发生更改时更改路由表。在网络更改期间,动态路由

  • 问题内容: 对不起,我的英语不好。我为AbstractRoutingDataSource编写了实现: 我创建了用于在数据库之间进行切换的新类: 其中DatabaseType为: 在我的beans.xml中: 现在,当我尝试更改DAO中的数据源时: 首次执行getJdbcTemplate()时,一次调用一次defineCurrentLookupKey(),并且数据源不会切换。 问题答案: Sprin