我在学习Reactor,我想知道如何实现某种行为。假设我有一个传入消息流。每条消息都与某个实体关联,并包含一些数据。
interface Message {
String getEntityId();
Data getData();
}
与不同实体相关的消息可以并行处理。但是,与任何单个实体相关的消息必须一次处理一条,即在实体“abc”的消息1处理完成之前,无法开始处理实体“abc”的消息2。在处理消息的过程中,应该缓冲该实体的进一步消息。其他实体的消息可以不受阻碍地进行。可以将其视为每个实体的线程上都有这样的代码:
public void run() {
for (;;) {
// Blocks until there's a message available
Message msg = messageQueue.nextMessageFor(this.entityId);
// Blocks until processing is finished
processMessage(msg);
}
}
如何在没有阻塞的情况下使用React实现这一点?总消息速率可能很高,但每个实体的消息速率将非常低。实体集可能非常大,不一定提前知道。
我猜它可能看起来像这样,但我不知道。
{
incomingMessages()
.groupBy(Message::getEntityId)
.flatMap(entityStream -> entityStream
/* ... */
.map(msg -> /* process the message */)))
/* ... */
}
public static Stream<Message> incomingMessages() { /* ... */ }
我们在项目中遇到了同样的问题。具有相同id的实体必须按顺序处理,但具有不同id的实体可以并行处理。
解决方案似乎非常简单。我们开始使用concatMap而不是flatMap。来自concatMap的文档:
* Transform the elements emitted by this {@link Flux} asynchronously into Publishers,
* then flatten these inner publishers into a single {@link Flux}, sequentially and
* preserving order using concatenation.
代码示例:
public void receive(Flux<Data> data) {
data
.groupBy(Data::getPointID)
.flatMap(service::process)
.onErrorContinue(Logging::logError)
.subscribe();
}
工艺方法:
Flux<SomeEntity> process(Flux<Data> dataFlux) {
return dataFlux
.doOnNext(Logging::logReceived)
.concatMap(this::proceedDefinitionsSearch)
.doOnNext(Logging::logDefSearch)
.flatMap(this::processData)
.doOnNext(Logging::logDataProcessed)
.concatMap(repository::save)
.doOnNext(Logging::logSavedEntity);
}
使用ProjectReactor,您可以通过以下方式解决此问题:
@Test
public void testMessages() {
Flux.fromStream(incomingMessages())
.groupBy(Message::getEntityId)
.map(g -> g.publishOn(Schedulers.newParallel("groupByPool", 16))) //create new publisher for groups of messages
.subscribe( //create consumer for main stream
stream ->
stream.subscribe(this::processMessage) // create consumer for group stream
);
}
public Stream<Message> incomingMessages() {
return IntStream.range(0, 100).mapToObj(i -> new Message(i, i % 10));
}
public void processMessage(Message message) {
System.out.println(String.format("Message: %s processed by the thread: %s", message, Thread.currentThread().getName()));
}
private static class Message {
private final int id;
private final int entityId;
public Message(int id, int entityId) {
this.id = id;
this.entityId = entityId;
}
public int getId() {
return id;
}
public int getEntityId() {
return entityId;
}
@Override
public String toString() {
return "Message{" +
"id=" + id +
", entityId=" + entityId +
'}';
}
}
我认为类似的解决方案可以在RxJava中实现
我试图实现每个组的并行性,其中分组元素并行运行,组内每个元素按顺序工作。然而,对于下面的代码,第一个emit使用并行线程,但对于后续emit,它使用一些不同的线程池。如何实现组的并行性和组内元素的顺序执行。 日志
项目Reactor3.1.5。发布 考虑这一点: 我希望订阅服务器在多个线程中运行,但它只在一个线程中运行: 留档告诉我的期望是正确的(http://projectreactor.io/docs/core/release/reference/#threading)。有人能给我解释一下那里发生了什么吗?
我想知道如何对REST或Web服务进行几个并行调用,然后加入响应并将其发送到调用@RestController的响应中。 类似于下面的代码构建与比较未来,但Reactor(通量,单声道)。
我是项目Reactor或反应式编程的新手,所以我可能做错了什么。我正在努力构建一个执行以下操作的流程: 给定类实体: 从DB读取实体(
问题内容: 我有一个像这样的熊猫数据框: 我想按第一列分组并获得第二列作为行中的列表: 可以使用来做类似的事情吗? 问题答案: 你可以使用以下方法对感兴趣的列进行分组,然后对每个分组进行分组:
我有一个熊猫数据框,如下所示 我想找到每个日期介绍的,如果item_id介绍超过1个日期,那么我想找到每个日期的比例'(qty_bought/qty_purchased)。 我想要的输出如下 标志的条件是当比率大于以前的日期时,应将其设置为1或0 如果我在5个不同的日期引入了项目,那么这将动态生成5个日期和比率列。比率将具体到该日期。我只想列出那些引入了多次的。 这是我对python的尝试 这给了