当前位置: 首页 > 知识库问答 >
问题:

Spark 结构化流Kafka集成:微批处理执行分区关闭错误

令狐泓
2023-03-14

我正在使用Spark结构化流分别处理来自和来自Apache Kafka的传入和传出数据流,使用下面的scala代码。

我可以使用kafka源成功读取数据流,但是在尝试将stream写入Kafka接收器时,我收到以下错误:

ERROR MicroBatchExecution:91 - Query [id = 234750ca-d416-4182-b3cc-4e2c1f922724, runId = 4c4b0931-9876-456f-8d56-752623803332] terminated with error java.lang.IllegalArgumentException: Expected e.g. {"topicA":{"0":23,"1":-1},"topicB":{"0":-2}}, got 1 {"path":"file:///path/to/file.csv","timestamp":1536564701000,"batchId":0}
at org.apache.spark.sql.kafka010.JsonUtils$.partitionOffsets(JsonUtils.scala:74)
    at org.apache.spark.sql.kafka010.KafkaSourceOffset$.apply(KafkaSourceOffset.scala:64)
    at org.apache.spark.sql.kafka010.KafkaSource$$anon$1.deserialize(KafkaSource.scala:124)
    at org.apache.spark.sql.kafka010.KafkaSource$$anon$1.deserialize(KafkaSource.scala:99)
    at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.get(HDFSMetadataLog.scala:198)
    at org.apache.spark.sql.kafka010.KafkaSource.initialPartitionOffsets$lzycompute(KafkaSource.scala:129)
    at org.apache.spark.sql.kafka010.KafkaSource.initialPartitionOffsets(KafkaSource.scala:97)
    at org.apache.spark.sql.kafka010.KafkaSource.getBatch(KafkaSource.scala:207)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$populateStartOffsets$2.apply(MicroBatchExecution.scala:216)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$populateStartOffsets$2.apply(MicroBatchExecution.scala:213)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
    at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
    at org.apache.spark.sql.execution.streaming.StreamProgress.foreach(StreamProgress.scala:25)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$populateStartOffsets(MicroBatchExecution.scala:213)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:124)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
    at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:121)
    at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:117)
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)
Exception in thread "main" org.apache.spark.sql.streaming.StreamingQueryException: Expected e.g. {"topicA":{"0":23,"1":-1},"topicB":{"0":-2}}, got 1
{"path":""file:///path/to/file.csv"","timestamp":1536564701000,"batchId":0}
=== Streaming Query ===
Identifier: [id = 234750ca-d416-4182-b3cc-4e2c1f922724, runId = 851d0cd7-aabe-45c8-8a14-94227f90e174]
Current Committed Offsets: {KafkaSource[Subscribe[t]]: {"logOffset":2}}
Current Available Offsets: {KafkaSource[Subscribe[t]]: {"logOffset":3}}

Scala代码:

object spark_kafka_attempt2 {

  def main(args: Array[String]) {

    val spark = SparkSession
      .builder
      .appName("spark_kafka_test")
      .getOrCreate()

    import spark.implicits._

    val input_lines = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "localhost:9092,localhost:9093,localhost:9094")
      .option("subscribe", "input_stream")
      .option("startingOffsets", "earliest")
      .load()

    val inputStreamSchema = new StructType()
      .add("input_id", "long")
      .add("timestamp", "timestamp")
      .add("type", "string")

    val lines = input_lines.selectExpr("CAST(value AS STRING)", "CAST(timestamp AS TIMESTAMP)").as[(String, Timestamp)]
      .select(from_json($"value", inputStreamSchema).as("data"), $"timestamp".as("arrival_timestamp"))
      .select("data.*", "arrival_timestamp")


    val query = lines
      .selectExpr("CAST(input_id AS STRING) AS key", "to_json(struct(*)) AS value")
      .writeStream
      .format("kafka")
      .outputMode("update")
      .option("kafka.bootstrap.servers", "localhost:9092,localhost:9093,localhost:9094")
      .option("topic", "processed_stream")
      .option("checkpointLocation", "/home/local/directory")
      .start()

    query.awaitTermination()
  }
}

当输出发送到控制台时,代码工作正常,而在尝试将处理的流发送到Apache Kafka时出现错误。

我使用的是Apache结构化流2.3.1、Scala 2.11.8和Apache Kafka 2.0。

生成.sbt 文件如下所示:

name := "spark_kafka_test"    
version := "0.1"    
scalaVersion := "2.11.8"    
val sparkVersion = "2.3.1"    
libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-sql" % sparkVersion,
  "org.apache.spark" %% "spark-sql-kafka-0-10" % sparkVersion
) 

我正在提交我的工作以启动如下:

./spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.1 --class spark_kafka_test --master local[4] /home/salman/Development/spark_kafka_attempt2/target/scala-2.11/spark_kafka_test_2.11-0.1.jar 

共有1个答案

甄飞飙
2023-03-14

经过大量调查和浏览,我找到了以下解决方案,将处理后的流写入kafka接收器:

创建以下KafkaSink类

import java.util.Properties
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}    
import org.apache.spark.sql.ForeachWriter

class  KafkaSink(topic:String, servers:String) extends ForeachWriter[(String, String)]
{
  val kafkaProperties = new Properties()
  kafkaProperties.put("bootstrap.servers", servers)
  kafkaProperties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
  kafkaProperties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
  val results = new scala.collection.mutable.HashMap[String, String]
  var producer: KafkaProducer[String, String] = _

  def open(partitionId: Long,version: Long): Boolean = {
    producer = new KafkaProducer(kafkaProperties)
    true
  }

  def process(value: (String, String)): Unit = {
    producer.send(new ProducerRecord(topic, value._1 + ":" + value._2))
  }

  def close(errorOrNull: Throwable): Unit = {
    producer.close()
  }
}

使用Foreach writer向kafkasink发送数据,如下所示:

val outputDf = lines.selectExpr("CAST(input_id AS STRING) AS key", "to_json(struct(*)) AS value").as[(String, String)]

val topic = "processed_stream"
val brokers = "localhost:9092,localhost:9093,localhost:9094"

val writer = new KafkaSink(topic, brokers)

val query = outputDf
    .writeStream
    .foreach(writer)
    .outputMode("update")
    .start()
 类似资料:
  • 我是spark streaming的新手,我有一个关于其用法的一般性问题。我目前正在实现一个应用程序,它从一个Kafka主题流式传输数据。 使用应用程序只运行一次批处理是一种常见的场景吗,例如,一天结束,收集主题中的所有数据,做一些聚合和转换等等? 这意味着在用spark-submit启动应用程序后,所有这些东西将在一批中执行,然后应用程序将被关闭。或者spark stream build是为了在

  • 我正在研究为Spark结构化流在kafka中存储kafka偏移量,就像它为DStreams工作一样,除了结构化流,我也在研究同样的情况。是否支持结构化流?如果是,我如何实现? 我知道使用进行hdfs检查点,但我对内置的偏移量管理感兴趣。 我期待Kafka存储偏移量只在内部没有火花hdfs检查点。

  • 在过去的几个月里,我已经使用了相当多的结构化流来实现流作业(在大量使用Kafka之后)。在阅读了《Stream Processing with Apache Spark》一书之后,我有这样一个问题:有没有什么观点或用例可以让我使用Spark Streaming而不是Structured Streaming?如果我投入一些时间来研究它,或者由于im已经使用了Spark结构化流,我应该坚持使用它,而之

  • 我正在使用Spark结构化流媒体阅读Kafka主题。 我错过什么了吗?

  • 我们正在接收来自Kafka的星火流数据。一旦在Spark Streaming中开始执行,它只执行一个批处理,其余的批处理开始在Kafka中排队。 我们的数据是独立的,可以并行处理。 我们尝试了多个配置,有多个执行器,核心,背压和其他配置,但到目前为止没有任何工作。排队的消息很多,每次只处理一个微批处理,其余的都留在队列中。 我们从差异实验中得到的统计数据: 实验1 100个文件处理时间48分钟 1

  • 我使用Spark 2.1.1。 我使用结构化流从2个Kafka分区读取消息。我正在向Spark Standalone集群提交我的应用程序,其中有一个工人和两个执行者(每个2个核心)。 我想要这样的功能,来自每个Kafka分区的消息应该由每个单独的执行器独立处理。但现在正在发生的是,执行器分别读取和映射分区数据,但在映射之后,形成的无边界表被普遍使用,并且具有来自两个分区的数据。 当我对表运行结构化