当前位置: 首页 > 知识库问答 >
问题:

使用kafka的spark streaming时,无法迭代从将数据流转换为列表检索到的密钥列表

孔山
2023-03-14

下面是Kafka的spark streaming代码。在这里,我试图获取批处理的密钥作为Dstream,然后将其转换为列表。以便对其进行迭代,并将与每个键相关的数据放入以该键命名的hdfs文件夹中。

关键基本上是-模式。表\u名称

val ssc = new StreamingContext(sparkConf, Seconds(args{7}.toLong)) // configured to run for every 60 seconds
val warehouseLocation="Spark-warehouse"
val spark = SparkSession.builder.config(sparkConf).getOrCreate() 
import spark.implicits._

val kafkaParams = Map[String, Object](
  "bootstrap.servers" -> conf.getString("kafka.brokers"),
  "zookeeper.connect" -> conf.getString("kafka.zookeeper"),
  "group.id" -> conf.getString("kafka.consumergroups"),
  "auto.offset.reset" -> args { 1 },
  "enable.auto.commit" -> (conf.getString("kafka.autoCommit").toBoolean: java.lang.Boolean),
  "key.deserializer" -> classOf[StringDeserializer],
  "value.deserializer" -> classOf[StringDeserializer],
  "security.protocol" -> "SASL_PLAINTEXT",
  "session.timeout.ms" -> args { 2 },
  "max.poll.records" -> args { 3 },
  "request.timeout.ms" -> args { 4 },
  "fetch.max.wait.ms" -> args { 5 })

val messages = KafkaUtils.createDirectStream[String, String](
  ssc,
  LocationStrategies.PreferConsistent,
  ConsumerStrategies.
  Subscribe[String, String](topicsSet, kafkaParams))

正在提取密钥,但其类型为DStream[字符串]

 val keys = messages.map(x=>(x.key()))

var final_list_of_keys = List[String]()

将其转换为列表并更新var final\u list\u of\u键

keys.foreachRDD( rdd => {

val  df_keys = spark.read.json(rdd).distinct().toDF().persist(StorageLevel.MEMORY_ONLY)
df_keys.show()
val comma_separated_keys= df_keys.distinct().collect().mkString("").replace("[","").replace("]",",")

final_list_of_keys= comma_separated_keys.split(",").toList

现在尝试遍历列表。

 for ( i <- final_list_of_keys)
 {
  println(i)

val message1 = messages.filter(x =>  x.key().toString().equals(i)).map(x=>x.value()).persist(StorageLevel.MEMORY_ONLY) //.toString())

 message1.foreachRDD((rdd, batchTime) => {

 if (!rdd.isEmpty())
 {


   val df1 = spark.read.json(rdd).persist(StorageLevel.MEMORY_ONLY)  //.withColumn("pharmacy_location",lit(args{6}))

   val df2=df1.withColumn("message",struct( struct($"message.data.*",lit(args{6}).as("pharmacy_location")).alias("data"), struct($"message.headers.*").as("headers"))).persist(StorageLevel.MEMORY_ONLY)

   val df3= df2.drop("headers").drop("messageSchema").drop("messageSchemaId").persist(StorageLevel.MEMORY_ONLY)

   df3.coalesce(1).write.json(conf.getString("hdfs.streamoutpath1")+ PATH_SEPERATOR + i + PATH_SEPERATOR + args{6}+ PATH_SEPERATOR+ date_today.format(System.currentTimeMillis())
        + PATH_SEPERATOR + date_today_hour.format(System.currentTimeMillis()) + PATH_SEPERATOR + System.currentTimeMillis())

   df1.unpersist
   df2.unpersist()
   df3.unpersist()

 }



})

try
{
messages.foreachRDD { rdd =>
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  messages.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)                            // push it back 
}
}
catch
{
  case e: BlockMissingException => e.printStackTrace()
 case e: IOException => e.printStackTrace()
 case e:Throwable => e.printStackTrace()
}

}
 ssc.start()
 ssc.awaitTermination()

但我遇到了一个错误-不支持在启动上下文后添加新的输入、转换和输出操作

当我试图将for循环保持在键之外的列表上时。foreachRdd则列表不会更新,并保持为空。

有人可以建议我如何实际重做此代码,以使密钥位于列表中,然后再检查它们以将数据放入正确的目录中。

从我的研究中我看到了这篇文章-

类似帖子,但无法从中收集任何解决方案

此外,当我使用map时,在foreachRdd中过滤,然后在它里面的另一个foreachRdd可能会导致问题。使用类似的代码引用post-Post

共有1个答案

易京
2023-03-14

以下是问题的代码-

val messages = KafkaUtils.createDirectStream[String, String](
  ssc,
  LocationStrategies.PreferConsistent,
  ConsumerStrategies.
  Subscribe[String, String](topicsSet, kafkaParams)).persist(StorageLevel.MEMORY_ONLY)

 messages.foreachRDD((rdd,batchTime) =>          ///foreachRDD means go over each rdd parallelly , it gives the rdd and we will put the batch time also
{ 
  val table_list=rdd.map(x => x.key()).distinct().collect()  ////kafka sends data in key value pairs,
                                                           ///here rdd means key and values(key is tablename) and first we need to get all the distinct keys(this batch had 5 tables)

 val rddList = table_list.map(x=>(x,(rdd.filter(y=>y.key().equals(x)))))
 ///here x means table name and we are filtering data in the rdd which is equalent to current_table_name
  ///Now this table_list will contains the key(table) and values corresponding to each key
rddList.foreach(tuple =>  //here foreach not in parallal, we want to go one by one , touple is nothing but collection of key and multiple
   {

   val tableName= tuple._1.toString()   //tuple._1 will be the table name
  val tableRdd= tuple._2.map(x=>(x.value())).persist(StorageLevel.MEMORY_ONLY) // .toDF()


  ///tuple._2  will be the complete key value pair,we are putting the value in the hdfs


//   val tableRdd= messages.filter(x => x.key().toString().equals(tableName)).map(x=>x.value()).persist(StorageLevel.MEMORY_ONLY)
   println(tableName)

/* Your logic */
 类似资料:
  • 我有一个lambda表达式,我想在其中创建一个对象列表,但我得到了错误 错误 我希望作为lambda表达式的返回类型。我怎样才能做到呢?

  • 问题内容: 有一个对象,是否有比列表理解更快,更好或更正确的方法来获取迭代器返回的对象的列表? 问题答案:

  • 问题内容: List rateList = guestList.stream() .map(guest -> buildRate(ageRate, guestRate, guest)) .collect(Collectors.toList()); 在上面的代码中,可以通过内部方法的索引。我在构建时还需要传递索引,但无法通过获取索引。 问题答案: 您尚未提供的签名,但是我假设您希望首先传递元素的索引

  • 在上面的代码中,是否可以在方法内部传递的索引。在构建时,我还需要传递索引,但无法使用获取索引。

  • 我正在尝试学习如何使用Java 8的集合,我想知道是否有一种方法将我的列表转换成一个地图使用Java流。 我想知道如何使用流将上面的列表创建到一个映射中。我知道如何将foreach etc与puts一起使用,但我只是想知道是否有更优雅的方法使用流构建映射。(我知道下面的语法不正确,我是流的新手,不知道怎么写) 目标是代表以下内容。