当前位置: 首页 > 知识库问答 >
问题:

非阻塞队列,带批处理导向排水全部

包兴思
2023-03-14

我们有一个专家、多生产者(用户)和单一消费者(引擎)队列。用户线程运行得更频繁,并且总是向队列中添加单个元素。引擎线程操作运行频率较低,并在批处理中处理堆栈元素。如果堆栈是空的,它将停止,直到用户线程添加了一个条目。这样,只有当队列从空变为1时才需要发出通知。

在此实现中,引擎线程不是一次迭代和删除一个项目,而是删除所有项目 - 一个 drainAll,而不是 drainTo。没有其他操作可以改变堆栈 - 只有用户线程添加,引擎线程 drainAll。

目前我们通过一个同步链表来做这件事,我们想知道是否有一个非阻塞的方式来做这件事。对JDK类的drainTo操作将迭代堆栈,我们只想在一次操作中获取堆栈中的所有内容,而不是迭代——因为每次迭代都会遇到与volatile/cas相关的逻辑,所以我们最好每次drainAll只遇到一次。引擎线程可以对每个单独的元素进行迭代和操作,而不涉及sync/volatile/cas操作。

当前的实现如下所示:

public class SynchronizedPropagationQueue implements PropagatioQueue {
    protected volatile PropagationEntry head;
    protected volatile PropagationEntry tail;

    protected synchronized void addEntry( PropagationEntry entry ) {
        if ( head == null ) {
            head = entry;
            notifyWaitOnRest();
        } else {
            tail.setNext( entry );
        }
        tail = entry;
    }

    @Override
    public synchronized PropagationEntry drainAll() {
        PropagationEntry currentHead = head;
        head = null;
        tail = null;
        return currentHead;
    }

    public synchronized void waitOnRest() {
        try {
            log.debug("Engine wait");
            wait();
        } catch (InterruptedException e) {
            // do nothing
        }
        log.debug("Engine resumed");
    }


    @Override
    public synchronized void notifyWaitOnRest() {
        notifyAll();
    }
}

asdf

共有2个答案

韩鸿波
2023-03-14

目前我们通过同步链表来做到这一点,我们想知道是否有一种非阻塞的方法来做到这一点。JDK类上的drainTo操作将迭代堆栈,我们只想在一次操作中获取堆栈中的所有内容,而无需迭代

也许我不明白,但似乎使用 BlockingQueue.drainTo(...) 方法会比你的实现更好。例如,LinkedBlockingQueue.drainTo(...) 方法只有一个围绕该方法的锁 - 我没有看到迭代开销。

如果这不是一个学术讨论,那么我怀疑你的性能问题是队列本身的问题,应该把精力集中在其他方面。如果是学术性的,那么@Matt的答案可能会更好,尽管肯定要编写更多的代码来支持完整的< code>Collection方法列表。

禹德水
2023-03-14

堆栈具有非常简单的非阻塞实现,可轻松支持并发“全部弹出”操作,并且可以轻松检测空

public class EngineQueue<T>
{
    private final AtomicReference<Node<T>> m_lastItem = new AtomicReference<>();

    public void add(T item)
    {
        Node<T> newNode = new Node<T>(item);
        do {
            newNode.m_next = m_lastItem.get();
        } while(!m_lastItem.compareAndSet(newNode.m_next, newNode)); 

        if (newNode.m_next == null)
        {
            // ... just went non-empty signal any waiting consumer
        }
    }

    public List<T> removeAll()
    {
        Node<T> stack = m_lastItem.getAndSet(null);
        // ... wait for non-empty if necessary 
        List<T> ret = new ArrayList<>();
        for (;stack != null; stack=stack.m_next)
        {
            ret.add(stack.m_data);
        }
        Collections.reverse(ret);
        return ret;
    }
    private static class Node<U>
    {
        Node<U> m_next;
        final U m_data;
        Node(U data)
        {
            super();
            m_data = data;
        }
    }
}

在空旷的地方发信号-

 类似资料:
  • 我有一个应用程序,在其中按下开始按钮后,服务将开始轮询几个传感器,每当传感器值发生变化时,将传感器数据存储到某个对象中。每10毫秒,就会发生一次数据库插入,获取对象的当前值并将其存储到数据库中。这会发生30分钟 考虑到插入的速度和持续时间,我想在一个独立于UI线程的线程中运行它,这样导航就不会受到影响。因此,我的服务将通过将数据添加到队列中来为线程提供一些数据,然后另一个线程(消费者)将从队列中取

  • 问题内容: 我在一个非常简单的生产者-消费者场景中使用 java.util.concurrent.BlockingQueue 。例如,此伪代码描述了使用者部分: 到目前为止,一切都很好。在阻塞队列的javadoc中,我读到: BlockingQueue本质上不支持任何类型的“关闭”或“关闭”操作,以指示将不再添加任何项目。这些功能的需求和使用往往取决于实现。例如,一种常见的策略是让生产者插入特殊的

  • blpop key1...keyN timeout 从左到右扫描返回对第一个非空list进行lpop操作并返回,比如blpop list1 list2 list3 0 ,如果list不存在list2,list3都是非空则对list2做lpop并返回从list2中删除的元素。如果所有的list都是空或不存在,则会阻塞timeout秒,timeout为0表示一直阻塞。当阻塞时,如果有client对ke

  • 主要内容:1 ConcurrentLinkedQueue的概述,2 ConcurrentLinkedQueue的实现,2.1 基本结构,2.2 构造器,2.3 入队操作,2.4 出队操作,2.5 过程详解,2.6 获取操作,2.7 其他操作,3 ConcurrentLinkedQueue的总结基于JDK1.8详细介绍了ConcurrentLinkedQueue的底层源码实现,包括同步原理、入队操作、出队操作、获取操作等。 1 ConcurrentLinkedQueue的概述 public cla

  • 我编写了一个简单的类,我计划将其扩展为客户端套接字编程应用程序的一部分。类涉及一个BlockingQueue(我从这里复制了代码:相当于Java的BlockingQueue的C++)。当我创建了下面的包装类的一个实例后,我打算让它生成一个单独的线程,该线程只需执行BlockingQueue上阻塞的printer()函数,直到有一个或多个字符串可用,然后它只需将字符串打印到控制台窗口。在我的预期应用

  • 我在尝试用netty 4.1写一个非阻塞代理。我有一个处理传入连接的“FrontHandler ”,还有一个处理传出连接的“BackHandler”。我在关注hexdumproxyhandler(https://github . com/netty/netty/blob/ed4a 89082 bb 29 b 9 e 7d 869 C5 d 25d 6 b 9 ea 8 fc 9d 25 b/exa