当前位置: 首页 > 知识库问答 >
问题:

同键主题未能加入Kafka流

井疏珂
2023-03-14

我最近在一个streams应用程序中遇到了一个以前没有遇到过的问题,它很难跟踪与键控/连接相关的问题(以及更新后的分区问题)。

我有两个主题(raw_events和processed_users),这两个主题的密钥相同,但是当我试图对这两个主题执行连接时,尽管密钥相同,但只有一些连接是成功的。

为简洁起见,应用程序的基本工作流程如下:

    null

问题本身是在步骤5中产生的。由于RAW_Event主题和Processing_Users主题之间的连接只有一些是按预期工作的,所以上面(事件充实)。

使用经过整个管道的24条记录的子集,主题中的24对中只有5对成功加入。那些起作用的似乎是相同的一致的,但我在数据中没有看到任何东西来说明为什么一个能起作用而另一个不行:

raw_event keys          processing_user keys
mawjuG0B9k3AiALz0_2S    0q0juG0B9k3AiALz8ApP 
xEEcv20B9k3AiALzEN0m    m60juG0B9k3AiALz5gU5 
zqwjuG0B9k3AiALzz_tg    ua0juG0B9k3AiALz7wqa 
v60juG0B9k3AiALz6Aal    xEEcv20B9k3AiALzEN0m 
0q0juG0B9k3AiALz8ApP    zqwjuG0B9k3AiALzz_tg 
RK0juG0B9k3AiALz5QUw    zK0juG0B9k3AiALz6Aal
0a0juG0B9k3AiALz6Aal    Ta0juG0B9k3AiALz5QUw 
8KwjuG0B9k3AiALz1v58    RKwjuG0B9k3AiALz1P7C 
c60juG0B9k3AiALz5gU4    -60juG0B9k3AiALz3gGn 
RKwjuG0B9k3AiALz1P7C    Va0juG0B9k3AiALz5QUw 
zK0juG0B9k3AiALz6Aal    560juG0B9k3AiALz3QGh 
Ta0juG0B9k3AiALz5QUw    mawjuG0B9k3AiALz0_2S 
Va0juG0B9k3AiALz5QUw    -K0juG0B9k3AiALz3QGh 
pK0juG0B9k3AiALz5gU5    zq0juG0B9k3AiALz6Aal 
Xa0juG0B9k3AiALz2QCh    RK0juG0B9k3AiALz5QUw 
560juG0B9k3AiALz3QGh    v60juG0B9k3AiALz6Aal 
-K0juG0B9k3AiALz3QGh    Xa0juG0B9k3AiALz2QCh 
-60juG0B9k3AiALz3gGn    P60juG0B9k3AiALz5QUw 
F60juG0B9k3AiALz3gKn    pK0juG0B9k3AiALz5gU5 
m60juG0B9k3AiALz5gU5    0a0juG0B9k3AiALz6Aal 
zq0juG0B9k3AiALz6Aal    3K0juG0B9k3AiALz3QGh 
ua0juG0B9k3AiALz7wqa    8KwjuG0B9k3AiALz1v58 
3K0juG0B9k3AiALz3QGh    F60juG0B9k3AiALz3gKn 
P60juG0B9k3AiALz5QUw    c60juG0B9k3AiALz5gU4 

我试过将主题连接为KStreams和KTables(以及我能想到的所有组合)的组合,但是在这个小子集的24条消息中,只有5条是成功的。

当前代码的当前示例(和略微简化):

val events = streams.createKTable<RawEvent>("raw_events)
val users = streams.createKStream<ProcessingUser>("processing_users)

val finalEvents = events
    .join(users, eventsProcessor::enrichWithUsers)
    .to("final_events")

鉴于RAW_EventsProcessing_Users主题中有相应的对(1:1),有没有解释为什么有些连接会成功而其他连接会失败?其中只有5个对能够一致地通过final_events主题(总是相同的对)。

欢迎任何其他建议!

    null

一直成功的五个连接看起来只是因为键位于每个主题的相同分区上:

successful events       raw_events partition  processing_users partition
RK0juG0B9k3AiALz5QUw    3                     3
m60juG0B9k3AiALz5gU5    7                     7
ua0juG0B9k3AiALz7wqa    7                     7
8KwjuG0B9k3AiALz1v58    8                     8
RKwjuG0B9k3AiALz1P7C    9                     9

尽管两个主题中都存在所有键,但它们似乎不是使用相同的策略进行分区的(即两个主题都包含具有相同键的所有消息,但有些消息可能出现在RAW_Events中的一个分区上,而出现在Processing_Users中的另一个分区上),如下面的分区/计数表示所示:

值得强调的是,出现在RAW_Events主题中的消息是在上面描述的streams应用程序工作流之外产生的,这使我相信需要回答以下问题:

  • 假设分区策略会导致跨分区的规范化分布,那么它是否可以允许分区策略的责任完全落在流工作流的入口点上?(例如,如果给定的键在RAW_Events的分区7中,并且您将具有相同键的记录发送到Preprocessing_Users,则该记录将落入分区7?
  • 如果是的话,这是一个合理的策略吗?或者,是否有一种方法可以强制执行此行为,而无需编写所有生产者和流应用程序都使用的自定义分区器?
  • 如果不是,是否可以使用现有主题(在本例中为raw_event并基本上重新分区整个主题,以便使用默认分区策略?

共有1个答案

曾飞沉
2023-03-14

正如在对原始文章的更新中详细介绍的那样,问题本身是.NET Producer应用程序之间分区策略不同的结果,后者默认使用consistent_random分区策略,而默认Java streams应用程序则使用murmur2random策略。

有几种方法可以解决这一问题,但在这种特殊情况下,最简单的方法是调整生产者使用适当的策略:

// Set the default partitioning strategy 
ProducerConfig.Partitioner = Partitioner.Murmur2Random;

另一种方法可能是编写创建一个custompartitioner类,它将实现您首选的分区策略,以模拟您的生产者。

 类似资料:
  • 我们需要在Kafka主题上实现连接,同时考虑延迟数据或“不在连接中”,这意味着流中延迟或不在连接中的数据不会被丢弃/丢失,但会被标记为超时, 连接的结果被产生以输出Kafka主题(如果发生超时字段)。 (独立部署中的火花2.1.1,Kafka 10) Kafka在主题:X,Y,...输出主题结果将如下所示: 我发现三个解决方案写在这里,1和2从火花流官方留档,但与我们不相关(数据不在加入Dtsre

  • 假设我有三个Kafka主题,其中充满了表示不同聚合中发生的业务事件的事件(事件源应用程序)。这些事件允许构建具有以下属性的聚合: 用户:usedId,名称 应用程序的模块:moduleId,name 授予用户应用程序模块:grantId、userId、moduleId、作用域 现在,我想创建一个包含所有授权的流,其中包含用户和产品的名称(而不是id)。我想这么做: 通过按用户ID对事件进行分组,为

  • 我有一个要求加入3个Kafka主题。前两个主题A和B将使用inner join添加,因为消息键相同,并且生成一个POJO与B相同的新Kafka流。现在,使用这个累积的流,我需要加入另一个主题C,并且我需要根据C中存在的字段对输出进行分组。 到目前为止,我有以下方法: 前两个主题(A和B)的KStream-KStream inner join是否可以不发布任何主题的累积输出,并且仍然可以在下面使用它

  • 我在《掌握Kafka Streams and ksqlDB》一书中遇到了以下两个短语,作者使用了两个术语,它们的真正含义是“压缩主题”和“未压缩主题” 他们对“日志压缩”有什么看法吗? 表可以被认为是对数据库的更新。在日志的这种视图中,只保留每个键的当前状态(给定键的最新记录或某种聚合)。表通常是从压缩的主题构建的。 用数据库的说法,流可以被视为插入。每个不同的记录都保留在此日志视图中。流通常是从

  • 只是关于Kafka的后续问题-未压缩主题与压缩主题 正如那里所说, 用数据库的说法,流可以被视为插入。每个不同的记录都保留在此日志视图中。流通常是从未压缩的主题构建的。 作为最佳实践,关于未压缩主题的语义,是否应禁用要在日志启用程序中取消压缩的主题,以便不会发生压缩(清理),其属性如下: 日志清洁工enable=false或log。清洁工启用=true(默认),清除策略为“delete”(默认)

  • 如何以可伸缩的方式编写连接多个Kafka主题的使用者? 我有一个主题用一个键发布事件,第二个主题用相同的键发布与第一个主题的子集相关的其他事件。我想编写一个订阅这两个主题的使用者,并为出现在这两个主题中的子集执行一些额外的操作。 理想情况下,我需要将主题绑定在一起,以便以相同的方式对它们进行分区,并同步地将分区分配给使用者。我怎么能这么做? 我知道Kafka Streams将主题连接在一起,这样键