我在Camel中定义了一个路由,内容如下:GET请求进来,在文件系统中创建一个文件。文件消费者将其拾取,从外部Web服务获取数据,并通过POST将生成的消息发送到其他Web服务。
下面的简化代码:
// Update request goes on queue:
from("restlet:http://localhost:9191/update?restletMethod=post")
.routeId("Update via POST")
[...some magic that defines a directory and file name based on request headers...]
.to("file://cameldest/queue?allowNullBody=true&fileExist=Ignore")
// Update gets processed
from("file://cameldest/queue?delay=500&recursive=true&maxDepth=2&sortBy=file:parent;file:modified&preMove=inprogress&delete=true")
.routeId("Update main route")
.streamCaching() //otherwise stuff can't be sent to multiple endpoints
[...enrich message from some web service using http4 component...]
.multicast()
.stopOnException()
.to("direct:sendUpdate", "direct:dependencyCheck", "direct:saveXML")
.end();
多播中的三个endpoint只是将生成的消息发布到其他Web服务。
当队列(即文件目录cameldest)相当空时,这一切都可以很好地工作。正在cameldest中创建文件/
但是,一旦传入请求堆积到大约300,000个文件,进度就会变慢,最终由于内存不足错误(超过GC开销限制)而导致管道失败。
通过增加日志记录,我可以看到文件消费者轮询基本上永远不会运行,因为它似乎对每次看到的所有文件负责,等待它们完成处理,然后才开始另一轮轮询。除了(我假设)导致资源瓶颈,这也干扰了我的排序要求:一旦队列被数千条等待处理的消息堵塞,那些天真地被排序较高的新消息——如果它们仍然被拾取的话——仍然在那些已经“开始”的消息后面等待。
现在,我已经尝试了maxMessagesPerPoll和WangerMaxMessagesperpoll选项。起初,它们似乎缓解了问题,但经过几轮民意调查后,我最终仍有数千份文件处于“开始”状态。
唯一起作用的是制造延迟和最大消息量的瓶颈 如此狭窄,以至于处理平均完成速度比文件轮询周期快。
显然,这不是我想要的。我希望我的管道能够尽可能快地处理文件,但不能更快。我希望文件使用者在路由繁忙时等待。
我是不是犯了一个明显的错误?
(如果这是问题的一部分,我正在使用XFS的Redhat 7机器上运行稍旧的Camel 2.14.0。)
除非您确实需要将数据保存为文件,否则我会提出另一种解决方案。
从您的restlet消费者那里,将每个请求发送到消息队列应用程序,例如actiemq或Rabbitmq或类似的东西。您很快就会在该队列中收到大量消息,但没关系。
然后用队列消费者替换您的文件消费者。这需要一些时间,但每条消息都应该单独处理并发送到您想要的任何地方。我已经用大约500,000条消息测试了Rabbitmq,效果很好。这也应该减少消费者的负载。
简短的答案是没有答案:Camel文件组件的sortBy
选项内存效率太低,无法容纳我的用例:
问题似乎是,如果我正确阅读了源代码和文档,那么所有文件详细信息都会存储在内存中以执行排序,无论是使用内置语言还是自定义的可插入排序器。文件组件总是创建一个包含所有详细信息的对象列表,这显然会在经常轮询许多文件时导致大量垃圾收集开销。
我的用例基本上可以正常工作,无需使用数据库或编写自定义组件,使用以下步骤:
想象一下,在机场办理登机手续时,如果商务舱通道空无一人,游客会被邀请进入该通道。
这在负载下就像一个魅力,即使数十万个文件在“低”队列中,直接放入“高”的新消息(文件)也会在几秒钟内得到处理。
此解决方案没有涵盖的唯一要求是有序性:不能保证先拾取旧文件,而是随机拾取旧文件。我们可以想象这样一种情况:源源不断的传入文件可能会导致某个特定的文件X总是不走运,永远不会被拾取。然而,发生这种情况的可能性很低。
可能的改进:当前允许/暂停将文件从“低”升级到“高”的阈值设置为“高”中的0条即时消息。一方面,这保证了在执行另一次从“低”升级之前,将处理掉到“高”的文件,另一方面,这会导致一种停止-启动模式,尤其是在多线程场景中。虽然这不是一个真正的问题,但它的表现令人印象深刻。
资料来源:
我的路线定义:
ThrottlingInflightRoutePolicy trp = new ThrottlingInflightRoutePolicy();
trp.setMaxInflightExchanges(50);
SuspendOtherRoutePolicy sorp = new SuspendOtherRoutePolicy("lowPriority");
from("file://cameldest/queue/low?delay=500&maxMessagesPerPoll=25&preMove=inprogress&delete=true")
.routeId("lowPriority")
.log("Copying over to high priority: ${in.headers."+Exchange.FILE_PATH+"}")
.to("file://cameldest/queue/high");
from("file://cameldest/queue/high?delay=500&maxMessagesPerPoll=25&preMove=inprogress&delete=true")
.routeId("highPriority")
.routePolicy(trp)
.routePolicy(sorp)
.threads(20)
.log("Before: ${in.headers."+Exchange.FILE_PATH+"}")
.delay(2000) // This is where business logic would happen
.log("After: ${in.headers."+Exchange.FILE_PATH+"}")
.stop();
我的Sus从业者RoutePolicy
,像ThrottlingInflight RoutePolicy
一样松散构建
public class SuspendOtherRoutePolicy extends RoutePolicySupport implements CamelContextAware {
private CamelContext camelContext;
private final Lock lock = new ReentrantLock();
private String otherRouteId;
public SuspendOtherRoutePolicy(String otherRouteId) {
super();
this.otherRouteId = otherRouteId;
}
@Override
public CamelContext getCamelContext() {
return camelContext;
}
@Override
public void onStart(Route route) {
super.onStart(route);
if (camelContext.getRoute(otherRouteId) == null) {
throw new IllegalArgumentException("There is no route with the id '" + otherRouteId + "'");
}
}
@Override
public void setCamelContext(CamelContext context) {
camelContext = context;
}
@Override
public void onExchangeDone(Route route, Exchange exchange) {
//log.info("Exchange done on route " + route);
Route otherRoute = camelContext.getRoute(otherRouteId);
//log.info("Other route: " + otherRoute);
throttle(route, otherRoute, exchange);
}
protected void throttle(Route route, Route otherRoute, Exchange exchange) {
// this works the best when this logic is executed when the exchange is done
Consumer consumer = otherRoute.getConsumer();
int size = getSize(route, exchange);
boolean stop = size > 0;
if (stop) {
try {
lock.lock();
stopConsumer(size, consumer);
} catch (Exception e) {
handleException(e);
} finally {
lock.unlock();
}
}
// reload size in case a race condition with too many at once being invoked
// so we need to ensure that we read the most current size and start the consumer if we are already to low
size = getSize(route, exchange);
boolean start = size == 0;
if (start) {
try {
lock.lock();
startConsumer(size, consumer);
} catch (Exception e) {
handleException(e);
} finally {
lock.unlock();
}
}
}
private int getSize(Route route, Exchange exchange) {
return exchange.getContext().getInflightRepository().size(route.getId());
}
private void startConsumer(int size, Consumer consumer) throws Exception {
boolean started = super.startConsumer(consumer);
if (started) {
log.info("Resuming the other consumer " + consumer);
}
}
private void stopConsumer(int size, Consumer consumer) throws Exception {
boolean stopped = super.stopConsumer(consumer);
if (stopped) {
log.info("Suspending the other consumer " + consumer);
}
}
}
尝试将“发件人文件”endpoint上的maxMessagesPerPoll设置为较低的值,以便每次轮询最多只拾取X个文件,这也限制了您的Camel应用程序中的机上消息总数。
您可以在文件组件的Camel文档中找到有关该选项的更多信息
我在为 端口设置 消费者以捕获消息时遇到问题。我的: 申请开始: 并且<code>514</code>端口已打开但未侦听 我可以在tcpdump中看到,tcpdump-I eth1-nn-A-s 0端口514和udp正确发送和接收消息。 有人能告诉我我做错了什么吗?
我已经和ApacheCamel合作了一段时间,做了一些基本的工作,但现在我正在尝试创建一个路由,在该路由中,我可以让多个“消费者”访问同一条路由,或者在路由中添加一个消费者,然后处理消息。 我的想法是拥有一个由事件触发的事件驱动消费者,然后例如从ftp读取文件。我正计划做这样的事情: 所以这个想法是我有一个事件(例如直接或来自消息队列),它具有“fileName”属性,然后使用该属性从ftp下载/
我在JPA上遇到了以下问题,但这可能更像是一个关于骆驼的概念问题。 我需要一个基于cron的石英消费者。但如果触发了,我想选择JPA组件作为第一步。 但是如果我用“to”调用JPA组件,那么它被用作生产者,而不是消费者。我可以以某种方式使用JPA组件来处理这个问题吗,或者我必须遵循服务激活器(基于bean的)逻辑并将JPA组件留在后面? 提前谢谢你,葛格利
我有一个从JMS队列读取项目并将其写入数据库的路径。 我已经阅读了关于ApacheCamelJMS组件的文档,但我没有得到我的问题的确切和明确的答案,即“如果路由中出现异常,JMS消费者是否会重新插入项目或解锁JMS队列中的消息?”。 谢谢 阿里
我对骆驼生产商有很好的了解,但我不能对各种骆驼消费者保持清醒的头脑。特别是事件驱动消费者和轮询消费者,camel如何知道为这些消费者调用回调? 消费者的一般流量是多少?
我一直在尝试为Spring引导Kafka骆驼Avro消费者寻找示例代码,但没有运气。我在以下URL找到了Spring Camel Kafka消费者和生产者示例: https://thysmichels.com/2015/09/04/apache-camel-kafka-spring-integration/ 我的具体问题是,一旦我的bean从Avro模式创建,并且我有了POJO类,我如何将上面的c