Greenplum Streaming Server(GPSS)自定义客户端开发


1. GPSS服务定义


syntax = "proto3";
import "google/protobuf/empty.proto";
import "google/protobuf/struct.proto";
import "google/protobuf/timestamp.proto";

package api;

option java_multiple_files = true;

// Connect service Request message
message ConnectRequest {
  string Host = 1;      // Host address of Greenplum master; must be accessible from gpss server system
  int32 Port = 2;       // Greenplum master port
  string Username = 3;  // User or role name that gpss uses to access Greenplum 
  string Password = 4;  // User password
  string DB = 5;        // Database name
  bool UseSSL = 6;      // Use SSL or not; ignored, use the gpss config file to config SSL

// Connect service Response message
message Session {
  string ID = 1;  // Id of client connection to gpss

// Operation mode
enum Operation {
  Insert = 0;  // Insert all data into table; behavior of duplicate key or data depends upon the constraints of the target table.
  Merge = 1;   // Insert and Update
  Update = 2;  // Update the value of "UpdateColumns" if "MatchColumns" match
  Read = 3;    // Not supported

// Required parameters of the Insert operation
message InsertOption {
  repeated string InsertColumns = 1;    // Names of the target table columns the insert operation should update; used in 'INSERT INTO', useful for partial loading
  bool TruncateTable = 2;               // Truncate table before loading?
  int64 ErrorLimitCount = 4;            // Error limit count; used by external table
  int32 ErrorLimitPercentage = 5;       // Error limit percentage; used by external table

// Required parameters of the Update operation
message UpdateOption {
  repeated string MatchColumns = 1;     // Names of the target table columns to compare when determining to update or not
  repeated string UpdateColumns = 2;    // Names of the target table columns to update if MatchColumns match
  string Condition = 3;                 // Optional additional match condition; SQL syntax and used after the 'WHERE' clause
  int64 ErrorLimitCount = 4;            // Error limit count; used by external table
  int32 ErrorLimitPercentage = 5;       // Error limit percentage; used by external table

// Required parameters of the Merge operation
// Merge operation creates a session-level temp table in StagingSchema
message MergeOption {
  repeated string InsertColumns = 1;
  repeated string MatchColumns = 2;
  repeated string UpdateColumns = 3;
  string Condition = 4;
  int64 ErrorLimitCount = 5;
  int32 ErrorLimitPercentage = 6;

// Open service Request message
message OpenRequest {
  Session Session = 1;      // Session ID returned by Connect
  string SchemaName = 2;    // Name of the Greenplum Database schema
  string TableName = 3;     // Name of the Greenplum Database table
  string PreSQL = 4;        // SQL to execute before gpss loads the data
  string PostSQL = 5;       // SQL to execute after gpss loads the data
  int32 Timeout = 6;        // Time to wait before aborting the operation (seconds); not supported
  string Encoding = 7;      // Encoding of text data; not supported
  string StagingSchema = 8; // Schema in which gpss creates external and temp tables; default is to create these tables in the same schema as the target table

  oneof Option {            // Identify the type of write operation to perform
    InsertOption InsertOption = 100;
    UpdateOption UpdateOption = 101;
    MergeOption MergeOption = 102;

message DBValue {
  oneof DBType {
    int32 Int32Value = 1;
    int64 Int64Value = 2;
    float Float32Value = 5;
    double Float64Value = 6;
    string StringValue = 7;  // Includes types whose values are presented as string but are not a real string type in Greenplum; for example: macaddr, time with time zone, box, etc.
    bytes BytesValue = 8;
    google.protobuf.Timestamp TimeStampValue = 10;  // Time without timezone
    google.protobuf.NullValue NullValue = 11;

message Row {
  repeated DBValue Columns = 1;

message RowData {    
  bytes Data = 1;     // A single protobuf-encoded Row

// Write service Request message
message WriteRequest {
  Session Session = 1;
  repeated RowData Rows = 2;     // The data to load into the target table

// Close service Response message
message TransferStats {          // Status of the data load operation
  int64 SuccessCount = 1;        // Number of rows successfully loaded
  int64 ErrorCount = 2;          // Number of error lines if Errorlimit is not reached
  repeated string ErrorRows = 3; // Number of rows with incorrectly-formatted data; not supported

// Close service Request message
message CloseRequest {
  Session session = 1;

// ListSchema service request message
message ListSchemaRequest {
  Session Session = 1;

message Schema {
  string Name = 1;
  string Owner = 2;

// ListSchema service response message
message Schemas {
  repeated Schema Schemas = 1;

// ListTable service request message
message ListTableRequest {
  Session Session = 1;
  string Schema = 2;    // 'public' is the default if no Schema is provided

// DescribeTable service request message
message DescribeTableRequest {
  Session Session = 1;
  string SchemaName = 2;
  string TableName = 3;

enum RelationType {
  Table = 0;
  View = 1;
  Index = 2;
  Sequence = 3;
  Special = 4;
  Other = 255;

message TableInfo {
  string Name = 1;
  RelationType Type = 2;

// ListTable service response message
message Tables {
  repeated TableInfo Tables = 1;

// DescribeTable service response message
message Columns {
  repeated ColumnInfo Columns = 1;

message ColumnInfo {
  string Name = 1;            // Column name
  string DatabaseType = 2;    // Greenplum data type

  bool HasLength = 3;         // Contains length information?
  int64 Length = 4;           // Length if HasLength is true

  bool HasPrecisionScale = 5; // Contains precision or scale information?
  int64 Precision = 6;
  int64 Scale = 7;

  bool HasNullable = 8;       // Contains Nullable constraint?
  bool Nullable = 9;

service Gpss {
  // Establish a connection to Greenplum Database; returns a Session object
  rpc Connect(ConnectRequest) returns (Session) {}

  // Disconnect, freeing all resources allocated for a session
  rpc Disconnect(Session) returns (google.protobuf.Empty) {}

  // Prepare and open a table for write
  rpc Open(OpenRequest) returns(google.protobuf.Empty) {}

  // Write data to table
  rpc Write(WriteRequest) returns(google.protobuf.Empty) {}

  // Close a write operation
  rpc Close(CloseRequest) returns(TransferStats) {}

  // List all available schemas in a database
  rpc ListSchema(ListSchemaRequest) returns (Schemas) {}

  // List all tables and views in a schema
  rpc ListTable(ListTableRequest) returns (Tables) {}

  // Decribe table metadata(column name and column type)
  rpc DescribeTable(DescribeTableRequest) returns (Columns) {}


2. 设置GPSS Java开发环境



<?xml version="1.0" encoding="UTF-8"?>
<project xmlns=""




3. 利用IDEA自动生成GPSS 客户端所需的类

特别注意:生成的GpssGrpc类中的SERVICE_NAME值必须为api.Gpss,否则会报错“Unkown service name XXX”


4. 编写GPSS客户端代码


  • ListSchema:查询数据库中所有定义的schema信息;
  • ListTable:查询指定schema中定义的表信息;
  • DescribeTable:查询表中所有列的定义;

4.1 连接到GPSS Server


package gpss.gpdb.util;

import gpss.gpdb.protobuf.api.GpssGrpc;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;

import java.util.concurrent.TimeUnit;

 * @Description:
 * @Author: zhh
 * @Date: 2019/8/7
public class GpssServerConnector {
    private String gpssHost = "";
    private int gpssPort = 50007;
    private ManagedChannel channel = null;
    private GpssGrpc.GpssBlockingStub bStub = null;

    public void establishGpssChannel(){
        channel = ManagedChannelBuilder.forAddress(gpssHost, gpssPort)
        bStub = GpssGrpc.newBlockingStub(channel);

    public void closeGpssChannel(int time) throws InterruptedException {
        channel.shutdown().awaitTermination(10, TimeUnit.SECONDS);
    public GpssGrpc.GpssBlockingStub getGpssStub(){
        return bStub;

4.2 连接Greenplum 数据库


package gpss.gpdb.util;

import gpss.gpdb.protobuf.api.ConnectRequest;
import gpss.gpdb.protobuf.api.GpssGrpc;
import gpss.gpdb.protobuf.api.Session;

import java.util.concurrent.TimeUnit;

 * @Description:
 * @Author: zhh
 * @Date: 2019/8/7
public class GpDBSession {
    private GpssGrpc.GpssBlockingStub bStub = null;
    private String gpMasterHost = "";
    private int gpMasterPort = 5432;
    private String gpUser = "gpadmin";
    private String gpPwd = 123456";
    private String dbName = "test";
    public GpDBSession(GpssGrpc.GpssBlockingStub bStub){
        this.bStub = bStub;

     * 建立session连接
     * @return
    public Session getGpssConnection() {
        ConnectRequest connRequest = ConnectRequest.newBuilder()
        return bStub.connect(connRequest); 

     * 关闭session连接
     * @param session
    public void closeGpssConnection(Session session){


4.3 查询Greenplum 数据库schema和table信息


  • ListSchema :查询指定表的所有schema,返回的是该schema名称和该schema的owner;
  • ListTable:查询指定schema下的所有表;
  • DescribeTable:查询表的字段定义;


GpssServerConnector gpss = new GpssServerConnector();
// 获取bStub
GpssGrpc.GpssBlockingStub gpssStub = gpss.getGpssStub();
// 获取session
GpDBSession gpDBSession = GpDBSession(gpssStub);
Session session = gpDBSession.getGpssConnection();
 * 获取数据库的schema信息
ListSchemaRequest listSchemaReq = ListSchemaRequest.newBuilder()
List<Schema> schemaList = gpssStub.listSchema(listSchemaReq).getSchemasList();
for (Schema schema : schemaList) {
    System.out.println(schema.getName() + " Owner->" + schema.getOwner());


// use the first schema name returned in the ListSchema code excerpt
String schemaName = schemaNameList.get(0);

// create a list table request builder
ListTableRequest ltReq = ListTableRequest.newBuilder()

// use the blocking stub to call the ListTable service
List<TableInfo> tblList = gpssStub.listTable(ltReq).getTablesList();

// extract the name of each table only and save in an array
ArrayList<String> tblNameList = new ArrayList<String>();
for(TableInfo ti : tblList) {
  if(ti.getTypeValue() == RelationType.Table_VALUE) {


// the name of the first table returned in the ListTable code excerpt
String tableName = tblNameList.get(0);

// create a describe table request builder
DescribeTableRequest dtReq = DescribeTableRequest.newBuilder()

// use the blocking stub to call the DescribeTable service
List<ColumnInfo> columnList = bStub.describeTable(dtReq).getColumnsList();

// print the name and type of each column
for(ColumnInfo ci : columnList) {
  String colname = ci.getName();
  String dbtype = ci.getDatabaseType();
  // display the column name and type to stdout
  System.out.println( "column " + colname + " type: " + dbtype );

4.4 利用gpss向greenplum 表中写入数据


GpssServerConnector gpss = new GpssServerConnector();
// 获取bStub
GpssGrpc.GpssBlockingStub gpssStub = gpss.getGpssStub();
// 获取session
Session session = gpss.getGpssConnection();
String schemaName = "public";
String tableName = "message_trace_2019_07";
 * 向表中写入数据
// 错误个数
int errLimit = 20;
// 错误百分比
int errPct = 20;
// 指定insert的表结构,需要和表字段名一一对应
InsertOption insertOpt = InsertOption.newBuilder()
//                .addInsertColumns("update_time")
// create an open request builder
OpenRequest openReq = OpenRequest.newBuilder()
//                .setPreSQL("")
//                .setPostSQL("")
//                .setStagingSchema("")
// use the blocking stub to call the open service: it returns nothing;
 * 向greenplum集群中写入数据
// 组装写入批次
List<RowData> rows = new ArrayList<RowData>();
for (int i = 0; i < 1000; i++) {
    //create a row builder
    Row.Builder builder = Row.newBuilder();
    // create builders for each column. in order, and set values
    DBValue.Builder colBuilder1 = DBValue.newBuilder();
    DBValue.Builder colBuilder2 = DBValue.newBuilder();
    colBuilder2.setStringValue("this is test trace infomation.");
    DBValue.Builder colBuilder3 = DBValue.newBuilder();
    colBuilder3.setStringValue("this is test comment infomation.");
    DBValue.Builder colBuilder4 = DBValue.newBuilder();
    // builder the row
    RowData.Builder rowBuilder = RowData.newBuilder().setData(;
    // add the row
// create a write request builder
WriteRequest writeReq = WriteRequest.newBuilder()
//use the blocking stub to call the write service it returns nothing

// create a close request builder
TransferStats tStats = null;
CloseRequest closeReq = CloseRequest.newBuilder()
// use the blocking stub to call the  Close service
tStats = gpssStub.close(closeReq);
// display the result to stdout
System.out.println("ClosrRequest tStats: " + tStats.toString());



gRPC Type

Greenplum Type


integer, serial


bigint, bigserial






text (any kind of data)




time, timestamp (without time zone)


