当前位置: 首页 > 工具软件 > spark-wallet > 使用案例 >

SparkStreaming & Kafka & ClickHouse

慕逸仙
2023-12-01

网上关于Spark 读写 clickhouse的文章不少,但我认为适用你的可能还真不多。看看本文是否能给你开启一个新思路?

一、Spark消费Kafka后写入Clickhouse

注意,clickhouse集群部署?kafka集群部署?Spark消费Kafka的CDC过程 怎么实现?怎么实现一次性语义?等不在本文的讨论范围。本文主要想给出一种写clickhouse的一种方式。

二、参考代码

......这里省略了部分无关代码

val topic = "testdbserver.cdc_test_db.t_cdc_test";
  val topics = Set(topic)

  val groupId = KafkaGroupConstants.DEFAULT_KAFKA_GROUP
  val offsetMap: Map[TopicPartition, Long] =     
  OffsetUtil.getMultipleOffsetFromRedis(topics, groupId)
  var offsetRanges: Array[OffsetRange] = Array.empty[OffsetRange]

  val kafkaInputDStream: InputDStream[ConsumerRecord[String, String]] = getKafkaStream(topics, streamingContext, offsetMap, groupId)

  kafkaInputDStream.transform(
    (rdd: RDD[ConsumerRecord[String, String]]) => {
      //获取当前采集周期中读取的主题对应的分区以及偏移量
      offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

      //查看偏移量信息
      for (elem <- offsetRanges) {
        //注意这种写法
        println(s"topic : ${elem.topic} + partition : ${elem.partition} + from-off-set : ${elem.fromOffset} + end-off-set : ${elem.untilOffset} ")
      }

      rdd
    })
    .map(
      (record: ConsumerRecord[String, String]) => {
        // 将 record 的值转换为 json 对象
        val jsonObj: JSONObject = JSON.parseObject(record.value())

        //print(jsonObj)

        jsonObj
      }
    )
    .foreachRDD(rdd => {
      //以分区为单位对数据进行处理
      val jsonObjects = new ListBuffer[JSONObject]

      rdd.foreachPartition(partition => {

        partition.foreach(row => {
          val payload = row.getJSONObject("payload")
          val op: String = payload.getString("op")
          val afterData = payload.getJSONObject("after")
          jsonObjects.append(afterData)
        })

        if (jsonObjects.size > 0) {
          val fieldDefine =
            """
            Id UInt64,
            Name String
        """

          val clickhouse: Clickhouse = new Clickhouse()
          clickhouse.setInitParameter("testcdc", "ods_cdc_test", fieldDefine)
          clickhouse.insert(jsonObjects)
        }
      })
      //提交偏移量到Redis中
      OffsetUtil.saveOffsetToRedis(topic, groupId, offsetRanges)
    })

  streamingContext.start()
  streamingContext.awaitTermination()

Clickhouse的工具类长啥样?

class Clickhouse extends Serializable {

  //一次写最多写20000
  val bulkSize = 20000

  var clickhouseUrl: String = "jdbc:clickhouse://%s:%d/%s"
  val host = PropertiesUtil.getProperty(CommonConstants.CLICKHOUSE_HOST)
  val port = PropertiesUtil.getProperty(CommonConstants.CLICKHOUSE_PORT).toInt

  val username: String = PropertiesUtil.getProperty(CommonConstants.CLICKHOUSE_USERNAME)
  val password: String = PropertiesUtil.getProperty(CommonConstants.CLICKHOUSE_PASSWORD)

  val properties: Properties = new Properties()

  var tableSchema: Map[String, String] = new HashMap[String, String]()

  var insertFields: java.util.List[String] = new ArrayList[String]

  var database = ""
  var writeTableName = ""

  val arrayPattern: Regex = "(Array.*)".r

  def log(line: String) {
    println("|============================================|" + line)
  }

  def insert(data: ListBuffer[JSONObject]): Unit = {
    val executorBalanced = new BalancedClickhouseDataSource(this.clickhouseUrl, this.properties)
    val dbConnect = executorBalanced.getConnection.asInstanceOf[ClickHouseConnectionImpl]
    val sqlStr = prepareInsertSQL(database, writeTableName, insertFields)
    val statement = dbConnect.createClickHousePreparedStatement(sqlStr, ResultSet.FETCH_REVERSE)

    var length = 0
    for (d <- data) {
      length += 1
      renderStatement(d, insertFields, statement)
      statement.addBatch()

      // 如果达到写入的数量则开始执行写入
      if (length >= bulkSize) {
        try {
          statement.executeBatch()
        } catch {
          case ex: Throwable => log("Batch insert to clickhouse exception=" + ex)
        }
        length = 0;
      }
    }

    // 如果剩下的数据小于 bulkSize,则单独提交执行
    if (length > 0) {
      try {
        statement.executeBatch()
      } catch {
        case ex: Throwable => log("Execute exception=" + ex)
      }
    }

    try {
      statement.close()
    } catch {
      case ex: Throwable => log("Close database exception=" + ex)
    }

    try {
      if (dbConnect != null) dbConnect.close();
    } catch {
      case ex: Throwable => log("Close dbconnection exception=" + ex)
    }
  }

  def select(): Unit = {

  }

  /** *
   *
   * @Author: name
   * @Description: 生成SQL
   * @Date: 2021/12/31
   * @param database
   * @param writeTableName
   * @param fieldDefine
   *
   * */
  def setInitParameter(database: String, writeTableName: String, fieldDefine: String): Unit = {
    this.database = database
    this.writeTableName = writeTableName

    this.clickhouseUrl = clickhouseUrl.format(this.host, this.port, this.database)
    //this.properties.put("user", this.username)
    //this.properties.put("password", this.password)

    var fieldName = ""
    val fieldDefineArray = fieldDefine.split(",");
    for (field <- fieldDefineArray) {
      val arrFD = field.trim().split(" ");
      if (arrFD.length == 2) {
        fieldName = arrFD(0).toString().trim()
        insertFields.add(fieldName)
        tableSchema += (fieldName -> arrFD(1).toString().trim())
      }
    }
  }

  // ============================== private methods ==========================

  private def getClickHouseSchema(conn: ClickHouseConnectionImpl, table: String): Map[String, String] = {
    val sql = String.format("desc %s", table)
    val resultSet = conn.createStatement.executeQuery(sql)
    var schema = new HashMap[String, String]()
    while (resultSet.next()) {
      schema += (resultSet.getString(1) -> resultSet.getString(2))
    }
    schema
  }

  /** *
   *
   * @Author: name
   * @Description: 生成写入 SQL
   * @Date: 2022/1/11
   * @param database
   * @param tableName
   * @param insertFields
   * @return java.lang.String
   * */
  private def prepareInsertSQL(database: String, tableName: String, insertFields: java.util.List[String]): String = {
    val prepare = List.fill(insertFields.size)("?")
    val sql = String.format(
      "insert into %s.%s (%s) values (%s)",
      database,
      tableName,
      insertFields.mkString(","),
      prepare.mkString(","))
    sql
  }

  /** *
   *
   * @Author: name
   * @Description: 参数赋值 & 类型转换
   * @Date: 2022/1/11
   * @param item
   * @param dsFields
   * @param statement
   *
   * */
  private def renderStatement(item: JSONObject,
                              dsFields: java.util.List[String],
                              statement: ClickHousePreparedStatement): Unit = {
    for (i <- 0 until insertFields.size()) {
      val field = insertFields.get(i)
      val fieldType = tableSchema(field)
      if (dsFields.indexOf(field) == -1) {
        renderDefaultStatement(i, fieldType, statement)
      } else {
        fieldType match {
          case "Map" =>
            statement.setString(i + 1, item.getJSONObject(field).toJSONString)
          case "DateTime" | "Date" | "String" =>
            statement.setString(i + 1, item.getString(field))
          case "Int8" | "UInt8" | "Int16" | "UInt16" | "Int32" =>
            statement.setInt(i + 1, item.getInteger(field))
          case "UInt32" | "UInt64" | "Int64" =>
            statement.setLong(i + 1, item.getLong(field))
          case "Float32" => statement.setFloat(i + 1, item.getFloat(field))
          case "Float64" => statement.setDouble(i + 1, item.getDouble(field))
          case _ => statement.setString(i + 1, item.getString(field))
        }
      }
    }
  }

  /** *
   *
   * @Author: name
   * @Description: 未定义字段统一设置对应类型的默认值
   * @Date: 2022/1/7
   * @param index
   * @param fieldType
   * @param statement
   *
   * */
  private def renderDefaultStatement(index: Int, fieldType: String, statement: ClickHousePreparedStatement): Unit = {
    fieldType match {
      case "DateTime" | "Date" | "String" =>
        statement.setString(index + 1, renderStringDefault(fieldType))
      case "Int8" | "UInt8" | "Int16" | "Int32" | "UInt32" | "UInt16" =>
        statement.setInt(index + 1, 0)
      case "UInt64" | "Int64" =>
        statement.setLong(index + 1, 0)
      case "Float32" => statement.setFloat(index + 1, 0)
      case "Float64" => statement.setDouble(index + 1, 0)
      case arrayPattern(_) => statement.setArray(index + 1, List())
      case _ => statement.setString(index + 1, "")
    }
  }

  private def renderStringDefault(fieldType: String): String = {
    fieldType match {
      case "DateTime" =>
        val dateFormat: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
        dateFormat.format(System.currentTimeMillis())
      case "Date" =>
        val dateFormat: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd")
        dateFormat.format(System.currentTimeMillis())
      case "String" =>
        ""
    }
  }
}

配置文件参考

# Kafka
kafka.broker.list=192.168.1.1:9092,192.168.1.2:9092,192.168.1.3:9092

# Kafka Producer
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer
enable.idempotence=true

# Redis
redis.host=192.168.1.1
redis.port=6379
redis.password=111111
redis.timeout=1000

# ZK
zk.host=192.168.1.1,192.168.1.2,192.168.1.3:2181

# Mysql
# mysql.nsbigdata.url="jdbc:mysql://192.168.1.1/test?characterEncoding=utf-8&useSSL=false"

# ClickHouse
clickhouse.driver=ru.yandex.clickhouse.ClickHouseDriver
clickhouse.host=192.168.1.1
clickhouse.port=8123
clickhouse.url=jdbc:clickhouse://192.168.1.1:8123/test

也有不完美的地方

1)为什么没有使用DataFrame/DataSet?

因为在我的这个场景下,我试图通过各种方式创建DataFrame都无法正常运行,正深入学习中,知道原因的还请多多指点。就如同这段代码,放到最外层运行一切OK,但在

rdd.foreachPartition

中就不能正常工作了。

val url = "jdbc:clickhouse://192.168.1.1:9000"
  val drivce = "com.github.housepower.jdbc.ClickHouseDriver"
    // df创建略了...
    val df = spark.read
    .format("jdbc")
    .option("driver", drivce)
    .option("url", url)
    .option("dbtable", "testcdc.ods_cdc_test")
    .load
  df.show()

2)写入的时候只写进一个节点,至于是写本地表,还是分布式表或是分布式表采用哪种方式这些问题我想在后续的章节中和大家一起聊一聊;

至此,CDC至 ClickHouse流程 通了。也是第一次使用ClickHouse。相互学习。

 类似资料: