当前位置: 首页 > 知识库问答 >
问题:

aggregateidentifier的分布式使用

程正阳
2023-03-14

一般来说,这似乎是有效的,因为当我在第二个服务中发送命令时,它会被接收并更新状态。由于我可以使用EventSourcingHandler还可以使用在另一个服务中创建的事件来操作它的状态,所以我从第一个服务聚合应用的源中获取状态信息。

我担心快照机制会对我不利,但显然它足够聪明,只要我确保聚合的“类型”名称不相同,就可以单独存储快照。

到目前为止,一切都很好,唯一让我感觉到的是,第二个聚合没有(需要)初始构造函数CommandHandler,因为创建是在第一个聚合中完成的。

@Aggregate
@Getter
@NoArgsConstructor
public class Foo {

  @AggregateIdentifier
  private String fooIdentifier;

  @CommandHandler
  public Foo(CreateFooCommand command) {
    apply(FooCreatedEvent.builder()
      .fooIdentifier(command.getFooIdentifier())
      .build());
  }

  @EventSourcingHandler
  public void on(FooCreatedEvent event) {
    this.fooIdentifier = event.getFooIdentifier();
  }

}


@Aggregate
@Getter
@NoArgsConstructor
public class Bar {

  @AggregateIdentifier
  private String fooIdentifier;

  private String barProperty;

  @CommandHandler
  public void on(UpdateBarCommand command) {

    apply(BarUpdatedEvent.builder()
      .fooIdentifier(this.fooIdentifier)
      .barProperty(command.getBarProperty())
      .build());
  }

  @EventSourcingHandler
  public void on(FooCreatedEvent event) {
    this.fooIdentifier = event.getFooIdentifier();
  }

  @EventSourcingHandler
  public void on(BarUpdatedEvent event) {
    this.barProperty = event.getBarProperty();
  }

}

因此,我现在唯一的选择就是上面提到的,或者使用创建第二个聚合来设置不同的aggregateId,但也可以在内部添加第一个聚合的aggregateId,以允许将第一个聚合的aggregateId信息作为引用ID发布事件。为了实现这一功能,我必须保留一个投影来在两个标识符之间来回映射,这看起来也不太好。

提前谢谢,拉斯·卡尔森

共有1个答案

胡玉书
2023-03-14

你想出了一个很有趣的解决方案。我不能说我曾经在这样一个庄园中拆分聚合逻辑,即一个服务创建它,另一个服务加载相同的事件,以它自己的形式重新创建该状态。

那么,我是否违背了axon框架打算使用聚合的方式,或者这是一个可行的用例?

老实说,我不认为这会是预期的用法。与其说是因为Axon,不如说是因为您所使用的术语Bounded Context。在上下文之间,您应该非常有意识地共享,因为每个上下文的术语(普遍存在的语言)都是不同的。您的事件本质上是该语言的一部分,因此我通常不会建议与另一个服务共享聚合流的全部。

由于我不能为相同的聚合标识符但不同的聚合类型发布两次创建事件(构造函数中的CommandHandler,序列0),所以我不能完全分离这两个状态。

这行提示您希望在不同的聚合之间重用聚合标识符,这也在问题的标题中出现。正如您所注意到的,[聚合标识符,序列号]对需要是唯一的。因此,不能为不同类型的聚合重用聚合标识符。但是,要知道Axon将使用聚合标识符类的toString方法来填充聚合标识符字段。因此,如果将toString()方法调整为包含聚合类型,则可以保持唯一性要求,并仍然重用聚合标识符。

例如,包含UUIDVehicleID类的ToString方法通常会输出以下内容:

    null
  • vehhleid[684EC9F4-B9F8-11EA-B3DE-0242AC130004]

最后,我想分享以下三点:

  1. Axon框架无意重用聚合流来重新创建不同的聚合类型。
  2. 聚类聚合可能是解决此方案的一种方法
  3. [aggregateId,seqNo]唯一性要求可以重用aggregateId,只要toString方法将聚合类型追加/前置到结果。
 类似资料:
  • 首先,我为很长的帖子道歉。有相当多的代码,以显示有一个问题的详细理解,因此失去了张贴的东西...请你阅读所有的内容:-) 我不明白此设置中与此错误消息相关的一些事情: 为什么create命令和事件与confirm命令/event使用相同的方法(至少就我目前的理解而言)时会按预期工作? 为什么Axon抱怨一个appoinmentId作为(当然是聚合)的标识符,而相应的代码(见下文)为@aggrega

  • 一、分布式锁 数据库的唯一索引 Redis 的 SETNX 指令 Redis 的 RedLock 算法 Zookeeper 的有序节点 二、分布式事务 2PC 本地消息表 三、CAP 一致性 可用性 分区容忍性 权衡 四、BASE 基本可用 软状态 最终一致性 五、Paxos 执行过程 约束条件 六、Raft 单个 Candidate 的竞选 多个 Candidate 竞选 数据同步 参考 一、分

  • Spring Boot通过使用Atomikos或Bitronix嵌入式事务管理器支持跨多个XA资源的分布式JTA事务。 部署到合适的Java EE Application Server时,也支持JTA事务。 检测到JTA环境时,Spring的JtaTransactionManager用于管理事务。 自动配置的JMS,DataSource和JPA bean已升级为支持XA事务。 您可以使用标准的Sp

  • distributed RPC(分布式RPC) (DRPC) 的设计目的是充分利用Storm的计算能力实现高密度的并行实时计算。Storm topology(拓扑)接受若干个函数参数作为输入,然后输出这些函数调用的结果。 严格的来说,DRPC不能够算作Storm的一个特性,因为它是一种基于Storm 原语(Stream,Spout,Bolt,Topology)实现的设计模式。DRPC可以脱离Sto

  • 定义 在一个包含了若干Erlang节点的分布式系统中,可能需要以分布的方法来控制应用。如果某个节点——上面运行了某个应用——挂了,应用要在另一个节点上被重启。 这样一个应用被称之为一个分布式应用。注意是对于应用的控制是分布的,所有应用当然都可以是分布——比如,使用其它节点上的服务。 因为一个分布式应用可能会在节点之间移动,所以必须有某种寻址机制来确保它可以被其他应用找到,无论它当前运行于哪个节点上

  • Consumer Offset Tracking(消费者offset跟踪) 高级别的consumer跟踪每个分区已消费的offset,并定期提交,以便在重启的情况下可以从这些offset中恢复。Kafka提供了一个选项在指定的broker中来存储所有给定的consumer组的offset,称为offset manager。例如,该consumer组的所有consumer实例向offset mana