我正在尝试以下列方式使用kafka流实现事件源模式。
我在一家安全服务公司工作,处理两个用例:
RegisterUserCommand
应生成 UserRegisterEvent
。ChangeUserNameCommand
应生成 UserNameChangedEvent
。我有两个主题:
“安全命令”。
每个命令都是键控的,密钥是用户的电子邮件。例如:foo@bar.com:{"type": "RegisterUserCommand", "command": {"name":"Alex","email":"foo@bar.com"}}
foo@bar.com:{"type": "ChangeUserNameCommand", "command": {"email":"foo@bar.com","newName":"Alex1"}}
foo@bar.com:{"type":"UserRegisteredEvent","event":{"email":"foo@bar.com","name":"Alex", "version":0}}
foo@bar.com:{"type":"UserNameChangedEvent","event":{"email":"foo@bar.com","name":"Alex1","version":1}}
Kafka Streams version 2.8.0
Kafka version 2.8
实现思想可以用以下拓扑表示:
commandStream = builder.stream("security-command");
eventStream = builder.stream("security-event",
Consumed.with(
...,
new ZeroTimestampExtractor()
/*always returns 0 to get the latest version of snapshot*/));
// build the snapshot to get the current state of the user.
userSnapshots = eventStream.groupByKey()
.aggregate(() -> new UserSnapshot(),
(key /*email*/, event, currentSnapshot) -> currentSnapshot.apply(event));
// join commands with latest snapshot at the time of the join
commandWithSnapshotStream =
commandStream.leftJoin(
userSnapshots,
(command, snapshot) -> new CommandWithUserSnapshot(command, snapshot),
joinParams
);
// handle the command given the current snapshot
resultingEventStream = commandWithSnapshotStream.flatMap((key /*email*/, commandWithSnapshot) -> {
var newEvents = commandHandler(commandWithSnapshot.command(), commandWithSnapshot.snapshot());
return Arrays.stream(newEvents )
.map(e -> new KeyValue<String, DomainEvent>(e.email(), e))
.toList();
});
// append events to events topic
resultingEventStream.to("security-event");
对于这个拓扑,我使用的是EOS exactly_once_beta
。
此拓扑的更显式版本:
KStream<String, Command<DomainEvent[]>> commandStream =
builder.stream(
commandTopic,
Consumed.with(Serdes.String(), new SecurityCommandSerde()));
KStream<String, DomainEvent> eventStream =
builder.stream(
eventTopic,
Consumed.with(
Serdes.String(),
new DomainEventSerde(),
new LatestRecordTimestampExtractor() /*always returns 0 to get the latest snapshot of the snapshot.*/));
// build the snapshots ktable by aggregating all the current events for a given user.
KTable<String, UserSnapshot> userSnapshots =
eventStream.groupByKey()
.aggregate(
() -> new UserSnapshot(),
(email, event, currentSnapshot) -> currentSnapshot.apply(event),
Materialized.with(
Serdes.String(),
new UserSnapshotSerde()));
// join command stream and snapshot table to get the stream of pairs <Command, UserSnapshot>
Joined<String, Command<DomainEvent[]>, UserSnapshot> commandWithSnapshotJoinParams =
Joined.with(
Serdes.String(),
new SecurityCommandSerde(),
new UserSnapshotSerde()
);
KStream<String, CommandWithUserSnapshot> commandWithSnapshotStream =
commandStream.leftJoin(
userSnapshots,
(command, snapshot) -> new CommandWithUserSnapshot(command, snapshot),
commandWithSnapshotJoinParams
);
var resultingEventStream = commandWithSnapshotStream.flatMap((key /*email*/, commandWithSnapshot) -> {
var command = commandWithSnapshot.command();
if (command instanceof RegisterUserCommand registerUserCommand) {
var handler = new RegisterUserCommandHandler();
var events = handler.handle(registerUserCommand);
// multiple events might be produced when a command is handled.
return Arrays.stream(events)
.map(e -> new KeyValue<String, DomainEvent>(e.email(), e))
.toList();
}
if (command instanceof ChangeUserNameCommand changeUserNameCommand) {
var handler = new ChangeUserNameCommandHandler();
var events = handler.handle(changeUserNameCommand, commandWithSnapshot.userSnapshot());
return Arrays.stream(events)
.map(e -> new KeyValue<String, DomainEvent>(e.email(), e))
.toList();
}
throw new IllegalArgumentException("...");
});
resultingEventStream.to(eventTopic, Produced.with(Serdes.String(), new DomainEventSerde()));
我遇到的问题:
foo@bar.com:{"type": "RegisterUserCommand", "command": {"name":"Alex","email":"foo@bar.com"}}
foo@bar.com:{"type": "ChangeUserNameCommand", "command": {"email":"foo@bar.com","newName":"Alex1"}}
Outcome:
1. Stream application fails when processing the ChangeUserNameCommand, because the snapshot is null.
2. The events topic has a record for successful registration, but nothing for changing the name:
/*OK*/foo@bar.com:{"type":"UserRegisteredEvent","event":{"email":"foo@bar.com","name":"Alex", "version":0}}
Thoughts:
When processing the ChangeUserNameCommand, the snapshot is missing in the aggregated KTable, userSnapshots. Restarting the application succesfully produces the following record:
foo@bar.com: {"type":"UserNameChangedEvent","event":{"email":"foo@bar.com","name":"Alex1","version":1}}
Tried increasing the max.task.idle.ms to 4 seconds - no effect.
Producing:
// Produce to command topic
foo@bar.com:{"type": "RegisterUserCommand", "command": {"name":"Alex","email":"foo@bar.com"}}
// event topic outcome
/*OK*/ foo@bar.com:{"type":"UserRegisteredEvent","event":{"email":"foo@bar.com","name":"Alex", "version":0}}
// Produce at once to command topic
foo@bar.com:{"type": "ChangeUserNameCommand", "command": {"email":"foo@bar.com","newName":"Alex1"}}
foo@bar.com:{"type": "ChangeUserNameCommand", "command": {"email":"foo@bar.com","newName":"Alex2"}}
foo@bar.com:{"type": "ChangeUserNameCommand", "command": {"email":"foo@bar.com","newName":"Alex3"}}
// event topic outcome
/*OK*/foo@bar.com: {"type":"UserNameChangedEvent","event":{"email":"foo@bar.com","name":"Alex1","version":1}}
/*NOK*/foo@bar.com: {"type":"UserNameChangedEvent","event":{"email":"foo@bar.com","name":"Alex2","version":1}}
/*NOK*/foo@bar.com: {"type":"UserNameChangedEvent","event":{"email":"foo@bar.com","name":"Alex3","version":1}}
Thoughts:
'ChangeUserNameCommand' commands are joined with a stale version of snapshot (pay attention to the version attribute).
The expected outcome would be:
foo@bar.com: {"type":"UserNameChangedEvent","event":{"email":"foo@bar.com","name":"Alex1","version":1}}
foo@bar.com: {"type":"UserNameChangedEvent","event":{"email":"foo@bar.com","name":"Alex2","version":2}}
foo@bar.com: {"type":"UserNameChangedEvent","event":{"email":"foo@bar.com","name":"Alex3","version":3}}
Tried increasing the max.task.idle.ms to 4 seconds - no effect, setting the cache_max_bytes_buffering to 0 has no effect.
在构建这样的拓扑时,我缺少什么?我希望每个命令都在最新版本的快照上处理。如果我生成命令时它们之间有几秒钟的延迟,则一切按预期工作。
找到了根本原因。不确定这是设计使然还是错误,但流任务在每个处理周期中只等待一次其他分区中的数据。因此,如果首先读取命令主题中的 2 条记录,则流任务将在处理第一个命令记录时等待 max.task.idle.ms
,从而允许 poll() 阶段发生。处理后,在处理第二个命令期间,流任务将不允许轮询获取由第一个命令处理产生的新生成的事件。
在 kafka 2.8 中,负责此行为的代码位于 StreamTask.java 中。IsProcessable()
在处理阶段开始时被调用。如果它返回 false,这将导致重复轮询阶段。
public boolean isProcessable(final long wallClockTime) {
if (state() == State.CLOSED) {
return false;
}
if (hasPendingTxCommit) {
return false;
}
if (partitionGroup.allPartitionsBuffered()) {
idleStartTimeMs = RecordQueue.UNKNOWN;
return true;
} else if (partitionGroup.numBuffered() > 0) {
if (idleStartTimeMs == RecordQueue.UNKNOWN) {
idleStartTimeMs = wallClockTime;
}
if (wallClockTime - idleStartTimeMs >= maxTaskIdleMs) {
return true;
// idleStartTimeMs is not reset to default, RecordQueue.UNKNOWN, value,
// therefore the next time when the check for all buffered partitions is done, `true` is returned, meaning that the task is ready to be processed.
} else {
return false;
}
} else {
// there's no data in any of the topics; we should reset the enforced
// processing timer
idleStartTimeMs = RecordQueue.UNKNOWN;
return false;
}
}
我想您错过了表格的更改日志恢复部分。阅读本文以了解更改日志恢复会发生什么情况。
对于表,它更复杂,因为它们必须维护其他信息(其状态)以允许有状态处理,例如连接和聚合,如 COUNT() 或 SUM()。为了实现这一点,同时确保高处理性能,表(通过其状态存储)在 Kafka Streams 应用程序实例或 ksqlDB 服务器中的本地磁盘上具体化。但是机器和容器以及任何本地存储的数据都可能会丢失。我们如何使表也具有容错能力?
答案是,存储在表中的任何数据也远程存储在Kafka中。每个表都有自己的更改流,我们可以说,这是一个内置的更改数据捕获(CDC)设置。因此,如果我们有一个按客户列出的账户余额表,那么每次更新账户余额时,都会将相应的更改事件记录到该表的更改流中。
还要记住,重新启动Kafka流应用程序不应该处理以前处理的事件。为此,您需要在处理消息后提交消息的偏移量。
我正在使KStream-KStream连接,其中创建2个内部主题。而KStream-KTable join将创建1个内部主题+1个表。 就性能和其他因素而言,哪个更好?
我正在尝试使用KStream-KTable leftJoin来丰富主题A中的条目和主题B。主题A是我的KStream,主题B是我的KTtable,它有大约2300万条记录。这两个主题中的键都没有计算,所以我必须使用reducer将KStream(主题B)转换为KTable。 下面是我的代码: 1)KTable初始化速度慢。(2000 msg/s左右),这正常吗?我的主题是只有1个分区。有什么方法可
尝试合并多个 Kafka 流,聚合
我正在尝试构建以下拓扑: > 使用Debezium连接器,我拉出2个表(我们称它们为表A和表DA)。根据DBZ,存储表行的主题具有{before:“...”,after:“...”}结构。 在我的拓扑中,第一步是从这两个“表”主题创建“干净的”KStreams。那里的子拓扑大致如下所示: 请注意,我显式地分配记录时间,因为表行将在它们最初发布后被CDC'ed“年”。该函数目前正在做的是伪造从201
我有以下资料: streamB中的消息需要使用表A中的数据进行丰富。 示例数据: 在一个完美的世界里,我想做什么 不幸的是,这对我不起作用,因为我的数据是这样的:每次将消息写入主题a时,相应的消息也会写入主题B(源是单个DB事务)。现在,在这个初始“创建”事务之后,主题B将继续接收更多消息。有时,主题B上会出现每秒数个事件,但对于给定的键,也可能出现连续事件间隔数小时的情况。 简单的解决方案不起作
我目前正在尝试使用KStream到KTable的连接来执行Kafka主题的充实。对于我的概念证明,我目前有一个Kafka流,其中有大约600,000条记录,它们都有相同的键,还有一个KTable,它是从一个主题创建的,其中KTable主题中的键与创建KStream的主题中的600,000条记录中的键匹配。 当我使用左联接(通过下面的代码)时,所有记录在ValueJoiner上都返回NULL。 下面