当前位置: 首页 > 知识库问答 >
问题:

KStream-KTable LeftJoin,KTable未完全加载时发生Join

闽焕
2023-03-14

我正在尝试使用KStream-KTable leftJoin来丰富主题A中的条目和主题B。主题A是我的KStream,主题B是我的KTtable,它有大约2300万条记录。这两个主题中的键都没有计算,所以我必须使用reducer将KStream(主题B)转换为KTable。

下面是我的代码:

KTable<String, String> ktable = streamsBuilder
     .stream("TopicB", Consumed.withTimestampExtractor(new customTimestampsExtractor()))
     .filter((key, value) -> {...})
     .transform(new KeyTransformer()) // generate new key
     .groupByKey()
     .reduce((aggValue, newValue) -> {...});

streamBuilder
     .stream("TopicA")
     .filter((key, value) -> {...})
     .transform(...)
     .leftJoin(ktable, new ValueJoiner({...}))
     .transform(...)
     .to("result")

1)KTable初始化速度慢。(2000 msg/s左右),这正常吗?我的主题是只有1个分区。有什么方法可以提高性能吗?我尝试设置以下内容以减少写入吞吐量,但似乎没有太大的提高。

CACHE_MAX_BYTES_BUFFERING_CONFIG = 10 * 1024 * 1024
COMMIT_INTERVAL_MS_CONFIG = 15 * 1000

2)当KTable未从主题B加载完成时发生连接。这里是连接发生时的偏移量(current-offset/log-end-offset)

   Topic A: 32725/32726 (Lag 1)
   Topic B: 1818686/23190390 (Lag 21371704)

更新:将时间戳设置为0时,我得到以下错误:

Caused by: org.apache.kafka.common.errors.UnknownProducerIdException: This exception is raised by the broker if it could not locate the producer metadata associated with the producerId in question. This could happen if, for instance, the producer's records were deleted because their retention time had elapsed. Once the last records of the producerID are removed, the producer's metadata is removed from the broker, and future appends by the producer will return this exception. 

我还尝试将max.task.idle.ms设置为>0(3秒30分钟),但仍然得到相同的错误。

更新:我通过将customTimestampsExtractor设置为6天前(仍早于主题A中的记录)修复了“UnknownProducerIdeXception”错误。我将thhink(不确定)设置为0触发了导致此错误的changelog上的保留。但是,在ktable完成加载之前,如果join仍然发生,则join仍然不起作用。这是为什么?

我使用的是Kafka Streams 2.3.0。

我在这里做错什么了吗?多谢。

共有1个答案

艾翼
2023-03-14

1.KTable初始化速度慢。(2000味精/秒左右),这正常吗?

这取决于您的网络,我认为限制是TopicB的消耗率,您使用的两个configcache_max_bytes_buffering_configcommit_interval_ms_config是为了在您希望生成多少KTable输出(因为KTable changelog是修订流)和您在将KTable更新到底层主题和下游处理器时接受多少延迟之间进行权衡。详细查看状态存储的Kafka流缓存配置和此blog部分表,而不是触发器

我认为提高TopicB的消耗率的好方法是增加更多的分区。

在您的示例中,此滞后是topicb的滞后,它并不意味着KTable未完全加载。当您的KTable处于state restore过程中时,它没有被完全加载,当它在实际运行您的stream app之前从KTable的底层changelog主题读取以恢复当前状态时,您就无法进行连接,因为stream app在state完全恢复之前才运行。

 类似资料:
  • 我有以下资料: streamB中的消息需要使用表A中的数据进行丰富。 示例数据: 在一个完美的世界里,我想做什么 不幸的是,这对我不起作用,因为我的数据是这样的:每次将消息写入主题a时,相应的消息也会写入主题B(源是单个DB事务)。现在,在这个初始“创建”事务之后,主题B将继续接收更多消息。有时,主题B上会出现每秒数个事件,但对于给定的键,也可能出现连续事件间隔数小时的情况。 简单的解决方案不起作

  • 我正在尝试以下列方式使用kafka流实现事件源模式。 我在一家安全服务公司工作,处理两个用例: 注册用户,处理 应生成 。 更改用户名,处理 应生成 。 我有两个主题: 命令主题,每个命令都是键控的,密钥是用户的电子邮件。例如: 实现思想可以用以下拓扑表示: 对于这个拓扑,我使用的是。 此拓扑的更显式版本: 我遇到的问题: 在具有现有记录的命令主题上启动流应用程序: 在构建这样的拓扑时,我缺少什么

  • 我正在尝试构建以下拓扑: > 使用Debezium连接器,我拉出2个表(我们称它们为表A和表DA)。根据DBZ,存储表行的主题具有{before:“...”,after:“...”}结构。 在我的拓扑中,第一步是从这两个“表”主题创建“干净的”KStreams。那里的子拓扑大致如下所示: 请注意,我显式地分配记录时间,因为表行将在它们最初发布后被CDC'ed“年”。该函数目前正在做的是伪造从201

  • 我正在使KStream-KStream连接,其中创建2个内部主题。而KStream-KTable join将创建1个内部主题+1个表。 就性能和其他因素而言,哪个更好?

  • 当尝试在Glion SceneBuilder中加载Fontawesomefx jar时,并非所有模块都已加载。 截图 我尝试过使用多个jar文件,但没有任何运气。 我使用的是Os X 10.12.6/Glion Scene Builder 10/OpenJDK 11.0.1。 无法在我的操作系统版本上安装最新版本的SceneBuilder。 有人知道我该怎么解决这个问题吗? Thanx公司

  • 我是Android Studio的新手,最后一天在我的mac上下载了它,但我永远无法使用它。组件下载即将结束,然后这个错误不断出现。请帮忙。我真的很想做我的项目。 警告:安装过程中发生错误:无法下载“https://dl.google.com/android/repository/android_m2repository_r30.zip”:读取超时,响应:200 OK 上述警告不断发生。