当前位置: 首页 > 知识库问答 >
问题:

Kafka的Spark流:如何从Kafka消费者数据流中获取主题名称?

郎永福
2023-03-14

我在Scala中设置了Spark Kafka Consumer,它接收来自多个主题的消息:

val properties = readProperties()
val streamConf = new SparkConf().setMaster("local[*]").setAppName("Kafka-Stream")
val ssc = new StreamingContext(streamConf, Seconds(10))

val kafkaParams = Map("metadata.broker.list" ->  properties.getProperty("broker_connection_str"), 
                      "zookeeper.connect"    ->  properties.getProperty("zookeeper_connection_str"), 
                      "group.id"             ->  properties.getProperty("group_id"), 
                      "auto.offset.reset"    ->  properties.getProperty("offset_reset")
                    )

// Kafka integration with receiver 
val msgStream = KafkaUtils.createStream[Array[Byte], String, DefaultDecoder, StringDecoder](
  ssc, kafkaParams, Map(properties.getProperty("topic1") -> 1,
                      properties.getProperty("topic2") -> 2,
                      properties.getProperty("topic3") -> 3),
                      StorageLevel.MEMORY_ONLY_SER).map(_._2)

我需要为每个主题的消息(将采用JSON格式)开发相应的操作代码。

我提到了以下问题,但其中的答案对我没有帮助:

从spark中的Kafka消息获取主题

那么,在接收到的DStream上是否有任何方法可用于获取主题名称以及消息以确定应该采取什么行动?

对此任何帮助都将不胜感激。谢谢你。

共有1个答案

慕健
2023-03-14

请参阅下面的代码。

您可以通过foreachRDD获取主题名称和消息,在DStream上进行地图操作。

msgStream.foreachRDD(rdd => {
      val pairRdd = rdd.map(i => (i.topic(), i.value()))
})

下面的代码是我正在使用的createDirectStream的示例源代码。

val ssc = new StreamingContext(configLoader.sparkConfig, Seconds(conf.getInt(Conf.KAFKA_PULL_INTERVAL)))
val kafkaParams = Map[String, Object](
  "bootstrap.servers" -> conf.getString(Conf.KAFKA_BOOTSTRAP_SERVERS),
  "key.deserializer" -> classOf[StringDeserializer],
  "value.deserializer" -> classOf[StringDeserializer],
  "group.id" -> conf.getString(Conf.KAFKA_CONSUMER_GID),
  "auto.offset.reset" -> conf.getString(Conf.KAFKA_AUTO_OFFSET_RESET),
  "enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics: Array[String] = conf.getString(Conf.KAFKA_TOPICS).split(",")
val stream = KafkaUtils.createDirectStream[String, String](
  ssc,
  PreferConsistent,
  Subscribe[String, String](topics, kafkaParams)
)
 类似资料:
  • 目前,我有一个Flink集群,它想通过一个模式消费Kafka主题,通过使用这种方式,我们不需要维护一个硬代码Kafka主题列表。 --update--我需要知道主题信息的原因是我们需要这个主题名称作为参数,在即将到来的Flink sink部分中使用。

  • 我是pyflink的新手。我正在尝试编写一个python程序来从kafka主题读取数据并将数据打印到标准输出。我按照链接Flink Python Datastream API Kafka Producer Sink Serializaion进行了操作。但由于版本不匹配,我一直看到NoSuchMethod odError。我添加了https://repo.maven.apache.org/maven

  • 本文向大家介绍Kafka 的消费者如何消费数据相关面试题,主要包含被问及Kafka 的消费者如何消费数据时的应答技巧和注意事项,需要的朋友参考一下 消费者每次消费数据的时候,消费者都会记录消费的物理偏移量(offset)的位置 等到下次消费时,他会接着上次位置继续消费

  • 我正在使用@StreamListener(Spring-Cloud-Stream)来使用来自主题(输入通道)的消息,进行一些处理并保存到一些缓存或数据库中。 我的要求是,如果DB在处理消费的消息时停止,我想暂停主消费者(输入通道),并从另一个主题(输入56通道)开始消费,一旦它消费了来自输入56通道的所有消息(没有很多),我想再次恢复主消费者(输入通道)。 这能做到吗??

  • 我有一个主题列表(目前是10个),其大小可以在未来增加。我知道我们可以产生多个线程(每个主题)来消耗每个主题,但在我的例子中,如果主题的数量增加,那么消耗主题的线程数量也会增加,这是我不希望的,因为主题不会太频繁地获取数据,所以线程将是理想的。 有没有办法让单个消费者从所有话题中消费?如果是的话,我们怎样才能做到呢?另外,Kafka将如何维护抵消?请建议答案。

  • 我的问题与单个消费者从多个话题消费有关。假设所有主题都加载了1M个记录,一个使用者必须处理这些记录。它将按照什么顺序从主题中读取(我的意思是首先读取哪个主题/分区,等等) Kafka内部资料的任何链接会有帮助吗?