当前位置: 首页 > 知识库问答 >
问题:

结构化流媒体:同时阅读多个Kafka主题

黄昊英
2023-03-14
java.lang.IllegalStateException: Race while writing batch 0
def main(args: Array[String]): Unit = {


  val kafkaProps = Util.loadProperties(kafkaConfigFile).asScala
  val topic_list = ("topic1", "topic2", "topic3", "topic4")

  topic_list.foreach(x => {
kafkaProps.update("subscribe", x)

val source= Source.fromInputStream(Util.getInputStream("/schema/topics/" + x)).getLines.mkString
val schemaParser = new Schema.Parser
val schema = schemaParser.parse(source)
val sqlTypeSchema = SchemaConverters.toSqlType(schema).dataType.asInstanceOf[StructType]

val kafkaStreamData = spark
  .readStream
  .format("kafka")
  .options(kafkaProps)
  .load()

val udfDeserialize = udf(deserialize(source), DataTypes.createStructType(sqlTypeSchema.fields))

val transformedDeserializedData = kafkaStreamData.select("value").as(Encoders.BINARY)
  .withColumn("rows", udfDeserialize(col("value")))
  .select("rows.*")

val query = transformedDeserializedData
  .writeStream
  .trigger(Trigger.ProcessingTime("5 seconds"))
  .outputMode("append")
  .format("parquet")
  .option("path", "/output/topics/" + x)
  .option("checkpointLocation", checkpointLocation + "//" + x)
  .start()  
})  
spark.streams.awaitAnyTermination()  
 }

共有1个答案

齐起运
2023-03-14

另类。您可以使用KAFKA Connect(来自Confluent)、NIFI、StreamSets等,因为您的用例似乎适合“dump/persisted to HDFS”。也就是说,您需要安装这些工具。您所说的小文件问题不是一个问题,所以就这样吧。

在Apache Kafka0.9或更高版本中,您可以使用Kafka Connect API for Kafka-->HDFS接收器(各种支持的HDFS格式)。不过,您需要一个KAFKA Connect集群,但无论如何,这都是基于现有集群的,所以没什么大不了的。但需要有人维护。

一些让你上路的链接:

    null
 类似资料:
  • 我第一次使用pyspark。Spark版本:2.3.0Kafka版本:2.2.0 我有一个Kafka制作人,它以avro格式发送嵌套数据,我正试图在pyspark中编写spark流/结构化流的代码,它将来自Kafka的avro反序列化为数据帧,并进行转换,将其以拼花格式写入s3。我在spark/scala中找到了avro转换器,但pyspark中的支持尚未添加。如何在pyspark中转换相同的值。

  • 我以前能够运行Kafka结构流编程。但是突然间,我所有的结构流python程序都失败了,出现了一个错误。我从Spark网站上拿了基本的Kafka结构流式编程,也以同样的错误失败。 spark-submit--packages org.apache.spark:spark-sql-kafka-0-102.11:2.2.0c:\users\ranjith.gangam\pycharmprojects\

  • 我一直在用Scala 2.11阅读spark structured streaming(2.4.4)中Kafka的avro序列化消息。为此,我使用了spark avro(下面的dependency)。我使用合流Kafka库从python生成Kafka消息。Spark streaming能够使用模式来使用消息,但无法正确读取字段的值。我准备了一个简单的例子来说明这个问题,代码在这里可用:https:

  • 这是因为检查点只存储了其中一个数据流的偏移量吗?浏览Spark结构流文档,似乎可以在Spark 2.2或>中进行流源的联接/联合

  • 我有一个用于结构化流媒体的Kafka和Spark应用程序。特别是我的KafkaProducer具有以下配置: 然后我创建了一个ProducerRecord,如下所示: 其中,json。toString()表示一个JSON格式的字符串,这是我想在Spark中处理的值。现在,我主要做的是将Spark与Kafka主题联系起来,正如官方Spark结构化流媒体指南中所报道的那样: 然后 我有以下输出和异常: