当前位置: 首页 > 知识库问答 >
问题:

Axon框架:只处理由同一个JVM实例发布的事件?

孙海
2023-03-14
    null

我想要达到的目标

我希望由一个实例发布的事件只由同一个实例处理

如果instance1发布eventX,那么只有instance1应该处理eventX

    null
class StackEventInterceptor(private val stackProperties: StackProperties) : MessageHandlerInterceptor<EventMessage<*>> {

    override fun handle(unitOfWork: UnitOfWork<out EventMessage<*>>?, interceptorChain: InterceptorChain?): Any? {
        val stackId = (unitOfWork?.message?.payload as SomeEvent).stackId
        if(stackId == stackProperties.id){
            interceptorChain?.proceed()
        }
        return null
    }
}
@Configuration
class AxonConfiguration {

    @Autowired
    fun configure(eventProcessingConfigurer: EventProcessingConfigurer, stackProperties: StackProperties) {
        val processingGroup = "processing-group-stack-${stackProperties.id}"
        eventProcessingConfigurer.byDefaultAssignTo(processingGroup)
        eventProcessingConfigurer.registerHandlerInterceptor(processingGroup) { StackEventInterceptor(stackProperties) }
    }
}

共有1个答案

丁雅懿
2023-03-14

你在这里有一个有趣的场景@Thowimmer。我的第一个预感是“改用subscribingEventProcessor”。但是,您指出这不是您设置中的选项。我认为,对于处于相同情况下的其他人来说,知道为什么这不是一个选择是非常有价值的。所以,也许你可以详细说明一下(老实说,我对此也很好奇)。

现在,对于您的问题案例,以确保事件仅在同一JVM中处理。将源添加到事件中肯定是您可以采取的步骤,因为这允许以逻辑方式进行筛选。“此事件是否源于my.origin()?”如果不是,您只需忽略该事件并完成它,就这么简单。不过,还有另一种方法可以实现这一点,我稍后会谈到这一点。

不过,我想过滤的地方是你最想找的。但首先,我想说明为什么首先需要筛选。正如您所注意到的,TrackingEventProcessor(TEP)从所谓的StreamableMessageSource流式传输事件。EventStore是这样一个StreamableMessageSource的实现。当您在同一个存储库中存储所有事件时,它只会将所有事件流到您的TEPS中。由于您的事件是单个事件流的一部分,您需要在某个阶段筛选它们。使用MessageHandlerInterceptor可以工作,您甚至可以编写HandlerEnhacnerDefinition,允许您向事件处理函数添加其他行为。不管你怎么说,在当前的设置下,过滤需要在某个地方完成。MessageHandlerInterceptor可以说是最简单的实现方法。

然而,对此有一种不同的处理方式。为什么不将事件存储区分隔为两个不同的应用程序实例呢?显然,它们不需要相互读取,那么为什么要共享同一个事件存储呢?如果不了解您的领域的进一步背景,我猜您基本上是在处理驻留在不同的有界上下文中的应用程序。简而言之,没有兴趣与两个应用程序/上下文共享所有内容,您只是非常有意识地相互共享您的领域语言的特定部分。

请注意,在中间使用单个通信集线器支持多个上下文正是Axon Server可以为您实现的功能。我在这里不是说你不能自己配置这个,但我在过去做过。但是,将这项工作留给其他人或其他事情,将您从配置基础设施的需要中解放出来,这将是一个巨大的时间节省。

希望这能帮助你设定一下我对这件事的想法的背景@Thowimmer。

 类似资料:
  • exception$10(errorcode.java:88)在org.axonframework.axonserver.connector.errorcode.convert(errorcode.java:182)在org.axonframework.axonserver.connector.command.command.command.axonservercommandbus$1.onnex

  • 因此,目前我正在用axon框架实现一个saga,使用事件源和CQRS。 情况如下: 我有3个微服务,m1、m2和m3 用户在GUI中输入由m1、m2、m3分别处理和保存的3个实体e1、e2、e3的数据,因此M1->e1、M2->e2、M3->e3 现在来看看这个传奇的必要性: e1没有e2就不能存在,e2没有E3就不能存在。 因此,所有3个实体必须由它们各自的服务成功创建,如果其中一个失败,则sa

  • 我有两个独立的应用程序运行在同一服务器上。 null 我已经实现了从用户管理到钱包管理的事件来源。它运行得很好。 但是,当我将从钱包管理应用程序中的事件处理程序发布新事件时,我将收到以下错误消息日志。 以下是我的日志详细信息

  • 目前,我不会使用自定义CommandGateway,即使是不重试的命令 我使用了不同的CommandGateway bean方法

  • 每个人 都。我一直在寻找这个问题,但我在这里没有找到它,所以我想它真的很简单。 我正在用JavaFX创建一个非常简单的应用程序,只需一个按钮。现在我想处理它的事件(如按下或释放),但当我在互联网上看到示例时,它们都使用匿名类(每个事件都有一个不同的类),这在我看来让代码变得肮脏。这就是为什么我想将事件处理程序放在一个单独的类中,并将它们添加到按钮中。 问题是我不知道我是否必须为每个事件创建一个不同