当前位置: 首页 > 知识库问答 >
问题:

按键对Ktable分组后的时间戳无效(负)

黎征
2023-03-14

我正在使用KakfkaStreams(2.3.0)使用流和Ktable之间的连接来丰富一些值(压缩主题)。问题是压缩主题是使用与流主题不同的分区器编写的,因此连接没有按预期工作(一些键不匹配,因为它们在不同的分区中)。

我开始研究通过使用groupBy()和reduce()来重新分区压缩的主题,但当它开始读取创建的重新分区主题时,它开始抛出一个StreamsException并显示消息

输入记录ConsumerRecord(topic=mappings-table-repartition,partition=18,leaderEpoch=null,offset=0,CreateTime=-1,serialized key size=37,serialized value size=20,headers=RecordHeaders(headers=[],isReadOnly=false),key=0CECDec3863208E57,value=(1126999878035640323<-null))的时间戳无效(负)。可能是因为使用了0.10之前的producer客户机将该记录写入Kafka而没有嵌入时间戳,或者是因为输入主题是在将Kafka集群升级到0.10+之前创建的。使用不同的TimestampExtractor处理此数据。

重新分区是由KafkaStreams自动创建和填充的,所以这看起来很奇怪,它会写一个无效的记录。我甚至尝试在阅读最初的压缩主题时提供一个自定义的时间戳提取器,但它没有起到任何作用。stacktrace似乎表明它正在使用一个内部TimestampExtractor(查看代码确实如此)

在t org.apache.kafka.streams.processor.internals.streamthread.run(streamthread.java:774)

代码如下所示:

KTable<String, String> table = streamsBuilder
                .table(mappingsTopic,
                       Consumed.with(Serdes.String(), Serdes.String(),
                                      null,
                                      Topology.AutoOffsetReset.EARLIEST))
    .groupBy(KeyValue::pair,
             Grouped.with("mappings-table", Serdes.String(), Serdes.String()))
   .reduce((value1, value2) -> value2, (value1, value2) -> value2)

共有1个答案

柴彬
2023-03-14

如错误消息所示,您需要升级代理使用的消息格式(参见.broker configlog.message.format.version)

从Kafka Streams1.0开始,需要0.10或更新的消息格式。

来源:https://kafka.apache.org/23/documentation/streams/upgrade-guide

Kafka Streams尝试在写入时设置记录时间戳,但是,旧的消息格式不支持时间戳(在写入时,当消息格式降级时,时间戳将丢失)。因此,在读取时,当消息格式转换回来时,将在消息中放入一个伪时间戳-1

不允许为重新分区主题设置不同的时间戳提取器,因为上游时间戳必须向下游转发以确保正确性。

 类似资料:
  • 问题内容: 我有一个用例,其中: 数据的格式为:Col1,Col2,Col3和时间戳。 现在,我只想获取行数与时间戳箱的数量。 也就是说,对于每半小时的存储桶(甚至没有对应行的存储桶),我需要计算有多少行。 时间戳记分布在一年内,因此我无法将其划分为24个存储桶。 我必须每隔30分钟将它们装箱。 问题答案: 通过

  • 问题内容: 我需要将表分组为15分钟间隔。我可以这样做: 但是要在图表中显示返回的数据,我还需要插入没有任何数据且当前未出现在我的select语句中的间隔。我该如何插入这些? 问题答案: 用15分钟的增量创建一个带有所有可能时间戳的表,然后从该表向上面的查询进行LEFT JOIN。 如果您知道图表始终涵盖24小时,则只需创建一个数字为0-95的表格,然后为每个条目将其添加到图表的开始时间。

  • 问题内容: 在nodejs应用程序中,我有一组事件对象,其格式如下: eventsArray具有n个元素的可变长度,并且假设我选择时间参考为巴黎时间,我希望能够按天,周或月对元素进行分组: 我对操纵unix时间戳经验很少,我想知道如何做到这一点,或者是否有npm模块可以简化此过程。 问题答案: 这是我的解决方案。请记住,从纪元开始,天,周和月相对于原点:

  • 问题内容: 我有下面的简单表: 我想了平均,并且每一天为6个小时桶。例如00:00至06:00、06:00至12:00、12:00至18:00和18:00至00:00。 我可以使用以下查询按年,月,日和小时分组: 但是我无法将每天分为上述4个时段,非常欢迎您提供任何帮助。 问题答案: 我认为将(商的小时数/ 6)的商的整数值分组应该会有所帮助。试试看,看看是否有帮助。您的分组依据应该是 其背后的逻

  • 问题内容: 我有一个包含datetime列和一些其他列的表。datetime列表示发生的事件。它可以包含一个时间(事件在那个时间发生)或NULL(事件没有发生) 我现在想计算在特定时间间隔(15分钟)内发生的记录数,但是不知道该怎么做。 例子: 现在,我想创建一个查询,该查询将创建类似于以下内容的结果集: 这在SQL中可能吗,或者有人可以建议我可以使用哪些其他工具?(例如,将数据导出到电子表格程序

  • 问题内容: 我有以下sql create语句 给出以下错误 这是什么错误? 问题答案: 那是由于服务器SQL模式-NO_ZERO_DATE。 根据参考:-在严格模式下,不允许将其作为有效日期。您仍然可以使用 IGNORE 选项插入零日期。如果不在严格模式下,则接受日期,但会生成警告。