当前位置: 首页 > 知识库问答 >
问题:

使用ConcurrentSkipListMap实现的优先级队列,不确定如何让消费者阻止它

柴晔
2023-03-14

我用ConcurrentSkipListMap实现了一个优先级队列,使用16种不同的优先级。

class ConcurrentPriorityQueue {

    ConcurrentSkipListMap<Long, Message> queue = new ConcurrentSkipListMap<>();

    AtomicLong counter16 = new AtomicLong(Long.MAX_VALUE);
    AtomicLong counter15 = new AtomicLong(Long.MAX_VALUE / 8 * 7);
    AtomicLong counter14 = new AtomicLong(Long.MAX_VALUE / 4 * 3);
    // etc
    AtomicLong counter1 = new AtomicLong(Long.MIN_VALUE / 8 * 7);

    void addPriority16(Message message) {
        queue.put(counter16.getAndDecrement(), message);
    }

    void addPriority15(Message message) {
        queue.put(counter15.getAndDecrement(), message);
    }

    // and so on
}

这并不完全是类的组织方式(例如,我把原子放在一个数组中),但我认为这段代码会更清晰。还有一个DelayQueue,用于删除旧消息或提高旧消息的优先级(取决于消息类型)。

我的问题是,我有几个消费者正在使用pollLastEntry(),以便从队列中删除优先级最高的消息,然后在队列为空时Hibernate,但问题是队列活动会突然爆发——它会持续一个小时而不包含超过几条消息,然后在接下来的一个小时里,它将永远不会空。因此,我希望使用阻塞方法从队列中删除消息,这样我就不会在重复睡眠的线程上浪费资源(我会使用指数退避,在活动较少时使它们睡眠时间更长,但这会使它们在队列再次启动时无响应),但我不清楚实现这一点的最佳方式——我有大量使用阻塞队列的经验,但没有实现它们的经验。我的第一个想法是在睡眠中的消费者中实现指数退避,然后在队列活动再次启动时中断他们,但我首先想看看是否有更好的方法来做到这一点。

共有1个答案

黎奇思
2023-03-14

我会实现一个包装器

class Wrapper implements Comparable<Wrapper> {
    long priority;
    Message message;

    Wrapper(long priority, Message message) {
        this.priority = priority;
        this.message = message;
    }

    @Override
    public int compareTo(Wrapper w) {
        return Long.compare(priority, w.priority);
    }
}

并使用PriorityBlockingQueue而不是ConcurrentSkipListMap

 类似资料:
  • 在我的python应用程序中,我使用芹菜作为任务生产者和消费者,使用RabbitMQ作为代理。现在,我正在实施优先级排序。起初,它看起来根本不起作用,因为根据文档,我刚刚在队列中添加了参数。我更深入地研究了一下,发现了另一种优先级——消费者优先级和任务优先级。所以,现在,看起来有三种不同的优先顺序,我完全困惑了。你能给我解释一下区别吗? 队列最大优先级:即https://www.rabbitmq.

  • 我正在编写一个涉及堆实现的代码,在我的bubbleUp方法中,在我的while循环行中,我似乎遇到了一个取消引用的错误。这可能是一个相当基本的问题,但解决这个问题的最佳方法是什么?我在实现removeHigh方法时也遇到了一些问题,该方法旨在从队列中移除最高的元素。

  • 问题内容: 总体而言,我正在尝试使用优先级队列来实现Dijkstra的算法。 根据golang-nuts的成员所述,Go中惯用的方法是将堆接口与自定义的基础数据结构一起使用。所以我像这样创建了Node.go和PQueue.go: 和PQueue.go: 和main.go :(动作在SolveMatrix中) 问题是,在编译时我收到错误消息: 注释掉PQ.Push(firstNode)行确实使编译器

  • 我正在使用优先级队列实现Dijkstra的算法,我想要一个函数从堆中删除一个元素,但我只能从Dijkstra的主节点索引中向它发送顶点索引,我找不到它在堆上的位置,我负担不起进行二进制搜索。有什么想法吗?

  • 考虑下面的优先级类声明<代码>类优先级队列 我的想法: 我能想到的一件事是,这将强制优先级队列使用对象比较器,并且不会提供实现其自定义比较器的能力,因为类的用户可能希望基于某个不同的比较器构建队列。

  • 问题 怎样实现一个按优先级排序的队列? 并且在这个队列上面每次 pop 操作总是返回优先级最高的那个元素 解决方案 下面的类利用 heapq 模块实现了一个简单的优先级队列: import heapq class PriorityQueue: def __init__(self): self._queue = [] self._index = 0