当前位置: 首页 > 知识库问答 >
问题:

Spring集成使异步网关等待聚合结果

华睿识
2023-03-14
    null
public void add(Message<?> messageToAdd) {
        if(messageToAdd.getPayload() instanceof FileMarker) {
            FileMarker marker = (FileMarker) messageToAdd.getPayload();
            switch (marker.getMark()) {
                case START:
                    replyChannel = messageToAdd.getHeaders().get(MessageHeaders.REPLY_CHANNEL);
                    break;
                case END:
                    expectedRowCount.set(marker.getLineCount());
                    break;
                default:
                    throw new IllegalStateException("Unexpected file mark");
            }
        } else {
            ProcessingResult processingResult = messageToAdd.getHeaders()
                    .get(ProcessingConstants.PROCESSING_RESULT_HEADER, ProcessingResult.class);
            assert processingResult != null;
            rowCount.incrementAndGet();
            switch (processingResult) {
                case FILTERED:
                    filteredCount.incrementAndGet();
                    break;
                case PROCESSED:
                    processedCount.incrementAndGet();
                    break;
                case PROCESSING_ERROR:
                    processingErrorCount.incrementAndGet();
                    break;
                case KAFKA_ERROR:
                    kafkaErrorCount.incrementAndGet();
                    break;
                default:
                    throw new IllegalStateException("Unrecognized processing result: " + processingResult);
            }
        }
    }

重新创建问题的测试可以在https://github.com/hawk1234/spring-integration-example commit 9f121f0729d8076872e6fbdcd7b1b91ca9ea8cb4中找到。当您运行测试时,应用程序日志可以在路径build/logs/spring-integration-example.log下获得。当前测试挂起,因为网关从未收到响应。此外,整个流程都在进行中,所以只有在超时后组才会释放。

共有1个答案

羊舌琛
2023-03-14

我已经创建了自定义消息组

那就有点不对劲了。

在group,standard group中已经有了所有消息(包括FileMarker)时,必须执行所有必需的逻辑。有一个MessageGroupProcessor,当ReleaseStrategy返回true时调用它。因此,这里有组的所有消息,您可以执行所有所需的统计信息收集,以产生所需的答复。

 类似资料:
  • 我使用Spring Batch admin项目,在该项目中,我得到了一个异步处理特定文件夹中文件的作业。目前,我通过batch admin ui通过传递相关作业参数来运行它。 现在,我正试图通过使用文件入站通道适配器自动化此过程。我已经配置了服务激活器,它将在收到文件时调用批处理作业。我现在有一个新的要求,即在第一个文件上载作业完成后调用另一个批处理作业。为此,我创建了另一个服务激活器,它使用第一

  • 我通读了Dart/flatter中的Async/Await/then,试图理解为什么aysnc函数中的Await不会等到完成后再继续。在我的UI中,有一个按钮调用一个异步方法来返回一个位置,该位置总是返回null,并且不等待函数完成。 该函数将调用推送到一个新的UI页面,该页面选择一个位置,并应返回一个结果。如何使该函数等待结果?我不是在使用异步吗?

  • 我试图在react/electron项目中使用async/await,但它不起作用。我想要的是获取docker容器状态列表。但是安慰。日志(列表)返回未定义的。 有人能帮我吗?:)

  • MDN文档 异步/等待函数的目的是简化同步使用promise的行为,并对一组promise执行某些行为。正如promise类似于结构化回调一样,async/await类似于组合生成器和promise。 我了解异步/等待、生成器和promise的基本概念。然而,我不完全理解说async/await类似于将生成器和promise结合起来意味着什么。 所以async/wait简化了生成器和promise

  • 我想使用聚合器从两条消息中创建一条消息,但我不知道如何做到这一点。 目前,我正在从一个目录中读取两个文件,并希望将这些消息聚合为一个。 我的整个项目是这样的: 读入。拉链- 如果我可以在解压缩文件后发送一条包含两个有效负载的消息,那就太好了,但在读取后聚合就足够了。 我的拉链看起来像这样: 它将这些文件放入两个目录中,我使用FileReadingMessageSource再次从中读取它们。我还想只

  • 我正在尝试将数据库调用移出控制器,以清理并使其可测试。当它们在控制器中时,一切都会顺利进行。我将它们移出控制器,并添加了一个异步,以确保我们等待。否则,我将调用的中的函数。现在,一旦我使用async/await,控制器中的函数就会认为没有用户,因为它没有等待。 有几个关于异步等待的SO问题,但我没有找到一个解决我的问题。我确实验证了返回了我的用户,并添加了控制台日志来显示路径。 节点猫鼬异步等待似