structured -streaming -kafka源码分析

韦欣德
2023-12-01

structured streaming支持四种input sources分别为File source 、Kafka source 、Socket source (for testing)和Rate source (for testing) 。

本文主要介绍Kafka source,从kafka读取数据,兼容 kafka broker版本0.10+。

<dependency>
  <groupId>org.apache.spark </groupId>
  <artifactId>  spark-sql-kafka-0-10_2.11</.artifactId>
  <version> 2.3.1</version>
</dependency>

使用代码如下

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load()

源码解析

spark:SparkSession 调用了readStream函数,该函数返回对象是DataStreamReader。DataStreamReader的format方法是输入的数据源名字,名字的定义是在实现DataSourceRegister接口然后重写其方法shortName中定义的。

private[kafka010] class KafkaSourceProvider extends DataSourceRegister
override def shortName(): String = "kafka"

然后继续调用DataStreamReader的option方法,根据format的数据源不同设置该数据源相应的参数,kafka consumer的相关参数点击此处,当然不仅仅是kafka consumer的参数,还有spark自己对于kafka的参数如下(kafka开头的是kafka的参数):

optionvaluedefaultquery typemeaning
startingOffsets“earliest”, “latest” (streaming only), or json string “”” {“topicA”:{“0”:23,”1”:-1},”topicB”:{“0”:-2}} “”““latest” for streaming, “earliest” for batchstreaming and batchThe start point when a query is started, either “earliest” which is from the earliest offsets, “latest” which is just from the latest offsets, or a json string specifying a starting offset for each TopicPartition. In the json, -2 as an offset can be used to refer to earliest, -1 to latest. Note: For batch queries, latest (either implicitly or by using -1 in json) is not allowed. For streaming queries, this only applies when a new query is started, and that resuming will always pick up from where the query left off. Newly discovered partitions during a query will start at earliest.
endingOffsetslatest or json string {“topicA”:{“0”:23,”1”:-1},”topicB”:{“0”:-1}}latestbatch queryThe end point when a batch query is ended, either “latest” which is just referred to the latest, or a json string specifying an ending offset for each TopicPartition. In the json, -1 as an offset can be used to refer to latest, and -2 (earliest) as an offset is not allowed.
failOnDataLosstrue or falsetruestreaming queryWhether to fail the query when it’s possible that data is lost (e.g., topics are deleted, or offsets are out of range). This may be a false alarm. You can disable it when it doesn’t work as you expected. Batch queries will always fail if it fails to read any data from the provided offsets due to lost data.
kafkaConsumer.pollTimeoutMslong512streaming and batchThe timeout in milliseconds to poll data from Kafka in executors.
fetchOffset.numRetriesint3streaming and batchNumber of times to retry before giving up fetching Kafka offsets.
fetchOffset.retryIntervalMslong10streaming and batchmilliseconds to wait before retrying to fetch Kafka offsets
maxOffsetsPerTriggerlongnonestreaming and batchRate limit on maximum number of offsets processed per trigger interval. The specified total number of offsets will be proportionally split across topicPartitions of different volume.

然后继续调用load方法 ,load方法是核心实现。load返回的datafram中ConsumerRecord对象字段如下:

ColumnType
keybinary
valuebinary
topicstring
partitionint
offsetlong
timestamplong
timestampTypeint

我们可以看到key value类型是binary ,所以通常我们代码会增加如下:

.load()
.selectExpr("CAST(value AS STRING)")
      .as[String]

load代码如下:

def load(): DataFrame = {
    if (source.toLowerCase(Locale.ROOT) == DDLUtils.HIVE_PROVIDER) {
      throw new AnalysisException("Hive data source can only be used with tables, you can not " +
        "read files of Hive data source directly.")
    }

    val ds = DataSource.lookupDataSource(source, sparkSession.sqlContext.conf).newInstance()
    val options = new DataSourceOptions(extraOptions.asJava)
     /*
    * 生成V1数据源 因为在这不知道是是否查询是持续性的,所以没有使用V2
    * */
    val v1DataSource = DataSource(
      sparkSession,
      userSpecifiedSchema = userSpecifiedSchema,
      className = source,
      options = extraOptions.toMap)
    val v1Relation = ds match {
      case _: StreamSourceProvider => Some(StreamingRelation(v1DataSource))
      case _ => None
    }
    ds match {
      case s: MicroBatchReadSupport =>
        val tempReader = s.createMicroBatchReader(
          Optional.ofNullable(userSpecifiedSchema.orNull),
          Utils.createTempDir(namePrefix = s"temporaryReader").getCanonicalPath,
          options)
        Dataset.ofRows(
          sparkSession,
          StreamingRelationV2(
            s, source, extraOptions.toMap,
            tempReader.readSchema().toAttributes, v1Relation)(sparkSession))
      case s: ContinuousReadSupport =>
       /* KafkaContinuousReader实现了ContinuousReader*/
        val tempReader = s.createContinuousReader(
          Optional.ofNullable(userSpecifiedSchema.orNull),
          Utils.createTempDir(namePrefix = s"temporaryReader").getCanonicalPath,
          options)
        Dataset.ofRows(
          sparkSession,
          StreamingRelationV2(
            s, source, extraOptions.toMap,
            tempReader.readSchema().toAttributes, v1Relation)(sparkSession))
      case _ =>
        // Code path for data source v1.
        Dataset.ofRows(sparkSession, StreamingRelation(v1DataSource))
    }
  }

load函数中核心代码:

val ds = DataSource.lookupDataSource(source,   sparkSession.sqlContext.conf).newInstance()

首先调用lookupDataSource函数,返回相应类的getClass返回的对象,此处的source是”kafka”

  def lookupDataSource(provider: String, conf: SQLConf): Class[_] = {
    /*
    * File Sourcec:从给定的目录读取数据,目前支持的格式有text,csv,json,parquet.容错。
    * Kafka Source:从kafka拉取数据。仅兼容kafka 0.10.0或者更高版本。容错。
    * Socket Source(for testing):从一个连接中读取UTF8编码的文本数据。不容错。
    *
    * backwardCompatibilityMap是jdbc csv json等之类的映射存储不包含外部数据源,因此如果是kafka那么返回provider1=kafka
    * */
    val provider1 = backwardCompatibilityMap.getOrElse(provider, provider) match {
      case name if name.equalsIgnoreCase("orc") &&
          conf.getConf(SQLConf.ORC_IMPLEMENTATION) == "native" =>
        classOf[OrcFileFormat].getCanonicalName
      case name if name.equalsIgnoreCase("orc") &&
          conf.getConf(SQLConf.ORC_IMPLEMENTATION) == "hive" =>
        "org.apache.spark.sql.hive.orc.OrcFileFormat"
      case name => name
    }
    // provider2=kafka.DefaultSource   provider1=kafka
    val provider2 = s"$provider1.DefaultSource"
    val loader = Utils.getContextOrSparkClassLoader
    /*
    * KafkaSourceProvider继承了DataSourceRegister,重写了shortName="kafka"
    * serviceLoader:ServiceLoader[DataSourceRegister]
    * */
    val serviceLoader = ServiceLoader.load(classOf[DataSourceRegister], loader)

    try {
      /* 在DataSourceRegister的实现类中匹配重写shorname="kafka"的 ,进步一得到了KafkaSourceProvider */
      serviceLoader.asScala.filter(_.shortName().equalsIgnoreCase(provider1)).toList match {

          /*  没有匹配到相应的实现类 */
        case Nil =>
          try {
            Try(loader.loadClass(provider1)).orElse(Try(loader.loadClass(provider2))) match {
              case Success(dataSource) =>
                //此处返回的是完全限定路径
                dataSource
              case Failure(error) =>
                if (provider1.startsWith("org.apache.spark.sql.hive.orc")) {
                  throw new AnalysisException(
                    "Hive built-in ORC data source must be used with Hive support enabled. " +
                    "Please use the native ORC data source by setting 'spark.sql.orc.impl' to " +
                    "'native'")
                } else if (provider1.toLowerCase(Locale.ROOT) == "avro" ||
                  provider1 == "com.databricks.spark.avro") {
                  throw new AnalysisException(
                    s"Failed to find data source: ${provider1.toLowerCase(Locale.ROOT)}. " +
                    "Please find an Avro package at " +
                    "http://spark.apache.org/third-party-projects.html")
                } else {
                  throw new ClassNotFoundException(
                    s"Failed to find data source: $provider1. Please find packages at " +
                      "http://spark.apache.org/third-party-projects.html",
                    error)
                }
            }
          } catch {
            case e: NoClassDefFoundError => // This one won't be caught by Scala NonFatal
              // NoClassDefFoundError's class name uses "/" rather than "." for packages
              val className = e.getMessage.replaceAll("/", ".")
              if (spark2RemovedClasses.contains(className)) {
                throw new ClassNotFoundException(s"$className was removed in Spark 2.0. " +
                  "Please check if your library is compatible with Spark 2.0", e)
              } else {
                throw e
              }
          }
        /*  匹配到了一个实现类 */
        case head :: Nil =>
          //   这里面匹配到了KafkaSourceProvider,返回其getClass
          head.getClass
          /* 如果有多个实现类情况,优先使用内部提供的,否则扔出异常 */
        case sources =>
          val sourceNames = sources.map(_.getClass.getName)
          val internalSources = sources.filter(_.getClass.getName.startsWith("org.apache.spark"))
          if (internalSources.size == 1) {
            logWarning(s"Multiple sources found for $provider1 (${sourceNames.mkString(", ")}), " +
              s"defaulting to the internal datasource (${internalSources.head.getClass.getName}).")
            internalSources.head.getClass
          } else {
            throw new AnalysisException(s"Multiple sources found for $provider1 " +
              s"(${sourceNames.mkString(", ")}), please specify the fully qualified class name.")
          }
      }
    } catch {
      case e: ServiceConfigurationError if e.getCause.isInstanceOf[NoClassDefFoundError] =>
        // NoClassDefFoundError's class name uses "/" rather than "." for packages
        val className = e.getCause.getMessage.replaceAll("/", ".")
        if (spark2RemovedClasses.contains(className)) {
          throw new ClassNotFoundException(s"Detected an incompatible DataSourceRegister. " +
            "Please remove the incompatible library from classpath or upgrade it. " +
            s"Error: ${e.getMessage}", e)
        } else {
          throw e
        }
    }
  }

然后继续回到load函数来,对刚刚lookupDataSource函数返回的对象ds进行匹配:

  val v1Relation = ds match {
      case _: StreamSourceProvider => Some(StreamingRelation(v1DataSource))
      case _ => None
    }

class KafkaSourceProvider extends DataSourceRegister,因此会返回 Some(StreamingRelation(v1DataSource)),具体如下:

/* 
*链接一个streaming DataSource 到逻辑执行计划LogicalPlan
*/
case class StreamingRelation(dataSource: DataSource, sourceName: String, output: Seq[Attribute])
  extends LeafNode with MultiInstanceRelation{

  override def isStreaming: Boolean = true
  override def toString: String = sourceName
  .
  ...  
  }

其中类LeafNode又继承了LogicalPlan ,StreamingRelation重写了其方法isStreaming。

之后ds继续进行匹配,匹配ds是MicroBatchReader还是ContinuousReader类型的。kafka的会匹配这个选项:

    case s: ContinuousReadSupport =>
        /* 
        * KafkaSourceProvider继承了ContinuousReadSupport
        * KafkaContinuousReader实现了ContinuousReader, tempRead就是其对象
        */
        val tempReader = s.createContinuousReader(
          Optional.ofNullable(userSpecifiedSchema.orNull),
          Utils.createTempDir(namePrefix = s"temporaryReader").getCanonicalPath,
          options)
        Dataset.ofRows(
          sparkSession,
          StreamingRelationV2(
            s, source, extraOptions.toMap,
            tempReader.readSchema().toAttributes, v1Relation)(sparkSession))

public interface ContinuousReadSupport extends DataSourceV2 调用其子类KafkaSourceProvider的createContinuousReader函数,然后返回KafkaContinuousReader对象。


  override def createContinuousReader(

      schema: Optional[StructType],

      metadataPath: String,

      options: DataSourceOptions): KafkaContinuousReader = {

    val parameters = options.asMap().asScala.toMap
    validateStreamOptions(parameters)
    // id. Hence, we should generate a unique id for each query.
    /* 对于每一个查询我们需要创建不同的groupid */
    val uniqueGroupId = s"spark-kafka-source-${UUID.randomUUID}-${metadataPath.hashCode}"

    /*  具体化kafka相应的参数*/
    val caseInsensitiveParams = parameters.map { case (k, v) => (k.toLowerCase(Locale.ROOT), v) }
    val specifiedKafkaParams =
      parameters
        .keySet
        .filter(_.toLowerCase(Locale.ROOT).startsWith("kafka."))
        .map { k => k.drop(6).toString -> parameters(k) }
        .toMap

    /*
    * EarliestOffsetRangeLimit
    * 默认情况是 LatestOffsetRangeLimit
    * SpecificOffsetRangeLimit 用户提供的[TopicPartiton,Long]
    * */
    val startingStreamOffsets = KafkaSourceProvider.getKafkaOffsetRangeLimit(caseInsensitiveParams,
      STARTING_OFFSETS_OPTION_KEY, LatestOffsetRangeLimit)

    /* KafkaOffsetReader用于kafka自己的consumer api读取offset  */
    val kafkaOffsetReader = new KafkaOffsetReader(
      /*  查看是哪种消费kafka方式  assign  subscirbe  subscribepattern 三种*/
      strategy(caseInsensitiveParams),
      /*  往ConsumerConfig中设置相应参数的值*/
      kafkaParamsForDriver(specifiedKafkaParams),
      parameters,
      driverGroupIdPrefix = s"$uniqueGroupId-driver")

    new KafkaContinuousReader(
      kafkaOffsetReader,
      kafkaParamsForExecutors(specifiedKafkaParams, uniqueGroupId),
      parameters,
      metadataPath,
      startingStreamOffsets,
      failOnDataLoss(caseInsensitiveParams))
  }

创建完相应的ContinuousReader extends BaseStreamingSource, DataSourceReader ,
然后会创建DataSet

Dataset.ofRows(
          sparkSession,
          StreamingRelationV2(
            s, source, extraOptions.toMap,
            tempReader.readSchema().toAttributes, v1Relation)(sparkSession))

创建dataset时候使用了刚刚 ds 两次匹配后的得到的结果
v1Relation:用于将DataSource链接到逻辑计划

/**
* Used to link a streaming [[DataSource]] into a
* [[org.apache.spark.sql.catalyst.plans.logical.LogicalPlan]]. This is only used for creating
* a streaming [[org.apache.spark.sql.DataFrame]] from [[org.apache.spark.sql.DataFrameReader]].
* It should be used to create [[Source]] and converted to [[StreamingExecutionRelation]] when
* passing to [[StreamExecution]] to run a query.
*/

tempReader:KafkaContinuousReader 用于消费kafka相关操作

然后利用这两个对象进行dataset的创建“

Dataset.ofRows(
          sparkSession,
          StreamingRelationV2(
            s, source, extraOptions.toMap,
            tempReader.readSchema().toAttributes, v1Relation)(sparkSession))

我们可以看一下ofRows函数实现:

 def ofRows(sparkSession: SparkSession, logicalPlan: LogicalPlan): DataFrame = {
    val qe = sparkSession.sessionState.executePlan(logicalPlan)
    qe.assertAnalyzed()
    new Dataset[Row](sparkSession, qe, RowEncoder(qe.analyzed.schema))
  }

首先得到拥有这个sparksession.sessionState方法得到对应的SessionState对象,这个对象
包括SQL配置、临时表、已注册的函数和接受[[org .APACHEC.SPACK.SQL.NET.SQLCONF] ]的所有内容。
然后调用sessionState.executePlan(logicalPlan),这里面的logicalPlan就是那会传过来的,代码如下:

StreamingRelationV2(
            s, source, extraOptions.toMap,
            tempReader.readSchema().toAttributes, v1Relation)(sparkSession)

然后返回对象是QueryExecution类型的,具体实现如下:

 def executePlan(plan: LogicalPlan): QueryExecution = createQueryExecution(plan)

// createQueryExecution函数在哪里呢? 在sessionState中

  private[sql] class SessionState(
    sharedState: SharedState,
    val conf: SQLConf,
    val experimentalMethods: ExperimentalMethods,
    val functionRegistry: FunctionRegistry,
    val udfRegistration: UDFRegistration,
    catalogBuilder: () => SessionCatalog,
    val sqlParser: ParserInterface,
    analyzerBuilder: () => Analyzer,
    optimizerBuilder: () => Optimizer,
    val planner: SparkPlanner,
    val streamingQueryManager: StreamingQueryManager,
    val listenerManager: ExecutionListenerManager,
    resourceLoaderBuilder: () => SessionResourceLoader,
    createQueryExecution: LogicalPlan => QueryExecution,
    createClone: (SparkSession, SessionState) => SessionState) {......}

然后执行ofRows函数主要执行相关查询的工作代码:

qe.assertAnalyzed()
    ↓
    ↓
    ↓
    ↓
def assertAnalyzed(): Unit = analyzed
    ↓
    ↓ 
    ↓
    ↓
lazy val analyzed: LogicalPlan = {
/*
 *当调用SparkSession.getOrCreate()时候,改变这个线程将要返回的sparksession和他的children,
 *这个确保了线程接收的sparksession是一个孤立的而不是全局的
*/
    SparkSession.setActiveSession(sparkSession)
    sparkSession.sessionState.analyzer.executeAndCheck(logical)
  }
    ↓
    ↓
    ↓
    ↓
def executeAndCheck(plan: LogicalPlan): LogicalPlan = {
    val analyzed = execute(plan)
    try {
      checkAnalysis(analyzed)
      EliminateBarriers(analyzed)
    } catch {
      case e: AnalysisException =>
        val ae = new AnalysisException(e.message, e.line, e.startPosition, Option(analyzed))
        ae.setStackTrace(e.getStackTrace)
        throw ae
    }
  }

    ↓
    ↓
    ↓
    ↓
    //执行逻辑计划
  override def execute(plan: LogicalPlan): LogicalPlan = {
    AnalysisContext.reset()
    try {
      executeSameContext(plan)
    } finally {
      AnalysisContext.reset()
    }
  }
 类似资料: