当前位置: 首页 > 知识库问答 >
问题:

将kstream与kstream聚合创建的ktable合并时,使ktable记录失效

周博达
2023-03-14

我正在尝试以下列方式使用kafka流实现事件源模式。

我在一家安全服务公司工作,处理两个用例:

  1. 注册用户,处理 RegisterUserCommand 应生成 UserRegisterEvent
  2. 更改用户名,处理 ChangeUserNameCommand 应生成 UserNameChangedEvent

我有两个主题:

  1. 命令主题,“安全命令”。每个命令都是键控的,密钥是用户的电子邮件。例如:
foo@bar.com:{"type": "RegisterUserCommand", "command": {"name":"Alex","email":"foo@bar.com"}}
foo@bar.com:{"type": "ChangeUserNameCommand", "command": {"email":"foo@bar.com","newName":"Alex1"}}
foo@bar.com:{"type":"UserRegisteredEvent","event":{"email":"foo@bar.com","name":"Alex",  "version":0}}
foo@bar.com:{"type":"UserNameChangedEvent","event":{"email":"foo@bar.com","name":"Alex1","version":1}}
Kafka Streams version 2.8.0
Kafka version 2.8

实现思想可以用以下拓扑表示:

    commandStream = builder.stream("security-command");
    eventStream = builder.stream("security-event",
                                    Consumed.with(
                                        ...,
                                        new ZeroTimestampExtractor()
                                        /*always returns 0 to get the latest version of snapshot*/));
    
    // build the snapshot to get the current state of the user.
    userSnapshots = eventStream.groupByKey()
                                .aggregate(() -> new UserSnapshot(),
                                     (key /*email*/, event, currentSnapshot) -> currentSnapshot.apply(event));
    
    // join commands with latest snapshot at the time of the join
    commandWithSnapshotStream =
                commandStream.leftJoin(
                        userSnapshots,
                        (command, snapshot) -> new CommandWithUserSnapshot(command, snapshot),
                        joinParams
                );
        
    // handle the command given the current snapshot
    resultingEventStream = commandWithSnapshotStream.flatMap((key /*email*/, commandWithSnapshot) -> {
            var newEvents = commandHandler(commandWithSnapshot.command(), commandWithSnapshot.snapshot());

            return Arrays.stream(newEvents )
                         .map(e -> new KeyValue<String, DomainEvent>(e.email(), e))
                         .toList();
            });

    // append events to events topic 
    resultingEventStream.to("security-event");

对于这个拓扑,我使用的是EOS exactly_once_beta

此拓扑的更显式版本:

       KStream<String, Command<DomainEvent[]>> commandStream =
                builder.stream(
                        commandTopic,
                            Consumed.with(Serdes.String(), new SecurityCommandSerde()));

        KStream<String, DomainEvent> eventStream =
                builder.stream(
                        eventTopic,
                        Consumed.with(
                            Serdes.String(),
                            new DomainEventSerde(),
                            new LatestRecordTimestampExtractor() /*always returns 0 to get the latest snapshot of the snapshot.*/));

        // build the snapshots ktable by aggregating all the current events for a given user.
        KTable<String, UserSnapshot> userSnapshots =
                                        eventStream.groupByKey()
                                                   .aggregate(
                                                           () -> new UserSnapshot(),
                                                           (email, event, currentSnapshot) ->   currentSnapshot.apply(event),
                                                           Materialized.with(
                                                                    Serdes.String(),
                                                                    new UserSnapshotSerde()));

        // join command stream and snapshot table to get the stream of pairs <Command, UserSnapshot>
        Joined<String, Command<DomainEvent[]>, UserSnapshot> commandWithSnapshotJoinParams =
                Joined.with(
                        Serdes.String(),
                        new SecurityCommandSerde(),
                        new UserSnapshotSerde()
                );

        KStream<String, CommandWithUserSnapshot> commandWithSnapshotStream =
                commandStream.leftJoin(
                        userSnapshots,
                        (command, snapshot) -> new CommandWithUserSnapshot(command, snapshot),
                        commandWithSnapshotJoinParams
                );

        var resultingEventStream = commandWithSnapshotStream.flatMap((key /*email*/, commandWithSnapshot) -> {

            var command = commandWithSnapshot.command();

            if (command instanceof RegisterUserCommand registerUserCommand) {
                var handler = new RegisterUserCommandHandler();
                var events = handler.handle(registerUserCommand);

                // multiple events might be produced when a command is handled.
                return Arrays.stream(events)
                             .map(e -> new KeyValue<String, DomainEvent>(e.email(), e))
                             .toList();
            }

            if (command instanceof ChangeUserNameCommand changeUserNameCommand) {
                var handler = new ChangeUserNameCommandHandler();
                var events = handler.handle(changeUserNameCommand, commandWithSnapshot.userSnapshot());

                return Arrays.stream(events)
                             .map(e -> new KeyValue<String, DomainEvent>(e.email(), e))
                             .toList();
            }

            throw new IllegalArgumentException("...");
        });

        resultingEventStream.to(eventTopic, Produced.with(Serdes.String(), new DomainEventSerde()));

我遇到的问题:

  1. 在具有现有记录的命令主题上启动流应用程序:
   foo@bar.com:{"type": "RegisterUserCommand", "command": {"name":"Alex","email":"foo@bar.com"}}
   foo@bar.com:{"type": "ChangeUserNameCommand", "command": {"email":"foo@bar.com","newName":"Alex1"}}

Outcome:
   1. Stream application fails when processing the ChangeUserNameCommand, because the snapshot is null.
   2. The events topic has a record for successful registration, but nothing for changing the name:
   /*OK*/foo@bar.com:{"type":"UserRegisteredEvent","event":{"email":"foo@bar.com","name":"Alex",  "version":0}}
   
Thoughts:
   When processing the ChangeUserNameCommand, the snapshot is missing in the aggregated KTable, userSnapshots. Restarting the application succesfully produces the following record:
   foo@bar.com: {"type":"UserNameChangedEvent","event":{"email":"foo@bar.com","name":"Alex1","version":1}}
   
   Tried increasing the max.task.idle.ms to 4 seconds - no effect.
Producing:
   
   // Produce to command topic
   foo@bar.com:{"type": "RegisterUserCommand", "command": {"name":"Alex","email":"foo@bar.com"}}
   
   // event topic outcome
   /*OK*/ foo@bar.com:{"type":"UserRegisteredEvent","event":{"email":"foo@bar.com","name":"Alex",  "version":0}}
   
   // Produce at once to command topic
   foo@bar.com:{"type": "ChangeUserNameCommand", "command": {"email":"foo@bar.com","newName":"Alex1"}}
   foo@bar.com:{"type": "ChangeUserNameCommand", "command": {"email":"foo@bar.com","newName":"Alex2"}}
   foo@bar.com:{"type": "ChangeUserNameCommand", "command": {"email":"foo@bar.com","newName":"Alex3"}}
   
   // event topic outcome
   /*OK*/foo@bar.com: {"type":"UserNameChangedEvent","event":{"email":"foo@bar.com","name":"Alex1","version":1}}
   /*NOK*/foo@bar.com: {"type":"UserNameChangedEvent","event":{"email":"foo@bar.com","name":"Alex2","version":1}}
   /*NOK*/foo@bar.com: {"type":"UserNameChangedEvent","event":{"email":"foo@bar.com","name":"Alex3","version":1}}
   
Thoughts:
   'ChangeUserNameCommand' commands are joined with a stale version of snapshot (pay attention to the version attribute).
   The expected outcome would be:
   foo@bar.com: {"type":"UserNameChangedEvent","event":{"email":"foo@bar.com","name":"Alex1","version":1}}
   foo@bar.com: {"type":"UserNameChangedEvent","event":{"email":"foo@bar.com","name":"Alex2","version":2}}
   foo@bar.com: {"type":"UserNameChangedEvent","event":{"email":"foo@bar.com","name":"Alex3","version":3}}

   Tried increasing the max.task.idle.ms to 4 seconds - no effect, setting the cache_max_bytes_buffering to 0 has no effect.

在构建这样的拓扑时,我缺少什么?我希望每个命令都在最新版本的快照上处理。如果我生成命令时它们之间有几秒钟的延迟,则一切按预期工作。

共有2个答案

胡劲
2023-03-14

找到了根本原因。不确定这是设计使然还是错误,但流任务在每个处理周期中只等待一次其他分区中的数据。因此,如果首先读取命令主题中的 2 条记录,则流任务将在处理第一个命令记录时等待 max.task.idle.ms,从而允许 poll() 阶段发生。处理后,在处理第二个命令期间,流任务将不允许轮询获取由第一个命令处理产生的新生成的事件。

在 kafka 2.8 中,负责此行为的代码位于 StreamTask.java 中。IsProcessable() 在处理阶段开始时被调用。如果它返回 false,这将导致重复轮询阶段。

   public boolean isProcessable(final long wallClockTime) {
        if (state() == State.CLOSED) {
            return false;
        }

        if (hasPendingTxCommit) {
            return false;
        }

        if (partitionGroup.allPartitionsBuffered()) {
            idleStartTimeMs = RecordQueue.UNKNOWN;
            return true;
        } else if (partitionGroup.numBuffered() > 0) {
            if (idleStartTimeMs == RecordQueue.UNKNOWN) {
                idleStartTimeMs = wallClockTime;
            }

            if (wallClockTime - idleStartTimeMs >= maxTaskIdleMs) {
                return true;

                // idleStartTimeMs is not reset to default, RecordQueue.UNKNOWN, value, 
                // therefore the next time when the check for all buffered partitions is done, `true` is returned, meaning that the task is ready to be processed.  
            } else {
                return false;
            }
        } else {
            // there's no data in any of the topics; we should reset the enforced
            // processing timer
            idleStartTimeMs = RecordQueue.UNKNOWN;
            return false;
        }
    }

狄兴邦
2023-03-14

我想您错过了表格的更改日志恢复部分。阅读本文以了解更改日志恢复会发生什么情况。

对于表,它更复杂,因为它们必须维护其他信息(其状态)以允许有状态处理,例如连接和聚合,如 COUNT() 或 SUM()。为了实现这一点,同时确保高处理性能,表(通过其状态存储)在 Kafka Streams 应用程序实例或 ksqlDB 服务器中的本地磁盘上具体化。但是机器和容器以及任何本地存储的数据都可能会丢失。我们如何使表也具有容错能力?

答案是,存储在表中的任何数据也远程存储在Kafka中。每个表都有自己的更改流,我们可以说,这是一个内置的更改数据捕获(CDC)设置。因此,如果我们有一个按客户列出的账户余额表,那么每次更新账户余额时,都会将相应的更改事件记录到该表的更改流中。

还要记住,重新启动Kafka流应用程序不应该处理以前处理的事件。为此,您需要在处理消息后提交消息的偏移量。

 类似资料:
  • 我正在使KStream-KStream连接,其中创建2个内部主题。而KStream-KTable join将创建1个内部主题+1个表。 就性能和其他因素而言,哪个更好?

  • 我正在尝试使用KStream-KTable leftJoin来丰富主题A中的条目和主题B。主题A是我的KStream,主题B是我的KTtable,它有大约2300万条记录。这两个主题中的键都没有计算,所以我必须使用reducer将KStream(主题B)转换为KTable。 下面是我的代码: 1)KTable初始化速度慢。(2000 msg/s左右),这正常吗?我的主题是只有1个分区。有什么方法可

  • 尝试合并多个 Kafka 流,聚合

  • 我正在尝试构建以下拓扑: > 使用Debezium连接器,我拉出2个表(我们称它们为表A和表DA)。根据DBZ,存储表行的主题具有{before:“...”,after:“...”}结构。 在我的拓扑中,第一步是从这两个“表”主题创建“干净的”KStreams。那里的子拓扑大致如下所示: 请注意,我显式地分配记录时间,因为表行将在它们最初发布后被CDC'ed“年”。该函数目前正在做的是伪造从201

  • 我有以下资料: streamB中的消息需要使用表A中的数据进行丰富。 示例数据: 在一个完美的世界里,我想做什么 不幸的是,这对我不起作用,因为我的数据是这样的:每次将消息写入主题a时,相应的消息也会写入主题B(源是单个DB事务)。现在,在这个初始“创建”事务之后,主题B将继续接收更多消息。有时,主题B上会出现每秒数个事件,但对于给定的键,也可能出现连续事件间隔数小时的情况。 简单的解决方案不起作

  • 我目前正在尝试使用KStream到KTable的连接来执行Kafka主题的充实。对于我的概念证明,我目前有一个Kafka流,其中有大约600,000条记录,它们都有相同的键,还有一个KTable,它是从一个主题创建的,其中KTable主题中的键与创建KStream的主题中的600,000条记录中的键匹配。 当我使用左联接(通过下面的代码)时,所有记录在ValueJoiner上都返回NULL。 下面