当前位置: 首页 > 知识库问答 >
问题:

Kafka Streams:用于聚合的自定义时间戳提取器

赵君植
2023-03-14

我正在构建一个非常简单的KafkaStreams演示应用程序,以测试用例。

我无法升级我正在使用的Kafka broker(当前版本为0.10.0),并且有几条消息是由0.10.0之前的生产者编写的,因此我使用自定义时间戳提取器,我将其作为默认值添加到主类开头的配置中:

config.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, GenericRecordTimestampExtractor.class);

当从我的源主题消费时,这非常好。但是当使用聚合运算符时,我遇到了一个异常,因为当从内部聚合主题消费时,使用了TimestampExtractorFailOnInvalidTimestamp实现而不是自定义实现。

Streams应用程序的代码如下所示:

...

KStream<String, MyValueClass> clickStream = streamsBuilder
              .stream("mytopic", Consumed.with(Serdes.String(), valueClassSerde));

KTable<Windowed<Long>, Long> clicksByCustomerId = clickStream
              .map(((key, value) -> new KeyValue<>(value.getId(), value)))
              .groupByKey(Serialized.with(Serdes.Long(), valueClassSerde))
              .windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(1)))
              .count();
...

我遇到的例外情况如下:

    Exception in thread "click-aggregator-b9d77f2e-0263-4fa3-bec4-e48d4d6602ab-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: 
Input record ConsumerRecord(topic = click-aggregator-KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition, partition = 9, offset = 0, CreateTime = -1, serialized key size = 8, serialized value size = 652, headers = RecordHeaders(headers = [], isReadOnly = false), key = 11230, value = org.example.MyValueClass@2a3f2ea2) has invalid (negative) timestamp. 
Possibly because a pre-0.10 producer client was used to write this record to Kafka without embedding a timestamp, or because the input topic was created before upgrading the Kafka cluster to 0.10+. Use a different TimestampExtractor to process this data.

现在的问题是:有什么方法可以让Kafka Streams在从内部聚合主题读取时使用自定义的TimestampExtractor(最佳情况下仍然使用Streams DSL)?

共有3个答案

鞠隐水
2023-03-14

读完马蒂亚斯的回答后,我仔细检查了一切,问题的原因是Kafka经纪人和Kafka流应用程序之间的不兼容版本。 <罢工> 我愚蠢到使用Kafka Streams 1.0.0与0.10.1.1Broker,这在Kafka Wiki中被明确声明为不兼容 这里。

编辑(thx to Matthias):问题的实际原因是我们的0.10.1使用的日志格式。x broker仍然是0.9.0。x、 与Kafka流不兼容。

陆浩博
2023-03-14

这是众所周知的问题:-)。我对项目中的老客户机也有同样的问题,这些客户机仍然使用老Kafka客户机,比如0.9,并且在与一些“未经认证”的客户机通信时也存在同样的问题。网络客户端。

所以我写了一篇专题文章:

public class MyTimestampExtractor implements TimestampExtractor {

    private static final Logger LOG = LogManager.getLogger( MyTimestampExtractor.class );

    @Override
    public long extract ( ConsumerRecord<Object, Object> consumerRecord, long previousTimestamp ) {
        final long timestamp = consumerRecord.timestamp();

        if ( timestamp < 0 ) {
            final String msg = consumerRecord.toString().trim();
            LOG.warn( "Record has wrong Kafka timestamp: {}. It will be patched with local timestamp. Details: {}", timestamp, msg );
            return System.currentTimeMillis();
        }

        return timestamp;
    }
}

当有许多消息时,您可能会跳过日志记录,因为它可能会泛滥。

程和畅
2023-03-14

您不能更改时间戳提取器(如v1.0.0)。出于正确性的原因,这是不允许的。

但我真的很想知道,一开始是如何将时间戳为-1的记录写入这个主题的。Kafka Streams在写入记录时使用自定义提取器提供的时间戳。还要注意,KafkaProducer不允许写入时间戳为负的记录。

因此,我能想到的唯一解释是,其他制作人确实在重新划分主题中写了文章——这是不允许的。。。只有Kafka流应该写入重新分区主题。

我想,你需要删除这个主题,让Kafka流重新创建它,回到一个干净的状态。

根据对另一个答案的讨论/评论:

使用Kafka Streams需要0.10格式。如果升级代理并保持0.9或更旧的格式,Kafka Streams可能无法按预期工作。

 类似资料:
  • 我正在尝试使用Kafka流来处理Kafka主题中的一些数据。数据来自Kafka0.11.0编写的Kafka主题,该主题没有嵌入时间戳。在网上读了一些书之后,我明白了我可以通过在自定义类中扩展类并将其传递到中来解决这个问题。 我是这样做的- 我基于github上的这段代码 但是,当我运行

  • 我有一个Java应用程序,它使用Prometheus库,以便在执行期间收集度量。稍后,我将Prometheus服务器链接到Grafana,以便可视化这些度量。我想知道是否可以让格拉法纳为这些度量显示一个自定义的X轴?通常的X轴是在当地时间。我能让它显示带有GPS/UTC时间戳的数据吗?有可能吗?如果是,需要什么?保存时间戳的附加度量参数? 我这样声明度量变量: 并添加如下所示的数据: 如有任何帮助

  • 我正在为一个项目试验Apache Flink。我正在使用 Flink 来聚合一系列传感器捕获的环境数据。为了计算空气质量指数,我正在尝试实现一个自定义聚合函数,以便在带有窗口的分组选择中使用,但我对类型提示有问题。下面是带有 DataTypeHint 注释的函数代码: 但我有以下例外: 我做错了什么?

  • 问题内容: 我正在尝试在Oracle中编写一个自定义聚合函数,并将该函数与其他一些函数一起分组在一个包中。作为一个示例(为了模拟我遇到的问题),假设我的自定义聚合对数字进行求和看起来像: 如果我编写以下函数定义: 和相应的类型声明进行测试: 这个说法: 给出正确的结果70。但是,使用函数定义创建一个包: 并通过以下方式调用: 与爆炸 是否可以在包声明中嵌套自定义聚合函数? 问题答案: Oracle

  • 我想在我的表中为过期的“竞赛”设置一个未来的时间戳。我可以毫无问题地输入时间,除了当我检索输入时,它似乎不会返回一个碳实例,而只是一个带时间的字符串? 这就是我用来创建新竞赛的内容,表中的时间格式与创建的和更新的字段完全相同。当我尝试以下方法时,它们似乎返回了一个碳实例: 为什么我没有得到一个碳实例返回? 我的迁移文件如下所示:

  • 我想将一个交易流聚合成相同交易量的窗口,这是区间内所有交易的交易规模之和。 我能够编写一个自定义触发器,将数据分区到Windows中。代码如下: 上面的代码可以将其划分为大致相同大小的窗口: 现在我喜欢对数据进行分区,以便卷与触发器值完全匹配。为此,我需要稍微修改一下数据,方法是将区间结束时的交易分成两部分,一部分属于正在触发的实际窗口,剩余的超过触发器值的数量必须分配给下一个窗口。 那可以用一些