我正在骆驼中开发一种机制,它将从一个可以是真或假的标志中为消息选择一个endpoint。这是一种节流机制,在我的上游通道被洪水淹没的情况下,它将把消息重新路由到批量摄取endpoint(发送到HDFS)。
最终,我的路线是这样的:
from("queue:myqueue").bean("messageParser")
.dynamicRoute(bean(ThrottleHelper.class, 'buildEndpoint'));
from('direct:regular').to('hbase');
from('direct:throttle').to('hdfs');
我的ThrottleHelper类的buildEndpoint方法如下所示:
public static String buildEndpoint() {
synchronized(shouldThrottle) {
if(shouldThrottle)
return "direct:throttle";
else
return "direct:regular"
}
}
目前,我在类上有一个名为checkStatus()的方法;设置shouldThrottle(静态变量)。checkStatus()每分钟在Camel quartz计时器上运行一次。
我注意到一些奇怪的行为,我想我可能误用了这个模式。从对Camel模式实现的进一步搜索来看,在消息遍历返回的每个endpoint之后,似乎会调用buildEndpoint()。这是真的吗?或者,我可以期待路径在转到“direct:throttle”或“direct:regular”后终止吗?
从我在网上收集的信息来看,我的方法真的应该是这样吗?
public static String buildEndpoint(Message message) {
if(message.getHeader('throttled') != null)
return null;
else
message.setHeader('throttled', true);
synchronized(shouldThrottle) {
if(shouldThrottle)
return "direct:throttle";
else
return "direct:regular"
}
}
谢谢
正如程序员Dan所指出的,动态路由器用于通过多个endpoint路由消息,因此需要显式返回null
来指示路由的结束。
如果您只想使用表达式或bean方法选择单个endpoint,最好使用动态收件人列表(cfr.http://camel.apache.org/recipient-list.html)。如果您在路由生成器中使用。收件人列表()
而不是。动态路由()
,您的第一个BuildEndpoint
方法实现将正常工作。
从官方的留档来看,是的,您的第二个构造更接近正确的用法。基本上,动态路由器可以用于将消息路由到多个终结点,而不仅仅是立即终止的单个终结点。要告诉动态路由器停止将消息路由到另一个终结点,您的bean必须返回null
,就像您在最终代码片段中写的那样,以指示此消息的路由已经完成。
我正在使用带有Apache骆驼的Spring Boot。我正在从控制器调用路由。一旦路由完成,控制就会返回控制器。我正在VerifyLimitProcess和批准限制处理器中生成响应。如果我没有在路由中提供窃听配置,控制器会按预期检索标头和正文。但如果我在路由中引入窃听,控制器会将标头和正文接收为null。如果有人指出我需要做什么,以便我可以在选择语句中引入两个处理器的窃听配置,即VerifyLi
我有一条小路线,我想使用自定义的重新传递策略来重复向endpoint发送消息,但这种行为非常奇怪。看起来,重新交付政策只是在重复一个错误。我试图将所有交换发送到路由的开头,但策略不起作用,因为每次都在创建: 我做错了什么?当错误发生时,我想以间隔重复我的请求。我的骆驼版本是2.6 日志:
我正在遵循位于Camel MyBatis Integration guide的安装指南。我使用的是Service Mix 5.0.1。我使用了安装spring mybatis的功能,它支持3.2.4。释放我的SqlMapConfig文件只包含有关TypeHandler和TypeAlias的信息。 当我开启服务混合,然后启动我的应用程序,我收到以下堆栈跟踪: ...还有50个 我的Bean定义如下:
我有一个Quarkus应用程序,它使用Apache-Camel并在本地运行良好。当我构建它并尝试运行docker容器时,我收到以下错误: 我的分级依赖关系是 } 当我在IntelliJ终端中使用“夸克斯开发”运行它时,我没有问题。我是否尝试运行容器,但会出现错误。为什么会发生这种情况?我不知道如何解决它。 编辑: 处理HttpException的位置: 处理程序本身: }
我试图弄清楚骆驼的节流概念。我已经看到了骆驼的航线政策,但这适用于许多飞行中的交换。 我的路线如下: 现在我的用例是,我想在这些路由之间传输比如说2000条消息,我知道可以通过来完成。但是,我不得不决定如何在下一个2000条消息被路由时控制它。我只想在接收者队列变为空时路由下2000条消息。 例如,消息从队列路由到。假设2K消息已成功路由,现在我想挂起我的路由,这样它就不会传输更多的消息,直到队列
我有一个Spring Boot2.25.1应用程序,它使用Camel 2.25.1与camel-kafka,一切都正常工作…在我的Kafka消费者中,我需要添加该功能以按需暂停消费,因此我升级到camel 3.18.1,以便我可以使用可暂停功能。升级到3.18.1后,我收到错误FileNotes与类文件TimeoutAwareAggregationStategy.class. 当我打开camel-