当前位置: 首页 > 知识库问答 >
问题:

如何处理JMS重新传送到我的Camel Road,但仍然允许消息删除

公冶渝
2023-03-14

我有一个骆驼路由,它从ActiveMQ JMS队列中读取,进行一些处理,并将结果传递到远程目标。如果此过程的通信部分失败,我想无限期地重试(直到目标“启动”)。我可以通过使路由事务处理并设置重新交付策略来处理这个问题。重新交付在骆驼(通过camel.processor.重新尝试路由的失败部分)和ActiveMQConntionWorks(通过org.apache.activemq.重新尝试整个路由)中设置。

我还要求我们应该能够从 JMS 队列中删除一个条目(我通过通过 JMX 与 ActiveMQ 通信的应用程序执行此操作),并且处理应该移动到下一条消息。

问题是,我可以允许消息删除(通过将使用者设置为< code > CACHE level name = CACHE _ NONE ),也可以让连接处理重试(通过将使用者设置为< code > CACHE level name = CACHE _ CONSUMER ),但不能同时允许这两者。

这是我目前的设置:

xml prettyprint-override"><bean id="jmsTransactionManager" class="org.springframework.jms.connection.JmsTransactionManager">
  <property name="connectionFactory" ref="jmsConnectionFactory"/>
</bean>

<bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
  <property name="brokerURL" value="${jms.brokerUrl}" />
  <property name="redeliveryPolicy" ref="amqRedeliveryPolicy" />
  <property name="prefetchPolicy">
    <bean class="org.apache.activemq.ActiveMQPrefetchPolicy">
      <property name="all" value="0"/>
    </bean>
  </property>
</bean> 

<bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration">
  <property name="connectionFactory" ref="jmsConnectionFactory"/>
  <property name="transactionManager" ref="jmsTransactionManager"/>
  <property name="transacted" value="true"/>
  <property name="concurrentConsumers" value="1"/>
</bean>

<bean id="jms" class="org.apache.camel.component.jms.JmsComponent" >
  <property name="configuration" ref="jmsConfig" />
</bean>

<!-- Redelivery Policy for ActiveMQ (the Broker), so it will perform retries of the entire route -->
<bean id="amqRedeliveryPolicy" class="org.apache.activemq.RedeliveryPolicy">
  <property name="initialRedeliveryDelay" value="5000" />
  <property name="redeliveryDelay" value="5000" />
  <property name="maximumRedeliveries" value="-1" />
  <property name="queue" value=">" />
</bean>

<!-- Redelivery Policy for Camel Internal, so it will retry from the part of the exchange that's failed in case of comms issues -->
<bean id="redeliveryProfile" class="org.apache.camel.processor.RedeliveryPolicy">
  <property name="maximumRedeliveries" value="2"/>
  <property name="redeliveryDelay" value="500"/>
</bean>

<bean id="PROPAGATION_REQUIRED" class="org.apache.camel.spring.spi.SpringTransactionPolicy">
  <constructor-arg>
    <bean class="org.springframework.transaction.support.TransactionTemplate">
      <property name="transactionManager" ref="jmsTransactionManager"/>
    </bean>
  </constructor-arg>
</bean>

<camel:camelContext id="testContext">
  <camel:routeBuilder ref="transactedRoute" />
</camel:camelContext>

使用非常简单的路线:

@Component
public class TransactedRoute extends SpringRouteBuilder {
  @Override
  public void configure() {
    onException(Exception.class) // we would handle comms exceptions here
      .redeliveryPolicyRef("redeliveryProfile")
      .rollback();

    from("jms:queue:myqueue?cacheLevelName=CACHE_CONSUMER")
      .transacted("PROPAGATION_REQUIRED")
      .bean(Bean1.class)
      .to(DESTINATION); // this could throw a comms exception
  }
}

通过此设置,路由从jms:queue:myqueue读取,并且在通信异常的情况下,Camel在内部重试路由从部分两次(redeliveryProfile)。然后在 5 秒延迟后,消息将再次通过整个路由发送 (amqRedeliveryPolicy)。这一直持续到我停止测试。

但是,如果我从ActiveMQ中删除消息,那么它将继续由路由处理,尽管它不再在队列中。

如果我将消费者更改为:

from("jms:queue:myqueue?cacheLevelName=CACHE_NONE")

我现在可以从ActiveMQ中删除消息,并且路由停止处理它……但amqReddeliveryPolicy被忽略,消息将立即重试(无5秒延迟),并在6次尝试后(AMQ的默认值)将其放入死信队列。

那么,有没有办法通过修改我的配置来实现两者兼得呢?

还是我完全错过了这一点?

共有1个答案

屠和洽
2023-03-14

如果您需要人工干预来通过JMX删除队列中的特定消息,这听起来有点像一个糟糕的设计。

也许您可以让 AMQ 在 X 次失败尝试后将消息移动到 DLQ - 永远重新传递也是一个糟糕的设计。但是,如果 6 次尝试很少,您可以将默认值增加到更高的值。

然后,您可以从 DLQ 队列中检查消息,并尝试了解消息失败的原因,并且您始终可以通过 JMX 或其他工具安全地删除或清除 DLQ 队列。

AMQ 允许每个队列有一个 DLQ,您可以在其中将其配置为使用 DLQ 前缀等,而不是一个常见的 DLQ 队列。

 类似资料:
  • 我读到:http://www.javaworld.com/article/2074123/java-web-development/transaction-and-redelivery-in-jms.html?page=2 "通常,确认特定消息会确认会话接收的所有先前消息"(在客户端确认模式下) “邮件重新传递不是自动的,但在某些情况下会重新传递邮件” 我的问题是: 如何确保每次收到消息时都有一个

  • 我正在读一条来自Solace的信息。我能够成功地阅读信息。假设我正在阅读一条消息,在侦听器线程上读取/处理消息时,应用程序崩溃。那我怎么能在那上面再读一遍那条信息呢。使用下面的代码,我无法再次阅读该消息。下面是我的配置

  • 如何延迟JMS消息发送或在不确定的时间内继续? 我使用的是Weblogic,正如您所知,在JMS发送之后,接收方将异步处理消息,但是,此时或有时外部资源还没有为接收方做好准备,因此,我想使用一些检查逻辑来延迟发送或处理消息。我猜例如:我将消息放入挂起队列,然后频繁检查资源可用性,一旦发送或继续消息? 大家都知道Weblogic是否支持这一点,或者如何实现它吗?

  • SQS消息由lambda成功处理。下面是处理并执行回调的代码: 我看到cloudwatch打印“OK DONE”消息。然而,SQS将消息“飞行”(并且它将永远飞行)。我的理解是,一旦成功发送响应,该消息将自动删除。我的能见度计时器=10分钟

  • 我们有一个camel路由,在这里我们从输入队列读取消息,处理它,设置一些JMS头(使用exchange.getin().setheader(...)),然后将消息路由到某个输出队列。在MQ故障转移方案期间,将重新传递消息。但是,当重新传递消息时,我前面放置的JMS头丢失了。是否有任何方法可以在重新交付后保留JMS头?

  • 高层体系结构 JMS(生产者/消费者)<---->Artemis(STOMP)<---->Websocket-Broker-Relay-Service<---->STOMP-over-Websocket-client(生产者/消费者)