当前位置: 首页 > 知识库问答 >
问题:

骆驼:文件消费者组件“咬得太多”,管道因内存不足错误而死亡

潘辰龙
2023-03-14

我在Camel中定义了一个路由,内容如下:GET请求进来,在文件系统中创建一个文件。文件消费者将其拾取,从外部Web服务获取数据,并通过POST将生成的消息发送到其他Web服务。

下面的简化代码:

    // Update request goes on queue:
    from("restlet:http://localhost:9191/update?restletMethod=post")
    .routeId("Update via POST")
    [...some magic that defines a directory and file name based on request headers...]
    .to("file://cameldest/queue?allowNullBody=true&fileExist=Ignore")

    // Update gets processed
    from("file://cameldest/queue?delay=500&recursive=true&maxDepth=2&sortBy=file:parent;file:modified&preMove=inprogress&delete=true")
    .routeId("Update main route")
    .streamCaching() //otherwise stuff can't be sent to multiple endpoints
    [...enrich message from some web service using http4 component...]
    .multicast()
        .stopOnException()
        .to("direct:sendUpdate", "direct:dependencyCheck", "direct:saveXML")
    .end();

多播中的三个endpoint只是将生成的消息发布到其他Web服务。

当队列(即文件目录cameldest)相当空时,这一切都可以很好地工作。正在cameldest中创建文件/

但是,一旦传入请求堆积到大约300,000个文件,进度就会变慢,最终由于内存不足错误(超过GC开销限制)而导致管道失败。

通过增加日志记录,我可以看到文件消费者轮询基本上永远不会运行,因为它似乎对每次看到的所有文件负责,等待它们完成处理,然后才开始另一轮轮询。除了(我假设)导致资源瓶颈,这也干扰了我的排序要求:一旦队列被数千条等待处理的消息堵塞,那些天真地被排序较高的新消息——如果它们仍然被拾取的话——仍然在那些已经“开始”的消息后面等待。

现在,我已经尝试了maxMessagesPerPoll和WangerMaxMessagesperpoll选项。起初,它们似乎缓解了问题,但经过几轮民意调查后,我最终仍有数千份文件处于“开始”状态。

唯一起作用的是制造延迟和最大消息量的瓶颈 如此狭窄,以至于处理平均完成速度比文件轮询周期快。

显然,这不是我想要的。我希望我的管道能够尽可能快地处理文件,但不能更快。我希望文件使用者在路由繁忙时等待。

我是不是犯了一个明显的错误?

(如果这是问题的一部分,我正在使用XFS的Redhat 7机器上运行稍旧的Camel 2.14.0。)

共有3个答案

傅嘉悦
2023-03-14

除非您确实需要将数据保存为文件,否则我会提出另一种解决方案。

从您的restlet消费者那里,将每个请求发送到消息队列应用程序,例如actiemq或Rabbitmq或类似的东西。您很快就会在该队列中收到大量消息,但没关系。

然后用队列消费者替换您的文件消费者。这需要一些时间,但每条消息都应该单独处理并发送到您想要的任何地方。我已经用大约500,000条消息测试了Rabbitmq,效果很好。这也应该减少消费者的负载。

谯振国
2023-03-14

简短的答案是没有答案:Camel文件组件的sortBy选项内存效率太低,无法容纳我的用例:

  • 唯一性:如果文件已经存在,我不想将其放在队列中。
  • 优先级:应首先处理标记为高优先级的文件。
  • 性能:拥有几十万个文件,甚至几百万个文件应该没问题。
  • FIFO:(奖励)应该首先拾取最旧的文件(按优先级)。

问题似乎是,如果我正确阅读了源代码和文档,那么所有文件详细信息都会存储在内存中以执行排序,无论是使用内置语言还是自定义的可插入排序器。文件组件总是创建一个包含所有详细信息的对象列表,这显然会在经常轮询许多文件时导致大量垃圾收集开销。

我的用例基本上可以正常工作,无需使用数据库或编写自定义组件,使用以下步骤:

  • 从父目录cameldest/queue上的一个文件使用者移动到两个使用者,每个目录一个,无需排序

想象一下,在机场办理登机手续时,如果商务舱通道空无一人,游客会被邀请进入该通道。

这在负载下就像一个魅力,即使数十万个文件在“低”队列中,直接放入“高”的新消息(文件)也会在几秒钟内得到处理。

此解决方案没有涵盖的唯一要求是有序性:不能保证先拾取旧文件,而是随机拾取旧文件。我们可以想象这样一种情况:源源不断的传入文件可能会导致某个特定的文件X总是不走运,永远不会被拾取。然而,发生这种情况的可能性很低。

可能的改进:当前允许/暂停将文件从“低”升级到“高”的阈值设置为“高”中的0条即时消息。一方面,这保证了在执行另一次从“低”升级之前,将处理掉到“高”的文件,另一方面,这会导致一种停止-启动模式,尤其是在多线程场景中。虽然这不是一个真正的问题,但它的表现令人印象深刻。

资料来源:

我的路线定义:

    ThrottlingInflightRoutePolicy trp = new ThrottlingInflightRoutePolicy();
    trp.setMaxInflightExchanges(50);

    SuspendOtherRoutePolicy sorp = new SuspendOtherRoutePolicy("lowPriority");

    from("file://cameldest/queue/low?delay=500&maxMessagesPerPoll=25&preMove=inprogress&delete=true")
    .routeId("lowPriority")
    .log("Copying over to high priority: ${in.headers."+Exchange.FILE_PATH+"}")
    .to("file://cameldest/queue/high");

    from("file://cameldest/queue/high?delay=500&maxMessagesPerPoll=25&preMove=inprogress&delete=true")
    .routeId("highPriority")
    .routePolicy(trp)
    .routePolicy(sorp)
    .threads(20)
    .log("Before: ${in.headers."+Exchange.FILE_PATH+"}")
    .delay(2000) // This is where business logic would happen
    .log("After: ${in.headers."+Exchange.FILE_PATH+"}")
    .stop();

我的Sus从业者RoutePolicy,像ThrottlingInflight RoutePolicy一样松散构建

public class SuspendOtherRoutePolicy extends RoutePolicySupport implements CamelContextAware {

    private CamelContext camelContext;
    private final Lock lock = new ReentrantLock();
    private String otherRouteId;

    public SuspendOtherRoutePolicy(String otherRouteId) {
        super();
        this.otherRouteId = otherRouteId;
    }

    @Override
    public CamelContext getCamelContext() {
        return camelContext;
    }

    @Override
    public void onStart(Route route) {
        super.onStart(route);
        if (camelContext.getRoute(otherRouteId) == null) {
            throw new IllegalArgumentException("There is no route with the id '" + otherRouteId + "'");
        }
    }

    @Override
    public void setCamelContext(CamelContext context) {
        camelContext = context;
    }

    @Override
    public void onExchangeDone(Route route, Exchange exchange) {
        //log.info("Exchange done on route " + route);
        Route otherRoute = camelContext.getRoute(otherRouteId);
        //log.info("Other route: " + otherRoute);
        throttle(route, otherRoute, exchange);
    }

    protected void throttle(Route route, Route otherRoute, Exchange exchange) {
        // this works the best when this logic is executed when the exchange is done
        Consumer consumer = otherRoute.getConsumer();

        int size = getSize(route, exchange);
        boolean stop = size > 0;
        if (stop) {
            try {
                lock.lock();
                stopConsumer(size, consumer);
            } catch (Exception e) {
                handleException(e);
            } finally {
                lock.unlock();
            }
        }

        // reload size in case a race condition with too many at once being invoked
        // so we need to ensure that we read the most current size and start the consumer if we are already to low
        size = getSize(route, exchange);
        boolean start = size == 0;
        if (start) {
            try {
                lock.lock();
                startConsumer(size, consumer);
            } catch (Exception e) {
                handleException(e);
            } finally {
                lock.unlock();
            }
        }
    }

    private int getSize(Route route, Exchange exchange) {
        return exchange.getContext().getInflightRepository().size(route.getId());
    }

    private void startConsumer(int size, Consumer consumer) throws Exception {
        boolean started = super.startConsumer(consumer);
        if (started) {
            log.info("Resuming the other consumer " + consumer);
        }
    }

    private void stopConsumer(int size, Consumer consumer) throws Exception {
        boolean stopped = super.stopConsumer(consumer);
        if (stopped) {
            log.info("Suspending the other consumer " + consumer);
        }
    }
}
杭泉
2023-03-14

尝试将“发件人文件”endpoint上的maxMessagesPerPoll设置为较低的值,以便每次轮询最多只拾取X个文件,这也限制了您的Camel应用程序中的机上消息总数。

您可以在文件组件的Camel文档中找到有关该选项的更多信息

 类似资料:
  • 我在为 端口设置 消费者以捕获消息时遇到问题。我的: 申请开始: 并且<code>514</code>端口已打开但未侦听 我可以在tcpdump中看到,tcpdump-I eth1-nn-A-s 0端口514和udp正确发送和接收消息。 有人能告诉我我做错了什么吗?

  • 我已经和ApacheCamel合作了一段时间,做了一些基本的工作,但现在我正在尝试创建一个路由,在该路由中,我可以让多个“消费者”访问同一条路由,或者在路由中添加一个消费者,然后处理消息。 我的想法是拥有一个由事件触发的事件驱动消费者,然后例如从ftp读取文件。我正计划做这样的事情: 所以这个想法是我有一个事件(例如直接或来自消息队列),它具有“fileName”属性,然后使用该属性从ftp下载/

  • 我在JPA上遇到了以下问题,但这可能更像是一个关于骆驼的概念问题。 我需要一个基于cron的石英消费者。但如果触发了,我想选择JPA组件作为第一步。 但是如果我用“to”调用JPA组件,那么它被用作生产者,而不是消费者。我可以以某种方式使用JPA组件来处理这个问题吗,或者我必须遵循服务激活器(基于bean的)逻辑并将JPA组件留在后面? 提前谢谢你,葛格利

  • 我有一个从JMS队列读取项目并将其写入数据库的路径。 我已经阅读了关于ApacheCamelJMS组件的文档,但我没有得到我的问题的确切和明确的答案,即“如果路由中出现异常,JMS消费者是否会重新插入项目或解锁JMS队列中的消息?”。 谢谢 阿里

  • 我对骆驼生产商有很好的了解,但我不能对各种骆驼消费者保持清醒的头脑。特别是事件驱动消费者和轮询消费者,camel如何知道为这些消费者调用回调? 消费者的一般流量是多少?

  • 我一直在尝试为Spring引导Kafka骆驼Avro消费者寻找示例代码,但没有运气。我在以下URL找到了Spring Camel Kafka消费者和生产者示例: https://thysmichels.com/2015/09/04/apache-camel-kafka-spring-integration/ 我的具体问题是,一旦我的bean从Avro模式创建,并且我有了POJO类,我如何将上面的c