添加如下依赖
<dependency>
<groupId>org.apache.kudu</groupId>
<artifactId>kudu-client</artifactId>
<version>1.15.0</version>
</dependency>
import org.apache.kudu.client.KuduPredicate.ComparisonOp
import org.apache.kudu.client.SessionConfiguration.FlushMode
import org.apache.kudu.client._
import org.apache.kudu.{ColumnSchema, Schema, Type}
import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
object Kudu_test {
// ======================================创建表================================
def createExampleTable(client: KuduClient, tableName: String) = {
val keyCol: ColumnSchema = new ColumnSchema.ColumnSchemaBuilder("user_id", Type.INT32)
.key(true)
.comment("用户ID")
.build()
val valueCol1: ColumnSchema = new ColumnSchema.ColumnSchemaBuilder("user_name", Type.STRING)
.nullable(true)
.comment("用户名称")
.build()
// 如果是decimal类型,需要添加.typeAttributes(ColumnTypeAttributes)定义precision和scale
val valueCol2: ColumnSchema = new ColumnSchema.ColumnSchemaBuilder("salary", Type.DOUBLE)
.nullable(true)
.comment("薪水")
.build()
// key必须位于前面
val columns: ArrayBuffer[ColumnSchema] = ArrayBuffer(keyCol, valueCol1, valueCol2)
val schema: Schema = new Schema(columns.asJava)
// 设置一个分区schema, row按hash分区进入不同的tablet。还支持range分区、hash + range分区
val cto: CreateTableOptions = new CreateTableOptions()
val hashKeys: ArrayBuffer[String] = ArrayBuffer("user_id")
val numBuckets = 8
cto.addHashPartitions(hashKeys.asJava, numBuckets)
// 创建表默认是3副本, 副本数必须为奇数
.setNumReplicas(1)
client.createTable(tableName, schema, cto)
println("Created table: " + tableName)
}
// ======================================插入数据到行======================================
def insertRows(client: KuduClient, tableName: String) = {
val table: KuduTable = client.openTable(tableName)
val session: KuduSession = client.newSession()
session.setFlushMode(FlushMode.AUTO_FLUSH_BACKGROUND)
// 插入第一条数据
val upsert1: Upsert = table.newUpsert()
val row1: PartialRow = upsert1.getRow()
row1.addInt("user_id", 1)
row1.setNull("user_name")
row1.addDouble("salary", 8.8)
session.apply(upsert1)
// 插入第二条数据
val upsert2: Upsert = table.newUpsert()
val row2: PartialRow = upsert2.getRow()
row2.addInt("user_id", 2)
row2.addString("user_name", "zhang_san")
row2.setNull("salary")
session.apply(upsert2)
// 也可以手动调用flush
// session.flush()
session.close()
// flush是在后台的线程完成,需要自己查看是否有插入异常
val errorNum: Int = session.countPendingErrors()
if (errorNum != 0) {
println(s"有${errorNum}个数据插入异常")
val rowErrorsAndOverflowStatus: RowErrorsAndOverflowStatus = session.getPendingErrors()
val errors: Seq[RowError] = rowErrorsAndOverflowStatus.getRowErrors().toSeq
errors.foreach(println)
// 如果error太多,就会产生overflow
if (rowErrorsAndOverflowStatus.isOverflowed) {
println("error buffer overflow,一些RowError已经被丢弃")
}
throw new Exception("插入数据失败")
} else {
println("插入数据成功")
}
}
// ======================================扫描表中的数据======================================
def scanTable(client: KuduClient, tableName: String) = {
val table: KuduTable = client.openTable(tableName)
val schema: Schema = table.getSchema()
// 定义需要返回的列,其中key必须定义
val projectColumns: ArrayBuffer[String] = ArrayBuffer("user_id", "user_name", "salary", "add_column")
// 定义扫描的key的low bound和upper bound
val lowPredicate: KuduPredicate = KuduPredicate.newComparisonPredicate(
schema.getColumn("user_id"), ComparisonOp.GREATER_EQUAL, 1
)
val upperPredicate: KuduPredicate = KuduPredicate.newComparisonPredicate(
schema.getColumn("user_id"), ComparisonOp.LESS_EQUAL, 2
)
// 定义scan
val scanner: KuduScanner = client.newScannerBuilder(table)
.setProjectedColumnNames(projectColumns.asJava)
.addPredicate(lowPredicate)
.addPredicate(upperPredicate)
.build()
// hash分区的表,返回的key是无序的
while (scanner.hasMoreRows()) {
// 批量获取数据
val results: RowResultIterator = scanner.nextRows()
while (results.hasNext()) {
val result: RowResult = results.next()
val user_id = result.getInt("user_id")
var user_name = ""
if (!result.isNull("user_name")) {
user_name = result.getString("user_name")
}
var salary = 0.0
if (!result.isNull("salary")) {
salary = result.getDouble("salary")
}
val add_column: java.util.Date = result.getDate("add_column")
println(s"user_id: ${user_id}, user_name: ${user_name}, salary: ${salary}, add_column: ${add_column}")
}
}
}
// ======================================修改表的schema======================================
def addTableColumn(client: KuduClient, tableName: String) = {
val alterTableOptions: AlterTableOptions = new AlterTableOptions()
val defaultDate = new java.sql.Date(1648949364000L)
alterTableOptions.addColumn("add_column", org.apache.kudu.Type.DATE, defaultDate)
// alter表后,需要重新执行client.openTable(tableName),才能看到新的schema
client.alterTable(tableName, alterTableOptions)
println("Altered the table")
}
// ======================================删除表中的行======================================
def deleteRows(client: KuduClient, tableName: String) = {
val table: KuduTable = client.openTable(tableName)
val session: KuduSession = client.newSession()
session.setFlushMode(FlushMode.AUTO_FLUSH_BACKGROUND)
// 删除user_id = 1的这条数据
val delete1: Delete = table.newDelete()
val row1: PartialRow = delete1.getRow()
row1.addInt("user_id", 1)
session.apply(delete1)
// 也可以手动调用flush
// session.flush()
session.close()
// flush是在后台的线程完成,需要自己查看是否有插入异常
val errorNum: Int = session.countPendingErrors()
if (errorNum != 0) {
println(s"有${errorNum}个数据删除异常")
val rowErrorsAndOverflowStatus: RowErrorsAndOverflowStatus = session.getPendingErrors()
val errors: Seq[RowError] = rowErrorsAndOverflowStatus.getRowErrors().toSeq
errors.foreach(println)
// 如果error太多,就会产生overflow
if (rowErrorsAndOverflowStatus.isOverflowed) {
println("error buffer overflow,一些RowError已经被丢弃")
}
throw new Exception("删除数据失败")
} else {
println("删除数据成功")
}
}
// ======================================删除表======================================
def deleteExampleTable(client: KuduClient, tableName: String) = {
client.deleteTable(tableName)
println("Deleted table: " + tableName)
}
def main(args: Array[String]): Unit = {
val kuduMasters = "192.168.8.112:7051,192.168.8.113:7051"
val tableName = "kudu_java_test"
val client: KuduClient = new KuduClient.KuduClientBuilder(kuduMasters).build()
try {
createExampleTable(client, tableName)
insertRows(client, tableName)
addTableColumn(client, tableName)
deleteRows(client, tableName)
scanTable(client, tableName)
} catch {
case e: Exception => {
deleteExampleTable(client, tableName)
e.printStackTrace()
client.shutdown()
}
}
}
}
通过mvn clean package
将程序打成jar包,上传到服务器上运行,结果如下:
[root@bigdata001 ~]# scala -classpath /root/kudu_test-1.0-SNAPSHOT.jar Kudu_test
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Created table: kudu_java_test
插入数据成功
Altered the table
删除数据成功
user_id: 2, user_name: zhang_san, salary: 0.0, add_column: 2022-04-03
[root@bigdata001 ~]#