我有一个骆驼路由,它从ActiveMQ JMS队列中读取,进行一些处理,并将结果传递到远程目标。如果此过程的通信部分失败,我想无限期地重试(直到目标“启动”)。我可以通过使路由事务处理
并设置重新交付策略
来处理这个问题。重新交付在骆驼(通过camel.processor.重新尝试路由的失败部分)和ActiveMQConntionWorks(通过
org.apache.activemq.重新尝试整个路由)中设置。
我还要求我们应该能够从 JMS 队列中删除一个条目(我通过通过 JMX 与 ActiveMQ 通信的应用程序执行此操作),并且处理应该移动到下一条消息。
),但不能同时允许这两者。问题是,我可以允许消息删除(通过将使用者设置为< code > CACHE level name = CACHE _ NONE
),也可以让连接处理重试(通过将使用者设置为< code > CACHE level name = CACHE _ CONSUMER
这是我目前的设置:
xml prettyprint-override"><bean id="jmsTransactionManager" class="org.springframework.jms.connection.JmsTransactionManager">
<property name="connectionFactory" ref="jmsConnectionFactory"/>
</bean>
<bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="${jms.brokerUrl}" />
<property name="redeliveryPolicy" ref="amqRedeliveryPolicy" />
<property name="prefetchPolicy">
<bean class="org.apache.activemq.ActiveMQPrefetchPolicy">
<property name="all" value="0"/>
</bean>
</property>
</bean>
<bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration">
<property name="connectionFactory" ref="jmsConnectionFactory"/>
<property name="transactionManager" ref="jmsTransactionManager"/>
<property name="transacted" value="true"/>
<property name="concurrentConsumers" value="1"/>
</bean>
<bean id="jms" class="org.apache.camel.component.jms.JmsComponent" >
<property name="configuration" ref="jmsConfig" />
</bean>
<!-- Redelivery Policy for ActiveMQ (the Broker), so it will perform retries of the entire route -->
<bean id="amqRedeliveryPolicy" class="org.apache.activemq.RedeliveryPolicy">
<property name="initialRedeliveryDelay" value="5000" />
<property name="redeliveryDelay" value="5000" />
<property name="maximumRedeliveries" value="-1" />
<property name="queue" value=">" />
</bean>
<!-- Redelivery Policy for Camel Internal, so it will retry from the part of the exchange that's failed in case of comms issues -->
<bean id="redeliveryProfile" class="org.apache.camel.processor.RedeliveryPolicy">
<property name="maximumRedeliveries" value="2"/>
<property name="redeliveryDelay" value="500"/>
</bean>
<bean id="PROPAGATION_REQUIRED" class="org.apache.camel.spring.spi.SpringTransactionPolicy">
<constructor-arg>
<bean class="org.springframework.transaction.support.TransactionTemplate">
<property name="transactionManager" ref="jmsTransactionManager"/>
</bean>
</constructor-arg>
</bean>
<camel:camelContext id="testContext">
<camel:routeBuilder ref="transactedRoute" />
</camel:camelContext>
使用非常简单的路线:
@Component
public class TransactedRoute extends SpringRouteBuilder {
@Override
public void configure() {
onException(Exception.class) // we would handle comms exceptions here
.redeliveryPolicyRef("redeliveryProfile")
.rollback();
from("jms:queue:myqueue?cacheLevelName=CACHE_CONSUMER")
.transacted("PROPAGATION_REQUIRED")
.bean(Bean1.class)
.to(DESTINATION); // this could throw a comms exception
}
}
通过此设置,路由从jms:queue:myqueue
读取,并且在通信异常的情况下,Camel在内部重试路由从到
部分两次(redeliveryProfile
)。然后在 5 秒延迟后,消息将再次通过整个路由发送 (amqRedeliveryPolicy
)。这一直持续到我停止测试。
但是,如果我从ActiveMQ中删除消息,那么它将继续由路由处理,尽管它不再在队列中。
如果我将消费者更改为:
from("jms:queue:myqueue?cacheLevelName=CACHE_NONE")
我现在可以从ActiveMQ中删除消息,并且路由停止处理它……但amqReddeliveryPolicy
被忽略,消息将立即重试(无5秒延迟),并在6次尝试后(AMQ的默认值)将其放入死信队列。
那么,有没有办法通过修改我的配置来实现两者兼得呢?
还是我完全错过了这一点?
如果您需要人工干预来通过JMX删除队列中的特定消息,这听起来有点像一个糟糕的设计。
也许您可以让 AMQ 在 X 次失败尝试后将消息移动到 DLQ - 永远重新传递也是一个糟糕的设计。但是,如果 6 次尝试很少,您可以将默认值增加到更高的值。
然后,您可以从 DLQ 队列中检查消息,并尝试了解消息失败的原因,并且您始终可以通过 JMX 或其他工具安全地删除或清除 DLQ 队列。
AMQ 允许每个队列有一个 DLQ,您可以在其中将其配置为使用 DLQ 前缀等,而不是一个常见的 DLQ 队列。
我读到:http://www.javaworld.com/article/2074123/java-web-development/transaction-and-redelivery-in-jms.html?page=2 "通常,确认特定消息会确认会话接收的所有先前消息"(在客户端确认模式下) “邮件重新传递不是自动的,但在某些情况下会重新传递邮件” 我的问题是: 如何确保每次收到消息时都有一个
我正在读一条来自Solace的信息。我能够成功地阅读信息。假设我正在阅读一条消息,在侦听器线程上读取/处理消息时,应用程序崩溃。那我怎么能在那上面再读一遍那条信息呢。使用下面的代码,我无法再次阅读该消息。下面是我的配置
如何延迟JMS消息发送或在不确定的时间内继续? 我使用的是Weblogic,正如您所知,在JMS发送之后,接收方将异步处理消息,但是,此时或有时外部资源还没有为接收方做好准备,因此,我想使用一些检查逻辑来延迟发送或处理消息。我猜例如:我将消息放入挂起队列,然后频繁检查资源可用性,一旦发送或继续消息? 大家都知道Weblogic是否支持这一点,或者如何实现它吗?
SQS消息由lambda成功处理。下面是处理并执行回调的代码: 我看到cloudwatch打印“OK DONE”消息。然而,SQS将消息“飞行”(并且它将永远飞行)。我的理解是,一旦成功发送响应,该消息将自动删除。我的能见度计时器=10分钟
我们有一个camel路由,在这里我们从输入队列读取消息,处理它,设置一些JMS头(使用exchange.getin().setheader(...)),然后将消息路由到某个输出队列。在MQ故障转移方案期间,将重新传递消息。但是,当重新传递消息时,我前面放置的JMS头丢失了。是否有任何方法可以在重新交付后保留JMS头?
高层体系结构 JMS(生产者/消费者)<---->Artemis(STOMP)<---->Websocket-Broker-Relay-Service<---->STOMP-over-Websocket-client(生产者/消费者)