当前位置: 首页 > 知识库问答 >
问题:

大容量插入在Azure SQL Server中工作不正常

姜松
2023-03-14

我无法使用C#webapi将大量数据插入AzureSQL服务器DB

考虑考虑

我想插入60K

我的方法:(都在本地sql server中工作,但不在Azure sql server中工作)

1) 尝试使用EF逐个插入记录(10000次约10分钟,大部分超时)

2) 尝试使用大容量插入扩展以及在SqlBulkCopy中尝试的EF 3)

4) 尝试增加连接字符串中的连接超时

5) 尝试在Dbcontext中增加命令超时。

异常StackTrace

Execution Timeout Expired.  The timeout period elapsed prior to completion of the operation or the server is not responding.
System.Data.SqlClient.SqlException (0x80131904): Execution Timeout Expired.  The timeout period elapsed prior to completion of the operation or the server is not responding. ---> System.ComponentModel.Win32Exception (0x80004005): The wait operation timed out
   at System.Data.SqlClient.SqlInternalConnection.OnError(SqlException exception, Boolean breakConnection, Action`1 wrapCloseInAction)
   at System.Data.SqlClient.TdsParser.ThrowExceptionAndWarning(TdsParserStateObject stateObj, Boolean callerHasConnectionLock, Boolean asyncClose)
   at System.Data.SqlClient.TdsParserStateObject.ReadSniError(TdsParserStateObject stateObj, UInt32 error)
   at System.Data.SqlClient.TdsParserStateObject.ReadSniSyncOverAsync()
   at System.Data.SqlClient.TdsParserStateObject.TryReadNetworkPacket()
   at System.Data.SqlClient.TdsParserStateObject.TryPrepareBuffer()
   at System.Data.SqlClient.TdsParserStateObject.TryReadByte(Byte& value)
   at System.Data.SqlClient.TdsParser.TryRun(RunBehavior runBehavior, SqlCommand cmdHandler, SqlDataReader dataStream, BulkCopySimpleResultSet bulkCopyHandler, TdsParserStateObject stateObj, Boolean& dataReady)
   at System.Data.SqlClient.TdsParser.Run(RunBehavior runBehavior, SqlCommand cmdHandler, SqlDataReader dataStream, BulkCopySimpleResultSet bulkCopyHandler, TdsParserStateObject stateObj)
   at System.Data.SqlClient.SqlBulkCopy.RunParser(BulkCopySimpleResultSet bulkCopyHandler)
   at System.Data.SqlClient.SqlBulkCopy.CopyBatchesAsyncContinuedOnSuccess(BulkCopySimpleResultSet internalResults, String updateBulkCommandText, CancellationToken cts, TaskCompletionSource`1 source)
   at System.Data.SqlClient.SqlBulkCopy.CopyBatchesAsyncContinued(BulkCopySimpleResultSet internalResults, String updateBulkCommandText, CancellationToken cts, TaskCompletionSource`1 source)
   at System.Data.SqlClient.SqlBulkCopy.CopyBatchesAsync(BulkCopySimpleResultSet internalResults, String updateBulkCommandText, CancellationToken cts, TaskCompletionSource`1 source)
   at System.Data.SqlClient.SqlBulkCopy.WriteToServerInternalRestContinuedAsync(BulkCopySimpleResultSet internalResults, CancellationToken cts, TaskCompletionSource`1 source)
   at System.Data.SqlClient.SqlBulkCopy.WriteToServerInternalRestAsync(CancellationToken cts, TaskCompletionSource`1 source)
   at System.Data.SqlClient.SqlBulkCopy.WriteToServerInternalAsync(CancellationToken ctoken)
   at System.Data.SqlClient.SqlBulkCopy.WriteRowSourceToServerAsync(Int32 columnCount, CancellationToken ctoken)
   at System.Data.SqlClient.SqlBulkCopy.WriteToServer(DataTable table, DataRowState rowState)

是否有任何解决方案可以在Azure中更改它或任何配置?

使现代化

用于批量插入的代码

  using (var dbConnection = new DBModel().Database.Connection as SqlConnection)
                {
                    dbConnection?.Open();
                    using (var sqlBulkCopy = new SqlBulkCopy(dbConnection))
                    {
                        try
                        {
                            /* ColumnMapping
                             * Column is mapped to DB Column to DataTable Column
                             *
                             */
                            sqlBulkCopy.EnableStreaming = true;
                            sqlBulkCopy.BulkCopyTimeout = 500;
                            sqlBulkCopy.DestinationTableName = "LogTable";
                            //dt is object of the Datatable
                            sqlBulkCopy.WriteToServer(dt);
                        }
                        catch (Exception ex)
                        {

                        }
                    }


                }

共有3个答案

宣俊豪
2023-03-14

首先,若您只是在数据库中插入一条记录,请确保您是否可以连接到Azure SQL数据库并执行操作而不会出现超时错误。

其次,请检查是否使用ObjectContext上的CommandTimeout属性覆盖默认超时值,并且可以尝试为BulkCopyTimeout属性设置0(表示没有限制)。

此外,请确保在执行大容量插入时它是否达到Azure SQL数据库的限制,您可以尝试更改数据库的服务层和性能级别。

景俊拔
2023-03-14

在过去的两天中,我一直在处理批量插入,下面是一个通用的批量插入类,它允许您排除某些列,如果发现DbGeography属性,它会将其强制转换为SqlGeography:

 public class BulkInsert<T> where T : class
{
    #region Fields

    private readonly LoggingService _logger = new LoggingService(typeof(BulkInsert<T>));
    private string _connectionString;
    private string _tableName;
    private IEnumerable<string> _excludedPropertyNames;
    private int _batchSize;
    private IEnumerable<T> _data;
    private DataTable _dataTable;

    #endregion

    #region Constructor

    public BulkInsert(
        string connectionString,
        string tableName,
        IEnumerable<T> data,
        IEnumerable<string> excludedPropertyNames,
        int batchSize = 1000)
    {
        if (string.IsNullOrEmpty(connectionString)) throw new ArgumentNullException(nameof(connectionString));
        if (string.IsNullOrEmpty(tableName)) throw new ArgumentNullException(nameof(tableName));
        if (data == null) throw new ArgumentNullException(nameof(data));
        if (batchSize <= 0) throw new ArgumentOutOfRangeException(nameof(batchSize));

        _connectionString = connectionString;
        _tableName = tableName;
        _batchSize = batchSize;
        _data = data;
        _excludedPropertyNames = excludedPropertyNames == null ? new List<string>() : excludedPropertyNames;
        _dataTable = CreateCustomDataTable();
    }

    #endregion

    #region Public Methods

    public void Insert()
    {
        using (var connection = new SqlConnection(_connectionString))
        {
            connection.Open();
            SqlTransaction transaction = connection.BeginTransaction();

            using (var bulkCopy = new SqlBulkCopy(connection, SqlBulkCopyOptions.Default | SqlBulkCopyOptions.KeepIdentity, transaction))
            {
                bulkCopy.BatchSize = _batchSize;
                bulkCopy.DestinationTableName = _tableName;

                // Let's fix tons of mapping issues by
                // Setting the column mapping in SqlBulkCopy instance:
                foreach (DataColumn dataColumn in _dataTable.Columns)
                {
                    bulkCopy.ColumnMappings.Add(dataColumn.ColumnName, dataColumn.ColumnName);
                }

                try
                {
                    bulkCopy.WriteToServer(_dataTable);
                }
                catch (Exception ex)
                {
                    _logger.LogError(ex.Message);
                    transaction.Rollback();
                    connection.Close();
                }
            }

            transaction.Commit();
        }
    }

    #endregion

    #region Private Helper Methods

    private DataTable CreateCustomDataTable()
    {
        PropertyDescriptorCollection properties = TypeDescriptor.GetProperties(typeof(T));
        var table = new DataTable();
        foreach (PropertyDescriptor prop in properties)
        {
            // Just include the not excluded columns
            if (_excludedPropertyNames.All(epn => epn != prop.Name))
            {                  
                if (prop.PropertyType.Name == "DbGeography")
                {
                    var type = typeof(SqlGeography);
                    table.Columns.Add(prop.Name, type);
                }
                else
                {
                    table.Columns.Add(prop.Name, Nullable.GetUnderlyingType(prop.PropertyType) ?? prop.PropertyType);
                }
            }
        }
        foreach (T item in _data)
        {
            DataRow row = table.NewRow();
            foreach (PropertyDescriptor prop in properties)
            {
                // Just include the values in not excluded properties 
                if (_excludedPropertyNames.All(epn => epn != prop.Name))
                {
                    if (prop.PropertyType.Name == "DbGeography")
                    {                           
                        row[prop.Name] = SqlGeography.Parse(((DbGeography)prop.GetValue(item)).AsText()).MakeValid();
                    }
                    else
                    {
                        row[prop.Name] = prop.GetValue(item) ?? DBNull.Value;
                    }
                }
            }
            table.Rows.Add(row);
        }
        return table;
    }

    #endregion

}

那么,它必须像这样使用:

var myEntityBulk = new BulkInsert<MyEntity>(
    _mYConnectionString,
     "MyEntities", 
      myEntities, 
      new[] { "ObjectState","NavigationPropertyOne", "NavigationPropertyTwo" }
);
myEntityBulk.Insert();

我希望它能有所帮助,我很确定它会的...昨天我把它弄大了

令狐晟
2023-03-14

我建议您设置sqlBulkCopy。BatchSize的值,而不是将所有内容都插入一个批次中。根据要插入的数据,尝试从10.000开始,逐步向上或向下,直到对性能感到满意为止。

编辑一些额外的澄清:当你考虑你的批量大小,你需要考虑到SQLBulkCopras将不仅需要插入数据,而且还读取和发送它-最后一部分可能是为什么它在本地SQL Server上工作,而不是在Azure上-这也意味着,如果你正在使用一个大的数据集,您将需要使用较低的批处理大小或相当高的BulkCopyTimeout设置,以使每个批处理都有机会在达到超时限制之前完成。

您可以在这篇文章中阅读更多关于批量大小的信息。SqlBulkCopy的推荐批次大小是多少?

其他选项:
我正在阅读这篇文章,可能是因为您的插入达到了一个关键的DTU(数据库事务单元,基本上是服务器组合资源的度量)使用点。

性能级别经过校准和管理,以提供所需的资源来运行数据库工作负载,使其达到所选服务层/性能级别允许的最大限制。如果您的工作负载达到CPU/数据IO/日志IO限制之一的限制,您将继续以允许的最大级别接收资源,但您可能会看到查询的延迟增加。这些限制不会导致任何错误,只会降低您的工作负载,除非降低到查询超时的程度。

摘自此链接:https://azure.microsoft.com/da-dk/blog/azure-sql-database-introduces-new-near-real-time-performance-metrics/
尝试在监视DTU使用情况的同时再次启动拷贝,并查看它是否长时间(er)处于100%开启状态。如果是这种情况,您可能希望提高数据库的定价级别。

 类似资料:
  • 我试图配置Spring Boot和Spring Data JPA,以便在批处理中进行批量插入。 目标是:在执行操作时,提交每个n-记录,而不是每个记录。 从现在起,我在中尝试了: 以下是实体: 和存储库:

  • 就像有人说的第二种方式更慢,但我不确定,那么哪种方式更好呢?不能使数据库崩溃。

  • 我正在尝试将文件插入到现有表中。现有的表有3列,这些列是ID(在表中分配)、学生号和组号。 在我的中,我有下面的格式,但是每次插入它时,我都会得到一个错误

  • 我正在使用大容量插入并得到以下错误: 注意:加载文件中的数据不超过配置的列长度 从'C:\temp\dataload\load_file.txt'大容量插入load_data(firstrow=1,fieldterminator='0x09',rowterminator='\n',MAXERRORS=0,ERRORFILE='C:\temp\dataload\load_file') Msg 486

  • 我想在我的sql-server数据库中插入。txt文件数据。我在.txt文件中使用和不使用PK-Variable时都出现了错误。我的疑问: 使用 从“C:\user\test.txt”大容量插入db.schema.table(FIELDTERMINATOR=';')去吧 null null