当前位置: 首页 > 知识库问答 >
问题:

如何使用Spring Cloud Stream Kafka和每个服务的数据库实现微服务事件驱动架构

施华奥
2023-03-14

我试图实现一个事件驱动的架构来处理分布式事务。每个服务都有自己的数据库,并使用Kafka发送消息来通知其他微服务有关操作。

一个例子:

 Order service -------> | Kafka |------->Payment Service
       |                                       |
Orders MariaDB DB                   Payment MariaDB Database

订单接收订单请求。它必须将新订单存储在其数据库中,并发布一条消息,以便支付服务意识到它必须对该项目收费:

私人订单业务;

@PostMapping
public Order createOrder(@RequestBody Order order){
    logger.debug("createOrder()");
    //a.- Save the order in the DB
    orderBusiness.createOrder(order);
    //b. Publish in the topic so that Payment Service charges for the item.
    try{
        orderSource.output().send(MessageBuilder.withPayload(order).build());
    }catch(Exception e){
        logger.error("{}", e);
    }
    return order;
}

以下是我的疑惑:

  1. 步骤a.-(保存在订单数据库中)和b.-(发布消息)应该在事务中自动执行。我怎样才能做到这一点?
  2. 这与前一个有关:我发送消息:orderSource.output()。发送(MessageBuilder.withPayload(订单)。构建());这个操作是异步的,并且总是返回true,无论Kafka代理是否关闭。我怎么知道消息已经到达Kafka经纪人?

共有2个答案

权兴为
2023-03-14

我认为实现事件源的正确方法是让Kafka直接从插件推送的事件中填充,该插件读取RDBMS binlog,例如使用合流瓶装水(https://www.confluent.io/blog/bottled-water-real-time-integration-of-postgresql-and-kafka/)或更活跃的去脑区(http://debezium.io/)。然后,消费微服务可以监听这些事件,消费它们,并对它们各自的数据库采取行动,最终与RDBMS数据库保持一致。

看看我对指导方针的完整回答:https://stackoverflow.com/a/43607887/986160

夏侯旻
2023-03-14

步骤a.-(按DB顺序保存)和b.-(发布消息)应在事务中以原子方式执行。我怎样才能做到这一点?

kafka目前不支持事务(因此也不支持回滚或提交),您需要像这样同步事务。所以简而言之:你不能做你想做的事。在不久的将来,当KIP-98被合并时,这种情况将会改变,但这可能需要一些时间。此外,即使在Kafka中使用事务,跨两个系统的原子事务也是一件非常困难的事情,接下来的一切只能通过Kafka中的事务支持来改进,它仍然不能完全解决您的问题。为此,您需要考虑在整个系统中实现某种形式的两阶段提交。

通过配置producer属性,您可以在某种程度上接近它,但最终您必须为您的一个系统(MariaDB或Kafka)选择至少一次或最多一次。

让我们从您在Kafka中可以做的事情开始,确保消息的传递,然后我们将深入探讨您对整个流程的选择以及后果。

保证交货

您可以配置在使用参数acks将请求返回给您之前,必须有多少代理确认收到您的消息:通过将此设置为all,您可以告诉代理等待,直到所有副本确认您的消息,然后再返回答复。这仍然不能100%保证您的消息不会丢失,因为它只被写入页面缓存,而且理论上存在代理在将其持久化到磁盘之前失败的情况,消息可能仍然会丢失。但这是一个很好的保证。您可以通过降低代理强制fsync到光盘的间隔(强调文本和/或flush.ms)来进一步降低数据丢失的风险,但请注意,这些值可能会带来严重的性能损失。

除了这些设置之外,您还需要等待Kafka制作人返回对您请求的响应,并检查是否发生异常。这类问题与你问题的第二部分有关,因此我将进一步深入。如果回答是明确的,那么您可以尽可能确定您的数据到达了Kafka,并开始担心MariaDB。

到目前为止,我们讨论的所有内容都只涉及如何确保Kafka收到了您的消息,但您还需要将数据写入MariaDB,这也可能会失败,这就需要重新调用您可能已经发送给Kafka的消息,而这是您无法做到的。

因此,基本上,您需要选择一个能够更好地处理重复/缺失值的系统(取决于是否重新发送部分故障),这将影响您执行操作的顺序。

备选办法1

在此选项中,您在MariaDB中初始化事务,然后将消息发送给Kafka,等待响应,如果发送成功,您将在MariaDB中提交事务。如果发送到Kafka失败,您可以在MariaDB中回滚您的事务,一切都很好。但是,如果发送到Kafka成功,并且由于某种原因提交到MariaDB失败,那么就无法从Kafka获得消息。因此,如果您稍后重新发送所有内容,您要么在MariaDB中丢失一条消息,要么在Kafka中有一条重复的消息。

选择2

这几乎是另一种方式,但根据您的数据模型,您可能能够更好地删除用MariaDB编写的消息。

当然,您可以通过跟踪失败的发送并稍后重试这些方法来缓解这两种方法,但所有这些更多的是在更大的问题上的创可贴。

就我个人而言,我会选择方法1,因为提交失败的可能性应该比发送本身小一些,并且在Kafka的另一边实现某种欺骗检查。

这与上一条相关:我使用:orderSource发送消息。输出()。发送(MessageBuilder.withPayload)(订单)。build());此操作是异步的,无论Kafka代理是否关闭,都会返回true。我怎么知道消息已经传到Kafka经纪人那里了?

首先,我承认我不熟悉Spring,所以这可能对您没有用,但是下面的代码片段展示了一种检查产生异常响应的方法。通过调用flush,您可以阻止所有发送完成(失败或成功),然后检查结果。

Producer<String, String> producer = new KafkaProducer<>(myConfig);
final ArrayList<Exception> exceptionList = new ArrayList<>();

for(MessageType message : messages){
  producer.send(new ProducerRecord<String, String>("myTopic", message.getKey(), message.getValue()), new Callback() {
    @Override
    public void onCompletion(RecordMetadata metadata, Exception exception) {
      if (exception != null) {
        exceptionList.add(exception);
      }
    }
  });
}

producer.flush();

if (!exceptionList.isEmpty()) {
  // do stuff
}
 类似资料:
  • 我是事件驱动微服务的新手,也是微服务本身的新手。我正在开发的系统并不大,它处理一堆文件,然后根据调用不同服务的数据。所以在我看来,一个好主意是,不要让服务调用其他服务来完成这项工作,而是将这些消息发送到发布/订阅队列,处理它们并将它们发送到相关主题,然后每个服务将订阅其中一个或几个主题,每隔几分钟,每个服务都会提取它们订阅的消息并发挥它们的魔力。这些服务也可以通过Rest来公开,以防您想强制执行它

  • 我正在开发一个具有微服务架构的网站,每个服务都拥有一个数据库。数据库存储微服务所需的数据。 、服务都需要用户信息,所以这两个服务都订阅了。 当有新用户注册时,将触发。 一旦服务收到,它们就会将传入的用户信息放入它们自己的数据库中。因此,他们可以在不询问服务的情况下做事。 到目前为止还不错。但问题来了: 如果我要创建一个新服务呢?如何获取注册用户信息并将其放入新服务 也许我可以从现有服务中获取信息。

  • 我读了一些文章,看了一些视频,但在为这些微服务提供服务方面,没有找到具体的建议。我的理解是,他们应该使用自己的应用程序服务器。 我的问题是它们应该部署在不同的服务器上,还是没关系。 当它们在同一台服务器(计算机)上提供服务时,不会有端口冲突吗?

  • 我读过萨姆·纽曼的《微服务》一书,在关于分裂整体的一章中,他举了一个“打破外键关系”的例子,他承认跨API进行连接会更慢--但他接着说,如果你的应用程序足够快,它比以前慢有关系吗? 这似乎有点油嘴滑舌?人的经历是什么?您使用了哪些技术来使API联接执行得令人满意?

  • 如果一个微服务只知道它自己的领域,但是有一个数据流需要多个服务以某种方式交互,那该怎么做呢? 假设我们有这样的东西: 为论证起见,假设一个订单发货后,就应该创建发票。 我确实知道这可以被认为是高度基于意见的。但它也有具体的一面,因为微服务不应该做上述的事情。因此,必须有一个“根据定义它应该做什么”,这不是基于意见的。 开枪啊。

  • 我正在为我们的应用程序使用Spring boot Microservices体系结构。在我们的项目中,我们使用的是OAuth2、Jwt、Zuul和Eureka服务,我的疑问是,我是否需要将这些服务作为一个独立的服务来实现,或者我是否可以将所有服务开发成一个单一的应用程序。 如果我必须作为单个应用程序实现,那么更好的方法是什么。请澄清