Greenplum Streaming Server(GPSS)自定义客户端开发

汪栋
2023-12-01

1. GPSS服务定义

将以下内容复制到gpss.proto文件中,如下:

syntax = "proto3";
import "google/protobuf/empty.proto";
import "google/protobuf/struct.proto";
import "google/protobuf/timestamp.proto";

package api;

option java_multiple_files = true;

// Connect service Request message
message ConnectRequest {
  string Host = 1;      // Host address of Greenplum master; must be accessible from gpss server system
  int32 Port = 2;       // Greenplum master port
  string Username = 3;  // User or role name that gpss uses to access Greenplum 
  string Password = 4;  // User password
  string DB = 5;        // Database name
  bool UseSSL = 6;      // Use SSL or not; ignored, use the gpss config file to config SSL
}

// Connect service Response message
message Session {
  string ID = 1;  // Id of client connection to gpss
}

// Operation mode
enum Operation {
  Insert = 0;  // Insert all data into table; behavior of duplicate key or data depends upon the constraints of the target table.
  Merge = 1;   // Insert and Update
  Update = 2;  // Update the value of "UpdateColumns" if "MatchColumns" match
  Read = 3;    // Not supported
}

// Required parameters of the Insert operation
message InsertOption {
  repeated string InsertColumns = 1;    // Names of the target table columns the insert operation should update; used in 'INSERT INTO', useful for partial loading
  bool TruncateTable = 2;               // Truncate table before loading?
  int64 ErrorLimitCount = 4;            // Error limit count; used by external table
  int32 ErrorLimitPercentage = 5;       // Error limit percentage; used by external table
}

// Required parameters of the Update operation
message UpdateOption {
  repeated string MatchColumns = 1;     // Names of the target table columns to compare when determining to update or not
  repeated string UpdateColumns = 2;    // Names of the target table columns to update if MatchColumns match
  string Condition = 3;                 // Optional additional match condition; SQL syntax and used after the 'WHERE' clause
  int64 ErrorLimitCount = 4;            // Error limit count; used by external table
  int32 ErrorLimitPercentage = 5;       // Error limit percentage; used by external table
}

// Required parameters of the Merge operation
// Merge operation creates a session-level temp table in StagingSchema
message MergeOption {
  repeated string InsertColumns = 1;
  repeated string MatchColumns = 2;
  repeated string UpdateColumns = 3;
  string Condition = 4;
  int64 ErrorLimitCount = 5;
  int32 ErrorLimitPercentage = 6;
}

// Open service Request message
message OpenRequest {
  Session Session = 1;      // Session ID returned by Connect
  string SchemaName = 2;    // Name of the Greenplum Database schema
  string TableName = 3;     // Name of the Greenplum Database table
  string PreSQL = 4;        // SQL to execute before gpss loads the data
  string PostSQL = 5;       // SQL to execute after gpss loads the data
  int32 Timeout = 6;        // Time to wait before aborting the operation (seconds); not supported
  string Encoding = 7;      // Encoding of text data; not supported
  string StagingSchema = 8; // Schema in which gpss creates external and temp tables; default is to create these tables in the same schema as the target table

  oneof Option {            // Identify the type of write operation to perform
    InsertOption InsertOption = 100;
    UpdateOption UpdateOption = 101;
    MergeOption MergeOption = 102;
  }
}

message DBValue {
  oneof DBType {
    int32 Int32Value = 1;
    int64 Int64Value = 2;
    float Float32Value = 5;
    double Float64Value = 6;
    string StringValue = 7;  // Includes types whose values are presented as string but are not a real string type in Greenplum; for example: macaddr, time with time zone, box, etc.
    bytes BytesValue = 8;
    google.protobuf.Timestamp TimeStampValue = 10;  // Time without timezone
    google.protobuf.NullValue NullValue = 11;
  }
}

message Row {
  repeated DBValue Columns = 1;
}

message RowData {    
  bytes Data = 1;     // A single protobuf-encoded Row
}

// Write service Request message
message WriteRequest {
  Session Session = 1;
  repeated RowData Rows = 2;     // The data to load into the target table
}

// Close service Response message
message TransferStats {          // Status of the data load operation
  int64 SuccessCount = 1;        // Number of rows successfully loaded
  int64 ErrorCount = 2;          // Number of error lines if Errorlimit is not reached
  repeated string ErrorRows = 3; // Number of rows with incorrectly-formatted data; not supported
}

// Close service Request message
message CloseRequest {
  Session session = 1;
}

// ListSchema service request message
message ListSchemaRequest {
  Session Session = 1;
}

message Schema {
  string Name = 1;
  string Owner = 2;
}

// ListSchema service response message
message Schemas {
  repeated Schema Schemas = 1;
}

// ListTable service request message
message ListTableRequest {
  Session Session = 1;
  string Schema = 2;    // 'public' is the default if no Schema is provided
}

// DescribeTable service request message
message DescribeTableRequest {
  Session Session = 1;
  string SchemaName = 2;
  string TableName = 3;
}

enum RelationType {
  Table = 0;
  View = 1;
  Index = 2;
  Sequence = 3;
  Special = 4;
  Other = 255;
}

message TableInfo {
  string Name = 1;
  RelationType Type = 2;
}

// ListTable service response message
message Tables {
  repeated TableInfo Tables = 1;
}

// DescribeTable service response message
message Columns {
  repeated ColumnInfo Columns = 1;
}

message ColumnInfo {
  string Name = 1;            // Column name
  string DatabaseType = 2;    // Greenplum data type

  bool HasLength = 3;         // Contains length information?
  int64 Length = 4;           // Length if HasLength is true

  bool HasPrecisionScale = 5; // Contains precision or scale information?
  int64 Precision = 6;
  int64 Scale = 7;

  bool HasNullable = 8;       // Contains Nullable constraint?
  bool Nullable = 9;
}

service Gpss {
  // Establish a connection to Greenplum Database; returns a Session object
  rpc Connect(ConnectRequest) returns (Session) {}

  // Disconnect, freeing all resources allocated for a session
  rpc Disconnect(Session) returns (google.protobuf.Empty) {}

  // Prepare and open a table for write
  rpc Open(OpenRequest) returns(google.protobuf.Empty) {}

  // Write data to table
  rpc Write(WriteRequest) returns(google.protobuf.Empty) {}

  // Close a write operation
  rpc Close(CloseRequest) returns(TransferStats) {}

  // List all available schemas in a database
  rpc ListSchema(ListSchemaRequest) returns (Schemas) {}

  // List all tables and views in a schema
  rpc ListTable(ListTableRequest) returns (Tables) {}

  // Decribe table metadata(column name and column type)
  rpc DescribeTable(DescribeTableRequest) returns (Columns) {}
}

特别提醒:以上文件中的内容不要进行修改,否则无法与gpss进行通信(具体原因可以了解下Grpc原理)。

2. 设置GPSS Java开发环境

jdk和idea默认已安装完成。

以下介绍在idea中配置GPSS开发环境,新建一个maven项目。并在项目pom.xml中添加如下内容:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>gen.gpss.gpdb</groupId>
    <artifactId>gen-gpss-gpdb</artifactId>
    <version>1.0-SNAPSHOT</version>
    <properties>
        <grpc.version>1.16.1</grpc.version>
        <protobuf.version>3.6.1</protobuf.version>
        <os.maven.plugin>1.6.2</os.maven.plugin>
        <protobuf.maven.plugin>0.6.1</protobuf.maven.plugin>
    </properties>

    <dependencies>
        <dependency>
            <groupId>com.google.protobuf</groupId>
            <artifactId>protobuf-java</artifactId>
            <version>${protobuf.version}</version>
        </dependency>
        <dependency>
            <groupId>com.google.protobuf</groupId>
            <artifactId>protobuf-java-util</artifactId>
            <version>${protobuf.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>com.google.guava</groupId>
                    <artifactId>guava</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>io.grpc</groupId>
            <artifactId>grpc-all</artifactId>
            <version>${grpc.version}</version>
        </dependency>
        <dependency>
            <groupId>com.google.guava</groupId>
            <artifactId>guava</artifactId>
            <version>23.6-jre</version>
        </dependency>
    </dependencies>

    <build>
        <extensions>
            <extension>
                <groupId>kr.motd.maven</groupId>
                <artifactId>os-maven-plugin</artifactId>
                <version>${os.maven.plugin}</version>
            </extension>
        </extensions>
        <plugins>
            <plugin>
                <groupId>org.xolstice.maven.plugins</groupId>
                <artifactId>protobuf-maven-plugin</artifactId>
                <version>${protobuf.maven.plugin}</version>
                <configuration>
                    <protocArtifact>
                        com.google.protobuf:protoc:3.6.1:exe:${os.detected.classifier}
                    </protocArtifact>
                    <pluginId>grpc-java</pluginId>
                    <pluginArtifact>
                        io.grpc:protoc-gen-grpc-java:1.11.0:exe:${os.detected.classifier}
                    </pluginArtifact>
                </configuration>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>compile-custom</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

3. 利用IDEA自动生成GPSS 客户端所需的类

特别注意:生成的GpssGrpc类中的SERVICE_NAME值必须为api.Gpss,否则会报错“Unkown service name XXX”

详细生成方式参考:https://blog.csdn.net/lyjshen/article/details/52238234

4. 编写GPSS客户端代码

GPSS定义了3个API用于获取greenplum数据库有关scheam和table的元数据信息,如下:

  • ListSchema:查询数据库中所有定义的schema信息;
  • ListTable:查询指定schema中定义的表信息;
  • DescribeTable:查询表中所有列的定义;

4.1 连接到GPSS Server

示例代码如下:

package gpss.gpdb.util;

import gpss.gpdb.protobuf.api.GpssGrpc;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;

import java.util.concurrent.TimeUnit;

/**
 * @Description:
 * @Author: zhh
 * @Date: 2019/8/7
 */
public class GpssServerConnector {
    private String gpssHost = "192.168.11.11";
    private int gpssPort = 50007;
    private ManagedChannel channel = null;
    private GpssGrpc.GpssBlockingStub bStub = null;

    public void establishGpssChannel(){
        channel = ManagedChannelBuilder.forAddress(gpssHost, gpssPort)
                .usePlaintext()
                .build();
        bStub = GpssGrpc.newBlockingStub(channel);
    }

    public void closeGpssChannel(int time) throws InterruptedException {
        channel.shutdown().awaitTermination(10, TimeUnit.SECONDS);
    }
    
    public GpssGrpc.GpssBlockingStub getGpssStub(){
        return bStub;
    }
    
}

4.2 连接Greenplum 数据库

示例代码如下:

package gpss.gpdb.util;

import gpss.gpdb.protobuf.api.ConnectRequest;
import gpss.gpdb.protobuf.api.GpssGrpc;
import gpss.gpdb.protobuf.api.Session;

import java.util.concurrent.TimeUnit;

/**
 * @Description:
 * @Author: zhh
 * @Date: 2019/8/7
 */
public class GpDBSession {
    private GpssGrpc.GpssBlockingStub bStub = null;
    private String gpMasterHost = "192.168.11.11";
    private int gpMasterPort = 5432;
    private String gpUser = "gpadmin";
    private String gpPwd = 123456";
    private String dbName = "test";
    
    public GpDBSession(GpssGrpc.GpssBlockingStub bStub){
        this.bStub = bStub;
    }

    /**
     * 建立session连接
     * @return
     */
    public Session getGpssConnection() {
        ConnectRequest connRequest = ConnectRequest.newBuilder()
                .setHost(gpMasterHost)
                .setPort(gpMasterPort)
                .setUsername(gpUser)
                .setPassword(gpPwd)
                .setDB(dbName)
                .setUseSSL(false)
                .build();
        return bStub.connect(connRequest); 
    }

    /**
     * 关闭session连接
     * @param session
     */
    public void closeGpssConnection(Session session){
        bStub.disconnect(session);
    }
}

在GPSS客户端连接到Greenplum数据库之后,客户端可以调用服务请求来查询greenplum数据库schema和table的信息,并向Greenplum表进行写入数据。

4.3 查询Greenplum 数据库schema和table信息

常用API说明:

  • ListSchema :查询指定表的所有schema,返回的是该schema名称和该schema的owner;
  • ListTable:查询指定schema下的所有表;
  • DescribeTable:查询表的字段定义;

查询schema示例代码如下:

GpssServerConnector gpss = new GpssServerConnector();
gpss.establishGpssChannel();
// 获取bStub
GpssGrpc.GpssBlockingStub gpssStub = gpss.getGpssStub();
// 获取session
GpDBSession gpDBSession = GpDBSession(gpssStub);
Session session = gpDBSession.getGpssConnection();
/**
 * 获取数据库的schema信息
 */
ListSchemaRequest listSchemaReq = ListSchemaRequest.newBuilder()
        .setSession(session)
        .build();
List<Schema> schemaList = gpssStub.listSchema(listSchemaReq).getSchemasList();
for (Schema schema : schemaList) {
    System.out.println(schema.getName() + " Owner->" + schema.getOwner());
}

查询指定schema示例代码如下:

// use the first schema name returned in the ListSchema code excerpt
String schemaName = schemaNameList.get(0);

// create a list table request builder
ListTableRequest ltReq = ListTableRequest.newBuilder()
    .setSession(session)
    .setSchema(schemaName)
  .build();

// use the blocking stub to call the ListTable service
List<TableInfo> tblList = gpssStub.listTable(ltReq).getTablesList();

// extract the name of each table only and save in an array
ArrayList<String> tblNameList = new ArrayList<String>();
for(TableInfo ti : tblList) {
  if(ti.getTypeValue() == RelationType.Table_VALUE) {
    tblNameList.add(ti.getName());
  }
}

查询表的字段定义示例代码:

// the name of the first table returned in the ListTable code excerpt
String tableName = tblNameList.get(0);

// create a describe table request builder
DescribeTableRequest dtReq = DescribeTableRequest.newBuilder()
    .setSession(mSession)
    .setSchemaName(schemaName)
    .setTableName(tableName)
  .build();

// use the blocking stub to call the DescribeTable service
List<ColumnInfo> columnList = bStub.describeTable(dtReq).getColumnsList();

// print the name and type of each column
for(ColumnInfo ci : columnList) {
  String colname = ci.getName();
  String dbtype = ci.getDatabaseType();
  // display the column name and type to stdout
  System.out.println( "column " + colname + " type: " + dbtype );
}

4.4 利用gpss向greenplum 表中写入数据

向表写入数据示例代码如下:

GpssServerConnector gpss = new GpssServerConnector();
gpss.establishGpssChannel();
// 获取bStub
GpssGrpc.GpssBlockingStub gpssStub = gpss.getGpssStub();
// 获取session
Session session = gpss.getGpssConnection();
String schemaName = "public";
String tableName = "message_trace_2019_07";
/**
 * 向表中写入数据
 */
// 错误个数
int errLimit = 20;
// 错误百分比
int errPct = 20;
// 指定insert的表结构,需要和表字段名一一对应
InsertOption insertOpt = InsertOption.newBuilder()
        .setErrorLimitCount(errLimit)
        .setErrorLimitPercentage(20)
        .setTruncateTable(false)
        .addInsertColumns("id")
        .addInsertColumns("name")
        .addInsertColumns("age")
        .addInsertColumns("create_time")
//                .addInsertColumns("update_time")
        .build();
// create an open request builder
OpenRequest openReq = OpenRequest.newBuilder()
        .setSession(session)
        .setSchemaName(schemaName)
        .setTableName(tableName)
//                .setPreSQL("")
//                .setPostSQL("")
//                .setStagingSchema("")
        .setEncoding("utf-8")
        .setTimeout(5)
        .setInsertOption(insertOpt)
        .build();
// use the blocking stub to call the open service: it returns nothing
gpssStub.open(openReq);
/**
 * 向greenplum集群中写入数据
 */
// 组装写入批次
List<RowData> rows = new ArrayList<RowData>();
for (int i = 0; i < 1000; i++) {
    //create a row builder
    Row.Builder builder = Row.newBuilder();
    // create builders for each column. in order, and set values
    DBValue.Builder colBuilder1 = DBValue.newBuilder();
    colBuilder1.setStringValue(UUID.randomUUID().toString());
    builder.addColumns(colBuilder1);
    DBValue.Builder colBuilder2 = DBValue.newBuilder();
    colBuilder2.setStringValue("this is test trace infomation.");
    builder.addColumns(colBuilder2);
    DBValue.Builder colBuilder3 = DBValue.newBuilder();
    colBuilder3.setStringValue("this is test comment infomation.");
    builder.addColumns(colBuilder3);
    DBValue.Builder colBuilder4 = DBValue.newBuilder();
    colBuilder4.setStringValue(DateHelper.getDateTime());
    builder.addColumns(colBuilder4);
    // builder the row
    RowData.Builder rowBuilder = RowData.newBuilder().setData(builder.build().toByteString());
    // add the row
    rows.add(rowBuilder.build());
}
// create a write request builder
WriteRequest writeReq = WriteRequest.newBuilder()
        .setSession(session)
        .addAllRows(rows)
        .build();
//use the blocking stub to call the write service it returns nothing
gpssStub.write(writeReq);

// create a close request builder
TransferStats tStats = null;
CloseRequest closeReq = CloseRequest.newBuilder()
        .setSession(session)
        .build();
// use the blocking stub to call the  Close service
tStats = gpssStub.close(closeReq);
// display the result to stdout
System.out.println("ClosrRequest tStats: " + tStats.toString());

注意:插入数据指定数据类型必须要和数据库的类型保持一致,不然数据插入会失败。

附录目前支持的类型对照:

gRPC Type

Greenplum Type

Int32Value

integer, serial

Int64Value

bigint, bigserial

Float32Value

real

Float64Value

double

StringValue

text (any kind of data)

BytesValue

bytea

TimeStampValue

time, timestamp (without time zone)

 

 

 类似资料: