当前位置: 首页 > 知识库问答 >
问题:

Kafka流时间戳提取器在KGroupedStream处失败

郑声
2023-03-14

这个服务我已经测试了它,使用不同版本的Kafka(更高或等于0.10),它工作良好。

以下是我的配置

Spring:cloud:stream:kafka:streams:binder:brokers:${KAFKA_BROKERS}applicationid:email-MESSAGES-stream configuration:default.key.serde:org.apache.kafka.common.serialization.serdes$stringserde default.value.serde:org.apache.kafka.common.serialization.serdes$stringserde commit.interval.ms:1000 default.timestamp.extractor:

我代码的一部分:

    stream
        .mapValues(this::mapMessage)
        .groupBy(this::buildGroup, Serialized.with(new JsonSerde<>(Group.class), new JsonSerde<>(EmailMessage.class)))
        .windowedBy(TimeWindows.of(WINDOW_TIME))
        .aggregate(ArrayList::new, this::aggregate, Materialized.with(new JsonSerde<>(Group.class), new MessageListSerialization()))
        .toStream()
        .process(() -> new MailMessagesProcessor(emailService));

它向我抛出了这个错误:org.apache.kafka.streams.errors.streamsException:输入记录ConsumerRecord(主题=.....)使用不同的TimestampExtractor处理此数据。

共有1个答案

段干飞翮
2023-03-14

Kafka Streams需要brokers 0.10.0或更新版本。它不适合较老的经纪人。

>

  • Kafka Streams 0.10.0仅与0.10.0(或更新)代理兼容。

    Kafka Streams 0.10.1和更新版本,向后兼容0.10.1(但不兼容较旧的代理),并兼容较新的代理。

    此外,由于Kafka Streams1.0,因此需要0.10(或更高)的消息格式。因此,即使您将代理升级到0.10.0(或更高),如果您的消息格式没有升级,它也不会工作。

    为了使用“精确一次”特性,需要0.11.0(或更高)的broker版本。

    有关更多详细信息,请参阅:https://docs.confluent.io/current/streams/upgrade-guide.html#兼容性

  •  类似资料:
    • 我正在尝试使用Kafka流来处理Kafka主题中的一些数据。数据来自Kafka0.11.0编写的Kafka主题,该主题没有嵌入时间戳。在网上读了一些书之后,我明白了我可以通过在自定义类中扩展类并将其传递到中来解决这个问题。 我是这样做的- 我基于github上的这段代码 但是,当我运行

    • 我正在尝试将Flink作业部署到基于Flink:1.4.1-hadoop27-scala\u 2.11-alpine映像的集群中。作业使用的是Kafka连接器源(flink-connector-Kafka-0.11),我试图为其分配时间戳和水印。我的代码与Flink Kafka连接器文档中的Scala示例非常相似。但FlinkKafkaConsumer011 这在从IDE本地运行时非常有效。但是,

    • 大家好,我有一个关于提取器和Kafka流的问题。。。。 在我们的应用程序中,有可能接收到无序事件,因此我喜欢根据负载中的业务日期来排序事件,而不是根据它们放置在主题中的时间点。 为此,我编程了一个定制的时间戳提取器,以便能够从有效负载中提取时间戳。我在这里所说的一切都非常有效,但当我构建这个主题的KTable时,我发现我收到的无序事件(从业务角度来看,它不是最后一个事件,而是在最后收到的)显示为对

    • 我正在构建一个非常简单的KafkaStreams演示应用程序,以测试用例。 我无法升级我正在使用的Kafka broker(当前版本为0.10.0),并且有几条消息是由0.10.0之前的生产者编写的,因此我使用自定义时间戳提取器,我将其作为默认值添加到主类开头的配置中: 当从我的源主题消费时,这非常好。但是当使用聚合运算符时,我遇到了一个异常,因为当从内部聚合主题消费时,使用了的实现而不是自定义实

    • 问题内容: 我可以看到例如在这里进行了几次讨论,但是我认为由于Elasticsearch中的重大更改,解决方案已过时。 我正在尝试将我在Kafka主题中的Json中的long / epoch字段转换为通过连接器推送的Elasticsearch日期类型。 当我尝试添加动态映射时,我的Kafka连接更新失败,因为我试图将两个映射应用于字段_doc和kafkaconnect。我认为这是关于版本6的重大更

    • 我们正在使用使用STREAM_TIME标点符号的自定义转换器。当我记录通过转换函数发送的消息时,来自context.timestamp()的流时间显示如预期的那样——基于使用时间戳提取器派生的数据的合理日期。 现在——在过去的某个时候,我们收到了一些恶意消息,将流时间提前到2036年。我们现在已经阻止了这些上游,重新启动了Kafka河。 当流启动时,标点符号会在受影响任务的启动时运行,但会显示20