当前位置: 首页 > 知识库问答 >
问题:

与LMAX中断器相比,这种队列实现是否可以实现更低的延迟消息传递?

司徒鸿文
2023-03-14

我开发了一个队列,它允许单个消费者和生产者同时从队列中提供/轮询元素,而无需对每个提供/轮询进行同步或CAS操作。相反,当队列的尾部为空时,只需要一个原子操作。此队列旨在在队列被缓冲并且消费者没有赶上生产者的情况下减少延迟。

在这个问题上,我想回顾一下实现(代码还没有被其他人审查过,所以最好能得到第二个意见),并讨论一种我认为应该显著减少延迟的使用模式,以及这种架构是否可能比LMAX disruptor运行得更快。

代码位于github上:https://github.com/aranhakki/experimental-performance/blob/master/java/src/concurrency/messaging/ConcurrentPollOfferArrayQueue.java

/*
 * Copyright 2014 Aran Hakki
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package concurrency.messaging;

// A non-blocking queue which allows concurrent offer and poll operations with minimal contention.
// Contention in offer and poll operations only occurs when offerArray, which acts as an incomming message buffer,
// becomes full, and we must wait for it too be swapped with the pollArray, acting as a outgoing message buffer,
// the most simple analogy would be too imaging two buckets, one we fill and at the same time we empty another bucket
// which already contains some liquid, then at the point the initial bucket becomes full, we swap it with the bucket that
// is being emptied. 

// It's possible that this mechanism might be faster than the LMAX disruptor, need to create tests to confirm.

public final class ConcurrentPollOfferArrayQueue<T> {

    private T[] offerArray;
    private T[] pollArray;

    public ConcurrentPollOfferArrayQueue(T[] _pollArray){
        offerArray = (T[]) new Object[_pollArray.length];
        pollArray = _pollArray;
    }

    private int offerIndex = 0;
    private int pollIndex = 0;

    public void offer(T t){
        if (offerIndex<offerArray.length){
            offerArray[offerIndex] = t;
            offerIndex++;
        } else {
            while(!arrayFlipped){

            }
            arrayFlipped = false;
            offerIndex = 0;
            offer(t);
        }
    }

    private volatile boolean arrayFlipped = false;

    public T poll(){
        if (pollIndex<pollArray.length){
            T t = pollArray[pollIndex];
            pollArray[pollIndex] = null;
            pollIndex++;
            return t;
        } else {
            pollIndex = 0;
            T[] pollArrayTmp = pollArray;
            pollArray = offerArray;
            offerArray = pollArrayTmp;
            arrayFlipped = true;
            return poll();
        }

    }

}

通过使用许多这样的队列来代替多个生产者和消费者都引用同一个队列,我认为延迟可以显着减少。

考虑生产者A、B、C都引用单个队列Q,消费者E、E和F都引用同一个队列。这会导致以下一组关系,因此会产生大量争用:

写给Q

B写入Q

C写入Q

E写入Q

D写入Q

F写入Q

使用我开发的队列,可以在每个生产者和单个消费者聚合线程之间创建一个队列,该线程将获取每个生产者队列尾部的元素,并将它们放置在消费者队列的头部。这将大大减少争用,因为我们只有一个写入程序来存储一段内存。现在的关系如下所示:

向总部(AQ)发出的书面通知

B写至总部(BQ)

C写至总部(CQ)

消费者聚合线程写入尾随(AQ)

ConsumerAggregationThread writesTo tailOf(BQ)

ConsumerAggregationThread writesTo tailOf(CQ)

消费者聚合线程写入头(EQ)

ConsumerAggregationThread写入headOf(FQ)

ConsumerAggregationThread writesTo-headOf(GQ)

E写入尾框(EQ)

F写入尾随(FQ)

G写入尾框(GQ)

上述关系确保了单编写器原则。

我很想听听你的想法。

共有1个答案

弓泰
2023-03-14

你们觉得这个实现怎么样?我对其进行了更改,以便轮询线程在轮询队列为空时触发队列切换。

/*
* Copyright 2014 Aran Hakki
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/



/* 
* A non-blocking queue which allows concurrent offer and poll operations with    minimal contention.
* Contention in offer and poll operations only occurs when pollQueue is empty and must be swapped with offer queue.
* This implementation does not make use of any low level Java memory optimizations e.g. using the Unsafe class or direct byte buffers,
* so its possible it could run much faster.
* If re-engineered to use lower level features its possible that this approach might be faster than the LMAX disruptor.
* I'm current observing an average latency of approx 6000ns.
*/

package concurrency.messaging;

import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicInteger;

public class ConcurrentPollOfferQueue<T> {

private class ThreadSafeSizeQueue<T> {
    private Queue<T> queue = new LinkedList<T>();
    private volatile AtomicInteger size = new AtomicInteger(0);

    public int size(){
        return size.get();
    }

    public void offer(T value){
        queue.offer(value);
        size.incrementAndGet();
    }

    public T poll(){
        T value = queue.poll();
        if (value!=null){
            size.decrementAndGet();
        }
        return value;
    }
}

private volatile ThreadSafeSizeQueue<T> offerQueue;
private volatile ThreadSafeSizeQueue<T> pollQueue;

private int capacity;

public ConcurrentPollOfferQueue(int capacity){
    this.capacity = capacity;
    offerQueue = new ThreadSafeSizeQueue<T>();
    pollQueue = new ThreadSafeSizeQueue<T>();
}

public void offer(T value){
    while(offerQueue.size()==capacity){/* wait for consumer to finishing consuming pollQueue */}
    offerQueue.offer(value);
}

public T poll(){
    T polled;
    while((polled = pollQueue.poll())==null){
        if (pollQueue.size()==0){
            ThreadSafeSizeQueue<T> tmpQueue = offerQueue;
            offerQueue = pollQueue;
            pollQueue = tmpQueue;
        } 
    }
    return polled;
}

public boolean isEmpty(){
    return pollQueue.size()==0;
}
 类似资料:
  • 本文向大家介绍RabbitMQ 怎么实现延迟消息队列?相关面试题,主要包含被问及RabbitMQ 怎么实现延迟消息队列?时的应答技巧和注意事项,需要的朋友参考一下 延迟队列的实现有两种方式: 通过消息过期后进入死信交换器,再由交换器转发到延迟消费队列,实现延迟功能; 使用 RabbitMQ-delayed-message-exchange 插件实现延迟功能。

  • 本文向大家介绍kafka如何实现延迟队列?相关面试题,主要包含被问及kafka如何实现延迟队列?时的应答技巧和注意事项,需要的朋友参考一下 Kafka并没有使用JDK自带的Timer或者DelayQueue来实现延迟的功能,而是基于时间轮自定义了一个用于实现延迟功能的定时器(SystemTimer)。JDK的Timer和DelayQueue插入和删除操作的平均时间复杂度为O(nlog(n)),并不

  • 主要内容:7. 延迟队列,7.1 延迟队列的概念,7.2 延迟队列使用场景,7.4 整合SpringBoot,7.5 队列TTL,7.6 延迟队列优化,7.7 RabbitMQ 插件实现延迟队列,7.8 总结7. 延迟队列 7.1 延迟队列的概念 延迟队列,队列内部是有序的,最重要的特性就体现在它的延迟属性上。 延迟队列中的元素是希望在指定时间到了以后或之前取出和处理。简单来说,延迟队列就是用来存放需要在指定时间被处理的元素的队列。 延迟队列属于死信队列的一种,属于消息TTL过期的情况。 7.2

  • 本文向大家介绍Python中线程的MQ消息队列实现以及消息队列的优点解析,包括了Python中线程的MQ消息队列实现以及消息队列的优点解析的使用技巧和注意事项,需要的朋友参考一下 “消息队列”是在消息的传输过程中保存消息的容器。消息队列管理器在将消息从它的源中继到它的目标时充当中间人。队列的主要目的是提供路由并保证消息的传递;如果发送消息时接收者不可用,消息队列会保留消息,直到可以成功地传递它。相

  • rabbitmq 的队列是否可以实现消费速率的流控? 就是设置消费速率的意思 比如不管这个队列有多少个消费者,限制这个队列,每分钟只能分发给(所有)消费者(加起来) 100 个任务 如果不能在 rabbitmq server 端实现流控,而是要在消费者端流控的话,那么消费者就要引入 redis 等组件,变得复杂且丑陋。这是我不希望的 或者说哪个主流的消息队列支持流控?kafka?rocketMQ?

  • AWS SQS FIFO队列的批处理设置为1,延迟为1秒。收到的每个项目都与一个MessageGroup相关联。 队列同时接收10个不同消息组的30条消息,每个消息组包含3条消息。。。 一秒钟的延迟是否适用于队列级别,即30条消息需要30秒的传递时间? 还是队列会启动10个消费者,每个消息组一个,在3秒内清空队列?