当前位置: 首页 > 面试题库 >

为什么我的Disruptor程序没有充分利用环形缓冲区

暨弘懿
2023-03-14
问题内容

Disruptor github地址为:https :
//github.com/LMAX-Exchange/disruptor

我对其进行了一个简单的测试,如下所示:

public class DisruptorMain {
    @SuppressWarnings({ "rawtypes", "unchecked" })
    public static void main(String[] args) throws Exception {
        class Element {

            private int value;

            public int get() {
                return value;
            }

            public void set(int value) {
                this.value = value;
            }

        }

        ThreadFactory threadFactory = new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "simpleThread");
            }
        };

        EventFactory<Element> factory = new EventFactory<Element>() {
            @Override
            public Element newInstance() {
                return new Element();
            }
        };

        EventHandler<Element> handler = new EventHandler<Element>() {
            @Override
            public void onEvent(Element element, long sequence, boolean endOfBatch) {
                try {
                    Thread.sleep(1000 * sequence);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

                System.out.println("Element: " + element.get());
            }
        };

        BlockingWaitStrategy strategy = new BlockingWaitStrategy();

        int bufferSize = 4;

        Disruptor<Element> disruptor = new Disruptor(factory, bufferSize, threadFactory, ProducerType.SINGLE, strategy);

        disruptor.handleEventsWith(handler);

        disruptor.start();

        RingBuffer<Element> ringBuffer = disruptor.getRingBuffer();

        for (int l = 0; l < 8; l++) {
            long sequence = ringBuffer.next();
            System.out.println("sequence:" + sequence);

            try {
                Element event = ringBuffer.get(sequence);
                event.set(l);
            } finally {
                ringBuffer.publish(sequence);
            }
        }
    }
}

结果是:序列:0序列:1序列:2序列:3元素:0元素:1元素:2元素:3序列:4序列:5序列:6序列:7元素:4元素:5元素:6元素:7

在测试中,我定义了一个4的环形缓冲区,我有一个生产者为其创建8个任务,我的问题是,当生产者将4个任务放入环形缓冲区时,使用者开始从环形缓冲区中获取任务要正常工作,在任务1完成后,环形缓冲区应该为任务5提供一个空白空间,但是结果表明,只有在环形缓冲区中所有任务都完成后,环形缓冲区才能接受新任务,为什么?


问题答案:

这是因为Disruptor将在事件处理程序中进行批处理。如果事件处理程序速度慢或环形缓冲区较小,则批处理大小通常可以是环形缓冲区的大小。Disruptor将仅更新该事件处理程序的已处理序列,直到批处理完成为止。这减少了需要对发布者用来确定空间是否可用的序列变量进行的更新次数。如果需要使空间比默认值更早可用,则可以使用SequenceReportingEventHandler来实现。

public class MyEventHandler implements SequenceReportingEventHandler<Element> {
    Sequence processedSequence;

    public void setSequenceCallback(Sequence s) {
        processedSequence = s;
    }

    public void onEvent(Element e, long sequence, boolean endOfBatch) {
        // Do stuff
        processedSequence.set(sequence);
    }
}


 类似资料:
  • 环形缓冲区接口 结构体 struct   rt_ringbuffer   环形缓冲区控制块 更多...   枚举 函数 void  rt_ringbuffer_init (struct rt_ringbuffer *rb, rt_uint8_t *pool, rt_int16_t size)   初始化环形缓冲区   void  rt_ringbuffer_reset (struct rt_rin

  • 我读到FileWriter和BufferedWriter的区别在于FileWriter直接写入文件(逐字符),white BufferedReader使用缓冲区。如果是,为什么FileWriter有缓冲区?例如,如果我创建一个FileWriter对象,如下所示: 而且,如果我在程序结束时不刷新或关闭写入器,它将不会向文件写入任何内容。这意味着它也使用缓冲区。拜托,解释一下?

  • 关于IO,我有两个问题。 A.在一个教程和一些StackOverflow答案中,他们声称没有被修复。是真的吗? 以下代码使用将数据读入字节数组(1024字节) 从API,有一行: 公共整数读取(字节b[])引发IOException @参数b:将数据读入的缓冲区 B.如果它们都是缓冲的,它们都将数据放入缓冲区,并从缓冲区中获取数据,那么使BufferedInputStream比FileInputS

  • 问题内容: 我有一个流时间序列,我有兴趣保留最后4个元素,这意味着我希望能够弹出第一个元素并将其添加到末尾。本质上我需要一个环形缓冲区。 哪个Java集合最适合此用途?向量? 问题答案: 考虑CircularFifoBuffer Apache的Common.Collections。与Queue不同,你不必维护基础集合的有限大小,只要达到极限就可以包装它。 由于以下属性,CircularFifoBu

  • 环形块状缓冲区接口 结构体 struct   rt_rbb_blk   rbb 中的块 更多...   struct   rt_rbb_blk_queue   块队列。这些块在队列中,其 buffer 地址是连续的 更多...   struct   rt_rbb   环形块状缓冲区,简称 rbb 更多...   枚举 函数 void  rt_rbb_init (rt_rbb_t rbb, rt_u

  • 我想写一个计算长除法问题总进位的代码。这是我为carries编写的函数。我相信问题在于,第一个while循环并没有一直运行。如果是,为什么?这是在Python 3.5中实现的。