当前位置: 首页 > 知识库问答 >
问题:

Kafka KStream-KTable加入竞赛条件

郎魁
2023-03-14

我有以下资料:

KTable<Integer, A> tableA = builder.table("A");
KStream<Integer, B> streamB = builder.stream("B");  

streamB中的消息需要使用表A中的数据进行丰富。

示例数据:

Topic A: (1, {name=john})
Topic B: (1, {type=create,...}), (1, {type=update,...}), (1, {type=update...})

在一个完美的世界里,我想做什么

streamB.join(tableA, (b, a) -> { b.name = a.name; return b; })
       .selectKey((k,b) -> b.name)
       .to("C");

不幸的是,这对我不起作用,因为我的数据是这样的:每次将消息写入主题a时,相应的消息也会写入主题B(源是单个DB事务)。现在,在这个初始“创建”事务之后,主题B将继续接收更多消息。有时,主题B上会出现每秒数个事件,但对于给定的键,也可能出现连续事件间隔数小时的情况。

简单的解决方案不起作用的原因是原始的“创建”事务会导致竞争条件:主题A和主题B几乎同时获取它们的消息,如果主题B的消息首先到达拓扑的“连接”部分(比方说在主题A之前几毫秒消息到达那里)的tableA将不包含相应的条目。此时事件丢失。我可以在主题C上看到这种情况:一些事件显示出来,一些没有(如果我使用leftJoin,所有事件都显示出来,但有些事件有空键,这相当于丢失)。这只是初始‘创造’交易的问题。之后,每当一个事件到达主题B时,相应的条目就存在于表A中。

所以我的问题是:你如何解决这个问题?

我目前的解决方案很难看。我所做的是创建一个“B集合”,并使用

B.groupByKey()
 .aggregate(() -> new CollectionOfB(), (id, b, agg) -> agg.add(b));
 .join(tableA, ...);

现在我们有一个KTable-KTable连接,它不容易受到这种竞争条件的影响。我认为这是“丑陋的”的原因是因为在每次连接后,我必须向主题B发送一条特殊的消息,基本上说“从集合中删除我刚刚处理的事件”。如果此特殊消息没有发送到主题B,则集合将继续增长,并且集合中的每个事件将在每个连接上报告。

目前我正在研究窗口联接是否有效(将a和B读入KStreams并使用窗口联接)。我也不确定这是否有效,因为窗口大小没有上限。我想说,“窗口在‘之前’1秒开始,在‘之后’无限秒结束”。即使我能以某种方式做到这一点,我还是有点担心拥有一个无界窗口的空间需求。

如有任何建议,将不胜感激。

共有1个答案

齐昆
2023-03-14

不确定您使用的是哪个版本,但最新的Kafka 2.1改进了流表联接。即使在第2.1节之前,以下内容仍然适用:

  • 流表连接基于事件时间
  • Kafka Streams基于事件时间处理消息,但是,以偏移顺序(对于两个输入流,首先处理具有较小记录时间戳的流)
  • 如果要确保表首先更新,表更新记录的时间戳应该小于流记录

自2.1起:

  • 为了允许一些延迟,您可以配置max.task。闲置的ms用于在只有一个输入主题具有输入数据的情况下延迟处理的配置

事件时处理顺序在2.0和更早版本中实现为最佳努力,这可能导致您描述的竞争条件。在2.1中,处理顺序是有保证的,并且只有当max.task.idle.ms命中时才可能被违反。

有关详细信息,请参阅https://cwiki.apache.org/confluence/display/KAFKA/KIP-353:改进Kafka Streams时间戳同步

 类似资料:
  • 问题内容: 我正在尝试编写一个查询,仅当用户打开的活动声明不超过两个时,才将“声明”表中的行更新为活动状态。因此,它是用于数据的完整性非常重要的是,用户 永远 有两个以上的主动要求在任何给定的时间打开。 我在并发环境中运行此查询,因此两个进程可能同时执行此查询。我也在默认隔离级别下运行它。 我想知道是否由于子选择和更新子句之间的竞争条件而在某个时候打开一个用户有两个以上的主动声明的可能性。 值得一

  • 问题内容: 如何停止MySQL中的竞争条件?当前的问题是由一个简单的算法引起的: 从表中选择一行 如果不存在,将其插入 然后会得到重复的行,或者如果您通过唯一/主键阻止它,则会出现错误。 现在,通常我认为事务在这里有所帮助,但是由于该行不存在,所以事务实际上并没有帮助(或者我是否错过了什么?)。 LOCK TABLE听起来有些矫kill过正,尤其是如果该表每秒更新多次。 我唯一想到的其他解决方案是

  • 9.1. 竞争条件 在一个线性(就是说只有一个goroutine的)的程序中,程序的执行顺序只由程序的逻辑来决定。例如,我们有一段语句序列,第一个在第二个之前(废话),以此类推。在有两个或更多goroutine的程序中,每一个goroutine内的语句也是按照既定的顺序去执行的,但是一般情况下我们没法去知道分别位于两个goroutine的事件x和y的执行顺序,x是在y之前还是之后还是同时发生是没法

  • 1 竞态条件漏洞 下面的代码段属于某个特权程序(即 Set-UID 程序),它使用 Root 权限运行。 1: if (!access("/tmp/X", W_OK)) { 2: /* the real user ID has access right */ 3: f = open("/tmp/X", O_WRITE); 4 : write_to_file(f); 5: } 6: else

  • 竞态条件是由和事件时间相关的意料之外的依赖所导致的反常行为。 换句话说,一个程序员不正确的假设一个特殊的事件总是在另一个事件之前发生。 一些通常的导致竞态条件的原因是信号,存取检查和打开文件操作。 由于信号生来就是异步事件所以在处理他们时要特别当心。存取检查中使 用access(2)然后使用open(2) 是很明显的非原子操作。用户可以在两次调用中移走文件。换言之,有特 权的程序应该使用seteu

  • 本书主要面向 CTF Pwn 初学者,专注于 Linux 二进制安全。全书包含12章,从二进制底层讲起,结合源码详细分析了常见二进制安全漏洞、缓解机制以及漏洞利用方法,并辅以分析工具和环境搭建的讲解,循序渐进,让读者可以进行系统性的学习。