当前位置: 首页 > 工具软件 > Npgsql > 使用案例 >

Npgsql实现Postgresql数据库表的批量导入及导出(Bulk Copy)

贾实
2023-12-01

我们知道在ADO.NET中有“System.Data.SqlClient.SqlBulkCopy类”专门针对SQL Server的批导入表数据操作(Bulk Copy),而针对Postgresql的Npgsql并无相应的类,Npgsql实现Bulk Copy是通过Copy指令实现的,详细可以参考Npgsql官网对Bulk Copy的说明https://www.npgsql.org/doc/copy.html  (如果想更多了解关于Copy指令的语法可以阅读https://www.postgresql.org/docs/current/sql-copy.html)。

以下是C#中实现批量导入表格数据的通用方法(导出方法请阅读官网相关说明,此处略过):

public void BulkCopy(string tableName, DataTable dt)
        {    
            List<string> lsColNames = new List<string>();
            for (int i = 0; i < dt.Columns.Count; i++)
            {
                lsColNames.Add($"\"{dt.Columns[i].ColumnName}\"");
            }
            string copyString = $"COPY \"{tableName}\" ( {string.Join(",", lsColNames) } ) FROM STDIN (FORMAT BINARY)";
            using (NpgsqlConnection conn = (NpgsqlConnection)_DB.CreateConnection())
            {
                if (conn.State == ConnectionState.Closed)
                    conn.Open();
                var writer = conn.BeginBinaryImport(copyString);
                foreach (DataRow row in dt.Rows)
                {
                    writer.StartRow();
                    IEnumerable<KeyValuePair<string, JToken>> JRowData = DataConvert.ToJObject(row);
                    foreach (var kvp in JRowData)
                    {
                        NpgsqlParameter colParam = GetParameter(tableName, kvp);
                        writer.Write(colParam.Value, colParam.NpgsqlDbType);
                    }                    
                }
                writer.Complete();
                conn.Close();
            }
        }

以下是上例中涉及的有关获取各字段作参数的类型与值的方法:

        public NpgsqlParameter GetParameter(string tableName, KeyValuePair<string,JToken> columnValuePair)
        {
            string columnDBypeName = _DBTableDefProvider.GetTableColumn(tableName, columnValuePair.Key).data_type.ToLower();           

            NpgsqlParameter p = new NpgsqlParameter("@" + columnValuePair.Key,
                    columnDBypeName == "timestamp" || columnDBypeName == "timestamp without time zone" ? NpgsqlDbType.Timestamp
                        : columnDBypeName == "timestamp with time zone" ? NpgsqlDbType.TimestampTz
                        : columnDBypeName == "date" ? NpgsqlDbType.Date
                        : columnDBypeName=="time" || columnDBypeName == "time without time zone" ? NpgsqlDbType.Time
                        : columnDBypeName == "time with time zone" ? NpgsqlDbType.TimeTz
                        : columnDBypeName == "smallint" ? NpgsqlDbType.Smallint
                        : columnDBypeName == "integer" || columnDBypeName == "serial" ? NpgsqlDbType.Integer
                        : columnDBypeName == "bigint" || columnDBypeName == "bigserial" ? NpgsqlDbType.Bigint
                        : columnDBypeName == "double precision" ? NpgsqlDbType.Double
                        : columnDBypeName == "real" ? NpgsqlDbType.Real
                        : columnDBypeName == "boolean" ? NpgsqlDbType.Boolean
                        : columnDBypeName == "uuid" ? NpgsqlDbType.Uuid
                        : columnDBypeName == "bit" ? NpgsqlDbType.Bit                    //eg:0|1
                        : columnDBypeName == "json" ? NpgsqlDbType.Json
                        : columnDBypeName == "money" ? NpgsqlDbType.Money
                        : columnDBypeName == "numeric" ? NpgsqlDbType.Numeric
                        : columnDBypeName == "bit varying" ? NpgsqlDbType.Varbit        //eg:01010101
                        : columnDBypeName == "text" ? NpgsqlDbType.Text
                        : columnDBypeName == "character varying" ? NpgsqlDbType.Varchar    //NpgsqlDbType.Varchar可以直接用NpgsqlDbType.Text
                        : columnDBypeName == "\"char\"" || columnDBypeName == "character" ? NpgsqlDbType.Char   //NpgsqlDbType.Char可以直接用NpgsqlDbType.Text
                        //: columnDBypeName == "array" ? NpgsqlDbType.Array|NpgsqlDbType.Json //ARRAY需要匹配各个基础类型的Array,且不能直接以string传值,不常用不做处理
                        : columnDBypeName == "interval" ? NpgsqlDbType.Interval
                        //: NpgsqlDbType.Text); 
                        : NpgsqlDbType.Unknown);
            p.Value = columnValuePair.Value.Type == JTokenType.Null ? DBNull.Value
                    : columnDBypeName.StartsWith("timestamp") || columnDBypeName == "date" || columnDBypeName.StartsWith("time") ? Convert.ToDateTime(((JValue)columnValuePair.Value).Value)
                    : columnDBypeName == "smallint" ? Convert.ToInt16(columnValuePair.Value)
                    : columnDBypeName == "integer" || columnDBypeName == "serial" ? Convert.ToInt32(((JValue)columnValuePair.Value).Value)
                    : columnDBypeName == "bigint" || columnDBypeName == "bigserial" ? Convert.ToInt64(((JValue)columnValuePair.Value).Value)
                    : columnDBypeName == "double precision" ? Convert.ToDouble(((JValue)columnValuePair.Value).Value)
                    : columnDBypeName == "real" ? Convert.ToSingle(((JValue)columnValuePair.Value).Value)
                    : columnDBypeName == "boolean" ? Convert.ToBoolean(((JValue)columnValuePair.Value).Value)
                    : columnDBypeName == "uuid" ? Guid.Parse((string)columnValuePair.Value)
                    : columnDBypeName == "bit" ? Convert.ToString(Convert.ToInt32(columnValuePair.Value), 2).Last().ToString()
                    : columnDBypeName == "json" ? JObject.Parse((string)columnValuePair.Value).ToString()
                    : columnDBypeName == "money" || columnDBypeName == "numeric" ? Convert.ToDecimal(((JValue)columnValuePair.Value).Value)
                    : columnDBypeName == "text"|| columnDBypeName == "character varying" || columnDBypeName == "character" ? (string)columnValuePair.Value
                    : columnDBypeName == "interval" ? TimeSpan.Parse(Regex.Replace((string)columnValuePair.Value, "days?",".",RegexOptions.IgnoreCase).Replace(" ",""))
                    : (object)(string)((JValue)columnValuePair.Value).Value;
            return p;
        }

 以下是查询Postgresql表格与字段信息的SQL:

select
    --col.table_schema,
    col.table_name,
    col.column_name,
    (case when col.column_default like 'nextval(%' then true else false end) as is_serial,
    --col.is_generated,
    (case when col.is_nullable='NO' then false else true end) as is_nullable,
    (case when con.contype is not null then true else false end) as is_primarykey,
    --con.contype ,
    col.data_type,
    col.character_maximum_length
from information_schema.columns col 
inner join pg_class cls on col.table_name=cls.relname
left join pg_constraint con on cls.oid=con.conrelid and con.contype='p' and array_position(con.conkey::integer[],col.ordinal_position::integer)>0
where col.table_schema = 'public'
order by col.table_name,col.ordinal_position;

 类似资料: