当前位置: 首页 > 知识库问答 >
问题:

Spark结构化流:JDBC接收器中的主键

谭思博
2023-03-14

我正在使用带更新模式的结构化流媒体读取Kafka主题中的数据流。,然后做一些改变。

然后我创建了一个jdbc接收器,用追加模式在mysql接收器中推送数据。问题是我如何告诉我的接收器让它知道这是我的主键,并基于它进行更新,这样我的表就不会有任何重复的行。

   val df: DataFrame = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<List-here>")
  .option("subscribe", "emp-topic")
  .load()


  import spark.implicits._
  // value in kafka is bytes so cast it to String
  val empList: Dataset[Employee] = df.
  selectExpr("CAST(value AS STRING)")
  .map(row => Employee(row.getString(0)))

  // window aggregations on 1 min windows
  val aggregatedDf= ......

  // How to tell here that id is my primary key and do the update
  // based on id column
  aggregatedDf
  .writeStream
  .trigger(Trigger.ProcessingTime(60.seconds))
  .outputMode(OutputMode.Update)
  .foreachBatch { (batchDF: DataFrame, batchId: Long) =>
      batchDF
      .select("id", "name","salary","dept")
      .write.format("jdbc")
      .option("url", "jdbc:mysql://localhost/empDb")
      .option("driver","com.mysql.cj.jdbc.Driver")
      .option("dbtable", "empDf")
      .option("user", "root")
      .option("password", "root")
      .mode(SaveMode.Append)
      .save()
     }

共有1个答案

齐晟
2023-03-14

一种方法是,您可以使用上的重复密钥更新foreachPartition来实现这一目的

下面是伪代码片段

/**
    * Insert in to database using foreach partition.
    * @param dataframe : DataFrame
    * @param sqlDatabaseConnectionString
    * @param sqlTableName
    */
  def insertToTable(dataframe: DataFrame, sqlDatabaseConnectionString: String, sqlTableName: String): Unit = {

//numPartitions = number of simultaneous DB connections you can planning to give
datframe.repartition(numofpartitionsyouwant)

    val tableHeader: String = dataFrame.columns.mkString(",")
    dataFrame.foreachPartition { partition =>
      // Note : Each partition one connection (more better way is to use connection pools)
      val sqlExecutorConnection: Connection = DriverManager.getConnection(sqlDatabaseConnectionString)
      //Batch size of 1000 is used since some databases cant use batch size more than 1000 for ex : Azure sql
      partition.grouped(1000).foreach {
        group =>
          val insertString: scala.collection.mutable.StringBuilder = new scala.collection.mutable.StringBuilder()
          group.foreach {
            record => insertString.append("('" + record.mkString(",") + "'),")
          }

val sql =   s"""
               | INSERT INTO $sqlTableName  VALUES  
               | $tableHeader
               | ${insertString}
               | ON DUPLICATE KEY UPDATE 
               | yourprimarykeycolumn='${record.getAs[String]("key")}'
    sqlExecutorConnection.createStatement()
                .executeUpdate(sql)
          }
    sqlExecutorConnection.close() // close the connection
        }
      }

可以使用preparedstatement代替jdbc语句。

进一步阅读: SPARKSQL-使用DataFrames和JDBC更新MySql表

 类似资料:
  • 在过去的几个月里,我已经使用了相当多的结构化流来实现流作业(在大量使用Kafka之后)。在阅读了《Stream Processing with Apache Spark》一书之后,我有这样一个问题:有没有什么观点或用例可以让我使用Spark Streaming而不是Structured Streaming?如果我投入一些时间来研究它,或者由于im已经使用了Spark结构化流,我应该坚持使用它,而之

  • 场景与经典的流连接略有不同 交易流: transTS, userid, productid,... streamB:创建的新产品流:productid、productname、createTS等) 我想加入与产品的交易,但我找不到水印/加入条件的组合来实现这一点。 结果为空。 我做错了什么?

  • 我正在使用Kafka和Spark 2.1结构化流。我有两个json格式的数据主题,例如: 我需要比较Spark中基于标记的两个流:name,当值相等时,执行一些额外的定义/函数。 如何使用Spark结构化流来做到这一点? 谢谢

  • 我正在尝试将连续触发器与 Spark 结构化流式处理查询结合使用。我得到的错误是,火花消费者在处理数据时找不到适当的偏移量。如果没有此触发器,查询将正常运行(如预期)。 我的工作: 从Kafka主题读取数据: 将数据写入Kafka主题: 所以我基本上没有做什么特别的事情——只是将输入数据传输到输出主题,而没有任何转换或无效操作。 我得到了什么: 在executor日志中,我看到很多这样的消息: 尽

  • 我正在研究为Spark结构化流在kafka中存储kafka偏移量,就像它为DStreams工作一样,除了结构化流,我也在研究同样的情况。是否支持结构化流?如果是,我如何实现? 我知道使用进行hdfs检查点,但我对内置的偏移量管理感兴趣。 我期待Kafka存储偏移量只在内部没有火花hdfs检查点。

  • 我是Kafka流媒体的新手。我使用python设置了一个twitter监听器,它运行在localhost:9092kafka服务器中。我可以使用kafka客户端工具(conduktor)并使用命令“bin/kafka-console-consumer.sh--bootstrap-server localhost:9092-topic twitter--from-begind”来使用侦听器生成的流,