当前位置: 首页 > 知识库问答 >
问题:

事件来源——Apache Kafka Kafka Streams——如何确保原子性/事务性

冀耀
2023-03-14

我正在评估Apache Kafka Streams的事件源,看看它在复杂场景中的可行性。与关系数据库一样,我也遇到过一些情况,原子性/事务性至关重要:

具有两项服务的购物应用程序:

  • OrderService:有一个带有订单的Kafka流商店(OrdersStore)
  • ProductService:有一家Kafka流商店(ProductStockStore),里面有产品及其库存

流量:

>

  • OrderService发布OrderCreated事件(带有productId、orderId、userId信息)

    ProductService获取OrderCreated事件并查询其KafkaStreams商店(ProductStockStore)以检查该产品是否有库存。如果有库存,它会发布OrderUpdated事件(还包括productId、orderId、userId信息)

    关键是,这个事件将由ProductService Kafka Stream监听,后者将对其进行处理以减少库存,到目前为止效果良好。

    但是,想象一下:

    1. 客户1下单,order1(产品有1的库存)
    2. 客户2同时为同一产品下另一个订单,order2(库存仍然是1)
    3. ProductService处理order1并发送消息OrderUpdate以减少库存。此消息放在order2的主题之后-

    明显的问题是我们的物化视图(商店)应该在我们处理第一个OrderUpdate事件时直接更新。然而,更新Kafka Stream Store的唯一方法(我知道)是发布另一个事件(OrderUpdate)以由Kafka Stream处理。这样我们就不能以事务方式执行此更新。

    我希望你能想出应对这种情况的办法。

    更新:我将尝试澄清问题中存在的问题:

    ProductService有一个Kafka Streams商店,ProductStock有此库存(productId=1,quantity=1)

    OrderService在orders主题上发布两个OrderPlaced事件:

    >

    Event2(key=product1,productId=product1,数量=1,eventType="OrderPlated")

    ProductService在订单主题上有一个消费者。为了简单起见,让我们假设一个分区来确保消息按顺序使用。此使用者执行以下逻辑:

    if("OrderPlaced".equals(event.get("eventType"))){
    
        Order order = new Order();
        order.setId((String)event.get("orderId"));
        order.setProductId((Integer)(event.get("productId")));
        order.setUid(event.get("uid").toString());
    
        // QUERY PRODUCTSTOCK TO CHECK AVAILABILITY
        Integer productStock = getProductStock(order.getProductId());
    
        if(productStock > 0) {
            Map<String, Object> event = new HashMap<>();
            event.put("name", "ProductReserved");
            event.put("orderId", order.getId());
            event.put("productId", order.getProductId());
    
            // WRITES A PRODUCT RESERVED EVENT TO orders topic
            orderProcessor.output().send(MessageBuilder.withPayload(event).build(), 500);
        }else{
            //XXX CANCEL ORDER
        }
    }
    

    ProductService还有一个Kafka Streams处理器,负责更新库存:

    KStream<Integer, JsonNode> stream = kStreamBuilder.stream(integerSerde, jsonSerde, "orders");
    stream.xxx().yyy(() -> {...}, "ProductsStock");
    

    Event1将首先处理,因为仍然有1个可用产品,所以它将生成ProductReserved事件。

    现在,轮到Event2了。如果在ProductService Kafka Streams处理器处理由Event1生成的ProductReseved事件之前,ProductService consumer消费了它,那么消费者仍然会看到product1的ProductStore库存为1,为Event2生成ProductReserved事件,然后在系统中产生不一致。

  • 共有2个答案

    严誉
    2023-03-14

    在确保任何分布式系统的一致性时,同样的问题也很典型。通常使用流程管理器/saga模式,而不是追求强一致性。这有点类似于分布式事务中的两阶段提交,但在应用程序代码中显式实现。事情是这样的:

    订单服务要求产品服务预订N件商品。产品服务要么接受该命令并减少库存,要么在没有足够的可用项时拒绝该命令。在对命令做出肯定回复后,订单服务现在可以发出OrderCreated事件(尽管我称之为OrderPlaced,因为“placed”听起来像是域的惯用模式,“created”更通用,但这是一个细节)。产品服务要么侦听OrderPlaced事件,要么向其发送显式ConfirmReservation命令。或者,如果发生了其他事情(例如未能清算资金),可以发出适当的事件,或者将CancelReservation命令显式发送到ProductService。为了适应特殊情况,ProductService还可能有一个调度程序(在KafkaStreams中,标点符号可以很方便地用于此)来取消在超时时间内未确认或中止的预订。

    两个服务的编排以及处理错误条件和补偿操作(在这种情况下取消预订)的技术细节可以直接在服务中处理,或者在显式的Process Manager组件中处理以隔离此责任。就我个人而言,我会选择一个可以使用Kafka Streams处理器API实现的显式流程管理器。

    彭鸿畅
    2023-03-14

    对于你最初的问题,这个答案有点晚了,但为了完整起见,还是让我来回答吧。

    有很多方法可以解决这个问题,但我鼓励以事件驱动的方式解决这个问题。这意味着您(a)验证是否有足够的库存来处理订单,以及(b)将库存保留为单个库存,所有库存都在单个KStreams操作中。诀窍是按productId重新输入密钥,这样您就知道同一产品的订单将在同一线程上顺序执行(所以您不能进入Order1

    有一个帖子讨论了如何做到这一点:https://www.confluent.io/blog/building-a-microservices-ecosystem-with-kafka-streams-and-ksql/

    也许更有用的是,还有一些示例代码也展示了如何实现:https://github.com/confluentinc/kafka-streams-examples/blob/1cbcaddd85457b39ee6e9050164dc619b08e9e7d/src/main/java/io/confluent/examples/streams/microservices/InventoryService.java#L76

    请注意,在这个KStreams代码中,第一行如何将密钥重新输入productId,然后使用转换器来(a)验证是否有足够的库存来处理订单,以及(b)保留更新状态存储所需的库存。这是通过使用Kafka的事务功能以原子方式完成的。

     类似资料:
    • 在MongoDB中,写操作的原子性是在document级别上的,即使修改的是文档中的内嵌部分,写锁的级别也是document上。 当一个写操作要修改多个文档,每个文档的修改是原子性的。整个的写操作并不是原子性的,它可能和其他写操作产生交织。然而你可以使用$isolated隔离操作符来限制写操作,让它不与其他写操作交织。 不隔离性能更高,但是会产生数据的不确定性,隔离写操作,事务性更好。MongoD

    • 问题内容: 我开发了一个在线预订系统。为了简化起见,假设用户可以预订多个项目,而每个项目只能预订一次。物品首先添加到购物车中。 应用使用/ 数据库。根据MySql文档,默认隔离级别为。 这是到目前为止我提出的结帐程序: 开始交易 在购物车中选择项目 (带锁)在此步骤中, 从中获取记录和表格。 检查其他人是否还没有预定商品 基本上检查是否。在实际的应用程序中它更加复杂,因此我将其作为单独的步骤放在这

    • 我有以下情况 我有一个REST客户端,它充当其他3个REST客户端的门面。(我正在用Java编程,使用Spring Boot) 客户机的职责之一包括对用户执行CRUD操作 现在,所有其他3个公开自己REST API的系统都有某种用户管理功能。 例如,当我收到创建用户的请求时,我必须通过REST API在这3个系统上创建它们,并将它们保存在我的数据库中。 现在,在最好的情况下,我只是调用他们的API

    • 问题内容: 我正在尝试创建一个保存对象的视图,但是如果引发某些异常,我想 撤消 该保存。这是我尝试的: 我究竟做错了什么?即使引发异常,它仍然在数据库中。 问题答案: 原子性文档 总而言之,如果视图产生的响应没有错误,将在数据库上执行事务。因为您自己正在捕获异常,所以在Django中,您的视图执行得很好。 如果发现异常,则需要自己处理:控制事务 如果在发生故障时需要产生适当的json响应:

    • Spring Kafka和Spring Cloud Stream允许我们创建事务生产者和处理器。我们可以在其中一个示例项目中看到该功能的实际应用:https://github.com/spring-cloud/spring-cloud-stream-samples/tree/master/transaction-kafka-samples: 在这个摘录中,有来自Kafka主题的读取、数据库中的写入

    • 一、事件埋点 您需要在研发工程师的协助下,在用户行为发生时将其记录下来,并发送给诸葛io——这个过程称作事件埋点。 如果您是产品或运营,建议您在和研发君沟通之前,尽量弄清分析目标并整理好事件埋点表,您也可以把这份入门指南推荐给研发君阅读——帮研发君在最短的时间内理解事情的全貌,会有助于事情的快速和顺利推进。 另外,在集成SDK之前,请确保您已在诸葛io中完成了账号注册并创建应用(大约需要2分钟,跟