当前位置: 首页 > 知识库问答 >
问题:

如何在文件到队列的骆驼路由上配置重新传递策略?

颜永怡
2023-03-14
public class DemoHelperRouteBuilder extends RouteBuilder {

@Override
public void configure() throws Exception {


    onException(JmsException.class, ConnectException.class)
    .routeId("ConnectionExceptionRoute")
    .handled(true)
    .log(LoggingLevel.ERROR, "Connection Error")
    .maximumRedeliveries(5)
    .redeliveryDelay(1000)
    .backOffMultiplier(2)
    .useExponentialBackOff()
    .maximumRedeliveryDelay(60000)
    .log(LoggingLevel.DEBUG, "Rolling back!")
    .rollback();



    errorHandler(defaultErrorHandler()
            .maximumRedeliveries(20).redeliveryDelay(7000).retryAttemptedLogLevel(LoggingLevel.INFO));

    from("file:{{directory.property}}?delete=true&readLock=markerFile&delay=5000")
    .log("Passing File")
    .transacted("PROPAGATION_REQUIRED")
    .setHeader("FILE_TYPE", constant("MYTYPE"))
    .setHeader("MSG_ID", constant("55"))
    .to("activemq:{{MY.QUEUE}}"); 

}

}

我的骆驼上下文文件如下所示:

<!-- This is the default behavior. -->
<bean id="PROPAGATION_REQUIRED" class="org.apache.camel.spring.spi.SpringTransactionPolicy">
    <property name="transactionManager" ref="jmsTransactionManager"/>
</bean>

<bean id="PROPAGATION_REQUIRES_NEW" class="org.apache.camel.spring.spi.SpringTransactionPolicy">
    <property name="transactionManager" ref="jmsTransactionManager"/>
    <property name="propagationBehaviorName" value="PROPAGATION_REQUIRES_NEW"/>
</bean>


<bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent">
    <property name="connectionFactory" ref="jmsConnectionFactory"></property>
    <property name="transacted" value="true"/>
    <property name="transactionManager" ref="jmsTransactionManager"/>
</bean>

<bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
    <property name="brokerURL" value="${jms.broker.url}"/>
    <property name="redeliveryPolicy">
        <bean class="org.apache.activemq.RedeliveryPolicy">
            <property name="maximumRedeliveries" value="-1" />
            <property name="redeliveryDelay" value="10000" />
        </bean>
    </property>
</bean>

我在应用程序中的各种不同endpoint之间有几条其他路由,它们需要重新传递配置,并且按照预期的方式运行。但是,这个特定的路由类型(从文件到队列)似乎并不遵循我配置的任何重新传递策略。当我关闭ActiveMQ代理时,路由会尝试每6秒重新传递一次文件。:

10:54:17,088 WARN c_demo%5GenericFileOnCompletion 105-org.apache.camel.camel-core-2.10.0.redhat-60024回滚文件策略:org.apache.camel.component.file.strategy.genericfileDeleteProcessStrategy@789d315e for file:genericfile[]

我查看了事务性客户端文档和其他站点,但一直未能找到解决方案。如果我错过了一些告诉我如何解决这个问题的东西,我道歉。任何可以帮助解决此问题的指针或文档都将非常感谢。谢谢!

共有1个答案

微生耘豪
2023-03-14

我可以通过稍微更改路由来配置路由:

public class DemoHelperRouteBuilder extends SpringRouteBuilder{

@Override
public void configure() throws Exception {


onException(JmsException.class, ConnectException.class)
.routeId("ConnectionExceptionRoute")
.handled(true)
.log(LoggingLevel.ERROR, "Connection Error")
.maximumRedeliveries(25)
.redeliveryDelay(1000)
.backOffMultiplier(2)
.useExponentialBackOff()
.maximumRedeliveryDelay(60000)
.log(LoggingLevel.DEBUG, "Rolling back!")
.rollback();



 errorHandler(transactionErrorHandler()
    .maximumRedeliveries(5).redeliveryDelay(60000).retryAttemptedLogLevel(LoggingLevel.INFO)); 

from("file:{{directory.property}}?delete=true&readLock=markerFile&delay=5000")
.log("Passing File")
.transacted("PROPAGATION_REQUIRED")
.setHeader("FILE_TYPE", constant("MYTYPE"))
.setHeader("MSG_ID", constant("55"))
.to("activemq:{{MY.QUEUE}}"); 

}

}

<bean id="JMS_TRANSACTED" class="org.apache.camel.spring.spi.SpringTransactionPolicy">
    <property name="transactionManager" ref="jmsTransactionManager"/>
</bean>
<bean id="PROPAGATION_REQUIRED" class="org.apache.camel.spring.spi.SpringTransactionPolicy"> 
            <property name="transactionManager"> 
                   <osgi:reference interface="org.springframework.transaction.PlatformTransactionManager" /> 
            </property> 
            <property name="propagationBehaviorName" value="PROPAGATION_REQUIRED" /> 
</bean> 

<bean id="PROPAGATION_REQUIRES_NEW" class="org.apache.camel.spring.spi.SpringTransactionPolicy">
    <property name="transactionManager" ref="jmsTransactionManager"/>
    <property name="propagationBehaviorName" value="PROPAGATION_REQUIRES_NEW"/>
</bean>

<bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="${jms.broker.url}"/>
<property name="redeliveryPolicy">
    <bean class="org.apache.activemq.RedeliveryPolicy">
        <property name="maximumRedeliveries" value="-1" />
        <property name="redeliveryDelay" value="10000" />
    </bean>
</property>
</bean>
    null
 类似资料:
  • 我正在尝试使用多部分/表单数据将文件上传到骆驼路由。一切都很好,但是,我无法获得原始文件名。骆驼版本是:3.14.1 更新 使用对路由的以下修改进行更新。我设法处理二进制文件(获取文件名并存储它们)。但是,对于文本文件,该文件将附加边界页脚: 路线定义: 先谢谢你了 爱德华

  • null 我希望每隔5分钟轮询这个队列(使用Camel-Quartz),并将这些consumes消息主体中找到的字符串(fizz、buzz等)合并到JDBC语句中,该语句查找与字符串(例如fizz或buzz)匹配的任何

  • 我正在尝试实现一个非常简单的骆驼路由,即从CXFendpoint接收请求,并将其放置在队列中,以便稍后进行异步处理。一旦消息被放置在队列中,我需要能够向调用者发送一个响应,指示消息已被接收。我已经完成了教程,但似乎无法正确完成。所发生的情况是,一旦消息被放置在队列中,在消息从队列处理到其预期目的地之前,不会向调用者发送响应。 下面的代码

  • 我有一个Camel/SpringBoot应用程序,它从GraphQLendpoint检索数据,将数据存储在内存数据库(2个表)中,通过运行SQL查询提取CSV文件,然后将文件上传到FTP服务器。由于将提取约350k条记录,我使用SQLs outputType=StreamList、splitter和stream:file。整个路线如下所示: 提取数据时不会出现任何问题,并使用记录创建CSV文件。但

  • 我拥有一个spring应用程序,希望在应用程序启动期间动态添加骆驼路由。endpoint在属性文件中配置,并在运行时加载。使用Java DSL,我使用for循环创建所有路由, 无法创建路由file_routeDirect:在:at:>>>onException[[class org.apache.camel.component.file.GenericFileOperationFailedExce

  • 我有一条小路线,我想使用自定义的重新传递策略来重复向endpoint发送消息,但这种行为非常奇怪。看起来,重新交付政策只是在重复一个错误。我试图将所有交换发送到路由的开头,但策略不起作用,因为每次都在创建: 我做错了什么?当错误发生时,我想以间隔重复我的请求。我的骆驼版本是2.6 日志: