当前位置: 首页 > 工具软件 > Apache Kudu > 使用案例 >

Apache Kudu的Java/Scala API操作

公西浩
2023-12-01

1. 使用Java/Scala API进行表创建、插入数据、alter表、scan表、删除表

1.1 pom.xml

添加如下依赖

        <dependency>
            <groupId>org.apache.kudu</groupId>
            <artifactId>kudu-client</artifactId>
            <version>1.15.0</version>
        </dependency>

1.2 Kudu_test程序

import org.apache.kudu.client.KuduPredicate.ComparisonOp
import org.apache.kudu.client.SessionConfiguration.FlushMode
import org.apache.kudu.client._
import org.apache.kudu.{ColumnSchema, Schema, Type}

import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer

object Kudu_test {


  // ======================================创建表================================
  def createExampleTable(client: KuduClient, tableName: String) = {

    val keyCol: ColumnSchema = new ColumnSchema.ColumnSchemaBuilder("user_id", Type.INT32)
      .key(true)
      .comment("用户ID")
      .build()
    val valueCol1: ColumnSchema = new ColumnSchema.ColumnSchemaBuilder("user_name", Type.STRING)
      .nullable(true)
      .comment("用户名称")
      .build()
    // 如果是decimal类型,需要添加.typeAttributes(ColumnTypeAttributes)定义precision和scale
    val valueCol2: ColumnSchema = new ColumnSchema.ColumnSchemaBuilder("salary", Type.DOUBLE)
      .nullable(true)
      .comment("薪水")
      .build()
    // key必须位于前面
    val columns: ArrayBuffer[ColumnSchema] = ArrayBuffer(keyCol, valueCol1, valueCol2)
    val schema: Schema = new Schema(columns.asJava)

    // 设置一个分区schema, row按hash分区进入不同的tablet。还支持range分区、hash + range分区
    val cto: CreateTableOptions = new CreateTableOptions()
    val hashKeys: ArrayBuffer[String] = ArrayBuffer("user_id")
    val numBuckets = 8
    cto.addHashPartitions(hashKeys.asJava, numBuckets)
      // 创建表默认是3副本, 副本数必须为奇数
      .setNumReplicas(1)


    client.createTable(tableName, schema, cto)
    println("Created table: " + tableName)
  }


  // ======================================插入数据到行======================================
  def insertRows(client: KuduClient, tableName: String) = {
    val table: KuduTable = client.openTable(tableName)
    val session: KuduSession = client.newSession()
    session.setFlushMode(FlushMode.AUTO_FLUSH_BACKGROUND)

    // 插入第一条数据
    val upsert1: Upsert = table.newUpsert()
    val row1: PartialRow = upsert1.getRow()
    row1.addInt("user_id", 1)
    row1.setNull("user_name")
    row1.addDouble("salary", 8.8)
    session.apply(upsert1)

    // 插入第二条数据
    val upsert2: Upsert = table.newUpsert()
    val row2: PartialRow = upsert2.getRow()
    row2.addInt("user_id", 2)
    row2.addString("user_name", "zhang_san")
    row2.setNull("salary")
    session.apply(upsert2)

    // 也可以手动调用flush
    // session.flush()

    session.close()

    // flush是在后台的线程完成,需要自己查看是否有插入异常
    val errorNum: Int = session.countPendingErrors()
    if (errorNum != 0) {
      println(s"有${errorNum}个数据插入异常")

      val rowErrorsAndOverflowStatus: RowErrorsAndOverflowStatus = session.getPendingErrors()
      val errors: Seq[RowError] = rowErrorsAndOverflowStatus.getRowErrors().toSeq
      errors.foreach(println)

      // 如果error太多,就会产生overflow
      if (rowErrorsAndOverflowStatus.isOverflowed) {
        println("error buffer overflow,一些RowError已经被丢弃")
      }

      throw new Exception("插入数据失败")
    } else {
      println("插入数据成功")
    }

  }

  // ======================================扫描表中的数据======================================
  def scanTable(client: KuduClient, tableName: String) = {
    val table: KuduTable = client.openTable(tableName)
    val schema: Schema = table.getSchema()

    // 定义需要返回的列,其中key必须定义
    val projectColumns: ArrayBuffer[String] = ArrayBuffer("user_id", "user_name", "salary", "add_column")

    // 定义扫描的key的low bound和upper bound
    val lowPredicate: KuduPredicate = KuduPredicate.newComparisonPredicate(
      schema.getColumn("user_id"), ComparisonOp.GREATER_EQUAL, 1
    )
    val upperPredicate: KuduPredicate = KuduPredicate.newComparisonPredicate(
      schema.getColumn("user_id"), ComparisonOp.LESS_EQUAL, 2
    )

    // 定义scan
    val scanner: KuduScanner = client.newScannerBuilder(table)
      .setProjectedColumnNames(projectColumns.asJava)
      .addPredicate(lowPredicate)
      .addPredicate(upperPredicate)
      .build()

    // hash分区的表,返回的key是无序的
    while (scanner.hasMoreRows()) {
      // 批量获取数据
      val results: RowResultIterator = scanner.nextRows()
      while (results.hasNext()) {
        val result: RowResult = results.next()

        val user_id = result.getInt("user_id")
        var user_name = ""
        if (!result.isNull("user_name")) {
          user_name = result.getString("user_name")
        }
        var salary = 0.0
        if (!result.isNull("salary")) {
          salary = result.getDouble("salary")
        }
        val add_column: java.util.Date = result.getDate("add_column")

        println(s"user_id: ${user_id}, user_name: ${user_name}, salary: ${salary}, add_column: ${add_column}")
      }
    }


  }

  // ======================================修改表的schema======================================
  def addTableColumn(client: KuduClient, tableName: String) = {
    val alterTableOptions: AlterTableOptions = new AlterTableOptions()
    val defaultDate = new java.sql.Date(1648949364000L)
    alterTableOptions.addColumn("add_column", org.apache.kudu.Type.DATE, defaultDate)
    // alter表后,需要重新执行client.openTable(tableName),才能看到新的schema
    client.alterTable(tableName, alterTableOptions)
    println("Altered the table")
  }

  // ======================================删除表中的行======================================
  def deleteRows(client: KuduClient, tableName: String) = {
    val table: KuduTable = client.openTable(tableName)
    val session: KuduSession = client.newSession()
    session.setFlushMode(FlushMode.AUTO_FLUSH_BACKGROUND)

    // 删除user_id = 1的这条数据
    val delete1: Delete = table.newDelete()
    val row1: PartialRow = delete1.getRow()
    row1.addInt("user_id", 1)
    session.apply(delete1)

    // 也可以手动调用flush
    // session.flush()

    session.close()

    // flush是在后台的线程完成,需要自己查看是否有插入异常
    val errorNum: Int = session.countPendingErrors()
    if (errorNum != 0) {
      println(s"有${errorNum}个数据删除异常")

      val rowErrorsAndOverflowStatus: RowErrorsAndOverflowStatus = session.getPendingErrors()
      val errors: Seq[RowError] = rowErrorsAndOverflowStatus.getRowErrors().toSeq
      errors.foreach(println)

      // 如果error太多,就会产生overflow
      if (rowErrorsAndOverflowStatus.isOverflowed) {
        println("error buffer overflow,一些RowError已经被丢弃")
      }

      throw new Exception("删除数据失败")
    } else {
      println("删除数据成功")
    }

  }


  // ======================================删除表======================================
  def deleteExampleTable(client: KuduClient, tableName: String) = {

    client.deleteTable(tableName)
    println("Deleted table: " + tableName)

  }


  def main(args: Array[String]): Unit = {

    val kuduMasters = "192.168.8.112:7051,192.168.8.113:7051"
    val tableName = "kudu_java_test"
    val client: KuduClient = new KuduClient.KuduClientBuilder(kuduMasters).build()

    try {
      createExampleTable(client, tableName)
      insertRows(client, tableName)
      addTableColumn(client, tableName)
      deleteRows(client, tableName)
      scanTable(client, tableName)
    } catch {
      case e: Exception => {
        deleteExampleTable(client, tableName)
        e.printStackTrace()
        client.shutdown()
      }
    }


  }

}

1.3 运行程序查看结果

通过mvn clean package将程序打成jar包,上传到服务器上运行,结果如下:

[root@bigdata001 ~]# scala -classpath /root/kudu_test-1.0-SNAPSHOT.jar Kudu_test
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Created table: kudu_java_test
插入数据成功
Altered the table
删除数据成功
user_id: 2, user_name: zhang_san, salary: 0.0, add_column: 2022-04-03
[root@bigdata001 ~]#
 类似资料: