6.1 Talos-Spark

优质
小牛编辑
135浏览
2023-12-01
  • 易于理解的并行处理:通过使用 DirectTalosInputDStream,Spark Streaming创建的RDD所包含的partitions数量和Talos topic的partitions数量是一一对应的关系,便于理解和使用。
  • Exactly-once语义:Offsets可以通过checkpoint保存到可靠存储。因此,通过开启checkpoint(How to configure Checkpointing),以及使用正确地方式输出到外部存储( Semantics of output operations),可以保证Exactly-once的语义(默认已经开启checkpointing)。

添加依赖

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-streaming_2.10</artifactId>
  <version>${spark.version}</version>
  <scope>provided</scope>
</dependency>
<dependency>
  <groupId>com.xiaomi.infra.galaxy</groupId>
  <artifactId>galaxy-talos-spark</artifactId>
  <version>${galaxy.version}</version>
</dependency>

其中,spark.version需要和spark集群保持一致,galaxy.version一般选择最新即可; spark-streaming的scope设置与集群情况相关, 如果集群上包含了streaming依赖, 则设置为provided即可; 否则设置为compile.

常用API

import org.apache.spark.streaming.talos._

val streamingContext = TalosUtils.createStreamingContext( [spark conf], [batch duration], [map of Talos parameters], [Talos credential], [set of topics to consume]){[dstream function]}

参数说明:

  • [spark conf] 和 [batch duration] 是用来初始化StreamingContext的;
  • 对于[map of Talos parameters],必须通过TalosClientConfigKeys.GALAXY_TALOS_SERVICE_ENDPOINT(作为key)指定Talos集群地址(作为value); 默认情况下,生成的DStream会从每个Talos topic partition的最新(largest)offset开始消费。如果想要从最旧(smallest)offset开始消费,需要在[map of Talos parameters]中设置auto.offset.reset为smallest。
  • [Talos credential] 和 [ set of topics to consume] 都是Talos相关概念;
  • [dstream function] 是一个function参数, 需要用户传入一个类型为 InputDStream[(String, String)] => Unit 的function来构建DStream; 其中InputDStream[(String, String)] 的(String, String)分别对应Talos消息中的(Key, Message); 如果想要在DStream中处理其他格式的内容, 可以参考下面高阶用法中的自定义DStream中的数据格式

通过上述方式创建的StreamingContext, 默认是开启checkpointing的, 且checkpointing dir是app name; 这样当重新提交作业的时候, StreamingContext可以从checkpointing数据恢复, 从而可以从上次作业退出的位置(即保存的talos offset信息)继续消费; 如果想要禁止自动开启checkpointing, 可以配置SparkConf spark.streaming.talos.checkpointing.enable 为false.

示例代码

请查看galaxy-sdk-java/galaxy-talos-client/galaxy-talos-spark/src/main/scala/com/xiaomi/infra/galaxy/talos/spark/example

高阶用法

自定义DStream中的数据格式

TalosUtils提供如下API

import org.apache.spark.streaming.talos._

val streamingContext = TalosUtils.createStreamingContext( [spark conf], [batch duration], [map of Talos parameters], [Talos credential], [set of topics to consume], [custom message handler]){[dstream function]}

其中,[custom message handler]是类型为MessageAndOffset => T的参数,T为泛型参数。 比如,想要在DStream中的数据里添加message offset信息的话,可以自定义如下message handler:

val messageHandler = (mo: MessageAndOffset) => (mo.message.partitionKey, mo.messageOffset, new String(mo.message.message.array(), Charset.forName("UTF-8")))

对于message.array(), 除了转化为String, 也可以转化为其他自定义的内容, 或者直接返回array.

获取每个batch的offset range信息

以示例代码TalosSparkDemo为参照, 通过一下方式可以获取offset 信息:


  // Hold a reference to the current offset ranges, so it can be used downstream
  var offsetRanges = Array[OffsetRange]()

  val ssc = TalosUtils.createStreamingContext(
    sparkConf,
    Seconds(batchSec),
    Map(TalosClientConfigKeys.GALAXY_TALOS_SERVICE_ENDPOINT -> uri,
      "auto.offset.reset" -> "smallest"),
    credential,
    Set(topic)) { inputDstream => {
    inputDstream.transform { rdd =>
      offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      rdd
    }.foreachRDD(rdd => {
      // scalastyle:off println
      rdd.foreach(println)
      // scalastyle:on println
    })
  }

注意两点

  • 为了获取batch的offset range信息, 上面的transform必须作为inputDStream第一个方法进行调用,而不是在调用了其他方法后再调用。
  • 如果调用了任何触发shuffle或repartition的方法,比如reduceByKey(),RDD partition和Talos partition的一一对应关系不再有效。

更多配置参数

以下配置都可以在调用TalosUtils.createDirectStream时,设置在中streamingContextsparkConf中的,设置在[map of Talos parameters]中是无效的。

configdefaultmeaning
spark.streaming.talos.maxRatePerPartition0每个partition消费数据的速率限制,message/seconds;如果每个batch数据量太大,导致spark job delay严重时,可以考虑限制每个batch的数据量。
spark.streaming.talos.maxRetries-1与Talos交互时,失败重试次数;网络不好时,可尝试配置多一些重试次数。默认-1为无限重试。
spark.streaming.talos.backoff.ms200与Talos交互时,失败重试次数之间的时间间隔。
spark.streaming.talos.checkpointing.enabletrue是否自动开启checkpointing,并设置目录为AppName;默认为true,会自动执行streamingContext.checkpoint(AppName);

有使用问题,请联系 wangjiasheng@xiaomi.com 。