当前位置: 首页 > 知识库问答 >
问题:

具有并行流的散点聚集(聚合器中的超时)

子车雅珺
2023-03-14

我一直在尝试在聚集中添加超时,以避免等待每个流都完成。但是当我添加超时时,它不起作用,因为聚合器等待每个流完成。

@Bean
public IntegrationFlow queueFlow(LogicService service) {    
        return f -> f.scatterGather(scatterer -> scatterer
                                .applySequence(true)
                                .recipientFlow(aFlow(service))
                                .recipientFlow(bFlow(service))
                        , aggregatorSpec -> aggregatorSpec.groupTimeout(2000L)) 

E、 在我的流中,其中一个有2秒的延迟,另一个有4秒的延迟

public IntegrationFlow bFlow(LogicService service) {
        return IntegrationFlows.from(MessageChannels.executor(Executors.newCachedThreadPool()))
                .handle(service::callFakeServiceTimeout2)
                .transform((MessageDomain.class), message -> {
                    message.setMessage(message.getMessage().toUpperCase());
                    return message;
                }).get();
    } 

我使用遗嘱执行人。newCachedThreadPool()以并行运行。我想释放包含的每条消息,直到超时完成

我一直在测试的另一种方法是使用默认的gatherer,并在scatterGather中设置GathereTimeout,但我不知道我是否缺少一些方法GathereTimeout

更新

评论中给出的所有方法都经过测试并正常工作,唯一的问题是每个操作都在消息组创建过程中进行评估。并且消息组是在第一条消息到达之前创建的。理想的方法是在散射器分发请求消息时有一个有效的选项。

我的临时解决方案是使用一个发布策略,应用一个分组条件提供程序,它读取我通过网关发送消息时创建的自定义标头。唯一担心的是,只有在到达新消息或我设置了组超时时,才会执行发布策略。

共有1个答案

商夜洛
2023-03-14

聚合器上的组超时不足以释放组。如果你没有在那个超时时间得到整个组,那么它将被丢弃。请参阅sendPartialResultOnExpiry选项:https://docs.spring.io/spring-integration/reference/html/message-routing.html#agg-并分组到

如果到期时发送部分结果为true,则(部分)消息组中的现有消息将作为正常聚合器回复消息发布到输出通道。否则,它将被丢弃。

如果您根本不希望gatherer回复,那么GathereTimeout是一个不错的选择。因此,这样您就不会永远阻止散聚线程:https://docs.spring.io/spring-integration/reference/html/message-routing.html#scatter-收集错误处理

 类似资料:
  • 我参考的是英特尔关于至强Phi指令集的手册,无法理解分散/聚集指令是如何工作的。 假设如果我有以下双向量: 是否可以按如下方式创建4个向量: 使用这些说明?有没有其他方法可以做到这一点?

  • 你能让我知道如何在新的api中使用与数据流运行器的聚合器吗。?

  • 我有一个关于R中空间聚合的问题。我的数据集有纬度/经度坐标,有些彼此接近,有些不接近。我想为彼此接近的纬度/经度坐标做一个点。 我不确定如何做到这一点。我是否将纬度/经度坐标作为组列出,并使求平均值以使一个点表示每个组?因为我对这类事情没什么经验。我希望你们中的任何人可能有一些有用的指导/可能的解决方案。 例如,如果我可以将下面的纬度/经度坐标设为一个点: 例如,如下所示,不影响时间和速度值:

  • 主要内容:1 分散/聚集的介绍,2 分散读取,3 聚集写入1 分散/聚集的介绍 Java NIO带有内置的分散/聚集功能。分散/聚集是在读取和写入Channel中使用的概念。 从Channel分散读取是将数据读取到多个缓冲区中的读取操作。因此,通道将数据从通道“分散”到多个缓冲区中。 对Channel的聚集写入是一种将来自多个缓冲区的数据写入单个通道的写入操作。因此,通道将来自多个缓冲区的数据“聚集”到一个Channel中。 在需要分别处理传输数据的各个

  • 我使用复合和术语聚合来获得基于给定字段的分组结果。我还使用基数聚合来获取聚合桶的总计数。 下面是我发送的请求查询,以获得相应的响应: 请求: 答复: 我使用Kibana检查查询,它对我来说很好。 但是,我不确定如何在我的NEST对象语法中使用这个基数聚合器。 这是我的代码: 我将非常感谢任何帮助。

  • 我想使用聚合器从两条消息中创建一条消息,但我不知道如何做到这一点。 目前,我正在从一个目录中读取两个文件,并希望将这些消息聚合为一个。 我的整个项目是这样的: 读入。拉链- 如果我可以在解压缩文件后发送一条包含两个有效负载的消息,那就太好了,但在读取后聚合就足够了。 我的拉链看起来像这样: 它将这些文件放入两个目录中,我使用FileReadingMessageSource再次从中读取它们。我还想只