当前位置: 首页 > 知识库问答 >
问题:

如何从阻塞队列中创建Reactor通量?

壤驷穆冉
2023-03-14

我试图实现从阻塞队列创建的Reactor通量,但不确定哪个操作符最适合我的用例?

我正在创建一个流式RESTendpoint,其中的响应是流量,需要不断从阻塞队列中发出消息,作为获取REST调用的响应。

我已经尝试过论坛和留档,只能找到从可迭代集合或响应数据源发起的Flux,但没有任何BlockingQueue的示例。

共有1个答案

陆英毅
2023-03-14

你可以试试Flux#generate and Queue#peek。请记住,如果队列为空,peek将返回null,并且不能在onNext中使用。

比如:

Flux.generate(sink -> {
    val element = queue.peek();
    if (element == null) {
        sink.complete();
    } else {
        sink.next(element);
    }
});

还有Flux#repeatWhen操作符,以防在队列被认为是空的情况下重新订阅队列,例如:

flux.repeatWhen(it -> it.delayElements(ofSeconds(1)))
 类似资料:
  • 问题内容: 我正在尝试使用Spring Reactor 3组件和Spring Integration从JMS队列创建反应式流(Flux)。 我正在尝试从JMS队列(使用Spring Integration的ActiveMQ)创建客户端的响应流(Spring Reactor 3 Flux),以使客户端异步获取JMS消息。我相信我已经正确连接了所有东西,但是在服务器停止之前,客户端不会收到任何JMS消

  • 问题内容: 基本上,我有一个URL,当发布新消息时,该URL在聊天室中流xml更新。我想将URL转换为InputStream并继续读取它,只要保持连接并且没有发送Thread.interrupt()。我遇到的问题是,当有内容要从流中读取时,BufferedReader.ready()似乎不正确。 我正在使用以下代码: 当我运行代码并将某些内容发布到聊天室时,buf.ready()永远不会变为tru

  • 问题内容: 我在一个非常简单的生产者-消费者场景中使用 java.util.concurrent.BlockingQueue 。例如,此伪代码描述了使用者部分: 到目前为止,一切都很好。在阻塞队列的javadoc中,我读到: BlockingQueue本质上不支持任何类型的“关闭”或“关闭”操作,以指示将不再添加任何项目。这些功能的需求和使用往往取决于实现。例如,一种常见的策略是让生产者插入特殊的

  • blpop key1...keyN timeout 从左到右扫描返回对第一个非空list进行lpop操作并返回,比如blpop list1 list2 list3 0 ,如果list不存在list2,list3都是非空则对list2做lpop并返回从list2中删除的元素。如果所有的list都是空或不存在,则会阻塞timeout秒,timeout为0表示一直阻塞。当阻塞时,如果有client对ke

  • 我编写了一个简单的类,我计划将其扩展为客户端套接字编程应用程序的一部分。类涉及一个BlockingQueue(我从这里复制了代码:相当于Java的BlockingQueue的C++)。当我创建了下面的包装类的一个实例后,我打算让它生成一个单独的线程,该线程只需执行BlockingQueue上阻塞的printer()函数,直到有一个或多个字符串可用,然后它只需将字符串打印到控制台窗口。在我的预期应用

  • 基本问题:Spring反应器WebClient如何实现非阻塞时,与RestTem板相比?在将请求发送到外部服务(例如)后,它是否必须在某个地方阻塞?HTTP本质上是同步的,对吗?所以调用应用程序必须等待响应?线程如何知道上下文,以便对来自服务的响应做出反应?