当前位置: 首页 > 知识库问答 >
问题:

与生产者/消费者线程重叠50%的滑动窗口

申高峯
2023-03-14

我在 Java 中有以下场景:

  • 1生产者线程将事件对象存储到队列中。阻塞它不是一个选项。它应该始终将每个元素存储在队列的末尾并退出(因此没有有界队列)。
  • 1个消费者线程等待队列中有WINDOW_SIZE数量的事件。然后它应该从队列中检索所有WINDOW_SIZE事件进行处理,但只删除其中的一半(即WINDOW_SIZE/2),重叠率为50%。

我的问题是,您将使用哪个(并发)集合来高效地实现这一点?事件在资源有限的设备(运行Android的手机)上以100Hz的频率出现。我想到了使用下面这些,但没有一个是合适的:

  1. ConcurrentLinkedQueue,每次修改队列时检查队列大小,并在WINDOW_SIZE事件可用时在使用者中使用 peek()/poll()。这似乎有点麻烦。
  2. 一个 ArrayBlockingQueue,再次检查队列大小,并使用 drainTo()。但是,该方法具有以下文档:“[...]此外,如果在操作正在进行时修改指定的集合,则未定义此操作的行为。[...]".对于并发集合来说,这似乎有点奇怪。

这是一些示例代码:

import java.util.Queue;

import com.google.common.collect.Queues;

public class AccelerometerProcessor implements Runnable {

    private static final int WINDOW_SIZE = 128;

    private final Queue<AccelerometerEvent> eventQueue = Queues.newConcurrentLinkedQueue();

    @Override
    public void run() {
        while (!Thread.interrupted()) {
            try {
                synchronized (eventQueue) {
                    while (eventQueue.size() < WINDOW_SIZE) {
                        eventQueue.wait();
                    }

                    // We have WINDOW_SIZE eventQueue, start processing
                }
            } catch (InterruptedException e) {
                // Do nothing
            }
        }
    }

    public void addAccelerometerEvent(AccelerometerEvent accelerometerEvent) {
        synchronized (eventQueue) {
            eventQueue.add(accelerometerEvent);
            eventQueue.notifyAll();
        }
    }
}

顺便说一下,我也在用谷歌番石榴,所以如果那里有我没听说过的不错的收藏,请推荐给我。

那么:有什么好主意可以有效、干净地解决这个问题吗?

共有1个答案

鄢子平
2023-03-14

如果您总是要整体使用 WINDOW_SIZE/2 个事件,为什么生产者线程(您说只有一个)不填充大小为 WINDOW_SIZE/2 的数组并在它填满后将其传递给队列?

 类似资料:
  • 我的应用程序有一个生产者和一个消费者。我的生产者不定期地生成消息。有时我的队列会是空的,有时我会有一些消息。我想让我的消费者监听队列,当有消息在其中时,接受它并处理这条消息。这个过程可能需要几个小时,如果我的消费者没有完成处理当前消息,我不希望他接受队列中的另一条消息。 我认为AKKA和AWS SQS可以满足我的需求。通过阅读文档和示例,akka-camel似乎可以简化我的工作。 我在github

  • 向Kafka推送100,000条消息 在使用者使用所有100,000条消息之前,使用Ctrl-C关闭zookeeper和kafka服务(这是通过在consumer方法中使用来模拟的)。 发现 在zookeeper和kafka服务被关闭后,消费者继续在控制台上写消息。 问题 我如何使消费者从上次消费的消息的索引+1继续。 向Kafka推送100,000条消息 在使用者使用所有100,000条消息之前

  • 我们有一个制作人 在开发过程中,我重新部署了producer应用程序,并做了一些更改。但在此之后,我的消费者没有收到任何消息。我尝试重新启动消费者,但没有成功。问题可能是什么和/或如何解决? 消费者配置: 生产者配置: 编辑2: 5分钟后,消费者应用程序死亡,但以下情况除外:

  • 我有两个线程的问题,似乎没有正确同步。我基本上有一个布尔值名为“已占用”。当没有线程启动时,它被设置为false。但是当一个线程启动时,线程集被占用是真的,我有一个类,它有线程(run),它们调用下面的函数。 这是一个模拟银行的示例,它接收一个金额(初始余额),然后随机执行取款和存款。我的教授提到了一些关于从取款线程到存款线程的信号?这是怎么回事?在提取线程中,它应该运行到余额为2低,并等待存款线

  • 问题内容: 我想创建某种线程应用程序。但是我不确定在两者之间实现队列的最佳方法是什么。 因此,我提出了两个想法(这两个想法可能都是完全错误的)。我想知道哪种更好,如果它们都烂了,那么实现队列的最佳方法是什么。我关心的主要是这些示例中队列的实现。我正在扩展一个内部类的Queue类,它是线程安全的。下面是两个示例,每个示例有4个类。 主班 消费阶层 生产者类别 队列类 要么 主班 消费阶层 生产者类别

  • 本教程演示了如何发送和接收来自Spring Kafka的消息。 首先创建一个能够发送消息给Kafka主题的Spring Kafka Producer。 接下来,我们创建一个Spring Kafka Consumer,它可以收听发送给Kafka主题的消息。使用适当的键/值序列化器和解串器来配置它们。 最后用一个简单的Spring Boot应用程序演示应用程序。 下载并安装Apache Kafka 要