当前位置: 首页 > 知识库问答 >
问题:

Kafka Streams如何在scala中从kafka消息中获取TimeStamp

宗涵蓄
2023-03-14

我正在运行一个简单的Kafka streams应用程序,它将使用Node JS记录的信息带到一个Kafka主题。

E.g. 
  Producer = kafka.Producer
  KeyedMessage = kafka.KeyedMessage
  client = new kafka.KafkaClient()
  producer = new Producer(client)
  km = new KeyedMessage('key', 'message')
  kafka_message = JSON.stringify({ id: req.session.data.toString(), url: article.info })
  payloads = [
    { topic: 'eventTopic', messages: kafka_message,timestamp:timestampNow}
  ];
  producer.send(payloads, function (err, data) {
    console.log(data);
  });

还需要注意的是,时间戳只是一个数字,表示自1970年6月以来的秒数。

我使用scala中的Kafka流来使用这些数据。

例如。

val builder = new StreamsBuilder

val stream = builder
    .stream[String, String]("TopicTest")
    .foreach((k:String, v:String) => {
     println(k)
     println(v) 
}

然而,我不确定如何将时间戳(我从nodeJS发送的)提取到这个流中。

例如,如果我尝试做这样的事情

val stream = builder
    .stream[String, String,Long]("TopicTest")
    .foreach((k:String, v:String,timeStamp:Long) => {
     println(k)
     println(v) 
     println(timeStamp)
}

这会导致错误“无法解析符号流”。我在想我该怎么解决这个问题。这里是我的拓扑结构和流的配置,仅供参考。val topology=builder。建筑

  import java.util.Properties
  val props = new Properties()
  import org.apache.kafka.streams.StreamsConfig

  val appId = this.getClass.getSimpleName.replace("$", "")
  props.put(StreamsConfig.APPLICATION_ID_CONFIG, appId)
  props.put(StreamsConfig.CLIENT_ID_CONFIG, appId)
  props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, ":9092")
  props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")


  // Step 4. Create Kafka Streams Client
  import org.apache.kafka.streams.KafkaStreams
  val ks = new KafkaStreams(topology, props)


  ks.start

共有1个答案

邢曦
2023-03-14

确实存在一个TimeStampExtractor(https://jaceklaskowski.gitbooks.io/mastering-kafka-streams/content/kafka-streams-TimestampExtractor.html)。但是,可以将时间戳作为任何常规kafka消息发送到。我更改的第一件事是我的NodeJS代码。

kafka_message = JSON.stringify({ id: req.session.information.toString(), url: article.info,timestamp:timestampNow.toString() })
  payloads = [
    { topic: 'eventTopic', messages: kafka_message}
  ];
 
  producer.send(payloads, function (err, data) {
    console.log(data);
  });

我发送的JSON消息中现在有一个时间戳字段。

最后,我们可以使用argonaut解析JSON消息。

 val streamEvents = builder
    .stream[String, String]("testTopic")
    .foreach((k:String, json:String) => {
      println(k)
      println(json)
      println(Parse.parse(json))
      val url:String = Parse.parseWith(json, _.field("url").flatMap(_.string).getOrElse("Error!"), msg => msg)
      val id:String = Parse.parseWith(json, _.field("id").flatMap(_.string).getOrElse("Error!"), msg => msg)
      val timestamp:String = Parse.parseWith(json, _.field("timestamp").flatMap(_.string).getOrElse("Error!"), msg => msg)
      val timeStampInt:Long = timestamp.toLong
 类似资料:
  • 我正在使用apache kafka进行消息传递。我已经用Java实现了生产者和消费者。如何才能得到一个主题中的留言数量?

  • 我的用例是,从生产者端,它将一行数据(大约100字节)作为一条消息发布到kafka topic,从消费者端,我希望一次消费5条消息,并将其提供给我的消费者逻辑。 我做了一个简单的例子,它总是得到一个消息并打印在控制台上。请建议我任何需要的配置更改,以实现这一点。 请在下面找到源代码。 使用以下命令启动生产者 /kafka生产者性能测试——num记录500——主题测试——吞吐量10——有效负载文件测

  • 我一直在使用covid19api持有的数据实现Kafka生产者/消费者和流。 我试图从endpoint中提取每天的案例https://api.covid19api.com/all.然而,这个服务——以及这个API中的其他服务——拥有自疾病开始以来的所有数据(确诊、死亡和恢复病例),但积累了数据,而不是日常病例,这就是我最终要实现的。 使用transformValues和StoreBuilder(正

  • 我正在使用弹性搜索Kafka连接在独立模式下。我不困惑使用哪种配置来启动Kafka连接并从最后一个故障点开始。 例如,生产者将继续推动记录进入Kafka和消费者,因为弹性搜索接收器连接器正在消费,现在我的由于某种原因我的消费者下降了,但我的骄傲将继续推动信息进入Kafka。现在,当我修复了ES sink连接器端的问题后,如果我重新启动ES sink连接器,它应该从上次故障中选择,而不是从开始或最近