马上过年了,抢了两天的火车票都没抢到。果断叫黄牛了,希望明天能帮我抢到….
apache的kudu是Cloudera开源的存储引擎,可以结合impala做实时的一些查询,小米那边就用了kudu。具体的一些东西,自己去google或者baidu吧。
由于这边后面会用到kudu的一些东西,就看了下它的文档。在网上kudu java api这一部分貌似没看到别人给demo,只有官方的github上面有小demo,但是只有建表、查询、删表,所以自己弄了下。
注意事项:
1.kudu的表必须有一个或者多个(联合主键)主键,且主键不能修改
2.它的删除修改都是根据主键判断的,所以做删除或者修改的时候必须要设置主键,不然会抛异常
3.kudu的删除api,只能设置主键,如果设置了主键也设置了非主键(不管非主键是否和数据匹配),数据不会被删除
4.kudu的DDL语句: 对字段来说只能添加列、删除列、重命名列名;对字表来说只能删除表、重命名表名。不能修改字段类型,不能对主键进行修改。
5.当修改了表结构这些,impala和kudu的映射元数据不会改变,包括把一个有默认值的字段改成允许为null或者删除一个列或者修改了列名字等,通过impala再来做表查询就会报错。但是新加一个列没影响,只不过impala查询的时候不会显示那一列。
下面贴下DML以及DDL的语句:
主要是因为需要异构,通过canal拉取binlog到kudu里面,所以写了下面的东西测试了一下。
首先抽象两个东西,一个kudu的row对象,以及kudu的column对象:
kudu的column对象:KuduColumn
public class KuduColumn {
/**
* crud
*/
private String columnName;
private Type columnType;
private Object columnValue;
private boolean isUpdate;
private boolean isPrimaryKey = false;
/**
* alter
*/
private AlterColumnEnum alterColumnEnum = AlterColumnEnum.NONE;
private Object defaultValue;
private boolean isNullAble;
private String newColumnName;
/**
* select
*/
private KuduPredicate.ComparisonOp comparisonOp;
private Object comparisonValue;
private boolean isSelect;
public String getColumnName() {
return columnName;
}
public KuduColumn setColumnName(String columnName) {
this.columnName = columnName;
return this;
}
public Type getColumnType() {
return columnType;
}
public KuduColumn setColumnType(Type columnType) {
this.columnType = columnType;
return this;
}
public Object getColumnValue() {
return columnValue;
}
public KuduColumn setColumnValue(Object columnValue) {
this.columnValue = columnValue;
return this;
}
public boolean isUpdate() {
return isUpdate;
}
public KuduColumn setUpdate(boolean update) {
isUpdate = update;
return this;
}
public Object getDefaultValue() {
return defaultValue;
}
public KuduColumn setDefaultValue(Object defaultValue) {
this.defaultValue = defaultValue;
return this;
}
public boolean isNullAble() {
return isNullAble;
}
public KuduColumn setNullAble(boolean nullAble) {
isNullAble = nullAble;
return this;
}
public AlterColumnEnum getAlterColumnEnum() {
return alterColumnEnum;
}
public KuduColumn setAlterColumnEnum(AlterColumnEnum alterColumnEnum) {
this.alterColumnEnum = alterColumnEnum;
return this;
}
public String getNewColumnName() {
return newColumnName;
}
public KuduColumn setNewColumnName(String newColumnName) {
this.newColumnName = newColumnName;
return this;
}
public boolean isPrimaryKey() {
return isPrimaryKey;
}
public KuduColumn setPrimaryKey(boolean primaryKey) {
isPrimaryKey = primaryKey;
return this;
}
public KuduPredicate.ComparisonOp getComparisonOp() {
return comparisonOp;
}
public KuduColumn setComparisonOp(KuduPredicate.ComparisonOp comparisonOp) {
this.comparisonOp = comparisonOp;
return this;
}
public Object getComparisonValue() {
return comparisonValue;
}
public KuduColumn setComparisonValue(Object comparisonValue) {
this.comparisonValue = comparisonValue;
return this;
}
public boolean isSelect() {
return isSelect;
}
public KuduColumn setSelect(boolean select) {
isSelect = select;
return this;
}
@Override
public String toString() {
return "KuduColumn{" +
"columnName='" + columnName + '\'' +
", columnType=" + columnType +
", columnValue=" + columnValue +
", isUpdate=" + isUpdate +
", isPrimaryKey=" + isPrimaryKey +
", alterColumnEnum=" + alterColumnEnum +
", defaultValue=" + defaultValue +
", isNullAble=" + isNullAble +
", newColumnName='" + newColumnName + '\'' +
", comparisonOp=" + comparisonOp +
", comparisonValue=" + comparisonValue +
", isSelect=" + isSelect +
'}';
}
public enum AlterColumnEnum {
ADD_COLUMN("ADD_COLUMN", "添加列"),
DROP_COLUMN("DROP_COLUMN", "删除列"),
RENAME_COLUMN("RENAME_COLUMN", "重命名列"),
NONE("NONE", "不操作");
private String type;
private String desc;
AlterColumnEnum(String type, String desc) {
this.type = type;
this.desc = desc;
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
public String getDesc() {
return desc;
}
public void setDesc(String desc) {
this.desc = desc;
}
}
}
kudu的row对象:KuduRow
public class KuduRow {
private String tableName;
private String newTableName;
private List<KuduColumn> rows;
private AlterTableEnum alterTableEnum = AlterTableEnum.NONE;
public String getTableName() {
return tableName;
}
public KuduRow setTableName(String tableName) {
this.tableName = tableName;
return this;
}
public String getNewTableName() {
return newTableName;
}
public KuduRow setNewTableName(String newTableName) {
this.newTableName = newTableName;
return this;
}
public List<KuduColumn> getRows() {
return rows;
}
public void setRows(List<KuduColumn> rows) {
this.rows = rows;
}
public AlterTableEnum getAlterTableEnum() {
return alterTableEnum;
}
public void setAlterTableEnum(AlterTableEnum alterTableEnum) {
this.alterTableEnum = alterTableEnum;
}
@Override
public String toString() {
return "KuduRow{" +
"tableName='" + tableName + '\'' +
", newTableName='" + newTableName + '\'' +
", rows=" + rows +
", alterTableEnum=" + alterTableEnum +
'}';
}
public enum AlterTableEnum {
DROP_TABLE("DROP_TABLE", "删除表"),
RENAME_TABLE("RENAME_TABLE", "重命名表"),
NONE("NONE", "不做操作");
private String type;
private String desc;
AlterTableEnum(String type, String desc) {
this.type = type;
this.desc = desc;
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
public String getDesc() {
return desc;
}
public void setDesc(String desc) {
this.desc = desc;
}
}
}
kudu的一个工具类: KuduAgentUtils
public class KuduAgentUtils {
private static final Logger logger = LoggerFactory.getLogger(KuduAgentUtils.class);
public static Operation WrapperKuduOperation(KuduColumn entity, Operation operate) {
Type rowType = entity.getColumnType();
String columnName = entity.getColumnName();
Object columnValue = entity.getColumnValue();
logger.info("kudu操作对象包装,列名:{},列值:{}", columnName, columnValue);
if (rowType.equals(Type.BINARY)) {
}
if (rowType.equals(Type.STRING)) {
if (isSetLogic(entity, operate)) {
operate.getRow().addString(columnName, String.valueOf(columnValue));
}
}
if (rowType.equals(Type.BOOL)) {
}
if (rowType.equals(Type.DOUBLE)) {
}
if (rowType.equals(Type.FLOAT)) {
}
if (rowType.equals(Type.INT8)) {
}
if (rowType.equals(Type.INT16)) {
}
if (rowType.equals(Type.INT32)) {
}
if (rowType.equals(Type.INT64)) {
if (isSetLogic(entity, operate)) {
operate.getRow().addLong(columnName, (Integer) columnValue);
}
}
if (rowType.equals(Type.UNIXTIME_MICROS)) {
}
return operate;
}
/**
* 返回查询的一行 map
*
* @param row
* @param entitys
* @return
*/
public static Map<String, Object> getRowsResult(RowResult row, List<KuduColumn> entitys) {
Map<String, Object> result = new HashMap<>();
for (KuduColumn entity : entitys) {
if (entity.getColumnType() != null) {
switch (entity.getColumnType()) {
case BOOL:
result.put(entity.getColumnName(), row.getBoolean(entity.getColumnName()));
break;
case BINARY:
result.put(entity.getColumnName(), row.getBinary(entity.getColumnName()));
break;
case STRING:
result.put(entity.getColumnName(), row.getString(entity.getColumnName()));
break;
case INT8:
result.put(entity.getColumnName(), row.getByte(entity.getColumnName()));
break;
case INT16:
result.put(entity.getColumnName(), row.getShort(entity.getColumnName()));
break;
case INT32:
result.put(entity.getColumnName(), row.getInt(entity.getColumnName()));
break;
case INT64:
result.put(entity.getColumnName(), row.getLong(entity.getColumnName()));
break;
case DOUBLE:
result.put(entity.getColumnName(), row.getDouble(entity.getColumnName()));
break;
case FLOAT:
result.put(entity.getColumnName(), row.getFloat(entity.getColumnName()));
break;
case UNIXTIME_MICROS:
result.put(entity.getColumnName(), row.getLong(entity.getColumnName()));
break;
}
}
}
return result;
}
/**
* 通用方法
*
* @param entity
* @param operate
* @param session
* @return
* @throws KuduException
*/
public static OperationResponse operate(KuduRow entity, Operation operate, KuduSession session) throws KuduException {
for (KuduColumn column : entity.getRows()) {
KuduAgentUtils.WrapperKuduOperation(column, operate);
}
OperationResponse apply = session.apply(operate);
return apply;
}
/**
* 返回column的string list
*
* @param entitys
* @return
*/
public static List<String> getColumnNames(List<KuduColumn> entitys) {
List<String> result = new ArrayList<>();
for (KuduColumn entity : entitys) {
if (entity.isSelect()) {
result.add(entity.getColumnName());
}
}
return result;
}
/**
* 设置条件
*
* @param kuduTable
* @param entitys
* @param kuduScannerBuilder
*/
public static void setKuduPredicates(KuduTable kuduTable, List<KuduColumn> entitys, KuduScanner.KuduScannerBuilder kuduScannerBuilder) {
for (KuduColumn entity : entitys) {
if (entity.getComparisonOp() != null) {
KuduPredicate kuduPredicate = null;
switch (entity.getColumnType()) {
case BOOL:
kuduPredicate = KuduPredicate.newComparisonPredicate(kuduTable.getSchema().getColumn(entity.getColumnName()), entity.getComparisonOp(), (Boolean) entity.getComparisonValue());
break;
case FLOAT:
kuduPredicate = KuduPredicate.newComparisonPredicate(kuduTable.getSchema().getColumn(entity.getColumnName()), entity.getComparisonOp(), (Float) entity.getComparisonValue());
break;
case DOUBLE:
kuduPredicate = KuduPredicate.newComparisonPredicate(kuduTable.getSchema().getColumn(entity.getColumnName()), entity.getComparisonOp(), (Double) entity.getComparisonValue());
break;
case BINARY:
kuduPredicate = KuduPredicate.newComparisonPredicate(kuduTable.getSchema().getColumn(entity.getColumnName()), entity.getComparisonOp(), (byte[]) entity.getComparisonValue());
break;
case STRING:
kuduPredicate = KuduPredicate.newComparisonPredicate(kuduTable.getSchema().getColumn(entity.getColumnName()), entity.getComparisonOp(), (String) entity.getComparisonValue());
break;
case UNIXTIME_MICROS:
kuduPredicate = KuduPredicate.newComparisonPredicate(kuduTable.getSchema().getColumn(entity.getColumnName()), entity.getComparisonOp(), (Long) entity.getComparisonValue());
break;
default:
kuduPredicate = KuduPredicate.newComparisonPredicate(kuduTable.getSchema().getColumn(entity.getColumnName()), entity.getComparisonOp(), (Double) entity.getComparisonValue());
break;
}
kuduScannerBuilder.addPredicate(kuduPredicate);
}
}
}
/**
* 如果是update事件并且是更新字段就设置,如果非update事件都设置
* 如果是delete事件是主键就设置,不是主键就不设置
*
* @param entity
* @param operate
* @return
*/
public static boolean isSetLogic(KuduColumn entity, Operation operate) {
return ((operate instanceof Update && entity.isUpdate()) || (operate instanceof Update && entity.isPrimaryKey())) || (operate instanceof Delete && entity.isPrimaryKey()) || (!(operate instanceof Update) && !(operate instanceof Delete));
}
public static List<OperationResponse> close(KuduSession session, KuduClient client) {
if (null != session) {
try {
session.flush();
} catch (KuduException e) {
e.printStackTrace();
}
}
List<OperationResponse> responses = null;
if (null != session) {
try {
responses = session.close();
} catch (KuduException e) {
e.printStackTrace();
}
}
if (null != client) {
try {
client.close();
} catch (KuduException e) {
e.printStackTrace();
}
}
return responses;
}
public static void close(KuduScanner build, KuduClient client) {
if (null != build) {
try {
build.close();
} catch (KuduException e) {
e.printStackTrace();
}
}
if (null != client) {
try {
client.close();
} catch (KuduException e) {
e.printStackTrace();
}
}
}
}
kudu的dml以及ddl的api类: KuduAgent
public class KuduAgent {
private static final Logger logger = LoggerFactory.getLogger(KuduAgent.class);
private static KuduClient client;
private final static String master = "hadoop01";
private final static SessionConfiguration.FlushMode FLASH_MODE_MULT = SessionConfiguration.FlushMode.MANUAL_FLUSH;
private final static SessionConfiguration.FlushMode FLASH_MODE_SINGLE = SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC;
private final static int BUFFER_SPACE = 1000;
static {
client = new KuduClient.KuduClientBuilder(master).build();
}
/**
* 查询 返回多条数据
*
* @param table
* @param client
* @param entitys
* @return
*/
public List<Map<String, Object>> select(String table, KuduClient client, List<KuduColumn> entitys) {
KuduTable kuduTable = null;
KuduScanner build = null;
KuduScanner.KuduScannerBuilder kuduScannerBuilder = null;
List<Map<String, Object>> resList = new ArrayList<>();
try {
kuduTable = client.openTable(table);
List<String> columnNames = KuduAgentUtils.getColumnNames(entitys);
kuduScannerBuilder = client.newScannerBuilder(kuduTable);
kuduScannerBuilder = kuduScannerBuilder.setProjectedColumnNames(columnNames);
KuduAgentUtils.setKuduPredicates(kuduTable, entitys, kuduScannerBuilder);
build = kuduScannerBuilder.build();
while (build.hasMoreRows()) {
RowResultIterator results = build.nextRows();
while (results.hasNext()) {
RowResult result = results.next();
Map<String, Object> rowsResult = KuduAgentUtils.getRowsResult(result, entitys);
resList.add(rowsResult);
}
}
} catch (KuduException e) {
e.printStackTrace();
logger.error("kudu执查询操作失败,失败信息:cause-->{},message-->{}", e.getCause(), e.getMessage());
throw new CustomerException(ExceptionConstant.KUDU_ERROR_CODE, ExceptionConstant.AGENT_ERROR_SYS, e.getMessage());
} finally {
KuduAgentUtils.close(build, client);
}
return resList;
}
/**
* 批量插入
*
* @param table
* @param client
* @param entitys
* @throws KuduException
*/
public void insert(String table, KuduClient client, List<KuduRow> entitys) {
KuduSession session = null;
try {
KuduTable kuduTable = client.openTable(table);
session = client.newSession();
session.setFlushMode(FLASH_MODE_MULT);
session.setMutationBufferSpace(BUFFER_SPACE);
for (KuduRow entity : entitys) {
Insert insert = kuduTable.newInsert();
KuduAgentUtils.operate(entity, insert, session);
}
} catch (Exception e) {
e.printStackTrace();
logger.error("kudu执行插入操作失败,失败信息:cause-->{},message-->{}", e.getCause(), e.getMessage());
throw new CustomerException(ExceptionConstant.KUDU_ERROR_CODE, ExceptionConstant.AGENT_ERROR_SYS, e.getMessage());
} finally {
List<OperationResponse> res = KuduAgentUtils.close(session, client);
}
}
/**
* 单条插入
*
* @param table
* @param client
* @param entity
* @throws KuduException
*/
public void insert(String table, KuduClient client, KuduRow entity) throws KuduException {
KuduSession session = null;
try {
KuduTable kuduTable = client.openTable(table);
session = client.newSession();
session.setFlushMode(FLASH_MODE_SINGLE);
Insert insert = kuduTable.newInsert();
OperationResponse operate = KuduAgentUtils.operate(entity, insert, session);
logger.info("insert 插入数据:{}", operate.getRowError());
} catch (Exception e) {
e.printStackTrace();
logger.error("kudu执行插入操作失败,失败信息:cause-->{},message-->{}", e.getCause(), e.getMessage());
throw new CustomerException(ExceptionConstant.KUDU_ERROR_CODE, ExceptionConstant.AGENT_ERROR_SYS, e.getMessage());
} finally {
KuduAgentUtils.close(session, client);
}
}
/**
* 批量更新
*
* @param table
* @param client
* @param entitys
* @throws KuduException
*/
public void update(String table, KuduClient client, List<KuduRow> entitys) {
KuduSession session = null;
try {
KuduTable kuduTable = client.openTable(table);
session = client.newSession();
session.setFlushMode(FLASH_MODE_MULT);
session.setMutationBufferSpace(BUFFER_SPACE);
for (KuduRow entity : entitys) {
Update update = kuduTable.newUpdate();
KuduAgentUtils.operate(entity, update, session);
}
} catch (Exception e) {
e.printStackTrace();
logger.error("kudu执行更新操作失败,失败信息:cause-->{},message-->{}", e.getCause(), e.getMessage());
throw new CustomerException(ExceptionConstant.KUDU_ERROR_CODE, ExceptionConstant.AGENT_ERROR_SYS, e.getMessage());
} finally {
List<OperationResponse> res = KuduAgentUtils.close(session, client);
}
}
/**
* 单条更新
*
* @param table
* @param client
* @param entity
* @throws KuduException
*/
public void update(String table, KuduClient client, KuduRow entity) throws KuduException {
KuduSession session = null;
try {
KuduTable kuduTable = client.openTable(table);
session = client.newSession();
session.setFlushMode(FLASH_MODE_SINGLE);
Update update = kuduTable.newUpdate();
OperationResponse operate = KuduAgentUtils.operate(entity, update, session);
} catch (Exception e) {
e.printStackTrace();
logger.error("kudu执行更新操作失败,失败信息:cause-->{},message-->{}", e.getCause(), e.getMessage());
throw new CustomerException(ExceptionConstant.KUDU_ERROR_CODE, ExceptionConstant.AGENT_ERROR_SYS, e.getMessage());
} finally {
KuduAgentUtils.close(session, client);
}
}
/**
* 批量删除 删除只能是主键
*
* @param table
* @param client
* @param entitys
* @throws KuduException
*/
public void delete(String table, KuduClient client, List<KuduRow> entitys) throws KuduException {
KuduSession session = null;
try {
KuduTable kuduTable = client.openTable(table);
session = client.newSession();
session.setFlushMode(FLASH_MODE_MULT);
session.setMutationBufferSpace(BUFFER_SPACE);
for (KuduRow entity : entitys) {
Delete delete = kuduTable.newDelete();
KuduAgentUtils.operate(entity, delete, session);
}
} catch (Exception e) {
e.printStackTrace();
logger.error("kudu执行删除操作失败,失败信息:cause-->{},message-->{}", e.getCause(), e.getMessage());
throw new CustomerException(ExceptionConstant.KUDU_ERROR_CODE, ExceptionConstant.AGENT_ERROR_SYS, e.getMessage());
} finally {
KuduAgentUtils.close(session, client);
}
}
/**
* 单条删除 删除只能是主键
*
* @param table
* @param client
* @param entity
* @throws KuduException
*/
public void delete(String table, KuduClient client, KuduRow entity) throws KuduException {
KuduSession session = null;
try {
KuduTable kuduTable = client.openTable(table);
session = client.newSession();
session.setFlushMode(FLASH_MODE_SINGLE);
Delete delete = kuduTable.newDelete();
OperationResponse operate = KuduAgentUtils.operate(entity, delete, session);
} catch (Exception e) {
e.printStackTrace();
logger.error("kudu执行删除操作失败,失败信息:cause-->{},message-->{}", e.getCause(), e.getMessage());
throw new CustomerException(ExceptionConstant.KUDU_ERROR_CODE, ExceptionConstant.AGENT_ERROR_SYS, e.getMessage());
} finally {
KuduAgentUtils.close(session, client);
}
}
/**
* 针对表的操作
* 修改表名 删除表 增加字段 删除字段
*
* @param client
* @param entitys
* @throws KuduException
*/
public void alter(KuduClient client, List<KuduRow> entitys) throws KuduException {
try {
for (KuduRow entity : entitys) {
if (entity.getAlterTableEnum().equals(KuduRow.AlterTableEnum.RENAME_TABLE)) {
AlterTableResponse alterTableResponse = renameTable(client, entity);
continue;
}
if (entity.getAlterTableEnum().equals(KuduRow.AlterTableEnum.DROP_TABLE)) {
DeleteTableResponse deleteTableResponse = dropTable(client, entity);
continue;
}
AlterTableResponse alterTableResponse = alterColumn(client, entity);
}
} catch (Exception e) {
e.printStackTrace();
logger.error("kudu执行表alter操作失败,失败信息:cause-->{},message-->{}", e.getCause(), e.getMessage());
throw new CustomerException(ExceptionConstant.KUDU_ERROR_CODE, ExceptionConstant.AGENT_ERROR_SYS, e.getMessage());
} finally {
KuduAgentUtils.close((KuduSession) null, client);
}
}
/**
* 针对表的操作
* 修改表名 删除表 增加字段 删除字段
*
* @param client
* @param entity
* @throws KuduException
*/
public void alter(KuduClient client, KuduRow entity) throws KuduException {
try {
if (entity.getAlterTableEnum().equals(KuduRow.AlterTableEnum.RENAME_TABLE)) {
AlterTableResponse alterTableResponse = renameTable(client, entity);
return;
}
if (entity.getAlterTableEnum().equals(KuduRow.AlterTableEnum.DROP_TABLE)) {
DeleteTableResponse deleteTableResponse = dropTable(client, entity);
return;
}
AlterTableResponse alterTableResponse = alterColumn(client, entity);
} catch (Exception e) {
e.printStackTrace();
logger.error("kudu执行表alter操作失败,失败信息:cause-->{},message-->{}", e.getCause(), e.getMessage());
throw new CustomerException(ExceptionConstant.KUDU_ERROR_CODE, ExceptionConstant.AGENT_ERROR_SYS, e.getMessage());
} finally {
KuduAgentUtils.close((KuduSession) null, client);
}
}
/**
* 修改表名
*
* @param client
* @param entity
* @return
* @throws KuduException
*/
private AlterTableResponse renameTable(KuduClient client, KuduRow entity) throws KuduException {
AlterTableOptions ato = new AlterTableOptions();
ato.renameTable(entity.getNewTableName());
return client.alterTable(entity.getTableName(), ato);
}
/**
* 删除表
*
* @param client
* @param entity
* @return
* @throws KuduException
*/
private DeleteTableResponse dropTable(KuduClient client, KuduRow entity) throws KuduException {
return client.deleteTable(entity.getTableName());
}
/**
* 列级别的alter
*
* @param client
* @param entity
* @return
* @throws KuduException
*/
private AlterTableResponse alterColumn(KuduClient client, KuduRow entity) throws KuduException {
AlterTableOptions ato = new AlterTableOptions();
for (KuduColumn column : entity.getRows()) {
if (column.getAlterColumnEnum().equals(KuduColumn.AlterColumnEnum.ADD_COLUMN) && !column.isNullAble()) {
ato.addColumn(column.getColumnName(), column.getColumnType(), column.getDefaultValue());
} else if (column.getAlterColumnEnum().equals(KuduColumn.AlterColumnEnum.ADD_COLUMN) && column.isNullAble()) {
ato.addNullableColumn(column.getColumnName(), column.getColumnType());
} else if (column.getAlterColumnEnum().equals(KuduColumn.AlterColumnEnum.DROP_COLUMN)) {
ato.dropColumn(column.getColumnName());
} else if (column.getAlterColumnEnum().equals(KuduColumn.AlterColumnEnum.RENAME_COLUMN)) {
ato.renameColumn(column.getColumnName(), column.getNewColumnName());
} else {
continue;
}
}
AlterTableResponse alterTableResponse = client.alterTable(entity.getTableName(), ato);
return alterTableResponse;
}
}
kudu的测试代码:
public static void main(String[] args) throws KuduException {
// insertSingleTEST();
// insertMultTEST();
// updateMultTEST();
// updateSingleTEST();
// deleteMultTEST();
// deleteSingleTEST();
// renameTEST();
// alterColumnTEST();
selectTest();
}
public static void selectTest() {
KuduAgent agent = new KuduAgent();
KuduColumn column01 = new KuduColumn();
column01.setColumnName("name").setColumnType(Type.STRING).setSelect(true).setComparisonOp(KuduPredicate.ComparisonOp.EQUAL).setComparisonValue("lijie001");
KuduColumn column02 = new KuduColumn();
column02.setColumnName("id").setSelect(true).setColumnType(Type.INT64);
KuduColumn column03 = new KuduColumn();
column03.setColumnName("sex").setSelect(true).setColumnType(Type.STRING);
List<KuduColumn> list = new ArrayList<>();
list.add(column01);
list.add(column02);
list.add(column03);
List<Map<String, Object>> select = agent.select("impala::impala_kudu.my_first_table", client, list);
System.out.println("-----------------" + select);
}
public static void alterColumnTEST() throws KuduException {
KuduAgent agent = new KuduAgent();
KuduRow myrows01 = new KuduRow();
myrows01.setTableName("impala::impala_kudu.my_first_table");
KuduColumn c01 = new KuduColumn();
c01.setColumnName("newsex").setNewColumnName("sex");
c01.setAlterColumnEnum(KuduColumn.AlterColumnEnum.RENAME_COLUMN);
KuduColumn c02 = new KuduColumn();
c02.setColumnName("myadd").setAlterColumnEnum(KuduColumn.AlterColumnEnum.DROP_COLUMN);
List<KuduColumn> rows01 = new ArrayList<>();
rows01.add(c01);
rows01.add(c02);
myrows01.setRows(rows01);
KuduRow myrows11 = new KuduRow();
myrows11.setTableName("impala::impala_kudu.my_first_table");
KuduColumn c11 = new KuduColumn();
c11.setColumnName("newname").setNewColumnName("name");
c11.setAlterColumnEnum(KuduColumn.AlterColumnEnum.RENAME_COLUMN);
KuduColumn c12 = new KuduColumn();
c12.setColumnName("nickName").setAlterColumnEnum(KuduColumn.AlterColumnEnum.ADD_COLUMN).setNullAble(false).setColumnType(Type.STRING).setDefaultValue("aaa");
List<KuduColumn> rows11 = new ArrayList<>();
rows11.add(c11);
rows11.add(c12);
myrows11.setRows(rows11);
List<KuduRow> list = new ArrayList<>();
list.add(myrows01);
list.add(myrows11);
agent.alter(client, list);
}
public static void renameTEST() throws KuduException {
KuduAgent agent = new KuduAgent();
KuduRow myrows01 = new KuduRow();
myrows01.setTableName("impala::impala_kudu.my_first_table");
myrows01.setNewTableName("impala::impala_kudu.my_first_table1");
myrows01.setAlterTableEnum(KuduRow.AlterTableEnum.RENAME_TABLE);
KuduRow myrows02 = new KuduRow();
myrows02.setTableName("impala::impala_kudu.my_first_table1");
myrows02.setNewTableName("impala::impala_kudu.my_first_table");
myrows02.setAlterTableEnum(KuduRow.AlterTableEnum.RENAME_TABLE);
List<KuduRow> list = new ArrayList<>();
list.add(myrows01);
list.add(myrows02);
agent.alter(client, list);
}
public static void deleteMultTEST() throws KuduException {
KuduAgent agent = new KuduAgent();
//第一行
KuduColumn c01 = new KuduColumn();
c01.setColumnName("id").setColumnValue(1000001).setColumnType(Type.INT64).setUpdate(false).setPrimaryKey(true);
KuduColumn c02 = new KuduColumn();
c02.setColumnName("name").setColumnValue("lijie123").setColumnType(Type.STRING).setUpdate(false);
List<KuduColumn> row01 = new ArrayList<>();
row01.add(c01);
// row01.add(c02);
KuduRow myrows01 = new KuduRow();
myrows01.setRows(row01);
//第一行
KuduColumn c11 = new KuduColumn();
c11.setColumnName("id").setColumnValue(1000002).setColumnType(Type.INT64).setUpdate(false).setPrimaryKey(true);
KuduColumn c12 = new KuduColumn();
c12.setColumnName("name").setColumnValue("lijie123").setColumnType(Type.STRING).setUpdate(false);
List<KuduColumn> row11 = new ArrayList<>();
row11.add(c11);
// row11.add(c12);
KuduRow myrows11 = new KuduRow();
myrows11.setRows(row11);
List<KuduRow> rows = new ArrayList<>();
rows.add(myrows01);
rows.add(myrows11);
agent.delete("impala::impala_kudu.my_first_table", client, rows);
}
public static void deleteSingleTEST() throws KuduException {
KuduAgent agent = new KuduAgent();
//第一行
KuduColumn c01 = new KuduColumn();
c01.setColumnName("id").setColumnValue(1000003).setColumnType(Type.INT64).setUpdate(false).setPrimaryKey(true);
KuduColumn c02 = new KuduColumn();
c02.setColumnName("name").setColumnValue("lijie789").setColumnType(Type.STRING).setUpdate(false);
List<KuduColumn> row01 = new ArrayList<>();
row01.add(c01);
// row01.add(c02);
KuduRow myrows01 = new KuduRow();
myrows01.setRows(row01);
agent.delete("impala::impala_kudu.my_first_table", client, myrows01);
}
public static void updateMultTEST() throws KuduException {
KuduAgent agent = new KuduAgent();
//第一行
KuduColumn c01 = new KuduColumn();
c01.setColumnName("id").setColumnValue(1000001).setColumnType(Type.INT64).setUpdate(false).setPrimaryKey(true);
KuduColumn c02 = new KuduColumn();
c02.setColumnName("name").setColumnValue("lijie123").setColumnType(Type.STRING).setUpdate(false);
List<KuduColumn> row01 = new ArrayList<>();
row01.add(c01);
row01.add(c02);
KuduRow myrows01 = new KuduRow();
myrows01.setRows(row01);
//第二行
KuduColumn c11 = new KuduColumn();
c11.setColumnName("id").setColumnValue(1000002).setColumnType(Type.INT64).setUpdate(false).setPrimaryKey(true);
KuduColumn c12 = new KuduColumn();
c12.setColumnName("name").setColumnValue("lijie456").setColumnType(Type.STRING).setUpdate(false);
List<KuduColumn> row11 = new ArrayList<>();
row11.add(c11);
row11.add(c12);
KuduRow myrows11 = new KuduRow();
myrows11.setRows(row11);
List<KuduRow> rows = new ArrayList<>();
rows.add(myrows01);
rows.add(myrows11);
agent.update("impala::impala_kudu.my_first_table", client, rows);
}
public static void updateSingleTEST() throws KuduException {
KuduAgent agent = new KuduAgent();
//第一行
KuduColumn c01 = new KuduColumn();
c01.setColumnName("id").setColumnValue(12).setColumnType(Type.INT64).setUpdate(false).setPrimaryKey(true);
KuduColumn c02 = new KuduColumn();
c02.setColumnName("name").setColumnValue("lijie789").setColumnType(Type.STRING).setUpdate(false);
KuduColumn c03 = new KuduColumn();
c03.setColumnName("sex").setColumnValue("lijie789").setColumnType(Type.STRING).setUpdate(false);
List<KuduColumn> row01 = new ArrayList<>();
row01.add(c01);
row01.add(c02);
row01.add(c03);
KuduRow myrows01 = new KuduRow();
myrows01.setRows(row01);
agent.update("impala::impala_kudu.my_first_table", client, myrows01);
}
public static void insertMultTEST() throws KuduException {
KuduAgent agent = new KuduAgent();
//第一行
KuduColumn c01 = new KuduColumn();
c01.setColumnName("id").setColumnValue(1000001).setColumnType(Type.INT64).setUpdate(false);
KuduColumn c02 = new KuduColumn();
c02.setColumnName("name").setColumnValue("lijie001").setColumnType(Type.STRING).setUpdate(false);
List<KuduColumn> row01 = new ArrayList<>();
row01.add(c01);
row01.add(c02);
KuduRow myrows01 = new KuduRow();
myrows01.setRows(row01);
//第二行
KuduColumn c11 = new KuduColumn();
c11.setColumnName("id").setColumnValue(1000002).setColumnType(Type.INT64).setUpdate(false);
KuduColumn c12 = new KuduColumn();
c12.setColumnName("name").setColumnValue("lijie002").setColumnType(Type.STRING).setUpdate(false);
List<KuduColumn> row02 = new ArrayList<>();
row02.add(c11);
row02.add(c12);
KuduRow myrows02 = new KuduRow();
myrows02.setRows(row02);
List<KuduRow> rows = new ArrayList<>();
rows.add(myrows01);
rows.add(myrows02);
agent.insert("impala::impala_kudu.my_first_table", client, rows);
}
public static void insertSingleTEST() throws KuduException {
KuduAgent agent = new KuduAgent();
//第一行
KuduColumn c01 = new KuduColumn();
c01.setColumnName("id").setColumnValue(1000003).setColumnType(Type.INT64).setUpdate(false);
KuduColumn c02 = new KuduColumn();
c02.setColumnName("name").setColumnValue("lijie003").setColumnType(Type.STRING).setUpdate(false);
List<KuduColumn> row01 = new ArrayList<>();
row01.add(c01);
row01.add(c02);
KuduRow myrows01 = new KuduRow();
myrows01.setRows(row01);
agent.insert("impala::impala_kudu.my_first_table", client, myrows01);
}