我是RabbitMQ和Spring Integration的新手。
我有一个使用来自通道的JSON消息的用例,将它转换成一个对象。我需要在对象中设置的一个字段是消息Id(delivery.getEnvelope()。getDeliveryTag()),我们从rabbitMQ接收消息,在所有业务逻辑之后,我们需要它来进行ack处理。
如何使用Spring集成来实现?这是我的xml配置。
<bean id="devRabbitmqConnectionFactory" class="com.rabbitmq.client.ConnectionFactory">
<property name="brokerURL" value="#{props[rabbitmq_inputjms_url]}" />
<property name="redeliveryPolicy" ref="redeliveryPolicy" />
</bean>
<bean id="devJMSCachingConnectionFactory"
class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
<property name="targetConnectionFactory" ref="devRabbitmqConnectionFactory" />
<property name="sessionCacheSize" value="10" />
<property name="cacheProducers" value="false" />
</bean>
<int-jms:channel id="devJMSChannel" acknowledge="transacted"
connection-factory="devJMSCachingConnectionFactory" message-driven="false"
queue-name="devJMSChannel">
</int-jms:channel>
<bean id="redeliveryPolicy" class="org.apache.activemq.RedeliveryPolicy">
<property name="initialRedeliveryDelay" value="5000" />
<property name="maximumRedeliveries" value="5" />
</bean>
<int:transformer id="devObjectTransformer" input-channel="devJMSChannel" ref="devService" method="readEventFromRabbitMQ"
output-channel="devPacketChannel">
<int:poller fixed-rate="10" task-executor="devObjectTransformerExecutor" />
</int:transformer>
transformer方法“readEventFromRabbitMQ”从msg.getPayload()获取消息字符串,将其转换为object,并发送到输出通道。但是不确定如何在transformer类中获得消息Id。有人能帮我吗?
public List<DevEventRecord> readEventFromRabbitMQ(Message<EventsDetail> msg){
DevEventRecord[] eventRecords=null;
EventsDetail expEvent = null;
long receivedTime =System.currentTimeMillis();
int packetId = -1;
try{
monitorBean.incrementDeviceExceptionPacketCount();
expEvent = msg.getPayload();
LogUtil.debug("readExceptionEvent :: consumed JMS Q "+expEvent);
eventRecords = dispatchPacket(expEvent);
}
catch(ProcessingException pe){
notifyAck(expEvent.getUniqueId(),,,,);
}
catch(Exception ex){
notifyAck(expEvent.getUniqueId(),,,,);
LogUtil.error("Exception occured while reading object in readEvent , "+ex.toString());
}
return getEventRecordList(eventRecords);
}
<code>deliveryTag</code>在<code>之后显示为消息头
我不明白为什么要混合使用AMQP
和JMS
,但无论如何,这些通道实现不会从收到的消息中填充标头。这是出于他们的责任。
请使用<代码>
我是Spring集成的新手 我的目标是将信息从一个渠道传递到另一个渠道(链式过程) 通道1--- 1. 尝试: 1.当我尝试使用@transformer无法与“erroeChannel”通信时。 问题: 找到答案 } 其他通道代码也是如此
我的任务是编写一个java程序,从一个主题中读取xml,将其转换为JSON并发送到另一个主题。我已经创建了一个将xml转换为json的程序,但我不知道接下来该怎么做,比如如何使用该主题中的xml并将其发送给另一个主题。
如何发送buf然后接收msg 方法 我正在尝试通过从连接出站发送msg并从入站接收msg然后返回消息Mono来实现此方法。但我只能在that(Publisher)方法中接收消息。它似乎无法返回数据Mono 我试过这个。 但它会一直阻塞,直到连接超时 我尝试了另一个代码。我添加了一个handle方法,并将响应放到map中。然后我可以得到单声道。fromSupply(),在映射处有一个while循环中
我曾经能够做到这一点,但我正在努力应对0.3的期货。 下面是我从WebSocket获得的一个sink和stream: 我创建了一个在异步tokio任务之间通信的无界通道: 这是我被卡住的部分。我生成了一个异步任务,它应该连接无界接收器和接收器;我的想法是通过< code>unbounded_sender发送消息: 对于<code>send_all</code>,错误消息显示: 而且 而且 查看文档
我正在为我的Discord服务器创建一个bot。 我的服务器中有一个新的用户通道。当新用户加入时,我想向这个频道发送欢迎消息。但是,我不知道如何才能访问新用户的渠道。