当前位置:网站首页>The simple encapsulation of SQL Server bulk insert data scheme sqlbulkcopy makes batch insert more convenient

The simple encapsulation of SQL Server bulk insert data scheme sqlbulkcopy makes batch insert more convenient

2020-12-08 10:53:58 itread01

# One 、Sql Server Introduction to insertion scheme * About `SqlServer` The way to batch insert , There are three common insertion methods ,`Insert`、`BatchInsert`、`SqlBulkCopy`, Let's compare the speed of the following three options ## 1. ordinary `Insert` Insert method ```csharp public static void Insert(IEnumerable persons) { using (var con = new SqlConnection("Server=.;Database=DemoDataBase;User ID=sa;Password=8888;")) { con.Open(); foreach (var person in persons) { using (var com = new SqlCommand( "INSERT INTO dbo.Person(Id,Name,Age,CreateTime,Sex)VALUES(@Id,@Name,@Age,@CreateTime,@Sex)", con)) { com.Parameters.AddRange(new[] { new SqlParameter("@Id", SqlDbType.BigInt) {Value = person.Id}, new SqlParameter("@Name", SqlDbType.VarChar, 64) {Value = person.Name}, new SqlParameter("@Age", SqlDbType.Int) {Value = person.Age}, new SqlParameter("@CreateTime", SqlDbType.DateTime) {Value = person.CreateTime ?? (object) DBNull.Value}, new SqlParameter("@Sex", SqlDbType.Int) {Value = (int)person.Sex}, }); com.ExecuteNonQuery(); } } } } ``` ## 2. Splicing `BatchInsert` Insert statement ```csharp public static void BatchInsert(Person[] persons) { using (var con = new SqlConnection("Server=.;Database=DemoDataBase;User ID=sa;Password=8888;")) { con.Open(); var pageCount = (persons.Length - 1) / 1000 + 1; for (int i = 0; i < pageCount; i++) { var personList = persons.Skip(i * 1000).Take(1000).ToArray(); var values = personList.Select(p => $"({p.Id},'{p.Name}',{p.Age},{(p.CreateTime.HasValue ? $"'{p.CreateTime:yyyy-MM-dd HH:mm:ss}'" : "NULL")},{(int) p.Sex})"); var insertSql = $"INSERT INTO dbo.Person(Id,Name,Age,CreateTime,Sex)VALUES{string.Join(",", values)}"; using (var com = new SqlCommand(insertSql, con)) { com.ExecuteNonQuery(); } } } } ``` ## 3.`SqlBulkCopy` Insert scheme ```csharp public static void BulkCopy(IEnumerable persons) { using (var con = new SqlConnection("Server=.;Database=DemoDataBase;User ID=sa;Password=8888;")) { con.Open(); var table = new DataTable(); table.Columns.AddRange(new [] { new DataColumn("Id", typeof(long)), new DataColumn("Name", typeof(string)), new DataColumn("Age", typeof(int)), new DataColumn("CreateTime", typeof(DateTime)), new DataColumn("Sex", typeof(int)), }); foreach (var p in persons) { table.Rows.Add(new object[] {p.Id, p.Name, p.Age, p.CreateTime, (int) p.Sex}); } using (var copy = new SqlBulkCopy(con)) { copy.DestinationTableName = "Person"; copy.WriteToServer(table); } } } ``` ## 3. Speed comparison of the three schemes | programme | Quantity | Time | |--|--|--| | Insert | 1 A thousand | 145.4351ms | | BatchInsert| 1 A thousand | 103.9061ms | | SqlBulkCopy | 1 A thousand | 7.021ms | ||| | Insert | 1 Ten thousand | 1501.326ms | | BatchInsert| 1 Ten thousand | 850.6274ms | |SqlBulkCopy | 1 Ten thousand | 30.5129ms| ||| | Insert |10 Ten thousand | 13875.4934ms | | BatchInsert| 10 Ten thousand | 8278.9056ms | |SqlBulkCopy | 10 Ten thousand | 314.8402ms | * Comparison of insertion efficiency between the two ,`Insert` It's obviously better than `SqlBulkCopy` It's too slow , Probably 20~40 Multiple efficiency gap , Now we're going to `SqlBulkCopy` Wrap it up , Make batch insertion more convenient # Two 、SqlBulkCopy Package code ## 1. Method introduction ** Batch insert extension kit method signature ** | Method | Method arguments | Introduce | |--|--|--| | BulkCopy | | Synchronous batch insertion method | | | SqlConnection connection | sql server Connecting objects | | | IEnumerable\ source | Data sources that need to be inserted in bulk | | | string tableName = null | Insert table name 【 For NULL The default is the entity name 】 | | |int bulkCopyTimeout = 30 | Batch insert timeout | | | int batchSize = 0 | Write a batch number to the database 【 If it is 0 For all one-time insertion 】 The most suitable quantity 【 It depends on your environment , Especially the number of rows and network latency . Personally , I will go from BatchSize Property is set to 1000 OK, let's start , Then see how effective it is . If possible , Then I'll double the number of lines ( For example, add to 2000、4000 etc. ), Until the performance drops or times out . Otherwise , If the timeout occurs in 1000, Then I'll cut the number of lines by half ( for example 500), Until it works .】 | | | SqlBulkCopyOptions options = SqlBulkCopyOptions.Default | Bulk copy arguments | | | SqlTransaction externalTransaction = null | Executed transaction object | | BulkCopyAsync | | Asynchronous batch insertion method | | | SqlConnection connection | sql server Connecting objects | | | IEnumerable\ source | Data sources that need to be inserted in bulk | | | string tableName = null | Insert table name 【 For NULL The default is the entity name 】 | | |int bulkCopyTimeout = 30 | Batch insert timeout | | | int batchSize = 0 | Write a batch number to the database 【 If it is 0 For all one-time insertion 】 The most suitable quantity 【 It depends on your environment , Especially the number of rows and network latency . Personally , I will go from BatchSize Property is set to 1000 OK, let's start , Then see how effective it is . If possible , Then I'll double the number of lines ( For example, add to 2000、4000 etc. ), Until the performance drops or times out . Otherwise , If the timeout occurs in 1000, Then I'll cut the number of lines by half ( for example 500), Until it works .】 | | | SqlBulkCopyOptions options = SqlBulkCopyOptions.Default | Bulk copy arguments | | | SqlTransaction externalTransaction = null | Executed transaction object | * This method mainly solves two problems : * No manual build `DataTable` perhaps `IDataReader` Interface implementation class , Manually built transformations are more difficult to maintain , If you change the field, you have to change all these places , In particular, you also need to treat enumerations specifically , Convert to his base type ( Presupposition `int`) * You don't have to build it yourself `SqlBulkCopy` thing , And configuration database row mapping , And the configuration of some properties * This scheme is also used in our company , In order to meet the company's needs for batch insert data , For example, third party reconciliation information * This method uses `Expression` Dynamically generate data conversion functions , It's almost as efficient as handwritten native code , Compared to native handwritten code , The extra conversion loss is small 【 The biggest performance loss is in ` Value type ` On the box 】 * This project is different from other online programs in that : It's not going to be `List` First convert to `DataTable`, Then write `SqlBulkCopy` Of , It's using an implementation `IDataReader` The reader package for `List`, Every time `SqlBulkCopy` Insert a row of data to convert a row of data * **`IDataReader` Scheme and `DataTable` The advantages of the scheme are ** * Efficient :`DataTable` The solution needs to be completely converted before , It's up to `SqlBulkCopy` Write to database , and `IDataReader` The scheme can be transferred to `SqlBulkCopy` Write to database (** for example :10 Million data insertion speed can be improved 30%**) * Less memory :`DataTable` The solution needs to be completely converted before , It's up to `SqlBulkCopy` Write to database , It takes up a lot of memory , and `IDataReader` The scheme can be transferred to `SqlBulkCopy` Write to database , It doesn't take up too much memory * Strong : Because it's writing while converting , and `EnumerableReader` What is passed in is an iterator , Can achieve the effect of continuous data insertion ## 2. Implementation principle ### ① Entity Model Match it with the watch * Database table code ```sql CREATE TABLE [dbo].[Person]( [Id] [BIGINT] NOT NULL, [Name] [VARCHAR](64) NOT NULL, [Age] [INT] NOT NULL, [CreateTime] [DATETIME] NULL, [Sex] [INT] NOT NULL, PRIMARY KEY CLUSTERED ( [Id] ASC )WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY] ) ON [PRIMARY] ``` * Entity class code ```csharp public class Person { public long Id { get; set; } public string Name { get; set; } public int Age { get; set; } public DateTime? CreateTime { get; set; } public Gender Sex { get; set; } } public enum Gender { Man = 0, Woman = 1 } ``` * Create field mapping 【 If this field is not mapped, the data will be filled in the wrong position , If the type is not correct, an error will be reported 】【 Because : No such field mapping default is inserted according to the column number 】 * Create a map using `SqlBulkCopy` Type `ColumnMappings` Property to complete , The mapping of columns to columns in the database ```csharp // Create batch insert objects using (var copy = new SqlBulkCopy(connection, options, externalTransaction)) { foreach (var column in ModelToDataTable .Columns) { // Create field mapping copy.ColumnMappings.Add(column.ColumnName, column.ColumnName); } } ``` ### ② Entity to column * The conversion of data to columns uses :` Reflection `+`Expression` To complete * among ` Reflection ` It is used to obtain and write `Expression` Required program class , Properties and other information * among `Expression` Is used to generate efficient conversion functions * among `ModelToDataTable ` Types take advantage of static generic class features , Cache the generic arguments * stay `ModelToDataTable ` In the static constructor of , Generating conversion functions , Get the property information to be converted , And save it in the static read-only field , Complete cache ### ③ Use IDataReader Overload of insert data * `EnumerableReader` It's the realization of `IDataReader` Interface reading class , Used to transfer model objects , Read it from the iterator , And convert it to a column , Available `SqlBulkCopy` Read * `SqlBulkCopy` There are only three ways to call :`GetOrdinal`、`Read`、`GetValue` * among `GetOrdinal` Only the ordinal number represented by each column is read in the first row 【 Need to fill in :`SqlBulkCopy` Type `ColumnMappings` Properties 】 * among `Read` The method is to iterate to the next line , And call `ModelToDataTable .ToRowData.Invoke()` To convert model objects to columns `object[]` * among `GetValue` The method is to get the value of the specified subscript position of the current line ## 3. Full code ** Extension suite method class ** ```csharp public static class SqlConnectionExtension { /// /// Bulk copy /// /// Inserted model objects /// Data sources that need to be inserted in bulk /// Database connection object /// Insert table name 【 For NULL The default is the entity name 】 /// Insert timeout /// Write a batch number to the database 【 If it is 0 For all one-time insertion 】 The most suitable quantity 【 It depends on your environment , Especially the number of rows and network latency . Personally , I will go from BatchSize Property is set to 1000 OK, let's start , Then see how effective it is . If possible , Then I'll double the number of lines ( For example, add to 2000、4000 etc. ), Until the performance drops or times out . Otherwise , If the timeout occurs in 1000, Then I'll cut the number of lines by half ( for example 500), Until it works .】 /// Bulk copy arguments /// Executed transaction object /// Insert number public static int BulkCopy (this SqlConnection connection, IEnumerable source, string tableName = null, int bulkCopyTimeout = 30, int batchSize = 0, SqlBulkCopyOptions options = SqlBulkCopyOptions.Default, SqlTransaction externalTransaction = null) { // Set up the reader using (var reader = new EnumerableReader (source)) { // Create batch insert objects using (var copy = new SqlBulkCopy(connection, options, externalTransaction)) { // Inserted table copy.DestinationTableName = tableName ?? typeof(TModel).Name; // Write a batch number to the database copy.BatchSize = batchSize; // Time out copy.BulkCopyTimeout = bulkCopyTimeout; // Create field mapping 【 If this field is not mapped, the data will be filled in the wrong position , If the type is not correct, an error will be reported 】【 Because : No such field mapping default is inserted according to the column number 】 foreach (var column in ModelToDataTable .Columns) { // Create field mapping copy.ColumnMappings.Add(column.ColumnName, column.ColumnName); } // Bulk write data to database copy.WriteToServer(reader); // Returns the number of inserted data return reader.Depth; } } } /// /// Bulk copy - asynchronous /// /// Inserted model objects /// Data sources that need to be inserted in bulk /// Database connection object /// Insert table name 【 For NULL The default is the entity name 】 /// Insert timeout /// Write a batch number to the database 【 If it is 0 For all one-time insertion 】 The most suitable quantity 【 It depends on your environment , Especially the number of rows and network latency . Personally , I will go from BatchSize Property is set to 1000 OK, let's start , Then see how effective it is . If possible , Then I'll double the number of lines ( For example, add to 2000、4000 etc. ), Until the performance drops or times out . Otherwise , If the timeout occurs in 1000, Then I'll cut the number of lines by half ( for example 500), Until it works .】 /// Bulk copy arguments /// Executed transaction object /// Insert number public static async Task BulkCopyAsync (this SqlConnection connection, IEnumerable source, string tableName = null, int bulkCopyTimeout = 30, int batchSize = 0, SqlBulkCopyOptions options = SqlBulkCopyOptions.Default, SqlTransaction externalTransaction = null) { // Set up the reader using (var reader = new EnumerableReader (source)) { // Create batch insert objects using (var copy = new SqlBulkCopy(connection, options, externalTransaction)) { // Inserted table copy.DestinationTableName = tableName ?? typeof(TModel).Name; // Write a batch number to the database copy.BatchSize = batchSize; // Time out copy.BulkCopyTimeout = bulkCopyTimeout; // Create field mapping 【 If this field is not mapped, the data will be filled in the wrong position , If the type is not correct, an error will be reported 】【 Because : No such field mapping default is inserted according to the column number 】 foreach (var column in ModelToDataTable .Columns) { // Create field mapping copy.ColumnMappings.Add(column.ColumnName, column.ColumnName); } // Bulk write data to database await copy.WriteToServerAsync(reader); // Returns the number of inserted data return reader.Depth; } } } } ``` ** Encapsulated iterator data reader ** ```csharp /// /// Iterator data reader /// /// Model type public class EnumerableReader : IDataReader { /// /// Instantiation iterators read objects /// /// Model source public EnumerableReader(IEnumerable source) { _source = source ?? throw new ArgumentNullException(nameof(source)); _enumerable = source.GetEnumerator(); } private readonly IEnumerable _source; private readonly IEnumerator _enumerable; private object[] _currentDataRow = Array.Empty (); private int _depth; private bool _release; public void Dispose() { _release = true; _enumerable.Dispose(); } public int GetValues(object[] values) { if (values == null) throw new ArgumentNullException(nameof(values)); var length = Math.Min(_currentDataRow.Length, values.Length); Array.Copy(_currentDataRow, values, length); return length; } public int GetOrdinal(string name) { for (int i = 0; i < ModelToDataTable .Columns.Count; i++) { if (ModelToDataTable .Columns[i].ColumnName == name) return i; } return -1; } public long GetBytes(int ordinal, long dataIndex, byte[] buffer, int bufferIndex, int length) { if (dataIndex < 0) throw new Exception($" The starting subscript cannot be less than 0!"); if (bufferIndex < 0) throw new Exception(" Target buffer start subscript cannot be less than 0!"); if (length < 0) throw new Exception(" Read length cannot be less than 0!"); var numArray = (byte[])GetValue(ordinal); if (buffer == null) return numArray.Length; if (buffer.Length <= bufferIndex) throw new Exception(" Target buffer start subscript cannot be larger than target buffer range !"); var freeLength = Math.Min(numArray.Length - bufferIndex, length); if (freeLength <= 0) return 0; Array.Copy(numArray, dataIndex, buffer, bufferIndex, length); return freeLength; } public long GetChars(int ordinal, long dataIndex, char[] buffer, int bufferIndex, int length) { if (dataIndex < 0) throw new Exception($" The starting subscript cannot be less than 0!"); if (bufferIndex < 0) throw new Exception(" Target buffer start subscript cannot be less than 0!"); if (length < 0) throw new Exception(" Read length cannot be less than 0!"); var numArray = (char[])GetValue(ordinal); if (buffer == null) return numArray.Length; if (buffer.Length <= bufferIndex) throw new Exception(" Target buffer start subscript cannot be larger than target buffer range !"); var freeLength = Math.Min(numArray.Length - bufferIndex, length); if (freeLength <= 0) return 0; Array.Copy(numArray, dataIndex, buffer, bufferIndex, length); return freeLength; } public bool IsDBNull(int i) { var value = GetValue(i); return value == null || value is DBNull; } public bool NextResult() { // Move to the next element if (!_enumerable.MoveNext()) return false; // Row layer +1 Interlocked.Increment(ref _depth); // Get the column _currentDataRow = ModelToDataTable .ToRowData.Invoke(_enumerable.Current); return true; } public byte GetByte(int i) => (byte)GetValue(i); public string GetName(int i) => ModelToDataTable .Columns[i].ColumnName; public string GetDataTypeName(int i) => ModelToDataTable .Columns[i].DataType.Name; public Type GetFieldType(int i) => ModelToDataTable .Columns[i].DataType; public object GetValue(int i) => _currentDataRow[i]; public bool GetBoolean(int i) => (bool)GetValue(i); public char GetChar(int i) => (char)GetValue(i); public Guid GetGuid(int i) => (Guid)GetValue(i); public short GetInt16(int i) => (short)GetValue(i); public int GetInt32(int i) => (int)GetValue(i); public long GetInt64(int i) => (long)GetValue(i); public float GetFloat(int i) => (float)GetValue(i); public double GetDouble(int i) => (double)GetValue(i); public string GetString(int i) => (string)GetValue(i); public decimal GetDecimal(int i) => (decimal)GetValue(i); public DateTime GetDateTime(int i) => (DateTime)GetValue(i); public IDataReader GetData(int i) => throw new NotSupportedException(); public int FieldCount => ModelToDataTable .Columns.Count; public object this[int i] => GetValue(i); public object this[string name] => GetValue(GetOrdinal(name)); public void Close() => Dispose(); public DataTable GetSchemaTable() => ModelToDataTable .ToDataTable(_source); public bool Read() => NextResult(); public int Depth => _depth; public bool IsClosed => _release; public int RecordsAffected => 0; } ``` ** Model object to column tool class ** ```csharp /// /// Object to DataTable Conversion class /// /// Generic type public static class ModelToDataTable { static ModelToDataTable() { // If you need to exclude some columns, you can modify this code var propertyList = typeof(TModel).GetProperties().Where(w => w.CanRead).ToArray(); Columns = new ReadOnlyCollection (propertyList .Select(pr => new DataColumn(pr.Name, GetDataType(pr.PropertyType))).ToArray()); // Generate object to column delegation ToRowData = BuildToRowDataDelegation(typeof(TModel), propertyList); } /// /// Build convert to column delegate /// /// Input type /// Properties of the transformation /// Convert column delegation private static Func BuildToRowDataDelegation(Type type, PropertyInfo[] propertyList) { var source = Expression.Parameter(type); var items = propertyList.Select(property => ConvertBindPropertyToData(source, property)); var array = Expression.NewArrayInit(typeof(object), items); var lambda = Expression.Lambda >(array, source); return lambda.Compile(); } /// /// Convert properties to data /// /// Source variable /// Property information /// Get attribute data representation private static Expression ConvertBindPropertyToData(ParameterExpression source, PropertyInfo property) { var propertyType = property.PropertyType; var expression = (Expression)Expression.Property(source, property); if (propertyType.IsEnum) expression = Expression.Convert(expression, propertyType.GetEnumUnderlyingType()); return Expression.Convert(expression, typeof(object)); } /// /// Get data type /// /// Property type /// Data type private static Type GetDataType(Type type) { // Enumerate the value types converted by default if (type.IsEnum) return type.GetEnumUnderlyingType(); // Nullable type if (type.IsGenericType && type.GetGenericTypeDefinition() == typeof(Nullable<>)) return GetDataType(type.GetGenericArguments().First()); return type; } /// /// Column set /// public static IReadOnlyList Columns { get; } /// /// Object to column delegation /// public static Func ToRowData { get; } /// /// Set to DataTable /// /// aggregate /// Table name /// Conversion completed DataTable public static DataTable ToDataTable(IEnumerable source, string tableName = "TempTable") { // Create table object var table = new DataTable(tableName); // Set column foreach (var dataColumn in Columns) { table.Columns.Add(new DataColumn(dataColumn.ColumnName, dataColumn.DataType)); } // Loop through each row of data foreach (var item in source) { table.Rows.Add(ToRowData.Invoke(item)); } // Return table object return table; } } ``` # 3、 ... and 、 Test encapsulated code ## 1. Test code ** Create table code ** ```sql CREATE TABLE [dbo].[Person]( [Id] [BIGINT] NOT NULL, [Name] [VARCHAR](64) NOT NULL, [Age] [INT] NOT NULL, [CreateTime] [DATETIME] NULL, [Sex] [INT] NOT NULL, PRIMARY KEY CLUSTERED ( [Id] ASC )WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY] ) ON [PRIMARY] ``` ** Entity class code ** * The attribute name of the defined entity needs to be and `SqlServer` Column name type corresponds to ```csharp public class Person { public long Id { get; set; } public string Name { get; set; } public int Age { get; set; } public DateTime? CreateTime { get; set; } public Gender Sex { get; set; } } public enum Gender { Man = 0, Woman = 1 } ``` ** Test method ** ```csharp // Generate 10 Ten thousand pieces of information var persons = new Person[100000]; var random = new Random(); for (int i = 0; i < persons.Length; i++) { persons[i] = new Person { Id = i + 1, Name = " Zhang San " + i, Age = random.Next(1, 128), Sex = (Gender)random.Next(2), CreateTime = random.Next(2) == 0 ? null : (DateTime?) DateTime.Now.AddSeconds(i) }; } // Create database connection using (var conn = new SqlConnection("Server=.;Database=DemoDataBase;User ID=sa;Password=8888;")) { conn.Open(); var sw = Stopwatch.StartNew(); // Batch insert data var qty = conn.BulkCopy(persons); sw.Stop(); Console.WriteLine(sw.Elapsed.TotalMilliseconds + "ms"); } ``` ** Perform batch insert results ** ```bash 226.4767ms Please press any key to continue . . . ``` ![ Insert picture description here ](https://img-blog.csdnimg.cn/20201128002635787.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3dlaXhpbl80MzI1MTU0Nw==,size_16,color_FFFFFF,t_70) # Four 、 Code download GitHub Code address :[https://github.com/liu-zhen-liang/PackagingComponentsSet/tree/main/SqlBulkCopyComponents](https://github.com/liu-zhen-liang/PackagingComponentsSet/tree/main/SqlBulkCopyComponents)

You may also like it …

0.69489097595215

版权声明
本文为[itread01]所创,转载请带上原文链接,感谢
https://chowdera.com/2020/12/20201208105334072x.html