假设我们有一个队列
BlockingQueue<String> queue= new LinkedBlockingQueue<>();
其他一些线程在里面放了值,然后我们读起来像
while (true) {
String next = queue.take();
System.out.println("next message:" + next);
}
如何以流样式迭代此队列,同时保持与上述代码相似的语义。
此代码仅遍历当前队列状态:
queue.stream().forEach(e -> System.out.println(e));
您可以查看异步队列实现。如果您有Java8,那么cyclops react,我是这个项目的开发人员,提供了一个异步。允许您异步(干净地)从队列中填充和使用的队列。
例如
Queue<String> queue = QueueFactories.<String>unboundedQueue().build();
或者简单地(只要这是一个com.aol.simple.react.async.Queue)
Queue<String> queue = new Queue<>();
然后在一个单独的线程中:
new Thread(() -> {
while (true) {
queue.add("New message " + System.currentTimeMillis());
}
}).start();
回到主线程上,您的原始代码现在应该可以按预期工作了(对添加到队列中的消息进行Infinetly迭代并打印出来)
queue.stream().forEach(e -> System.out.println(e));
队列和流可以在任何阶段通过-
queue.close();
另一种方法是构建自定义拆分器。在我的例子中,我有一个阻塞队列,我想构建一个流,继续提取元素,直到阻塞超时。拆分器类似于:
public class QueueSpliterator<T> implements Spliterator<T> {
private final BlockingQueue<T> queue;
private final long timeoutMs;
public QueueSpliterator(final BlockingQueue<T> queue, final long timeoutMs) {
this.queue = queue;
this.timeoutMs = timeoutMs;
}
@Override
public int characteristics() {
return Spliterator.CONCURRENT | Spliterator.NONNULL | Spliterator.ORDERED;
}
@Override
public long estimateSize() {
return Long.MAX_VALUE;
}
@Override
public boolean tryAdvance(final Consumer<? super T> action) {
try {
final T next = this.queue.poll(this.timeoutMs, TimeUnit.MILLISECONDS);
if (next == null) {
return false;
}
action.accept(next);
return true;
} catch (final InterruptedException e) {
throw new SupplierErrorException("interrupted", e);
}
}
@Override
public Spliterator<T> trySplit() {
return null;
}
}
引发的异常处理中断异常是RuntimeExcture的扩展。使用这个类,可以通过: StreamSupport.stream(new QueueSpliterator(...))构建流,并添加常用的流操作。
我有点猜测你在期待什么,但我想我有很好的预感。
队列的流,就像在队列上迭代一样,表示队列的当前内容。当迭代器或流到达队列尾部时,它不会阻塞等待添加更多元素。迭代器或流在该点耗尽,计算终止。
如果您想要一个由队列的所有当前和未来元素组成的流,您可以这样做:
Stream.generate(() -> {
try {
return queue.take();
} catch (InterruptedException ie) {
return "Interrupted!";
}
})
.filter(s -> s.endsWith("x"))
.forEach(System.out::println);
(不幸的是,需要处理InterruptedException
使得这一点非常混乱。)
请注意,没有办法关闭队列,也没有办法关闭流。generate
停止生成元素,因此这实际上是一个无限流。终止它的唯一方法是使用短路流操作,如findFirst
。
我正在尝试创建一个使用EWS服务侦听Exchange room邮箱中的事件的项目。 我们可以通过将邮箱id传递给方法“getUserSettings”来获取组信息。 感谢任何帮助
我的发现 根据我的研究,每个队列用于存储发送到每个客户端订阅的主题的消息。例如,当同一个主题(例如)被3个不同的客户端(例如、、&)订阅时,将有3个订阅队列(即、&)。因此,当消息发送到主题时,Artemis将把这些消息放在订阅队列中、、。 实际问题 我们的broker.xml
我正在尝试设置到系统中所有节点的广播消息。当一个新节点加入系统时,它会向其他所有节点发布一条消息来宣布它的加入。我设计的方式是,存在一个交换,所有节点都将绑定到它自己的队列。每当一个新节点加入系统时,它就会将其队列绑定到exchange并将消息发布到exchange。所有节点都会收到这个消息(包括自身),所有其他节点(除了这个消息)都会发送一个“ACK”消息,这样新节点就会了解系统中可用的节点。但
我正在使用Woocomemrce REST API与我的站点连接。当涉及到订单时,一切工作都很好,但对于订阅却不起作用。我尝试了以下代码来获取订阅,但它给出了“错误:找不到匹配URL和请求方法[rest_no_route]的路由” 谁能帮我解决这个问题。谢谢.
我正在构建一个Akka应用程序,并希望向外部消费者公开一些参与者的FSM状态转换。(最终目标是能够将状态转换消息推送到websocket,以便可以实时查看。)
是否有一种方法可以使用AWS CLI列出特定SQS队列订阅的所有AWS SNS主题? 我已经知道如何看到一个SNS主题上的订阅者列表,但这只有在我知道SNS主题的名称时才起作用。在这种情况下,我只知道SQS队列的名称。 我在网上搜索什么也找不到。