当前位置: 首页 > 知识库问答 >
问题:

利用kafka和cassandra进行事件来源的类别预测

蓝飞
2023-03-14

>

  • 聚合命令处理程序基本上是一个kafka使用者,它使用与某一主题相关的消息:

    1.1当它接收到命令时,它会加载聚合的所有事件,并为每个事件重放聚合事件处理程序,以使聚合达到当前状态。

    1.2根据命令和businiss逻辑,它将一个或多个事件应用到事件存储区。这涉及到将新事件插入到Cassandra中的事件存储表中。事件被标记为聚合的版本号--对于新的聚合,从版本0开始,这使得预测成为可能。此外,它将事件发送到另一个主题(出于投影目的)。

    我只是在挠头如何完成这件事。我很想知道是否有其他人使用卡桑德拉和Kafka作为活动来源。我读过一些地方,有些人不鼓励它。也许这就是原因。

    我知道EventStore内置了对此的支持。也许使用Kafka作为事件存储会是一个更好的解决方案

  • 共有1个答案

    濮君植
    2023-03-14

    对于这种体系结构,您必须在以下两个方面做出选择:

    • 每个类型的全局事件流-简单
    • 每个类型的分区事件流-可伸缩

    除非您的系统具有相当高的吞吐量(例如,对于所讨论的流类型,每秒至少有10s或100s的事件),否则全局流是更简单的方法。有些系统(例如事件存储)通过具有非常细粒度的流(例如每个聚合实例),但能够以一种性能良好且可预测的方式将它们组合成更大的流(每个流类型/类别/分区、每个多个流类型,等等),从而为您提供了两个世界的最佳选择,同时仍然简单,只需要跟踪单个全局事件位置。

    • 当处理需要进入相同模型的不同分区的事件时,投影代码将需要处理访问相同读取模型的并发使用者组。根据投影的目标存储,有很多方法来处理这个问题(事务、乐观并发、原子操作等),但是对于一些目标存储来说这是个问题
    • 投影代码需要跟踪每个分区的流位置,而不仅仅是单个位置。如果投影从多个流读取数据,它必须跟踪许多位置。

    使用全局流消除了这两个问题-性能通常可能足够好。

    在任何一种情况下,您都可能希望将流位置放入长期事件存储(即Cassandra)中--您可以通过使用一个专用进程从事件流(分区或全局)中读取并使用每个事件的全局或分区位置更新Cassandra中的事件来做到这一点。(我在MongoDB中也有类似的事情--我有一个读取'oplog'并将oplog时间戳复制到事件中的过程,因为oplog时间戳是完全有序的)。

    另一个选择是将Cassandra从最初的命令处理中删除,转而使用Kafka流:

    • 分区命令流是通过与已分区聚合KTable联接来处理的
    • 计算命令结果和事件
    • 原子上,使用更改的聚合更新KTable,将事件写入事件流,并将命令响应写入命令响应流。

    然后,您将有一个下游事件处理器,该处理器将事件复制到Cassandra中,以便于查询等(它可以在每个事件中添加Kafka流位置,从而给出类别排序)。如果您不想使用Kafka进行长期事件存储,这可以帮助您赶上订阅等。(为了跟上,您只需尽可能多地阅读Cassandra的内容,然后从最后一个Cassandra事件的位置切换到Kafka的流媒体)。另一方面,Kafka本身可以永远存储事件,所以这并不总是必要的。

     类似资料:
    • 我试图在Kafka流之上实现一个简单的CQRS/Event sourcing概念验证(如https://www.confluent.io/blog/event-sourcing-using-apache-kafka/所述) 我有4个基本部分: 命令处理器-命令流,左与聚合状态KTABLE连接。对于结果流中的每个条目,使用函数生成结果事件,并将它们发布到主题 问题是--有没有办法确保我在州存储中有聚

    • 很明显,基于这些模式的系统是易于扩展的。但我想问你,具体怎么做?关于可伸缩性,我没有什么问题: 如何缩放聚合体?如果我将创建

    • 我想创建一个CQRS和事件源架构,非常便宜,非常灵活,非常简单。 我想确保事件永远不会失败,至少到达发布者/事件存储,永远,因为这是业务所在。 天蓝 有了azure,我似乎不知道该用什么。 Azure服务总线 蔚蓝函数 Azure webjob(我想这可以用Azure函数代替) ??(还有什么我忘了或者不知道的?) null 你的经验说明了什么? 其他替代方案呢?(例如:)?

    • 我使用水槽代理通过水槽代理收集外部数据。外部数据批次几乎是每 10 秒 1MB。我按如下方式配置了水槽代理。 我按以下方式激活了代理。 可惜后来发现netcat source运行良好,channel或者sink出了问题。从Ubuntu的资源监视器,我可以看到以下性能。网络性能。蓝色曲线表示输入,而红色曲线表示在没有其他应用程序运行网络io的情况下的输出,我确信这个图展示了我的Flume代理发生了什

    • 当我在读一些CQRS的资料时,有一个反复出现的问题我没有理解。例如,假设一个客户端发出一个命令。该命令由域集成,因此它可以刷新其域模型(DM)。另一方面,命令保存在事件存储中。这是最常见的情况。 1) 当我们说DM被刷新时,我假设数据被保存在底层数据库中(如果有的话)。我说得对吗?否则,我们将处理内存瞬态模型,我想这不是一件好事吗?(在客户端请求之外,状态不应该保留在服务器端的内存中)。 2)如果

    • 我正在评估Apache Kafka Streams的事件源,看看它在复杂场景中的可行性。与关系数据库一样,我也遇到过一些情况,原子性/事务性至关重要: 具有两项服务的购物应用程序: OrderService:有一个带有订单的Kafka流商店(OrdersStore) ProductService:有一家Kafka流商店(ProductStockStore),里面有产品及其库存 流量: > Orde