我试图在Kafka流之上实现一个简单的CQRS/Event sourcing概念验证(如https://www.confluent.io/blog/event-sourcing-using-apache-kafka/所述)
我有4个基本部分:
events topic stream -> group to a Ktable by aggregate ID -> reduce aggregate events to current state -> materialize as a state store
命令处理器-命令流,左与聚合状态KTABLE连接。对于结果流中的每个条目,使用函数(命令,状态)=>events
生成结果事件,并将它们发布到events
主题
问题是--有没有办法确保我在州存储中有聚合的最新版本?
如果一个命令违反了业务规则(例如,如果实体被标记为已删除,则修改该实体的命令无效),我希望拒绝该命令。但是,如果发布了DeleteCommand
,之后紧跟一个ModifyCommand
,则delete命令将生成DeleteDevent
,但当处理ModifyCommand
时,从状态存储区加载的状态可能不会反映这一点,并且将发布冲突事件。
我不介意牺牲命令处理吞吐量,我更愿意获得一致性保证(因为所有东西都是由相同的键分组的,并且应该在相同的分区中结束)
希望这是清楚的:)有什么建议吗?
我不认为Kafka对于CQR和事件来源是好的,正如您所描述的那样,因为它缺乏一种(简单的)方法来确保防止并发写入。本文对此进行了详细的论述。
我的意思是,您期望一个命令生成零个或更多的事件,或者在一个异常情况下失败;这是经典的事件源CQRS。大多数人期望这种建筑。
但是,您可以使用不同的样式来获取事件源。您的命令处理程序可以为接收到的每个命令产生事件(即DeletewasAccepted
)。然后,事件处理程序最终可以以事件源方式(通过从其事件流重建聚合的状态)处理该事件,并发出其他事件(即itemdeleted
或itemdeletedtionwasrejected
)。因此,命令被触发并忘记,异步发送,客户端不会等待立即响应。但是,它会等待一个描述其命令执行结果的事件。
一个重要的方面是,事件处理程序必须以串行方式处理来自同一聚合的事件(精确地一次并按顺序处理)。这可以使用单个Kafaka消费者组来实现。你可以在这段视频中看到这个架构。
我使用以下代码创建kafka流: 我给每个流不同的组ID。当我运行应用程序时,只接收到部分kafka消息,并且执行程序在foreachRDD调用中挂起。如果我只创建一个流,一切正常。日志信息没有任何例外。 我不知道为什么应用程序卡在那里。这是否意味着没有足够的资源?
我是开发kafka-streams应用程序的新手。我的流处理器用于根据输入json消息中的用户键值对json消息进行排序。 我读到这里动态连接一个Kafka输入流到多个输出流,没有动态解决方案。 在我的用例中,我知道对输入流排序所需的用户键和输出主题。因此,我编写了针对每个用户的单独的处理器应用程序,其中每个处理器应用程序匹配不同的用户ID。 所有不同的流处理器应用程序都从kafka中的相同jso
我使用结构化流媒体(Spark 2.0.2)来消费Kafka消息。使用scalapb,protobuf中的消息。我得到以下错误。请帮助。。 线程“main”scala中的异常。ScalaRefltionException:不是一个术语org.apache.spark.sql.catalyst.符号$SymbolApi$9.apply术语(Seflection.scala:592)org.apach
> 聚合命令处理程序基本上是一个kafka使用者,它使用与某一主题相关的消息: 1.1当它接收到命令时,它会加载聚合的所有事件,并为每个事件重放聚合事件处理程序,以使聚合达到当前状态。 1.2根据命令和businiss逻辑,它将一个或多个事件应用到事件存储区。这涉及到将新事件插入到Cassandra中的事件存储表中。事件被标记为聚合的版本号--对于新的聚合,从版本0开始,这使得预测成为可能。此外,
我开始阅读Kafka Stream应用程序,在每个教程/示例中,通过比较KStream和GlobalkTable中的键来丰富数据。在我的情况下,我需要将KStream记录的值中的一个项与GlobalKTable中的一个键进行比较。如何实现这一点的任何想法或例子。
这个问题涵盖了如何使用FlinkSQL对乱序流进行排序,但我更愿意使用DataStream API。一种解决方案是使用ProcessFunction来执行此操作,该ProcessFunction使用PriorityQueue来缓冲事件,直到水印指示它们不再乱序,但这在RocksDB状态后端中表现不佳(问题是每次访问PriorityQueue都需要整个PriorityQueue的ser/de)。无论