当前位置: 首页 > 知识库问答 >
问题:

如何正确使用反应器发布器

赫连昕
2023-03-14

我不知道如何使用 React 正确实现发布者/订阅者方案。我有一个有效的解决方案,但对我来说,实现似乎不正确:

我的问题是,我需要手动实现发布者来注册订阅者并传递事件:

public void publishQuotes(String ticker) throws InterruptedException {

// [..] Here I generate some "lines" to be publisher

for (Subscriber<? super String> subscriber : subscribers) {
    lineList.forEach(line -> subscriber.onNext(line));
}

}

@Override
public void subscribe(Subscriber<? super String> subscriber) {
    subscribers.add(subscriber);

}

然后,我有一个WorkQueue处理器(应该是消费者):

WorkQueueProcessor<String> sink = WorkQueueProcessor.create();

// Here I subscribe to my publiser
publisher.subscribe(sink);

// Creates a Reactive Stream from the processor (having converted the lines to Quotations)
Flux<StockQuotation> mappedRS = sink.map(quotationConverter::convertHistoricalCSVToStockQuotation);

// Here I perform a number of stream transformations 

// Each call to consume will be executed in a separated Thread
filteredRS.consume(i -> System.out.println(Thread.currentThread() + " data=" + i));

它工作得很好,但很难看。在这个取自Spring Guides的示例中,他们使用EventBus将事件从发布者路由到消费者,但当我尝试将其与处理器链接时,我得到了以下编译器错误:

eventBus.on($("quotes"),sink);

The method on(Selector, Consumer<T>) in the type EventBus is not applicable for the arguments (Selector<String>, WorkQueueProcessor<String>)

我在这里迷路了,将出版商与处理器联系起来的最佳方式是什么?您建议使用EventBus吗?如果是的话,适当的调用是什么?

谢谢!

共有1个答案

柳和怡
2023-03-14

如果您使用EventBus,您将通过

 eventBus.notify("quotes", Event.wrap(line);

并通过订阅

eventBus.on($("quotes"), e -> System.out.println(Thread.currentThread() + " data=" + e);

其中“e”属于事件类型

 类似资料:
  • 在我们的代码中,回答服务间HTTP请求的典型句柄函数如下所示: 如果我没有理解错的话,这意味着每次调用时,都需要实例化整个管道(通常具有巨大的stacktraces)(并因此在以后收集)。 我的问题是:我能不能以某种方式“准备”一次整个流,以后再重用它? 进一步思考我可以做的是: 无论如何,我怀疑这不是的用意。

  • 我正在尝试为Vertx web客户端编写一个包装器,以便使用reactivestreams中的Publisher从服务器加载响应正文: 此解决方案是不正确的,因为它通过调用以阻塞方式读取所有正文字节。 是否可以分块读取来自Vertx网络客户端的响应,并将其转换为发布者(或Rx可流动的)?

  • 问题内容: 我的代码有最后一个问题,涉及反射包中的.Call函数。 所以我在打这样的电话: 我正在执行.Call的方法如下: 我不太了解的是如何操作“ in”变量,以便将需要的映射正确传递到函数中。我看到make()中的第二个参数是参数的长度吗?但是我不太了解如何格式化var以正确传递参数。我递归地遇到错误消息: 任何帮助将非常感激! 问题答案: 来自: 呼叫调用函数与输入参数。例如,如果,代表去

  • 问题内容: 我一直在这里和总体上搜索所有子解析器示例的分配,但似乎无法弄清楚这看似简单的事情。 我有两种var类型,其中一种具有约束,所以认为使用subparser是必经之路。例如-t允许“ A”或“ B”。如果用户通过“ A”,则进一步要求他们还指定它是“ a1”还是“ a2”。如果他们只是通过“ B”,那么什么也没有。 我可以这样做,让argparse返回给我什么类型的“ A”,或者只是“ B

  • 我需要一个自定义反序列化器来在复杂的POJO中转换字符串。反序列化工作直到使用反序列化器:特别是使用自定义反序列化器时,我的对象的非对象属性不会序列化。 我有一个restful Web服务,它有一个pojo作为参数。 所以我的类PreentivoWs需要一个方法。这里是类定义: 在jsonObject中,我有一个枚举定义为 但此对象需要转换反序列化程序: 并在财产上标注: fromString方法

  • 问题内容: 我有一个似乎无法满足的简单要求:我有一个产品页面。产品具有供应商,供应商输入是带有自动完成功能的文本字段。如果用户输入数据库中不存在的供应商,则需要添加它。要添加它,我在.load()页面上有一个DIV并调用了我的/ Vendor / Create控制器方法。该方法的视图使用: 这应该通过ajax发布我的表单,完成后调用Javascript。我遇到的问题是,提交后,我的整个页面都会刷新