当前位置: 首页 > 知识库问答 >
问题:

LMAX Disruptor-什么决定批量大小?

顾高翰
2023-03-14

我最近一直在学习LMAX干扰器,并在做一些实验。令我困惑的一件事是EventHandler的onEvent处理程序方法的endOfBatch参数。考虑我的以下代码。首先,我调用的伪消息和使用者类Test1和Test1Worker:

public class Test1 {

}

public class Test1Worker implements EventHandler<Test1>{
    public void onEvent(Test1 event, long sequence, boolean endOfBatch) {
        try{
            Thread.sleep(500);
        }
        catch(Exception e){
            e.printStackTrace();
        }
        System.out.println("Received message with sequence " + sequence + ". "
                + "EndOfBatch = " + endOfBatch);
    }
}

请注意,我设置了500毫秒的延迟,以替代一些实际工作。我还在控制台中打印

然后我的驱动程序类(作为生产者)被称为DisruptrTest:

public class DisruptorTest {

    private static Disruptor<Test1> bus1;

    private static ExecutorService test1Workers;

    public static void main(String[] args){             
        test1Workers = Executors.newFixedThreadPool(1);

        bus1 = new Disruptor<Test1>(new Test1Factory(), 8, test1Workers);           
        bus1.handleEventsWith(new Test1Worker());
        RingBuffer<Test1> buf1 = bus1.start();

        for (int i = 0; i < 10; i++){
            long a = System.currentTimeMillis();
            long next = buf1.next();
            long b = System.currentTimeMillis();
            System.out.println("Delay for claiming slot " + i + " is "+ (b - a));
            try {
                Test1 message = buf1.get(next);
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                buf1.publish(next);
            }
        }
    }

    public static class Test1Factory implements EventFactory<Test1> {
        public Test1 newInstance() {
            return new Test1();
        }

    }   
}

在这里,在初始化所需的内容之后,我将10条消息馈送到环形缓冲区(缓冲区大小8),并尝试监视两件事-生产者在环形缓冲区中申请下一个插槽的延迟,以及消费者端带有序列号的消息,以及特定序列是否被视为批次结束。

现在,有趣的是,处理每条消息需要500毫秒的延迟,这就是我得到的输出:

Delay for claiming slot 0 is 0
Delay for claiming slot 1 is 0
Delay for claiming slot 2 is 0
Delay for claiming slot 3 is 0
Delay for claiming slot 4 is 0
Delay for claiming slot 5 is 0
Delay for claiming slot 6 is 0
Delay for claiming slot 7 is 0
Received message with sequence 0. EndOfBatch = true
Delay for claiming slot 8 is 505
Received message with sequence 1. EndOfBatch = false
Received message with sequence 2. EndOfBatch = false
Received message with sequence 3. EndOfBatch = false
Received message with sequence 4. EndOfBatch = false
Received message with sequence 5. EndOfBatch = false
Received message with sequence 6. EndOfBatch = false
Received message with sequence 7. EndOfBatch = true
Delay for claiming slot 9 is 3519
Received message with sequence 8. EndOfBatch = true
Received message with sequence 9. EndOfBatch = true

但是,如果取消500 ms等待时间,我会得到以下结果:

Delay for claiming slot 0 is 0
Delay for claiming slot 1 is 0
Received message with sequence 0. EndOfBatch = true
Received message with sequence 1. EndOfBatch = true
Delay for claiming slot 2 is 0
Received message with sequence 2. EndOfBatch = true
Delay for claiming slot 3 is 0
Received message with sequence 3. EndOfBatch = true
Delay for claiming slot 4 is 0
Received message with sequence 4. EndOfBatch = true
Delay for claiming slot 5 is 0
Received message with sequence 5. EndOfBatch = true
Delay for claiming slot 6 is 0
Received message with sequence 6. EndOfBatch = true
Delay for claiming slot 7 is 0
Received message with sequence 7. EndOfBatch = true
Delay for claiming slot 8 is 1
Delay for claiming slot 9 is 0
Received message with sequence 8. EndOfBatch = false
Received message with sequence 9. EndOfBatch = true  

因此,某条消息是否被视为在批的末尾(即,批的大小)似乎受到消费者消息处理延迟的影响。也许我在这里很愚蠢,但这就是我应该做的吗?这背后的原因是什么?通常是什么决定了批量大小?提前谢谢。如果我的问题有什么不清楚的地方,请告诉我。

共有1个答案

南宫星波
2023-03-14

批量大小仅由可用元素的数量决定。因此,如果此时有更多的元素可用,那么它将包含在批次中。例如,若Disruptor调用您的代码,而队列中只有一个元素,那个么您将得到一个endOfBatch=true的调用。若队列中有8个元素,那个么它将收集所有8个元素并在单个批次中发送它们。

您可以在下面的代码中看到,队列中的#个“可用”条目被提取出来,这可能比“下一个”条目多得多。例如,您当前为5,正在等待插槽6,然后3个事件到达,可用的事件数为8,您将在一批中收到多个呼叫(对于6、7、8)。

https://github.com/LMAX-Exchange/disruptor/blob/master/src/main/java/com/lmax/disruptor/BatchEventProcessor.java#L124

final long availableSequence = sequenceBarrier.waitFor(nextSequence);
while (nextSequence <= availableSequence)
{
    event = dataProvider.get(nextSequence);
    eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
    nextSequence++;
}

关于元素9处的500ms暂停,请注意,中断器是使用环形缓冲区构建的,并且您已将缓冲区中的插槽数指定为8(请参见此处的第二个参数):

bus1 = new Disruptor<Test1>(new Test1Factory(), 8, test1Workers);  

如果并非所有使用者都已使用某个元素,并且ringbuffer已满(全部8个元素已满),则生产者将被阻止向缓冲区发布新事件。您可以尝试增加缓冲区大小,比如200万个对象,或者确保您的消费者比生产者快,这样队列就不会填满(删除睡眠,您已经演示过了)。

 类似资料:
  • 问题内容: 是什么导致内存中单个对象的大小? 我知道原语和引用会,但是还有其他吗?方法的数量及其长度是否重要? 问题答案: 这完全取决于实现,但是有一些因素会影响Java中的对象大小。 首先,Java对象中字段的数量和类型肯定会影响空间使用,因为您至少需要拥有容纳该对象所有字段所需的存储空间。但是,由于填充,对齐和指针压缩的优化,没有直接公式可用于精确计算以这种方式使用了多少空间。 对于方法,通常

  • 有没有办法为Spring的NamedParameterJdbcTemplate对象设置批处理大小? 在我的项目中,我遇到了一些OutOfMemory问题,但我能够通过在一个较小的块循环中调用NamedParameterJdbcTemplate来解决它。但这需要一些额外的努力,比如确定块大小,将一个大列表拆分成更小的子列表等等。 我想知道NamedParameterJdbcTemplate是否有这样

  • 我正在尝试使用keras对图像进行二值分类。 我的CNN模型对训练数据进行了良好的训练(训练准确率约为90%,验证准确率约为93%)。但在培训期间,如果我将批量大小设置为15000,则得到图I输出,如果我将批量大小设置为50000,则得到图II输出。有人能告诉我怎么了吗?预测不应该取决于批量大小,对吗? 我用于预测的代码: 我的型号:-

  • 我正在尝试使用画布元素,但高度不能超过像素。如果我尝试使用像素,我会在IE11中得到。这在Chrome中工作正常。画布用于PDF生成,我真的没有时间将生成移动到服务器。 我在谷歌上搜索了一下,看起来大小可能会因平台和浏览器的不同而有所不同。 是浏览器分配的内存和内存设置决定了这一点吗? 编辑:我在这里找到了一些信息: 看起来我可以有两倍的尺寸。也许这只是为了IE9。

  • 在JavaServlet环境中,哪些因素是同时用户数量的瓶颈。 服务器每个端口允许的HTTP连接数 还有其他因素吗? 编辑:为了不考虑业务逻辑,假设只有一个servlet在Log4j上打印一行。 我的Tomcat服务器能同时处理6000个HTTP连接吗?为什么不呢(文件处理?每个请求的CPU时间?) 为每个连接生成的垃圾量是否会产生影响?例如,如果Tomcat为每个HTTP连接创建并留下20KB的

  • 问题内容: 我有两台linux机器(都是VM),一台有12GB内存,另一台有8GB内存。 我试图在两台机器上启动相同的Java程序,并且最大可能的最大堆大小(使用-Xmx标志)。以下是我得到的结果。 12GB机器:9460MB 8GB机器:4790MB 如果我指定的最大堆大小超出了限制,我将得到以下错误。 我检查了两个系统中的可用内存(使用命令),然后得到关注。 12GB机器:大约3GB可用空间。