当前位置: 首页 > 知识库问答 >
问题:

scala数据集的批处理

晋承嗣
2023-03-14

我试图在Spark中创建成批的Dataset行。为了保持发送到服务的记录数量,我想对项目进行批处理,这样我就可以保持数据发送的速率。对于,

case class Person(name:String, address: String)
case class PersonBatch(personBatch: List[Person])

对于给定的数据集[Person]我想创建数据集[PersonBatch]

例如,如果输入Dataset[Person]有100条记录,那么输出Dataset应该像Dataset[PersonBatch]一样,其中每个PersonBatch应该是n记录(Person)的列表。

我试过了,但没用。

object DataBatcher extends Logger {

  var batchList: ListBuffer[PersonBatch] = ListBuffer[PersonBatch]()
  var batchSize: Long = 500  //default batch size

  def addToBatchList(batch: PersonBatch): Unit = {
    batchList += batch
  }

  def clearBatchList(): Unit = {
    batchList.clear()
  }

  def createBatches(ds: Dataset[Person]): Dataset[PersonBatch] = {

    val dsCount = ds.count()
    logger.info(s"Count of dataset passed for creating batches : ${dsCount}")
    val batchElement = ListBuffer[Person]()
    val batch = PersonBatch(batchElement)
    ds.foreach(x => {
      batch.personBatch += x
      if(batch.personBatch.length == batchSize) {
        addToBatchList(batch)
        batch.requestBatch.clear()
      }
    })
    if(batch.personBatch.length > 0) {
      addToBatchList(batch)
      batch.personBatch.clear()
    }
    sparkSession.createDataset(batchList)
  }  
}

我想在Hadoop集群上运行此作业。有人能帮我吗?

共有2个答案

欧阳正德
2023-03-14
val tableHeader: String = dataFrame.columns.mkString(",")
dataFrame.foreachPartition((it: Iterator[Row]) => {
      println("partition index: " )
      val url = "jdbc:...+ "user=;password=;"
      val conn = DriverManager.getConnection(url)
      conn.setAutoCommit(true)
      val stmt = conn.createStatement()
      val batchSize = 10
      var i =0
      while (it.hasNext) {
        val row = it.next
        import java.sql.SQLException
        import java.sql.SQLIntegrityConstraintViolationException
        try {
          stmt.addBatch(" UPDATE TABLE SET STATUS = 0 , " +
            " DATE ='" + new java.sql.Timestamp(System.currentTimeMillis()) +"'" +
            " where id = " + row.getAs("IDNUM")  )
          i += 1
          if (  i  % batchSize == 0 ) {
            stmt.executeBatch
            conn.commit
          }
        } catch {
          case e: SQLIntegrityConstraintViolationException =>
          case e: SQLException =>
            e.printStackTrace()
        }
        finally {
             stmt.executeBatch
             conn.commit
        }

      }
      import java.util
      val ret = stmt.executeBatch
      System.out.println("Ret val: " + util.Arrays.toString(ret))
      System.out.println("Update count: " + stmt.getUpdateCount)
      conn.commit
      stmt.close
邓令雪
2023-03-14

rdd。迭代器的分组函数可能对您有用。

例如:

iter。分组(批量大小)

使用iter进行批插入的示例代码段。分组(batchsize)这里是1000,我正试图插入到数据库中

   df.repartition(numofpartitionsyouwant) // numPartitions ~ number of simultaneous DB connections you can planning to give...
def insertToTable(sqlDatabaseConnectionString: String,
                  sqlTableName: String): Unit = {

  val tableHeader: String = dataFrame.columns.mkString(",")
  dataFrame.foreachPartition { partition =>
    //NOTE : EACH PARTITION ONE CONNECTION (more better way is to use connection pools)
    val sqlExecutorConnection: Connection =
      DriverManager.getConnection(sqlDatabaseConnectionString)
    //Batch size of 1000 is used since some databases cant use batch size more than 1000 for ex : Azure sql
    partition.grouped(1000).foreach { group =>
      val insertString: scala.collection.mutable.StringBuilder =
        new scala.collection.mutable.StringBuilder()

      group.foreach { record =>
        insertString.append("('" + record.mkString(",") + "'),")
      }

      sqlExecutorConnection
        .createStatement()
        .executeUpdate(f"INSERT INTO [$sqlTableName] ($tableHeader) VALUES "
          + insertString.stripSuffix(","))
    }

    sqlExecutorConnection.close() // close the connection so that connections wont exhaust.
  }
}
 类似资料:
  • 我正在寻找一种方法,将我的大型spark数据集划分为组/批,并在某些函数中处理这组行。所以基本上,这组行应该被输入到我的函数中,输出是我的单位,因为我不想聚合或更新输入记录,只是执行一些计算。 为了理解,假设我有以下输入。 假设我需要按col1和col2分组,这将给我以下分组 (1, A,1),(1, A,4),(1, A,5)--- (1,B,2)--- (1,C,3),(1,C,6)--- (

  • 对于在时间序列数据上实现PyTorch数据管道的“最佳实践”,我有点困惑。 我有一个HD5文件,我使用自定义DataLoader读取。似乎我应该返回数据样本作为一个(特征,目标)元组,每个元组的形状是(L,C),其中L是seq_len,C是通道数-即不要在数据加载器中预制批处理,只需返回一个表。 PyTorch模块似乎需要一个批处理暗淡,即。Conv1D期望(N,C,L)。 我的印象是,类将预先处

  • 我需要访问两个数据源: Spring批处理存储库:在内存H2中 我的步骤需要访问。 我在那里看到了几个关于如何创建自定义

  • 我有一个特定的要求,其中,我需要检查空的数据文件。如果为空,则填充默认值。这是我尝试过但没有得到我想要的东西。 这个想法是,如果df不是空的,就得到它。如果为空,则填写默认值为零。这似乎不起作用。以下是我得到的。 请帮忙。

  • 我的数据库中有大约1000万个blob格式的文件,我需要转换并以pdf格式保存它们。每个文件大小约为0.5-10mb,组合文件大小约为20 TB。我正在尝试使用spring批处理实现该功能。然而,我的问题是,当我运行批处理时,服务器内存是否可以容纳那么多的数据?我正在尝试使用基于块的处理和线程池任务执行器。请建议运行作业的最佳方法是否可以在更短的时间内处理如此多的数据

  • 我在GCP数据流/Apache Beam中有一个PCollection。我需要将“按N”组合起来,而不是逐个处理它。类似于分组(N)。因此,在有界处理的情况下,它将按10个项目进行分组,最后一批是剩下的任何项目。这在Apache Beam中可能吗?