当前位置: 首页 > 知识库问答 >
问题:

Kafka流中的自定义时间戳提取器

柳项明
2023-03-14

我正在尝试使用Kafka流来处理Kafka主题中的一些数据。数据来自Kafka0.11.0编写的Kafka主题,该主题没有嵌入时间戳。在网上读了一些书之后,我明白了我可以通过在自定义类中扩展TimestampExtractor类并将其传递到StreamsConfig中来解决这个问题。

我是这样做的-

class MyEventTimestampExtractor extends TimestampExtractor {
  override def extract(record: ConsumerRecord[AnyRef, AnyRef], prev: Long) = {
    record.value() match {
        case w: String => 1000L
        case _ => throw new RuntimeException(s"Called for $record")
    }
  }
}

我基于github上的这段代码

但是,当我运行sbt时,我会遇到这个错误

[error] /home/someuser/app/blahblah/src/main/scala/main.scala:34: class MyEventTimestampExtractor needs to be abstract, since method extract in trait TimestampExtractor of type (x$1: org.apache.kafka.clients.consumer.ConsumerRecord[Object,Object], x$2: Long)Long is not defined
[error] (Note that Long does not match Long)
[error] class MyEventTimestampExtractor extends TimestampExtractor {
[error]       ^
[error] /home/someuser/app/blahblah/src/main/scala/main.scala:35: method extract overrides nothing.
[error] Note: the super classes of class MyEventTimestampExtractor contain the following, non final members named extract:
[error] def extract(x$1: org.apache.kafka.clients.consumer.ConsumerRecord[Object,Object],x$2: Long): Long
[error]   override def extract(record: ConsumerRecord[AnyRef, AnyRef], prev: Long): Long = {
[error]                ^
[error] two errors found
[error] (compile:compileIncremental) Compilation failed

我的身材。这是sbt文件吗--

name := "kafka streams experiment"
version := "1.0"
scalaVersion := "2.12.4"

libraryDependencies ++= Seq(
  "org.apache.kafka" % "kafka-streams" % "1.0.0"
)

我真的不明白这个错误。特别是周围的部分,请注意Long与Long不匹配。我可能做错了什么?谢谢


共有2个答案

佟寒
2023-03-14

尝试(观察prev函数参数的类型:

  override def extract(record: ConsumerRecord[AnyRef, AnyRef], prev: java.lang.Long) = {
凤晨朗
2023-03-14

你需要java。因为这个API是用Java定义的,而您使用的是Scala Long

 类似资料:
  • 我正在构建一个非常简单的KafkaStreams演示应用程序,以测试用例。 我无法升级我正在使用的Kafka broker(当前版本为0.10.0),并且有几条消息是由0.10.0之前的生产者编写的,因此我使用自定义时间戳提取器,我将其作为默认值添加到主类开头的配置中: 当从我的源主题消费时,这非常好。但是当使用聚合运算符时,我遇到了一个异常,因为当从内部聚合主题消费时,使用了的实现而不是自定义实

  • 这个服务我已经测试了它,使用不同版本的Kafka(更高或等于0.10),它工作良好。 以下是我的配置: Spring:cloud:stream:kafka:streams:binder:brokers:${KAFKA_BROKERS}applicationid:email-MESSAGES-stream configuration:default.key.serde:org.apache.kafk

  • 我正在尝试将Flink作业部署到基于Flink:1.4.1-hadoop27-scala\u 2.11-alpine映像的集群中。作业使用的是Kafka连接器源(flink-connector-Kafka-0.11),我试图为其分配时间戳和水印。我的代码与Flink Kafka连接器文档中的Scala示例非常相似。但FlinkKafkaConsumer011 这在从IDE本地运行时非常有效。但是,

  • 我们正在使用使用STREAM_TIME标点符号的自定义转换器。当我记录通过转换函数发送的消息时,来自context.timestamp()的流时间显示如预期的那样——基于使用时间戳提取器派生的数据的合理日期。 现在——在过去的某个时候,我们收到了一些恶意消息,将流时间提前到2036年。我们现在已经阻止了这些上游,重新启动了Kafka河。 当流启动时,标点符号会在受影响任务的启动时运行,但会显示20

  • 大家好,我有一个关于提取器和Kafka流的问题。。。。 在我们的应用程序中,有可能接收到无序事件,因此我喜欢根据负载中的业务日期来排序事件,而不是根据它们放置在主题中的时间点。 为此,我编程了一个定制的时间戳提取器,以便能够从有效负载中提取时间戳。我在这里所说的一切都非常有效,但当我构建这个主题的KTable时,我发现我收到的无序事件(从业务角度来看,它不是最后一个事件,而是在最后收到的)显示为对

  • 我使用这个库在我的Android应用程序中使用FFmpeg。我正在从视频中提取帧,然后将它们添加到裁剪查看器中。因此,每一帧都需要表示视频中的某个时间帧。下面是我当前提取帧的ffmpeg代码: 感谢任何帮助