当前位置: 首页 > 知识库问答 >
问题:

如何使用中断器模式实现解复用器?

濮君植
2023-03-14

我希望有一个环形缓冲区队列,该队列将接收对象,并以单个生产者到多个消费者的方式,将它们分布在线程池中的多个线程中。我如何使用disruptor模式来实现这一点?是否有任何HelloDemux代码示例?谢谢

共有1个答案

萧元徽
2023-03-14

本文详细介绍了实现中断器模式的解复用器的所有内容,但我认为线程池意味着您将需要一个调度器,这与中断器模式背道而驰。要实现demux,您需要设置固定数量的使用者线程,而不是池,并让它们从队列的尾部获取消息。现在,你可能会问,没有调度员,他们怎么能做到这一点?他们只是忙着围绕队列尾部旋转(或使用其他类型的等待策略,即旋转、让步、停车、睡眠等的组合)。现在,你可能会问,他们如何做到这一点而不相互干涉?然后有两个选项:可以使用模数(无锁)或CAS(光锁)。每一种都有自己的优缺点。模数很快,但如果一个消费者落后,则可能导致车道争用。CAS没有那么快,但不会导致车道争用。

package com.coralblocks.coralqueue.sample.demux;

import com.coralblocks.coralqueue.demux.CASAtomicDemux;
import com.coralblocks.coralqueue.demux.Demux;

public class Sample {

    private static final int NUMBER_OF_CONSUMERS = 4;

    public static void main(String[] args) throws InterruptedException {

        final Demux<StringBuilder> queue = new CASAtomicDemux<StringBuilder>(1024, StringBuilder.class, NUMBER_OF_CONSUMERS);

        Thread[] consumers = new Thread[NUMBER_OF_CONSUMERS];

        for(int i = 0; i < consumers.length; i++) {

            final int index = i;

            consumers[i] = new Thread() {

                @Override
                public void run() {

                    boolean running = true;

                    while(running) {
                        long avail;
                        while((avail = queue.availableToPoll(index)) == 0); // busy spin
                        for(int i = 0; i < avail; i++) {
                            StringBuilder sb = queue.poll(index);

                            if (sb == null) break; // mandatory for demuxes!

                            if (sb.length() == 0) {
                                running = false;
                                break; // exit immediately...
                            } else {
                                System.out.println(sb.toString());
                            }
                        }
                        queue.donePolling(index);
                    }
                }
            };

            consumers[i].start();
        }

        StringBuilder sb;

        for(int i = 0; i < 10; i++) {
            while((sb = queue.nextToDispatch()) == null); // busy spin
            sb.setLength(0);
            sb.append("message ").append(i);
            queue.flush();
        }

        // send a message to stop consumers...
        for(int i = 0; i < NUMBER_OF_CONSUMERS; i++) {
            // because the consumer exit immediately on this message, each
            // consumer will get one of these messages and exit...
            while((sb = queue.nextToDispatch()) == null); // busy spin
            sb.setLength(0);
        }
        queue.flush(); // sent batch

        for(int i = 0; i < consumers.length; i++) consumers[i].join();
    }
}
 类似资料:
  • 在使用disruptor时,可能会有一个(多个)消费者落后,并且由于这个缓慢的消费者,整个应用程序都会受到影响。 请记住,每个生产者(发布者)和消费者(EventProcessor)都在一个线程上运行,如何解决消费者速度慢的问题? 我们可以在单个消费者上使用多个线程吗?如果没有,有什么更好的选择?

  • 我正在解决以下问题: 迭代器设计模式具有很强的封装性。例如,一个图书馆想要一个图书管理系统。一个类用于存储它们的详细信息,一个类用于存储图书和书架号。假设图书馆想要使用将数据存储在数据库中。 如何使用JDBC实现迭代器设计模式以确保数据的封装? 我关心的是在哪里处理数据库以及如何在应用程序之间共享数据。 数据库处理程序可以是库类的内部类吗?那么是否可以保存数据并根据请求检索它而不影响封装? 我还在

  • 如何使用适配器模式 横滑的滚动栏理论上应该是这个样子的: 新建一个 Swift 文件:HorizontalScroller.swift ,作为我们的横滑滚动控件, HorizontalScroller 继承自 UIView 。 打开 HorizontalScroller.swift 文件并添加如下代码: @objc protocol HorizontalScrollerDelegate { } 这

  • 本文向大家介绍JSP使用Servlet作为控制器实现MVC模式实例详解,包括了JSP使用Servlet作为控制器实现MVC模式实例详解的使用技巧和注意事项,需要的朋友参考一下 本文实例讲述了JSP使用Servlet作为控制器实现MVC模式的方法。分享给大家供大家参考。具体如下: 一、目标: ① 初步理解MVC模式; ② 掌握Servlet的编写; ③ 使用MVC模式完成登录功能。 二、主要内容:

  • 问题内容: 我经常发现用Java-8之前的设置来实现构建器模式很麻烦。总是有很多几乎重复的代码。建造者本身可以被视为样板。 实际上,有一些代码重复检测器,它们几乎会将使用java-8之前版本的工具构建的构建器的每种方法都视为所有其他方法的副本。 因此,请考虑以下类以及它是Java-8之前的构建器: 如何使用java-8工具实现构建器模式? 问题答案: 构建 可变对象 (稍后将讨论不可变对象)的想法

  • 本文向大家介绍Python中使用__new__实现单例模式并解析,包括了Python中使用__new__实现单例模式并解析的使用技巧和注意事项,需要的朋友参考一下 单例模式是一个经典设计模式,简要的说,一个类的单例模式就是它只能被实例化一次,实例变量在第一次实例化时就已经固定。  在Python中常见的单例模式有None,这就是一个很典型的设计,通常使用 if xxx is None或者if xx