当前位置: 首页 > 知识库问答 >
问题:

从一个通道接收RabbitMQ消息,在转换器中设置messageId,并使用Spring Integration将其发送到另一个通道

邹玮
2023-03-14

我是RabbitMQ和Spring Integration的新手。

我有一个使用来自通道的JSON消息的用例,将它转换成一个对象。我需要在对象中设置的一个字段是消息Id(delivery.getEnvelope()。getDeliveryTag()),我们从rabbitMQ接收消息,在所有业务逻辑之后,我们需要它来进行ack处理。

如何使用Spring集成来实现?这是我的xml配置。

       <bean id="devRabbitmqConnectionFactory" class="com.rabbitmq.client.ConnectionFactory">
            <property name="brokerURL" value="#{props[rabbitmq_inputjms_url]}" />
            <property name="redeliveryPolicy" ref="redeliveryPolicy" />
      </bean>   

     <bean id="devJMSCachingConnectionFactory"
            class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
            <property name="targetConnectionFactory" ref="devRabbitmqConnectionFactory" />
            <property name="sessionCacheSize" value="10" />
            <property name="cacheProducers" value="false" />
      </bean>

      <int-jms:channel id="devJMSChannel" acknowledge="transacted"
            connection-factory="devJMSCachingConnectionFactory" message-driven="false"
            queue-name="devJMSChannel">

      </int-jms:channel> 
      <bean id="redeliveryPolicy" class="org.apache.activemq.RedeliveryPolicy">
        <property name="initialRedeliveryDelay" value="5000" />
        <property name="maximumRedeliveries" value="5" />
      </bean>

    <int:transformer id="devObjectTransformer" input-channel="devJMSChannel"  ref="devService" method="readEventFromRabbitMQ"
                output-channel="devPacketChannel">
        <int:poller fixed-rate="10" task-executor="devObjectTransformerExecutor" />
    </int:transformer>

transformer方法“readEventFromRabbitMQ”从msg.getPayload()获取消息字符串,将其转换为object,并发送到输出通道。但是不确定如何在transformer类中获得消息Id。有人能帮我吗?

public List<DevEventRecord> readEventFromRabbitMQ(Message<EventsDetail> msg){
         DevEventRecord[] eventRecords=null;
         EventsDetail expEvent = null;
         long receivedTime =System.currentTimeMillis();
         int packetId = -1;
         try{
             monitorBean.incrementDeviceExceptionPacketCount();
             expEvent = msg.getPayload();
             LogUtil.debug("readExceptionEvent :: consumed JMS Q "+expEvent);
             eventRecords = dispatchPacket(expEvent);
         }
         catch(ProcessingException pe){
             notifyAck(expEvent.getUniqueId(),,,,);
         }
         catch(Exception ex){
             notifyAck(expEvent.getUniqueId(),,,,);
             LogUtil.error("Exception occured while reading object in readEvent , "+ex.toString());
         }
         return getEventRecordList(eventRecords);

    }

共有1个答案

韩禄
2023-03-14

<code>deliveryTag</code>在<code>之后显示为消息头

我不明白为什么要混合使用AMQPJMS,但无论如何,这些通道实现不会从收到的消息中填充标头。这是出于他们的责任。

请使用<代码>

 类似资料:
  • 我是Spring集成的新手 我的目标是将信息从一个渠道传递到另一个渠道(链式过程) 通道1--- 1. 尝试: 1.当我尝试使用@transformer无法与“erroeChannel”通信时。 问题: 找到答案 } 其他通道代码也是如此

  • 我的任务是编写一个java程序,从一个主题中读取xml,将其转换为JSON并发送到另一个主题。我已经创建了一个将xml转换为json的程序,但我不知道接下来该怎么做,比如如何使用该主题中的xml并将其发送给另一个主题。

  • 如何发送buf然后接收msg 方法 我正在尝试通过从连接出站发送msg并从入站接收msg然后返回消息Mono来实现此方法。但我只能在that(Publisher)方法中接收消息。它似乎无法返回数据Mono 我试过这个。 但它会一直阻塞,直到连接超时 我尝试了另一个代码。我添加了一个handle方法,并将响应放到map中。然后我可以得到单声道。fromSupply(),在映射处有一个while循环中

  • 我曾经能够做到这一点,但我正在努力应对0.3的期货。 下面是我从WebSocket获得的一个sink和stream: 我创建了一个在异步tokio任务之间通信的无界通道: 这是我被卡住的部分。我生成了一个异步任务,它应该连接无界接收器和接收器;我的想法是通过< code>unbounded_sender发送消息: 对于<code>send_all</code>,错误消息显示: 而且 而且 查看文档

  • 我正在为我的Discord服务器创建一个bot。 我的服务器中有一个新的用户通道。当新用户加入时,我想向这个频道发送欢迎消息。但是,我不知道如何才能访问新用户的渠道。