我已经配置了一个路由来从交易所中提取一些数据并聚合它们;这是简单的总结:
@Component
@RequiredArgsConstructor
public class FingerprintHistoryRouteBuilder extends RouteBuilder {
private final FingerprintHistoryService fingerprintHistoryService;
@Override
public void configure() throws Exception {
from("seda:httpFingerprint")
.aggregate( (AggregationStrategy) (oldExchange, newExchange) -> {
final FingerprintHistory newFingerprint = extract(newExchange);
if (oldExchange == null) {
List<FingerprintHistory> fingerprintHistories = new ArrayList<>();
fingerprintHistories.add(newFingerprint);
newExchange.getMessage().setBody(fingerprintHistories);
return newExchange;
}
final Message oldMessage = oldExchange.getMessage();
final List<FingerprintHistory> fingerprintHistories = (List<FingerprintHistory>) oldMessage.getBody(List.class);
fingerprintHistories.add(newFingerprint);
return oldExchange;
})
.constant(true)
.completionSize(aggregateCount)
.completionInterval(aggregateDuration.toMillis())
.to("direct:processFingerprint")
.end();
from("direct:processFingerprint")
.process(exchange -> {
List<FingerprintHistory> fingerprintHistories = exchange.getMessage().getBody(List.class);
fingerprintHistoryService.saveAll(fingerprintHistories);
});
strong text
}
}
问题是聚合完成永远不起作用,例如,这是我的测试示例:
@SpringBootTest
class FingerprintHistoryRouteBuilderTest {
@Autowired
ProducerTemplate producerTemplate;
@Autowired
FingerprintHistoryRouteBuilder fingerprintHistoryRouteBuilder;
@Autowired
CamelContext camelContext;
@MockBean
FingerprintHistoryService historyService;
@Test
void api_whenAggregate() {
UserSearchActivity activity = ActivityFactory.buildSampleSearchActivity("127.0.0.1", "salam", "finger");
Exchange exchange = buildExchange();
exchange.getMessage().setBody(activity);
ReflelctionTestUtils.setField;ReflectionTestUtils.setFiled;producerTemplate.send(FingerprintHistoryRouteBuilder.FINGERPRINT_HISTORY_ENDPOINT,交换);Mockito.verify(历史服务). saveAll(Mockito.any ()); }
Exchange buildExchange() {
DefaultExchange defaultExchange = new DefaultExchange(camelContext);
defaultExchange.setMessage(new DefaultMessage(camelContext));
return defaultExchange;
}
}
结果如下:
需要但未调用:fingerprintHistoryService bean . save all();
我构建了这个简化的示例,测试通过了,所以看起来您对聚合的使用可能是正确的。
您是否考虑过您的 Mockito.verify()
呼叫是在交换完成路由之前发生的?您可以通过删除验证调用并将.log()
语句添加到FINGERPRINT_PROCESS_AGGREGATION路由来测试这一点。如果在执行期间看到日志输出,则表示交换正在按预期进行路由。如果是这种情况,则您的 verify()
呼叫需要能够等待交换完成路由。我不怎么使用模拟,但看起来你可以这样做:
Mockito.verify(historyService, timeout(10000)).saveAll(Mockito.any());
我正在尝试从SFTP服务器位置下载文件,但日志看起来不错,最后没有任何东西从服务器下载到本地。没有错误也来了。请提前感谢您的意见。 可用的 SFTP 文件: 路由器: 日志: 砰.xml
我试图使用Apache Camel Quartz2实现一个调度器,它每分钟执行一次路由,并按预期执行一些任务。我使用spring DSL实现与apache camel相关联的路由,如下所示: 根据日志,它不会记录为路由记录的消息,例如Direct:DomainsWithFTPUsers等等。请指导如何实现同样的目标。
遵循官方文件(https://camel.apache.org/manual/component-dsl.html#_using_component_dsl)我创建了以下代码: 但是中的告诉我: 并且中的特性不建议导入相应的库。 有人能给我指出正确的方向吗? 我必须理解的概念才能做到这一点吗?
我们需要的是直接的API来设置和使用集群消息队列。我们最初的计划是使用Camel在集群JMS或ActiveMQ队列上进行消费/生产。Kafka如何使这项任务变得更容易?在任何一种情况下,应用程序本身都将在WebLogic服务器上运行。 消息传递将是点对点类型,其中有多个相同服务的实例在运行,但根据负载平衡策略,只有一个实例应该处理消息并发出结果。消息队列也是群集的,因此服务实例或队列实例的失败都不
考虑到apache Camel,我有一个问题:是否可以通过代码来创建全局拦截器,例如AOP?拦截器应该跳过endpoint还是模仿endpoint? 提前致谢
下面是我试图实现的场景: 谢谢你的帮助。此外,这里是已经工作的FTP部分。