当前位置: 首页 > 知识库问答 >
问题:

Spring集成AMQP-尽管有重试建议,仍会持续重试

钱和安
2023-03-14

我以前非常成功地使用了JMS的Spring集成,但是我们现在使用RabbitMQ/AMQP,并且在错误处理方面有一些问题。

我有一个int-amqp: inbinding-Channel适配器,带有一个设置为接收任何异常的错误通道,这里有一个错误转换器类检查失败的消息的原因异常。然后根据异常的类型:-

>

  • 抑制异常并转换为JSON对象,该对象可以作为解释故障的业务回复转到AMQP出站通道适配器。在这里,我希望原始邮件已被消费/确认。

    或者重新抛出导致异常的异常,以让RabbitMQ重新传递消息一定次数。

    我发现重新投掷导致消息被不断重新传递,然后我读了关于StatefulRetryoperationsInterceptorFactoryBean的文章,所以添加了一个建议链重试3次,然后我得到了关于没有消息id的异常,所以还在建议链的开始添加了一个“失踪消息身份建议”。

    尽管有这些建议,我仍然会不断地重新尝试从errorChannel的ErrorTransformer重新抛出的RuntimeException。我只是使用默认值通过RabbitMQ管理员发布消息。不确定缺少消息id是否会导致此操作不起作用,如果是,如何使消息具有id?我对以下两者之间的区别感到困惑:-

    a)条件反射错误处理程序(我已经设置为入站适配器的错误处理程序),它允许我提供一个自定义的FatalExceptionStrategy来实现isFatt()。我相信致命=真实(意味着DISCARD),消息被消耗和丢弃,但是我如何仍然发送出站失败消息?

    b)和我在入站适配器上的errorChannel,我正在使用该适配器检查异常并将其转换为出站失败响应消息。在这里,我想我可以抛出AmqpRejectAndDontDemeueExc0019,但为什么有条件反射错误处理程序呢?并将托运AmqpRejectAndDont请求异常工作

         <int-amqp:inbound-channel-adapter id="amqpInRequestPatternValuation"  channel="requestAmqpIn" channel-transacted="true"  transaction-manager="transactionManager"
             queue-names="requestQueue" error-channel="patternValuationErrorChannel" connection-factory="connectionFactory"
             receive-timeout="59000" concurrent-consumers="1"
             advice-chain="retryChain" error-handler="customErrorHandler" />
    
     <bean id="customErrorHandler" class="org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler">
             <constructor-arg ref="customFatalExceptionStrategy"/>
      </bean>                             
    
    
    <bean id="customFatalExceptionStrategy" class="abc.common.CustomFatalExceptionStrategy"/>
    
    
    
      <!-- ADVICE CHAIN FOR CONTROLLING NUMBER OF RE-TRIES before sending to DLQ (or discarding if no DLQ) without this any re-queued fatal message will retry forever  -->
    
      <util:list id="retryChain">
          <bean class="org.springframework.amqp.rabbit.retry.MissingMessageIdAdvice">
              <constructor-arg>
                  <bean class="org.springframework.retry.policy.MapRetryContextCache" />
              </constructor-arg>
          </bean>
          <ref bean="retryInterceptor" />
      </util:list>
    
    <bean id="retryInterceptor"
        class="org.springframework.amqp.rabbit.config.StatefulRetryOperationsInterceptorFactoryBean">
        <property name="retryOperations" ref="retryTemplate" />
        <property name="messageRecoverer" ref="messageRecoverer"/>
    </bean>
    
     <bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate">
       <property name="retryPolicy" ref="simpleRetryPolicy" />
       <property name="backOffPolicy">
              <bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy">
                     <property name="initialInterval" value="10000" />
              </bean>
       </property>
    </bean>
    
      <bean id="simpleRetryPolicy" class="org.springframework.retry.policy.SimpleRetryPolicy">
        <property name="maxAttempts" value="3" />
    </bean>
    
  • 共有2个答案

    金毅
    2023-03-14

    我从来没有抽出时间发布解决方案,所以它就在这里。

    一旦我配置了重试建议链到AMQP入站通道适配器,该适配器必须包括RejectAndDontDemeueRecoverer的MessageRecoverer(我认为这也是错误的)。我忽略的重要一点是确保当发送者发送消息时,他们包括message_id。因此,如果通过RabbitMQ管理控制台发布,我需要包含预定义的message_id属性并提供一个值。

    使用“失踪消息”没有帮助(所以我删除了),因为它会在每次重新传递消息时生成不同的message_id,导致重试计数不会增加,因为每次传递都被认为与上次不同

    颛孙和悌
    2023-03-14

    您必须使用拒绝和dontrequeuerecoverer在重试结束时停止重新传递:

     * MessageRecover that causes the listener container to reject
     * the message without requeuing. This enables failed messages
     * to be sent to a Dead Letter Exchange/Queue, if the broker is
     * so configured.
    

    是的,MessageId对于重试用例很重要。

    您可以注入自定义的MessageKeyGenerator策略,以确定消息中的唯一密钥,如果您不能在发送过程中手动提供它。

     类似资料:
    • translated_page: https://github.com/PX4/Devguide/blob/master/en/test_and_ci/continous_integration.md translated_sha: 95b39d747851dd01c1fe5d36b24e59ec865e323e PX4 Continuous Integration PX4 builds and

    • translated_page: https://github.com/PX4/Devguide/blob/master/en/test_and_ci/jenkins_ci.md translated_sha: 95b39d747851dd01c1fe5d36b24e59ec865e323e Jenkins CI Jenkins continuous integration server on S

    • translated_page: https://github.com/PX4/Devguide/blob/master/en/test_and_ci/README.md translated_sha: 95b39d747851dd01c1fe5d36b24e59ec865e323e 测试与持续集成 PX4提供大量的测试和持续集成。 本页提供概述。 在本地机器上测试 下面这条命令足够打开一个带有运

    • 我想用Spring集成创建一个简单的IntegrationFlow,但我遇到了一些困难。 我想创建一个集成流,从Rabbit Mq中的队列中获取消息并将消息发布到endpointRest。 我要处理的问题是,当一个请求失败时,它会继续无休止地重试,如何在这段代码中实现重试策略?例如,我想要3次重试,第一次重试在1秒后,第二次重试在5秒后,第三次重试在1分钟后。

    • 问题内容: 您的团队如何处理构建? 我们使用Cruise Control,但是(由于缺乏知识)我们遇到了一些问题-SVN中的代码冻结-生成管理 具体来说,当不断检入代码时,如何提供特定版本? 通常,您能否讨论在发行管理中使用的最佳实践? 问题答案: 我很惊讶这不是重复的,但是我找不到另一个。 好的,这是交易。它们是两个独立但相关的问题。 对于构建管理,重要的一点是您应该具有一个可重复的自动构建,该

    • 我们做的还不够好,先占个坑。 欢迎贡献章节。