当前位置: 首页 > 知识库问答 >
问题:

拆分/聚合从不释放组(Spring集成JavaDSL)

阚夕
2023-03-14

我正在尝试做一个GroupBy基于共享ID的GeoJSON功能列表,以便通过使用拆分/聚合来聚合这些功能的单个字段,如下所示:

@Bean
IntegrationFlow myFlow() {
    return IntegrationFlows.from(MY_DIRECT_CHANNEL)
            .handle(Http.outboundGateway(myRestUrl)
                    .httpMethod(HttpMethod.GET)
                    .expectedResponseType(FeatureCollection.class)
                    .mappedResponseHeaders(""))
            .split(FeatureCollection.class, FeatureCollection::getFeatures)
            .aggregate(aggregator -> aggregator
                    .outputProcessor(agg -> {
                        final List<String> collected = agg
                                .getMessages()
                                .stream()
                                .map(m -> ((Number)((Feature) m.getPayload()).getProperties().get("my_field")).intValue() + "")
                                .collect(Collectors.toList());
                        return MyPojo.builder()
                                .myId(((Number) agg.getGroupId()).longValue())
                                .myListString(String.join(",", collected))
                                .build();
                    })
                    .correlationStrategy(m -> ((Feature) m.getPayload()).getProperties().get("shared_id"))
                    // .sendPartialResultOnExpiry(true)
                    // .groupTimeout(10000) // there's got to be a better way ...
                    // .expireGroupsUponTimeout(false)
            )
            .handle(Jpa.updatingGateway(myEntityManagerFactory).namedQuery(MyPojo.QUERY_UPDATE),
                    spec -> spec.transactional(myTransactionManager))
            .nullChannel();
}

除非我取消对这三行的注释,否则聚合器永远不会发布组,数据库也不会收到任何更新。如果我将groupTimeout设置为小于5秒,则会丢失部分结果。

我预计发布策略默认为SimpleSequenceSizeReleaseStrategy,我预计在处理完所有(拆分)功能后会自动释放所有组(REST服务消息中总共只有129个功能)。手动将其设置为发布策略没有帮助。

一旦处理完所有129条消息,释放组的正确方法是什么?

共有1个答案

荀子轩
2023-03-14

我让它使用变压器而不是拆分/聚合工作:

@Bean
IntegrationFlow myFlow(MyTransformer myTransformer) {
    return IntegrationFlows.from(MY_DIRECT_CHANNEL)
            .handle(Http.outboundGateway(myRestUrl)
                    .httpMethod(HttpMethod.GET)
                    .expectedResponseType(FeatureCollection.class)
                    .mappedResponseHeaders(""))
            .transform(myTransformer)
            .split()
            .handle(Jpa.updatingGateway(myEntityManagerFactory).namedQuery(MyEntity.QUERY_UPDATE),
                    spec -> spec.transactional(myTransactionManager))
            .nullChannel();
}

变压器的签名为:

@Component
public class MyTransformer implements GenericTransformer<FeatureCollection, List<MyEntity>> {

    @Override
    public List<MyEntity> transform(FeatureCollection featureCollection) {
        ...
    }
}
 类似资料:
  • 目前,我正在与spring integration合作开发新的应用程序,并启动了poc,以了解如何处理故障案例。在我的应用程序中,spring integration将接收来自IBM mq的消息,并根据消息类型验证头信息和到不同队列的路由。传入的消息可能是批量消息,所以我使用了spring integration的拆分器和聚合器,并且对技术工作流程有很好的进展和控制。目前我面临的问题很少,我们有I

  • 使用Spring Integr中的拆分器,我拆分了从数据库中的表中选择的数据行。每条消息完成处理后,我想像旧消息一样将每条消息聚合到一条消息中。我该怎么办?我不知道拆分器拆分了多少条消息。我只知道拆分消息头中的相关ID。即使我聚合消息,我也无法制定发布策略。 我如何解决这个问题? 以及是否有任何方法可以使用jdbc-out站网关或jdbc-out站通道适配器一次插入多行数据,而无需使用拆分器插入每

  • 我有一个用例,我的消息被拆分两次,我想聚合所有这些消息。如何才能最好地实现这一点,我应该通过引入不同的序列头来聚合消息两次,还是有办法通过重写消息分组的方法在单个聚合步骤中聚合消息?

  • 我有一个基于DSL的流,它使用拆分迭代对象列表并发送Kafka消息: 在所有消息发出后,我需要调用服务,还需要记录处理了多少消息。我知道一种方法是使用publishSubscribeChannel,其中第一个subscribe执行实际的Kafka发送,然后聚合执行服务调用: 我在弄清楚如何使用DSL在pubSubChannel中实际执行部分时遇到了问题。到目前为止,我已经尝试过: 有什么指示吗?

  • 我想使用聚合器从两条消息中创建一条消息,但我不知道如何做到这一点。 目前,我正在从一个目录中读取两个文件,并希望将这些消息聚合为一个。 我的整个项目是这样的: 读入。拉链- 如果我可以在解压缩文件后发送一条包含两个有效负载的消息,那就太好了,但在读取后聚合就足够了。 我的拉链看起来像这样: 它将这些文件放入两个目录中,我使用FileReadingMessageSource再次从中读取它们。我还想只

  • 我有以下XML负载,我正试图将其用于Spring集成和Spring集成AMQP: 我正在使用xpath拆分器拆分消息: 我工作正常,消息被分成3条新消息,例如使用此有效负载: 在此步骤之后,将使用此设置聚合消息: 作为最后一步,消息将使用此出站通道适配器发送到交换机: 不幸的是,出现了一些问题,因为我最终得到了这样的有效载荷。我需要它保持XML格式。