我们有一个微服务架构,使用Kafka作为服务之间的通信机制。一些服务有自己的数据库。假设用户调用服务A,这将导致在该服务的数据库中创建一条记录(或一组记录)。此外,这个事件应该作为Kafka主题的一个项目报告给其他服务。确保数据库记录仅在Kafka主题成功更新(本质上是围绕数据库更新和Kafka更新创建分布式事务)时才写入的最佳方法是什么?
我们正在考虑使用spring kafka(在spring Boot WebFlux服务中),我可以看到它有一个KafkaTransactionManager,但据我所知,这更多是关于kafka事务本身(确保kafka生产者和消费者之间的一致性),而不是跨两个系统同步事务(请参见此处:“Kafka不支持XA,您必须处理DB tx可能在Kafka tx回滚时提交的可能性。”)。此外,我认为这个类依赖于Spring的事务框架,至少就我目前所知,它是线程绑定的,如果使用反应式方法(例如WebFlux),操作的不同部分可能会在不同的线程上执行,它将不起作用。(我们使用的是反应式pg客户端,手动处理事务也是如此,而不是使用Spring的框架。)
我能想到的一些选择:
有人对上述问题有任何想法或建议,或者能够纠正我上述假设中的任何错误吗?
提前谢谢!
上述所有方法都是解决问题的最佳方法,并且都是定义良好的模式。你可以在下面提供的链接中探索这些。
模式:事务发件箱
通过将事件或消息保存在数据库的发件箱中,将其作为数据库事务的一部分发布。http://microservices.io/patterns/data/transactional-outbox.html
模式:轮询发布者
通过轮询数据库中的发件箱来发布消息。http://microservices.io/patterns/data/polling-publisher.html
模式:事务日志跟踪
通过跟踪事务日志来发布对数据库所做的更改。http://microservices.io/patterns/data/transaction-log-tailing.html
首先,我必须说,我不是Kafka,也不是Spring专家,但我认为,在编写独立资源时,这更像是一个概念上的挑战,解决方案应该适合您的技术堆栈。此外,我应该说,这个解决方案试图在没有像Debezium这样的外部组件的情况下解决这个问题,因为在我看来,每个额外的组件都会给测试、维护和运行一个应用程序带来挑战,而在选择这样的选项时,这个挑战往往被低估。此外,并非每个数据库都可以用作Debezium源。
为了确保我们讨论的是相同的目标,让我们用一个简单的航空公司示例来说明情况,在这个示例中,客户可以购买机票。订单成功后,客户将收到一条由外部消息系统(我们必须与之交谈的系统)发送的消息(邮件、推送通知等)。
在传统的JMS世界中,我们的数据库(存储订单的地方)和JMS提供者之间有一个XA事务,它看起来如下所示:客户机为我们启动事务的应用程序设置订单。该应用程序将订单存储在其数据库中。然后消息被发送到JMS,您可以提交事务。两个操作都参与事务,即使它们在与自己的资源对话。因为XA事务保证我们没事。
让我们把Kafka(或任何其他无法参与XA事务的资源)带到游戏中。由于不再存在同步两个事务的协调器,下面的主要思想是将处理分为两部分,并使用持久状态。
当您将订单存储在数据库中时,您还可以将消息(连同聚合数据)存储在您希望随后发送给Kafka的同一数据库中(例如,在CLOB列中作为JSON)。同样的资源——酸保证,目前一切正常。现在,您需要一种机制来轮询“Kafkatask”——表中应该发送到Kafka主题的新任务(例如,使用计时器服务,可能可以在Spring中使用@Scheduled annotation)。消息成功发送到Kafka后,您可以删除任务条目。这确保了只有订单也成功存储在应用程序数据库中时,才会向Kafka发送消息。我们是否获得了与使用XA事务时相同的保证?不幸的是,没有,因为给Kafka写信仍然有可能奏效,但删除任务失败。在这种情况下,重试机制(如问题中所述,您需要一个)将重新处理任务并发送两次消息。如果您的商业案例对此“至少一次”感到满意,那么请确保您在这里使用了imho半复杂解决方案,该解决方案可以轻松地实现为框架功能,这样就不需要每个人都去关注细节。
如果需要“恰好一次”,则不能将状态存储在应用程序数据库中(在本例中,“任务删除”是“状态”),而是必须将其存储在Kafka中(假设两个Kafka主题之间有ACID保证)。例如:假设表中有100个任务(ID 1到100),任务作业处理前10个任务。你把你的Kafka留言写在他们的主题上,另一条ID为10的留言写在“你的主题”上。都在同一个Kafka交易中。在下一个循环中,您使用您的主题(值为10),并使用该值来获得接下来的10个任务(并删除已处理的任务)。
如果有更简单的(应用中)解决方案具有相同的保证,我期待着听到你的消息!
抱歉回答得太长,但我希望能有所帮助。
我建议使用略为改变的方法2。
只写入数据库,但除了实际的表写入之外,还要将“事件”写入同一数据库中的特殊表中;这些事件记录将包含您需要的聚合。以最简单的方式,只需插入另一个实体,例如由JPA映射的实体,该实体包含一个带有聚合负载的JSON属性。当然,这可以通过事务侦听器/框架组件实现自动化。
然后使用Debezium从该表中捕获更改,并将其流式传输到Kafka中。这样你就可以同时拥有这两种状态:Kafka中的最终一致状态(Kafka中的事件可能会落后,或者你可能会在重启后第二次看到一些事件,但最终它们会反映数据库状态),而不需要分布式事务和业务级事件你要的语义学。
(免责声明:我是Debezium的负责人;有趣的是,我正在写一篇博客文章,更详细地讨论这种方法。)
以下是帖子
https://debezium.io/blog/2018/09/20/materializing-aggregate-views-with-hibernate-and-debezium/
https://debezium.io/blog/2019/02/19/reliable-microservices-data-exchange-with-the-outbox-pattern/
问题内容: 我们正在多家商店中运行带有MySql后端的Java PoS(销售点)应用程序。我想保持商店中的数据库与主机服务器上的数据库同步。 商店中发生某些更改时,应在主机服务器上对其进行更新。我该如何实现? 问题答案: 复制不是很难创建。 这里有一些很好的教程: http://aciddrop.com/2008/01/10/step-by-step-how-to-setup-mysql-data
我正在尝试使用基于Kafka Connect的Confluent在几个MySQL数据库之间同步数据。我在源连接器配置中使用了“批量”作为模式,因为主键类型是 varchar,所以我无法使用递增模式。它工作正常,但我遇到了两个问题: 似乎它无法同步删除,当源数据库中的数据被删除时,接收器数据库没有任何变化。数据仍存在于接收器数据库中。 同步数据需要相当长的时间。就我而言,同步具有 2~4k 行的表大
“同步到数据库”功能让你比对物理模型和现有数据库或模式,显示它们之间结构的差异,并提供同步模型的结构到目标连接。 Navicat 提供一个向导,一步一步指导你完成任务: 选择“文件”->“同步到数据库”。 选择源数据库、模式,然后从现有的连接中选择目标连接、数据库、模式。 点击“选项”并选择比对或高级选项。 点击“比对”以显示源对象和目标对象之间的差异。 选择要同步的对象。 点击“部署”以生成一组
“同步到数据库”功能让你比对模型和现有数据库或模式,显示它们之间结构的差异,并提供同步模型的结构到目标连接。 Navicat 提供一个向导,一步一步指导你完成任务: 选择“文件”->“同步到数据库”。 选择源数据库、模式,然后从现有的连接中选择目标连接、数据库、模式。 点击“选项”并选择比对或高级选项。 点击“比对”以显示源对象和目标对象之间的差异。 选择要同步的对象。 点击“部署”以生成一组脚本
“同步到数据库”功能让你比对物理模型和现有数据库或模式,显示它们之间结构的差异,并提供同步模型的结构到目标连接。 Navicat 提供一个向导,一步一步指导你完成任务: 选择“文件”->“同步到数据库”。 选择源数据库、模式,然后从现有的连接中选择目标连接、数据库、模式。 点击“选项”并选择比对或高级选项。 点击“比对”以显示源对象和目标对象之间的差异。 选择要同步的对象。 点击“部署”以生成一组
问题内容: 如果我有数据库和数据库,并且数据库下的表和数据库下的表,则可以在MySQL中创建数据库下的视图。如果是这样,您能告诉我语法吗? 两个数据库都在同一台计算机上。 问题答案: