我正在尝试使用Kafka流来处理Kafka主题中的一些数据。数据来自Kafka0.11.0编写的Kafka主题,该主题没有嵌入时间戳。在网上读了一些书之后,我明白了我可以通过在自定义类中扩展TimestampExtractor
类并将其传递到StreamsConfig
中来解决这个问题。
我是这样做的-
class MyEventTimestampExtractor extends TimestampExtractor {
override def extract(record: ConsumerRecord[AnyRef, AnyRef], prev: Long) = {
record.value() match {
case w: String => 1000L
case _ => throw new RuntimeException(s"Called for $record")
}
}
}
我基于github上的这段代码
但是,当我运行sbt时,我会遇到这个错误
[error] /home/someuser/app/blahblah/src/main/scala/main.scala:34: class MyEventTimestampExtractor needs to be abstract, since method extract in trait TimestampExtractor of type (x$1: org.apache.kafka.clients.consumer.ConsumerRecord[Object,Object], x$2: Long)Long is not defined
[error] (Note that Long does not match Long)
[error] class MyEventTimestampExtractor extends TimestampExtractor {
[error] ^
[error] /home/someuser/app/blahblah/src/main/scala/main.scala:35: method extract overrides nothing.
[error] Note: the super classes of class MyEventTimestampExtractor contain the following, non final members named extract:
[error] def extract(x$1: org.apache.kafka.clients.consumer.ConsumerRecord[Object,Object],x$2: Long): Long
[error] override def extract(record: ConsumerRecord[AnyRef, AnyRef], prev: Long): Long = {
[error] ^
[error] two errors found
[error] (compile:compileIncremental) Compilation failed
我的身材。这是sbt文件吗--
name := "kafka streams experiment"
version := "1.0"
scalaVersion := "2.12.4"
libraryDependencies ++= Seq(
"org.apache.kafka" % "kafka-streams" % "1.0.0"
)
我真的不明白这个错误。特别是
周围的部分,请注意Long与Long不匹配。我可能做错了什么?谢谢
尝试(观察prev
函数参数的类型:
override def extract(record: ConsumerRecord[AnyRef, AnyRef], prev: java.lang.Long) = {
你需要java。因为这个API是用Java定义的,而您使用的是Scala Long
我正在构建一个非常简单的KafkaStreams演示应用程序,以测试用例。 我无法升级我正在使用的Kafka broker(当前版本为0.10.0),并且有几条消息是由0.10.0之前的生产者编写的,因此我使用自定义时间戳提取器,我将其作为默认值添加到主类开头的配置中: 当从我的源主题消费时,这非常好。但是当使用聚合运算符时,我遇到了一个异常,因为当从内部聚合主题消费时,使用了的实现而不是自定义实
这个服务我已经测试了它,使用不同版本的Kafka(更高或等于0.10),它工作良好。 以下是我的配置: Spring:cloud:stream:kafka:streams:binder:brokers:${KAFKA_BROKERS}applicationid:email-MESSAGES-stream configuration:default.key.serde:org.apache.kafk
我正在尝试将Flink作业部署到基于Flink:1.4.1-hadoop27-scala\u 2.11-alpine映像的集群中。作业使用的是Kafka连接器源(flink-connector-Kafka-0.11),我试图为其分配时间戳和水印。我的代码与Flink Kafka连接器文档中的Scala示例非常相似。但FlinkKafkaConsumer011 这在从IDE本地运行时非常有效。但是,
我们正在使用使用STREAM_TIME标点符号的自定义转换器。当我记录通过转换函数发送的消息时,来自context.timestamp()的流时间显示如预期的那样——基于使用时间戳提取器派生的数据的合理日期。 现在——在过去的某个时候,我们收到了一些恶意消息,将流时间提前到2036年。我们现在已经阻止了这些上游,重新启动了Kafka河。 当流启动时,标点符号会在受影响任务的启动时运行,但会显示20
大家好,我有一个关于提取器和Kafka流的问题。。。。 在我们的应用程序中,有可能接收到无序事件,因此我喜欢根据负载中的业务日期来排序事件,而不是根据它们放置在主题中的时间点。 为此,我编程了一个定制的时间戳提取器,以便能够从有效负载中提取时间戳。我在这里所说的一切都非常有效,但当我构建这个主题的KTable时,我发现我收到的无序事件(从业务角度来看,它不是最后一个事件,而是在最后收到的)显示为对
我使用这个库在我的Android应用程序中使用FFmpeg。我正在从视频中提取帧,然后将它们添加到裁剪查看器中。因此,每一帧都需要表示视频中的某个时间帧。下面是我当前提取帧的ffmpeg代码: 感谢任何帮助