当前位置: 首页 > 知识库问答 >
问题:

流API和队列:订阅BlockingQueue流样式

益源
2023-03-14

假设我们有一个队列

BlockingQueue<String> queue= new LinkedBlockingQueue<>();

其他一些线程在里面放了值,然后我们读起来像

while (true) {
    String next = queue.take();
    System.out.println("next message:" + next);
}

如何以流样式迭代此队列,同时保持与上述代码相似的语义。

此代码仅遍历当前队列状态:

queue.stream().forEach(e -> System.out.println(e));

共有3个答案

连文栋
2023-03-14

您可以查看异步队列实现。如果您有Java8,那么cyclops react,我是这个项目的开发人员,提供了一个异步。允许您异步(干净地)从队列中填充和使用的队列。

例如

Queue<String> queue = QueueFactories.<String>unboundedQueue().build();

或者简单地(只要这是一个com.aol.simple.react.async.Queue)

Queue<String> queue = new Queue<>();

然后在一个单独的线程中:

new Thread(() -> {
        while (true) {
            queue.add("New message " + System.currentTimeMillis());
        }
    }).start();

回到主线程上,您的原始代码现在应该可以按预期工作了(对添加到队列中的消息进行Infinetly迭代并打印出来)

queue.stream().forEach(e -> System.out.println(e));

队列和流可以在任何阶段通过-

queue.close();
夏侯枫
2023-03-14

另一种方法是构建自定义拆分器。在我的例子中,我有一个阻塞队列,我想构建一个流,继续提取元素,直到阻塞超时。拆分器类似于:

public class QueueSpliterator<T> implements Spliterator<T> {
    private final BlockingQueue<T> queue;
    private final long timeoutMs;

    public QueueSpliterator(final BlockingQueue<T> queue, final long timeoutMs) {
        this.queue = queue;
        this.timeoutMs = timeoutMs;
    }

    @Override
    public int characteristics() {
        return Spliterator.CONCURRENT | Spliterator.NONNULL | Spliterator.ORDERED;
    }

    @Override
    public long estimateSize() {
        return Long.MAX_VALUE;
    }

    @Override
    public boolean tryAdvance(final Consumer<? super T> action) {
        try {
            final T next = this.queue.poll(this.timeoutMs, TimeUnit.MILLISECONDS);
            if (next == null) {
                return false;
            }
            action.accept(next);
            return true;
        } catch (final InterruptedException e) {
            throw new SupplierErrorException("interrupted", e);
        }
    }

    @Override
    public Spliterator<T> trySplit() {
        return null;
    }

}

引发的异常处理中断异常是RuntimeExcture的扩展。使用这个类,可以通过: StreamSupport.stream(new QueueSpliterator(...))构建流,并添加常用的流操作。

季华茂
2023-03-14

我有点猜测你在期待什么,但我想我有很好的预感。

队列的流,就像在队列上迭代一样,表示队列的当前内容。当迭代器或流到达队列尾部时,它不会阻塞等待添加更多元素。迭代器或流在该点耗尽,计算终止。

如果您想要一个由队列的所有当前和未来元素组成的流,您可以这样做:

Stream.generate(() -> {
        try {
            return queue.take();
        } catch (InterruptedException ie) {
            return "Interrupted!";
        }
    })
    .filter(s -> s.endsWith("x"))
    .forEach(System.out::println);   

(不幸的是,需要处理InterruptedException使得这一点非常混乱。)

请注意,没有办法关闭队列,也没有办法关闭流。generate停止生成元素,因此这实际上是一个无限流。终止它的唯一方法是使用短路流操作,如findFirst

 类似资料:
  • 我正在尝试创建一个使用EWS服务侦听Exchange room邮箱中的事件的项目。 我们可以通过将邮箱id传递给方法“getUserSettings”来获取组信息。 感谢任何帮助

  • 我的发现 根据我的研究,每个队列用于存储发送到每个客户端订阅的主题的消息。例如,当同一个主题(例如)被3个不同的客户端(例如、、&)订阅时,将有3个订阅队列(即、&)。因此,当消息发送到主题时,Artemis将把这些消息放在订阅队列中、、。 实际问题 我们的broker.xml

  • 我正在尝试设置到系统中所有节点的广播消息。当一个新节点加入系统时,它会向其他所有节点发布一条消息来宣布它的加入。我设计的方式是,存在一个交换,所有节点都将绑定到它自己的队列。每当一个新节点加入系统时,它就会将其队列绑定到exchange并将消息发布到exchange。所有节点都会收到这个消息(包括自身),所有其他节点(除了这个消息)都会发送一个“ACK”消息,这样新节点就会了解系统中可用的节点。但

  • 我正在使用Woocomemrce REST API与我的站点连接。当涉及到订单时,一切工作都很好,但对于订阅却不起作用。我尝试了以下代码来获取订阅,但它给出了“错误:找不到匹配URL和请求方法[rest_no_route]的路由” 谁能帮我解决这个问题。谢谢.

  • 我正在构建一个Akka应用程序,并希望向外部消费者公开一些参与者的FSM状态转换。(最终目标是能够将状态转换消息推送到websocket,以便可以实时查看。)

  • 是否有一种方法可以使用AWS CLI列出特定SQS队列订阅的所有AWS SNS主题? 我已经知道如何看到一个SNS主题上的订阅者列表,但这只有在我知道SNS主题的名称时才起作用。在这种情况下,我只知道SQS队列的名称。 我在网上搜索什么也找不到。