1. GPSS服务定义
将以下内容复制到gpss.proto文件中,如下:
syntax = "proto3";
import "google/protobuf/empty.proto";
import "google/protobuf/struct.proto";
import "google/protobuf/timestamp.proto";
package api;
option java_multiple_files = true;
// Connect service Request message
message ConnectRequest {
string Host = 1; // Host address of Greenplum master; must be accessible from gpss server system
int32 Port = 2; // Greenplum master port
string Username = 3; // User or role name that gpss uses to access Greenplum
string Password = 4; // User password
string DB = 5; // Database name
bool UseSSL = 6; // Use SSL or not; ignored, use the gpss config file to config SSL
}
// Connect service Response message
message Session {
string ID = 1; // Id of client connection to gpss
}
// Operation mode
enum Operation {
Insert = 0; // Insert all data into table; behavior of duplicate key or data depends upon the constraints of the target table.
Merge = 1; // Insert and Update
Update = 2; // Update the value of "UpdateColumns" if "MatchColumns" match
Read = 3; // Not supported
}
// Required parameters of the Insert operation
message InsertOption {
repeated string InsertColumns = 1; // Names of the target table columns the insert operation should update; used in 'INSERT INTO', useful for partial loading
bool TruncateTable = 2; // Truncate table before loading?
int64 ErrorLimitCount = 4; // Error limit count; used by external table
int32 ErrorLimitPercentage = 5; // Error limit percentage; used by external table
}
// Required parameters of the Update operation
message UpdateOption {
repeated string MatchColumns = 1; // Names of the target table columns to compare when determining to update or not
repeated string UpdateColumns = 2; // Names of the target table columns to update if MatchColumns match
string Condition = 3; // Optional additional match condition; SQL syntax and used after the 'WHERE' clause
int64 ErrorLimitCount = 4; // Error limit count; used by external table
int32 ErrorLimitPercentage = 5; // Error limit percentage; used by external table
}
// Required parameters of the Merge operation
// Merge operation creates a session-level temp table in StagingSchema
message MergeOption {
repeated string InsertColumns = 1;
repeated string MatchColumns = 2;
repeated string UpdateColumns = 3;
string Condition = 4;
int64 ErrorLimitCount = 5;
int32 ErrorLimitPercentage = 6;
}
// Open service Request message
message OpenRequest {
Session Session = 1; // Session ID returned by Connect
string SchemaName = 2; // Name of the Greenplum Database schema
string TableName = 3; // Name of the Greenplum Database table
string PreSQL = 4; // SQL to execute before gpss loads the data
string PostSQL = 5; // SQL to execute after gpss loads the data
int32 Timeout = 6; // Time to wait before aborting the operation (seconds); not supported
string Encoding = 7; // Encoding of text data; not supported
string StagingSchema = 8; // Schema in which gpss creates external and temp tables; default is to create these tables in the same schema as the target table
oneof Option { // Identify the type of write operation to perform
InsertOption InsertOption = 100;
UpdateOption UpdateOption = 101;
MergeOption MergeOption = 102;
}
}
message DBValue {
oneof DBType {
int32 Int32Value = 1;
int64 Int64Value = 2;
float Float32Value = 5;
double Float64Value = 6;
string StringValue = 7; // Includes types whose values are presented as string but are not a real string type in Greenplum; for example: macaddr, time with time zone, box, etc.
bytes BytesValue = 8;
google.protobuf.Timestamp TimeStampValue = 10; // Time without timezone
google.protobuf.NullValue NullValue = 11;
}
}
message Row {
repeated DBValue Columns = 1;
}
message RowData {
bytes Data = 1; // A single protobuf-encoded Row
}
// Write service Request message
message WriteRequest {
Session Session = 1;
repeated RowData Rows = 2; // The data to load into the target table
}
// Close service Response message
message TransferStats { // Status of the data load operation
int64 SuccessCount = 1; // Number of rows successfully loaded
int64 ErrorCount = 2; // Number of error lines if Errorlimit is not reached
repeated string ErrorRows = 3; // Number of rows with incorrectly-formatted data; not supported
}
// Close service Request message
message CloseRequest {
Session session = 1;
}
// ListSchema service request message
message ListSchemaRequest {
Session Session = 1;
}
message Schema {
string Name = 1;
string Owner = 2;
}
// ListSchema service response message
message Schemas {
repeated Schema Schemas = 1;
}
// ListTable service request message
message ListTableRequest {
Session Session = 1;
string Schema = 2; // 'public' is the default if no Schema is provided
}
// DescribeTable service request message
message DescribeTableRequest {
Session Session = 1;
string SchemaName = 2;
string TableName = 3;
}
enum RelationType {
Table = 0;
View = 1;
Index = 2;
Sequence = 3;
Special = 4;
Other = 255;
}
message TableInfo {
string Name = 1;
RelationType Type = 2;
}
// ListTable service response message
message Tables {
repeated TableInfo Tables = 1;
}
// DescribeTable service response message
message Columns {
repeated ColumnInfo Columns = 1;
}
message ColumnInfo {
string Name = 1; // Column name
string DatabaseType = 2; // Greenplum data type
bool HasLength = 3; // Contains length information?
int64 Length = 4; // Length if HasLength is true
bool HasPrecisionScale = 5; // Contains precision or scale information?
int64 Precision = 6;
int64 Scale = 7;
bool HasNullable = 8; // Contains Nullable constraint?
bool Nullable = 9;
}
service Gpss {
// Establish a connection to Greenplum Database; returns a Session object
rpc Connect(ConnectRequest) returns (Session) {}
// Disconnect, freeing all resources allocated for a session
rpc Disconnect(Session) returns (google.protobuf.Empty) {}
// Prepare and open a table for write
rpc Open(OpenRequest) returns(google.protobuf.Empty) {}
// Write data to table
rpc Write(WriteRequest) returns(google.protobuf.Empty) {}
// Close a write operation
rpc Close(CloseRequest) returns(TransferStats) {}
// List all available schemas in a database
rpc ListSchema(ListSchemaRequest) returns (Schemas) {}
// List all tables and views in a schema
rpc ListTable(ListTableRequest) returns (Tables) {}
// Decribe table metadata(column name and column type)
rpc DescribeTable(DescribeTableRequest) returns (Columns) {}
}
特别提醒:以上文件中的内容不要进行修改,否则无法与gpss进行通信(具体原因可以了解下Grpc原理)。
2. 设置GPSS Java开发环境
jdk和idea默认已安装完成。
以下介绍在idea中配置GPSS开发环境,新建一个maven项目。并在项目pom.xml中添加如下内容:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>gen.gpss.gpdb</groupId>
<artifactId>gen-gpss-gpdb</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<grpc.version>1.16.1</grpc.version>
<protobuf.version>3.6.1</protobuf.version>
<os.maven.plugin>1.6.2</os.maven.plugin>
<protobuf.maven.plugin>0.6.1</protobuf.maven.plugin>
</properties>
<dependencies>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>${protobuf.version}</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java-util</artifactId>
<version>${protobuf.version}</version>
<exclusions>
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-all</artifactId>
<version>${grpc.version}</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>23.6-jre</version>
</dependency>
</dependencies>
<build>
<extensions>
<extension>
<groupId>kr.motd.maven</groupId>
<artifactId>os-maven-plugin</artifactId>
<version>${os.maven.plugin}</version>
</extension>
</extensions>
<plugins>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>${protobuf.maven.plugin}</version>
<configuration>
<protocArtifact>
com.google.protobuf:protoc:3.6.1:exe:${os.detected.classifier}
</protocArtifact>
<pluginId>grpc-java</pluginId>
<pluginArtifact>
io.grpc:protoc-gen-grpc-java:1.11.0:exe:${os.detected.classifier}
</pluginArtifact>
</configuration>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>compile-custom</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
3. 利用IDEA自动生成GPSS 客户端所需的类
特别注意:生成的GpssGrpc类中的SERVICE_NAME值必须为api.Gpss,否则会报错“Unkown service name XXX”
详细生成方式参考:https://blog.csdn.net/lyjshen/article/details/52238234
4. 编写GPSS客户端代码
GPSS定义了3个API用于获取greenplum数据库有关scheam和table的元数据信息,如下:
4.1 连接到GPSS Server
示例代码如下:
package gpss.gpdb.util;
import gpss.gpdb.protobuf.api.GpssGrpc;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import java.util.concurrent.TimeUnit;
/**
* @Description:
* @Author: zhh
* @Date: 2019/8/7
*/
public class GpssServerConnector {
private String gpssHost = "192.168.11.11";
private int gpssPort = 50007;
private ManagedChannel channel = null;
private GpssGrpc.GpssBlockingStub bStub = null;
public void establishGpssChannel(){
channel = ManagedChannelBuilder.forAddress(gpssHost, gpssPort)
.usePlaintext()
.build();
bStub = GpssGrpc.newBlockingStub(channel);
}
public void closeGpssChannel(int time) throws InterruptedException {
channel.shutdown().awaitTermination(10, TimeUnit.SECONDS);
}
public GpssGrpc.GpssBlockingStub getGpssStub(){
return bStub;
}
}
4.2 连接Greenplum 数据库
示例代码如下:
package gpss.gpdb.util;
import gpss.gpdb.protobuf.api.ConnectRequest;
import gpss.gpdb.protobuf.api.GpssGrpc;
import gpss.gpdb.protobuf.api.Session;
import java.util.concurrent.TimeUnit;
/**
* @Description:
* @Author: zhh
* @Date: 2019/8/7
*/
public class GpDBSession {
private GpssGrpc.GpssBlockingStub bStub = null;
private String gpMasterHost = "192.168.11.11";
private int gpMasterPort = 5432;
private String gpUser = "gpadmin";
private String gpPwd = 123456";
private String dbName = "test";
public GpDBSession(GpssGrpc.GpssBlockingStub bStub){
this.bStub = bStub;
}
/**
* 建立session连接
* @return
*/
public Session getGpssConnection() {
ConnectRequest connRequest = ConnectRequest.newBuilder()
.setHost(gpMasterHost)
.setPort(gpMasterPort)
.setUsername(gpUser)
.setPassword(gpPwd)
.setDB(dbName)
.setUseSSL(false)
.build();
return bStub.connect(connRequest);
}
/**
* 关闭session连接
* @param session
*/
public void closeGpssConnection(Session session){
bStub.disconnect(session);
}
}
在GPSS客户端连接到Greenplum数据库之后,客户端可以调用服务请求来查询greenplum数据库schema和table的信息,并向Greenplum表进行写入数据。
4.3 查询Greenplum 数据库schema和table信息
常用API说明:
查询schema示例代码如下:
GpssServerConnector gpss = new GpssServerConnector();
gpss.establishGpssChannel();
// 获取bStub
GpssGrpc.GpssBlockingStub gpssStub = gpss.getGpssStub();
// 获取session
GpDBSession gpDBSession = GpDBSession(gpssStub);
Session session = gpDBSession.getGpssConnection();
/**
* 获取数据库的schema信息
*/
ListSchemaRequest listSchemaReq = ListSchemaRequest.newBuilder()
.setSession(session)
.build();
List<Schema> schemaList = gpssStub.listSchema(listSchemaReq).getSchemasList();
for (Schema schema : schemaList) {
System.out.println(schema.getName() + " Owner->" + schema.getOwner());
}
查询指定schema示例代码如下:
// use the first schema name returned in the ListSchema code excerpt
String schemaName = schemaNameList.get(0);
// create a list table request builder
ListTableRequest ltReq = ListTableRequest.newBuilder()
.setSession(session)
.setSchema(schemaName)
.build();
// use the blocking stub to call the ListTable service
List<TableInfo> tblList = gpssStub.listTable(ltReq).getTablesList();
// extract the name of each table only and save in an array
ArrayList<String> tblNameList = new ArrayList<String>();
for(TableInfo ti : tblList) {
if(ti.getTypeValue() == RelationType.Table_VALUE) {
tblNameList.add(ti.getName());
}
}
查询表的字段定义示例代码:
// the name of the first table returned in the ListTable code excerpt
String tableName = tblNameList.get(0);
// create a describe table request builder
DescribeTableRequest dtReq = DescribeTableRequest.newBuilder()
.setSession(mSession)
.setSchemaName(schemaName)
.setTableName(tableName)
.build();
// use the blocking stub to call the DescribeTable service
List<ColumnInfo> columnList = bStub.describeTable(dtReq).getColumnsList();
// print the name and type of each column
for(ColumnInfo ci : columnList) {
String colname = ci.getName();
String dbtype = ci.getDatabaseType();
// display the column name and type to stdout
System.out.println( "column " + colname + " type: " + dbtype );
}
4.4 利用gpss向greenplum 表中写入数据
向表写入数据示例代码如下:
GpssServerConnector gpss = new GpssServerConnector();
gpss.establishGpssChannel();
// 获取bStub
GpssGrpc.GpssBlockingStub gpssStub = gpss.getGpssStub();
// 获取session
Session session = gpss.getGpssConnection();
String schemaName = "public";
String tableName = "message_trace_2019_07";
/**
* 向表中写入数据
*/
// 错误个数
int errLimit = 20;
// 错误百分比
int errPct = 20;
// 指定insert的表结构,需要和表字段名一一对应
InsertOption insertOpt = InsertOption.newBuilder()
.setErrorLimitCount(errLimit)
.setErrorLimitPercentage(20)
.setTruncateTable(false)
.addInsertColumns("id")
.addInsertColumns("name")
.addInsertColumns("age")
.addInsertColumns("create_time")
// .addInsertColumns("update_time")
.build();
// create an open request builder
OpenRequest openReq = OpenRequest.newBuilder()
.setSession(session)
.setSchemaName(schemaName)
.setTableName(tableName)
// .setPreSQL("")
// .setPostSQL("")
// .setStagingSchema("")
.setEncoding("utf-8")
.setTimeout(5)
.setInsertOption(insertOpt)
.build();
// use the blocking stub to call the open service: it returns nothing
gpssStub.open(openReq);
/**
* 向greenplum集群中写入数据
*/
// 组装写入批次
List<RowData> rows = new ArrayList<RowData>();
for (int i = 0; i < 1000; i++) {
//create a row builder
Row.Builder builder = Row.newBuilder();
// create builders for each column. in order, and set values
DBValue.Builder colBuilder1 = DBValue.newBuilder();
colBuilder1.setStringValue(UUID.randomUUID().toString());
builder.addColumns(colBuilder1);
DBValue.Builder colBuilder2 = DBValue.newBuilder();
colBuilder2.setStringValue("this is test trace infomation.");
builder.addColumns(colBuilder2);
DBValue.Builder colBuilder3 = DBValue.newBuilder();
colBuilder3.setStringValue("this is test comment infomation.");
builder.addColumns(colBuilder3);
DBValue.Builder colBuilder4 = DBValue.newBuilder();
colBuilder4.setStringValue(DateHelper.getDateTime());
builder.addColumns(colBuilder4);
// builder the row
RowData.Builder rowBuilder = RowData.newBuilder().setData(builder.build().toByteString());
// add the row
rows.add(rowBuilder.build());
}
// create a write request builder
WriteRequest writeReq = WriteRequest.newBuilder()
.setSession(session)
.addAllRows(rows)
.build();
//use the blocking stub to call the write service it returns nothing
gpssStub.write(writeReq);
// create a close request builder
TransferStats tStats = null;
CloseRequest closeReq = CloseRequest.newBuilder()
.setSession(session)
.build();
// use the blocking stub to call the Close service
tStats = gpssStub.close(closeReq);
// display the result to stdout
System.out.println("ClosrRequest tStats: " + tStats.toString());
注意:插入数据指定数据类型必须要和数据库的类型保持一致,不然数据插入会失败。
附录目前支持的类型对照:
gRPC Type | Greenplum Type |
Int32Value | integer, serial |
Int64Value | bigint, bigserial |
Float32Value | real |
Float64Value | double |
StringValue | text (any kind of data) |
BytesValue | bytea |
TimeStampValue | time, timestamp (without time zone) |