当前位置: 首页 > 知识库问答 >
问题:

Reactor中“groupBy”组的并行调度

谷梁卓
2023-03-14

我在学习Reactor,我想知道如何实现某种行为。假设我有一个传入消息流。每条消息都与某个实体关联,并包含一些数据。

interface Message {
    String getEntityId();
    Data getData();
}

与不同实体相关的消息可以并行处理。但是,与任何单个实体相关的消息必须一次处理一条,即在实体“abc”的消息1处理完成之前,无法开始处理实体“abc”的消息2。在处理消息的过程中,应该缓冲该实体的进一步消息。其他实体的消息可以不受阻碍地进行。可以将其视为每个实体的线程上都有这样的代码:

public void run() {
    for (;;) {
        // Blocks until there's a message available
        Message msg = messageQueue.nextMessageFor(this.entityId);

        // Blocks until processing is finished
        processMessage(msg);
    }
}

如何在没有阻塞的情况下使用React实现这一点?总消息速率可能很高,但每个实体的消息速率将非常低。实体集可能非常大,不一定提前知道。

我猜它可能看起来像这样,但我不知道。

{
    incomingMessages()
            .groupBy(Message::getEntityId)
            .flatMap(entityStream -> entityStream
                    /* ... */
                    .map(msg -> /* process the message */)))
                    /* ... */
}

public static Stream<Message> incomingMessages() { /* ... */ }

共有2个答案

沈伟
2023-03-14

我们在项目中遇到了同样的问题。具有相同id的实体必须按顺序处理,但具有不同id的实体可以并行处理。

解决方案似乎非常简单。我们开始使用concatMap而不是flatMap。来自concatMap的文档:

 * Transform the elements emitted by this {@link Flux} asynchronously into Publishers,
 * then flatten these inner publishers into a single {@link Flux}, sequentially and
 * preserving order using concatenation.

代码示例:

public void receive(Flux<Data> data) {
    data
        .groupBy(Data::getPointID)
        .flatMap(service::process)
        .onErrorContinue(Logging::logError)
        .subscribe();

}

工艺方法:

Flux<SomeEntity> process(Flux<Data> dataFlux) {
    return dataFlux
        .doOnNext(Logging::logReceived)
        .concatMap(this::proceedDefinitionsSearch)
        .doOnNext(Logging::logDefSearch)
        .flatMap(this::processData)
        .doOnNext(Logging::logDataProcessed)
        .concatMap(repository::save)
        .doOnNext(Logging::logSavedEntity);
}
广亮
2023-03-14

使用ProjectReactor,您可以通过以下方式解决此问题:

@Test
public void testMessages() {
    Flux.fromStream(incomingMessages())
            .groupBy(Message::getEntityId)
            .map(g -> g.publishOn(Schedulers.newParallel("groupByPool", 16))) //create new publisher for groups of messages
            .subscribe( //create consumer for main stream
                    stream ->
                            stream.subscribe(this::processMessage) // create consumer for group stream
            );
}

public Stream<Message> incomingMessages() {
    return IntStream.range(0, 100).mapToObj(i -> new Message(i, i % 10));
}

public void processMessage(Message message) {
    System.out.println(String.format("Message: %s processed by the thread: %s", message, Thread.currentThread().getName()));
}

private static class Message {
    private final int id;
    private final int entityId;

    public Message(int id, int entityId) {
        this.id = id;
        this.entityId = entityId;
    }

    public int getId() {
        return id;
    }

    public int getEntityId() {
        return entityId;
    }

    @Override
    public String toString() {
        return "Message{" +
                "id=" + id +
                ", entityId=" + entityId +
                '}';
    }
}

我认为类似的解决方案可以在RxJava中实现

 类似资料:
  • 我试图实现每个组的并行性,其中分组元素并行运行,组内每个元素按顺序工作。然而,对于下面的代码,第一个emit使用并行线程,但对于后续emit,它使用一些不同的线程池。如何实现组的并行性和组内元素的顺序执行。 日志

  • 项目Reactor3.1.5。发布 考虑这一点: 我希望订阅服务器在多个线程中运行,但它只在一个线程中运行: 留档告诉我的期望是正确的(http://projectreactor.io/docs/core/release/reference/#threading)。有人能给我解释一下那里发生了什么吗?

  • 我想知道如何对REST或Web服务进行几个并行调用,然后加入响应并将其发送到调用@RestController的响应中。 类似于下面的代码构建与比较未来,但Reactor(通量,单声道)。

  • 我是项目Reactor或反应式编程的新手,所以我可能做错了什么。我正在努力构建一个执行以下操作的流程: 给定类实体: 从DB读取实体(

  • 问题内容: 我有一个像这样的熊猫数据框: 我想按第一列分组并获得第二列作为行中的列表: 可以使用来做类似的事情吗? 问题答案: 你可以使用以下方法对感兴趣的列进行分组,然后对每个分组进行分组:

  • 我有一个熊猫数据框,如下所示 我想找到每个日期介绍的,如果item_id介绍超过1个日期,那么我想找到每个日期的比例'(qty_bought/qty_purchased)。 我想要的输出如下 标志的条件是当比率大于以前的日期时,应将其设置为1或0 如果我在5个不同的日期引入了项目,那么这将动态生成5个日期和比率列。比率将具体到该日期。我只想列出那些引入了多次的。 这是我对python的尝试 这给了