当前位置: 首页 > 知识库问答 >
问题:

在java中使用lmax Disruptor(3.0)处理数百万文档

吕修伟
2023-03-14

我有以下用例:

当我的服务启动时,它可能需要在尽可能短的时间内处理数百万个文档。将有三个数据来源。

我已设置以下内容:

    /* batchSize = 100, bufferSize = 2^30
    public MyDisruptor(@NonNull final MyDisruptorConfig config) {
        batchSize = config.getBatchSize();
        bufferSize = config.getBufferSize();
        this.eventHandler = config.getEventHandler();
        ThreadFactory threadFactory = createThreadFactory("disruptor-threads-%d");
        executorService = Executors.newSingleThreadExecutor(threadFactory);
        ringBuffer = RingBuffer.createMultiProducer(new EventFactory(), bufferSize, new YieldingWaitStrategy());
        sequenceBarrier = ringBuffer.newBarrier();
        batchEventProcessor = new BatchEventProcessor<>(ringBuffer, sequenceBarrier, eventHandler);
        ringBuffer.addGatingSequences(batchEventProcessor.getSequence());
        executorService.submit(batchEventProcessor);
    }

    public void consume(@NonNull final List<Document> documents) {
        List<List<Document>> subLists = Lists.partition(documents, batchSize);
        for (List<Document> subList : subLists) {
            log.info("publishing sublist of size {}", subList.size());
            long high = ringBuffer.next(subList.size());
            long low = high - (subList.size() - 1);
            long position = low;
            for (Document document: subList) {
                ringBuffer.get(position++).setEvent(document);
            }
            ringBuffer.publish(low, high);
            lastPublishedSequence.set(high);
        }
    }

我的每个源调用consume都使用Guice来创建一个单例破坏者。

我的eventHandler例程是

    public void onEvent(Event event, long sequence, boolean endOfBatch) throws Exception {
        Document document = event.getValue();
        handler.processDocument(document); //send the document to handler
        if (endOfBatch) {
            handler.processDocumentsList(); // tell handler to process all documents so far.
        }
    }

我在日志中看到,制作人(消费)有时会停滞不前。我假设这是当ringBuffer已满,eventHandler无法足够快地处理时。我看到eventHandler正在处理文档(来自我的日志),过了一段时间,生产者开始向环形缓冲区发布更多文档。

问题:

  • 我使用的是正确的Disruptor模式吗?我看到有很多方法可以使用它。我选择使用batchEventProcess,这样它就会发出endOfBatch的信号。
  • 如何提高EventHandler的效率?进程文档列表可能很慢。
  • 我应该使用并行EventHandler吗?lmax用户指南提到这是可能的,常见问题解答对此有一个问题。但是如何将其与batchEventProcess一起使用?它只需要一个eventHandler。

共有1个答案

林星阑
2023-03-14

您的处理程序是否有状态?如果没有,则可以使用多个并行事件处理程序来处理文档。您可以实现一个基本的分片策略,其中只有一个处理程序处理每个事件。

endOfBatch通常用于通过优化从批处理中获益的IO操作来加快处理速度。E、 g.在每个事件上写入文件,但仅在endOfBatch上刷新。

如果不知道文档处理器中发生了什么,就很难给出更多建议。

 类似资料:
  • 本文向大家介绍利用python如何处理百万条数据(适用java新手),包括了利用python如何处理百万条数据(适用java新手)的使用技巧和注意事项,需要的朋友参考一下 1、前言 因为负责基础服务,经常需要处理一些数据,但是大多时候采用awk以及java程序即可,但是这次突然有百万级数据需要处理,通过awk无法进行匹配,然后我又采用java来处理,文件一分为8同时开启8个线程并发处理,但是依然处

  • 注意:计数是对处理文件需要多长时间的更多调试。这项工作几乎花了一整天的时间,超过10个实例,但仍然失败,错误发布在列表的底部。然后我找到了这个链接,它基本上说这不是最佳的:https://forums.databricks.com/questions/480/how-do-i-ingest-a-large-number-of-files-from-s3-my.html 然后,我决定尝试另一个我目前

  • 我如何在我的java项目中包含PDE文件?有可能吗?

  • 问题内容: 有没有一种方法可以使用DecimalFormat(或其他一些标准格式化程序)来格式化数字,如下所示: 1,000,000 => 100万 1,234,567 => 1.23M 1,234,567,890 => 1234.57M 基本上是将某个数字除以100万,保留小数点后两位,并在最后打一个’M’。我曾考虑过创建NumberFormat的新子类,但它看起来比我想象的要复杂。 我正在编写

  • 这是如何使用公共类frome的一个后续步骤。其他处理选项卡中的java文件?;使用来自的Usage类中的示例。java文件-有完整的文档吗?-处理2。x和3。x论坛,我有这个: /tmp/Sketch/Foo.java 这个例子运行得很好,但是如果我取消注释import peasy。组织 行,则编译失败: 当然,我确实在下安装了PeasyCam,如果我导入peasy.*它工作得很好 来自草图。 我

  • 需要使用到内存进行排序,但是短时间内排序又会导致内存益处