我试图使用Axon 4.1+中的EventProcessingModule
在一个2 JVM node K8集群上重播事件。虽然我将它设置为清理事件,但它只从一个节点中提取事件,而另一个节点继续运行,因为它的跟踪事件仍然是活动的。
我如何在所有JVM上同时禁用它,以便它能够正确地重播?然后启用所有这些命令,继续处理命令。
我尝试通过这段代码增加线程,这导致了另一个问题,即现有的令牌在InitialSemgmentsCount中永远不会增加,除非我从DB中完全删除该令牌。
public void config(final EventProcessingConfigurer configurer) {
configurer.registerTrackingEventProcessorConfiguration(c ->
TrackingEventProcessorConfiguration
.forParallelProcessing(2)
.andInitialSegmentsCount(2));
// .andTokenClaimInterval(10L, TimeUnit.SECONDS)); // Event
}
# Application.yml
axon:
distributed:
enabled: true
eventProcessingModule
.eventProcessorByProcessingGroup("query.skuProcessor.SkuProjection", TrackingEventProcessor.class)
.ifPresent(trackingEventProcessor -> {
trackingEventProcessor.shutDown();
trackingEventProcessor.resetTokens();
});
// Thread.sleep() to verify with
eventProcessingModule
.eventProcessorByProcessingGroup("query.skuProcessor.SkuProjection", TrackingEventProcessor.class)
.ifPresent(trackingEventProcessor -> {
trackingEventProcessor.start();
});
示例代码在一个shutdown/Reset/Start设置中有上述内容,但我将它们拆分,只是为了看看它是如何工作的(相信@resethandler是在Reset之后调用的,因此Start会等待,但不能完全确定)
使用@resethandler
void方法清理要重播的表的SkuProjection组件。
根据当前哪个JVM拥有令牌,另一个将返回以下错误:skureplayservice.startreplay:Failed on exception!无法声明标记“Query.SkuProcessor.SkuProjecting[0]”。其所有者为'1@sku-7694bbc6b6-8p958'
我想我可以在这里给你一些指导。
您正在研究如何跨几个JVM将操作委托给一组特定的跟踪事件处理器,对吗?现在,API Axon框架为您提供的TrackingEventProcessor
,是在TrackingEventProcessor
中有意维护的。
因此,执行start()
、shutdown()
、processingstatus()
或resettokens()
是对特定跟踪事件处理器实例执行的调用。
此外,您还会看到无法声明令牌
异常,因为框架要求执行重置的TrackingEventProcessor
是给定处理组的所有令牌的所有者。原因是它需要将标记调整为replayTokens
,以支持诸如@resethandler
这样的好函数。
如果我是正确的,你现在要求的是:
Axon框架是否提供了将特定的跟踪事件处理器操作委托给处理相同处理组的所有实例的方法?
这是我对带有RetryScheduler的commandGateway的配置:
任何关于如何做到这一点的文件都将不胜感激。 提前谢了。
exception$10(errorcode.java:88)在org.axonframework.axonserver.connector.errorcode.convert(errorcode.java:182)在org.axonframework.axonserver.connector.command.command.command.axonservercommandbus$1.onnex
这是今天回答另一个问题的“副作用”。这更多的是好奇心而不是实际问题。 JavaSE7提供了Oracle所称的“fork/join框架”。这可能是将工作安排到多个处理器的一种更好的方法。虽然我理解它应该如何工作,但我无法理解它的优势所在,以及关于偷工作的说法。 也许其他人更了解为什么这种方法是可取的(而不是因为它有一个奇特的名字)。 分支/连接的底层原语是s,即s,其想法是要么立即执行工作[原文如此
您可以在下面看到我的示例类。 基本上,我希望使用Axon的表来存储事件,并使用我自己的实体表来存储实体。我知道,如果我激发在聚合中处理的,将发布一个事件,之后它将转到,Axon将在其表中持久化该事件。 如何回滚表,还是应该为此使用补偿事件? 我的外部@EventHandler类: