当前位置: 首页 > 知识库问答 >
问题:

RabbitMQ+Spring集成:从队列绑定到主题交换的消息转换

韦知
2023-03-14

生产者可以发送两种不同的消息类型;foobar(每个消息的内容无关,但假设它们都有一个id字段)。每个消息使用的路由密钥分别是msg.foomsg.bar。生产者不依赖默认的Java序列化,而是使用Jackson2JSONMessageConverter

使用者有一个队列,该队列使用msg.#的路由键绑定到同一个交换。一旦使用,使用者所要做的就是在日志文件中打印每个消息的ID。为了检索id字段的值,需要将JSON有效负载转换为某种对象。

这两个组件之间不共享消息类(foobar)。使用者的对象在其消息表示中可能有或多或少的字段。这很好,在这种情况下,任何空字段都可以设置为NULL。

@Bean
public IntegrationFlow inbound(ConnectionFactory connectionFactory, Queue queue) {
    return IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, queue))
        .handle(message -> {
            final byte[] payload = (byte[]) message.getPayload();
            final String payloadStr = new String(payload, Charset.defaultCharset());

            final String routingKey = (String) message.getHeaders().get("amqp_receivedRoutingKey");

            try {
                if (routingKey.equals("msg.foo")) {
                    final Foo foo = new Jackson2JsonObjectMapper().fromJson(payloadStr, Foo.class);

                    System.out.println("FOO id: " + foo.getId());
                } else if (routingKey.equals("msg.bar")) {
                    final Bar bar = new Jackson2JsonObjectMapper().fromJson(payloadStr, Bar.class);

                    System.out.println("BAR id: " + bar.getId());
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        })
        .get();
}

不幸的是,由于重复的if/else子句,它非常古怪,也非常难看。我是否遗漏了任何使代码更易读的Spring集成技巧?

我设法找到了一个类似的问题(spring boot rabbitmq MappingJackson2MessageConverter自定义对象转换),但除了解决方案不起作用之外,它非常特定于rabbitmq。我宁愿是RabbitMQ不可知论者,并在可能的情况下使用标准AMQP类/导入。

共有1个答案

宫弘亮
2023-03-14

您似乎忽略了Jackson2JSONMessageConverter在AMQP消息中填充适当的头这一事实。这个可以依靠这些头在消费者端恢复POJO。

因此,考虑用Jackson2JSONMessageConverter配置AMQP.InboundAdapter(connectionFactory,queue)

有关converter的更多信息请参见文档:https://docs.spring.io/spring-amqp/docs/2.2.0.rc1/reference/html/#json-message-converter

 类似资料:
  • RabbitMQ在下列情况下会循环分发消息吗? RabbitMQ配置: 交换类型-主题 路由密钥-通知# 制片人正在将消息推送到上面的交流中,并遵循以下不同的主题 - notify.log.# , notify.status.#, notify.priceChange.# 有4个消费者在不同的服务器上运行。 > 3个消费者在负载均衡器下执行相同的处理并在同一应用程序的不同实例上运行。(他们想消费生

  • 在队列选项卡的rabbitMQ web界面上,我看到了“概述”面板,我在其中找到了以下内容: 排队消息: 准备好了 未确认 总数 我猜“总数”是多少。但什么是“准备就绪”和“未确认”?“准备好了”——传递给消费者的信息?“未确认”-? 消息费率: 发表 交付 重新交付 承认 这些信息是什么?尤其是“重新交付”和“确认”?这是什么意思?

  • 是否有可能为带有“直接”类型的Rabbitmq交换设置一些“默认”队列? 比如,我有一个exchange A,队列Q1、Q2、Q3和QDef。因此,如果使用路由密钥Q1发布某个消息,它将转到Q1。但如果消息使用路由密钥Q4,则它应该转到QDef。若路由密钥不是现有队列的名称,则消息应转到QDef。 有可能做兔子吗?也许交换不应该是“直接”类型,而应该是其他类型? 换句话说。如果某个消费者为某个路由

  • 我有两个独立实例(p1、p2)的生产者应用程序和两个独立实例(c1、c2)的消费者应用程序。 生产者p1连接到exchange,主题为t1,队列名称为name1。 使用者c1连接到exchange,主题为t1,队列名称为name1。 生产者p2连接到exchange,主题为t2,队列名称为name1。 使用者c2连接到exchange,主题为t2,队列名称为name1。 我在RabbitMQ GU

  • 交换:1个类型为“direct”的交换 队列:1个队列 绑定:队列绑定到Exchange 每当消息被发送到exchange时,它就会被传递到队列,辅助进程就会得到它们的任务。 每件东西都要经久耐用。 null 交换:1个具有“扇出”类型的交换 队列:n个队列,每个使用者一个 绑定:每个队列都需要绑定到Exchange 那么是谁安排了什么?在我看来: 生产者创建交换 使用者创建队列(因为它是自己的队

  • 简单的消息发布器如下所示: ...其配置如下: 问题是,此发布服务器将消息发送到准备好的默认目标,但我需要将消息发送到不同的队列。在我的情况下,我的应用程序中的每个任务都需要一个消息队列。原因是,如果要中止任务,必须从代理中删除消息。如果一个任务有数千条消息,那么使用选择器接收所有消息不是一个好的做法。我需要从消息代理中删除中止任务的所有消息,而不接收客户端。JMS仅支持消息的发送和接收。我必须使