当前位置: 首页 > 知识库问答 >
问题:

阿帕奇骆驼聚合完成不起作用

司徒骞尧
2023-03-14

我已经配置了一个路由来从交易所中提取一些数据并聚合它们;这是简单的总结:

@Component
@RequiredArgsConstructor
public class FingerprintHistoryRouteBuilder extends RouteBuilder {

    private final FingerprintHistoryService fingerprintHistoryService;

    @Override
    public void configure() throws Exception {
        from("seda:httpFingerprint")
                .aggregate( (AggregationStrategy) (oldExchange, newExchange) -> {
                    final FingerprintHistory newFingerprint = extract(newExchange);
                    if (oldExchange == null) {
                        List<FingerprintHistory> fingerprintHistories = new ArrayList<>();
                        fingerprintHistories.add(newFingerprint);
                        newExchange.getMessage().setBody(fingerprintHistories);
                        return newExchange;
                    }

                    final Message oldMessage = oldExchange.getMessage();
                    final List<FingerprintHistory> fingerprintHistories = (List<FingerprintHistory>) oldMessage.getBody(List.class);
                    fingerprintHistories.add(newFingerprint);

                    return oldExchange;
                })
                .constant(true)
                .completionSize(aggregateCount)
                .completionInterval(aggregateDuration.toMillis())
                .to("direct:processFingerprint")
                .end();

        from("direct:processFingerprint")
                .process(exchange -> {
                    List<FingerprintHistory> fingerprintHistories = exchange.getMessage().getBody(List.class);
                    fingerprintHistoryService.saveAll(fingerprintHistories);
                });
strong text
    }

}

问题是聚合完成永远不起作用,例如,这是我的测试示例

@SpringBootTest
class FingerprintHistoryRouteBuilderTest {

    @Autowired
    ProducerTemplate producerTemplate;

    @Autowired
    FingerprintHistoryRouteBuilder fingerprintHistoryRouteBuilder;

    @Autowired
    CamelContext camelContext;

    @MockBean
    FingerprintHistoryService historyService;

    @Test
    void api_whenAggregate() {
        UserSearchActivity activity = ActivityFactory.buildSampleSearchActivity("127.0.0.1", "salam", "finger");
        Exchange exchange = buildExchange();
        exchange.getMessage().setBody(activity);

ReflelctionTestUtils.setField;ReflectionTestUtils.setFiled;producerTemplate.send(FingerprintHistoryRouteBuilder.FINGERPRINT_HISTORY_ENDPOINT,交换);Mockito.verify(历史服务). saveAll(Mockito.any ()); }

    Exchange buildExchange() {
        DefaultExchange defaultExchange = new DefaultExchange(camelContext);
        defaultExchange.setMessage(new DefaultMessage(camelContext));
        return defaultExchange;
    }

}

结果如下:

需要但未调用:fingerprintHistoryService bean . save all();

共有1个答案

胥英奕
2023-03-14

我构建了这个简化的示例,测试通过了,所以看起来您对聚合的使用可能是正确的。

您是否考虑过您的 Mockito.verify() 呼叫是在交换完成路由之前发生的?您可以通过删除验证调用并将.log()语句添加到FINGERPRINT_PROCESS_AGGREGATION路由来测试这一点。如果在执行期间看到日志输出,则表示交换正在按预期进行路由。如果是这种情况,则您的 verify() 呼叫需要能够等待交换完成路由。我不怎么使用模拟,但看起来你可以这样做:

Mockito.verify(historyService, timeout(10000)).saveAll(Mockito.any());
 类似资料:
  • 我正在尝试从SFTP服务器位置下载文件,但日志看起来不错,最后没有任何东西从服务器下载到本地。没有错误也来了。请提前感谢您的意见。 可用的 SFTP 文件: 路由器: 日志: 砰.xml

  • 我试图使用Apache Camel Quartz2实现一个调度器,它每分钟执行一次路由,并按预期执行一些任务。我使用spring DSL实现与apache camel相关联的路由,如下所示: 根据日志,它不会记录为路由记录的消息,例如Direct:DomainsWithFTPUsers等等。请指导如何实现同样的目标。

  • 遵循官方文件(https://camel.apache.org/manual/component-dsl.html#_using_component_dsl)我创建了以下代码: 但是中的告诉我: 并且中的特性不建议导入相应的库。 有人能给我指出正确的方向吗? 我必须理解的概念才能做到这一点吗?

  • 我们需要的是直接的API来设置和使用集群消息队列。我们最初的计划是使用Camel在集群JMS或ActiveMQ队列上进行消费/生产。Kafka如何使这项任务变得更容易?在任何一种情况下,应用程序本身都将在WebLogic服务器上运行。 消息传递将是点对点类型,其中有多个相同服务的实例在运行,但根据负载平衡策略,只有一个实例应该处理消息并发出结果。消息队列也是群集的,因此服务实例或队列实例的失败都不

  • 考虑到apache Camel,我有一个问题:是否可以通过代码来创建全局拦截器,例如AOP?拦截器应该跳过endpoint还是模仿endpoint? 提前致谢

  • 下面是我试图实现的场景: 谢谢你的帮助。此外,这里是已经工作的FTP部分。