• 注册
    • 查看作者
    • SQL Server 批量插入数据的完美解决方案

       

      这篇文章主要介绍了SQL Server 批量插入数据的完美解决方案,需要的朋友可以参考下

      目录

      一、Sql Server插入方案介绍

      二、SqlBulkCopy封装代码

      1.方法介绍

      2.实现原理

      3.完整代码

      三、测试封装代码

      1.测试代码

      四、代码下载

      一、Sql Server插入方案介绍

      关于 SqlServer 批量插入的方式,有三种比较常用的插入方式,Insert、BatchInsert、SqlBulkCopy,下面我们对比以下三种方案的速度

      1.普通的Insert插入方法

      public static void Insert(IEnumerable<Person> persons)
      {
        using (var con = new SqlConnection(“Server=.;Database=DemoDataBase;User ID=sa;Password=8888;”))
        {
          con.Open();
          foreach (var person in persons)
          {
            using (var com = new SqlCommand(
              “INSERT INTO dbo.Person(Id,Name,Age,CreateTime,Sex)VALUES(@Id,@Name,@Age,@CreateTime,@Sex)”,
              con))
            {
              com.Parameters.AddRange(new[]
              {
                new SqlParameter(“@Id”, SqlDbType.BigInt) {Value = person.Id},
                new SqlParameter(“@Name”, SqlDbType.VarChar, 64) {Value = person.Name},
                new SqlParameter(“@Age”, SqlDbType.Int) {Value = person.Age},
                new SqlParameter(“@CreateTime”, SqlDbType.DateTime)
                  {Value = person.CreateTime ?? (object) DBNull.Value},
                new SqlParameter(“@Sex”, SqlDbType.Int) {Value = (int)person.Sex},
              });
              com.ExecuteNonQuery();
            }
          }
        }
      }

      2.拼接BatchInsert插入语句

      public static void BatchInsert(Person[] persons)
      {
        using (var con = new SqlConnection(“Server=.;Database=DemoDataBase;User ID=sa;Password=8888;”))
        {
          con.Open();
          var pageCount = (persons.Length – 1) / 1000 + 1;
          for (int i = 0; i < pageCount; i++)
          {
            var personList = persons.Skip(i * 1000).Take(1000).ToArray();
            var values = personList.Select(p =>
              $”({p.Id},'{p.Name}’,{p.Age},{(p.CreateTime.HasValue ? $”‘{p.CreateTime:yyyy-MM-dd HH:mm:ss}'” : “NULL”)},{(int) p.Sex})”);
            var insertSql =
              $”INSERT INTO dbo.Person(Id,Name,Age,CreateTime,Sex)VALUES{string.Join(“,”, values)}”;
            using (var com = new SqlCommand(insertSql, con))
            {
              com.ExecuteNonQuery();
            }
          }
        }
      }

      3.SqlBulkCopy插入方案

      public static void BulkCopy(IEnumerable<Person> persons)
      {
        using (var con = new SqlConnection(“Server=.;Database=DemoDataBase;User ID=sa;Password=8888;”))
        {
          con.Open();
          var table = new DataTable();
          table.Columns.AddRange(new []
          {
            new DataColumn(“Id”, typeof(long)),
            new DataColumn(“Name”, typeof(string)),
            new DataColumn(“Age”, typeof(int)),
            new DataColumn(“CreateTime”, typeof(DateTime)),
            new DataColumn(“Sex”, typeof(int)),
          });
          foreach (var p in persons)
          {
            table.Rows.Add(new object[] {p.Id, p.Name, p.Age, p.CreateTime, (int) p.Sex});
          }

          using (var copy = new SqlBulkCopy(con))
          {
            copy.DestinationTableName = “Person”;
            copy.WriteToServer(table);
          }
        }
      }

      3.三种方案速度对比

      SQL Server 批量插入数据的完美解决方案

      两者插入效率对比,Insert明显比SqlBulkCopy要慢太多,大概20~40倍性能差距,下面我们将SqlBulkCopy封装一下,让批量插入更加方便

      二、SqlBulkCopy封装代码

      1.方法介绍

      批量插入扩展方法签名

      这个方法主要解决了两个问题:

      免去了手动构建DataTable或者IDataReader接口实现类,手动构建的转换比较难以维护,如果修改字段就得把这些地方都进行修改,特别是还需要将枚举类型特殊处理,转换成他的基础类型(默认int)

      不用亲自创建SqlBulkCopy对象,和配置数据库列的映射,和一些属性的配置

      此方案也是在我公司中使用,以满足公司的批量插入数据的需求,例如第三方的对账数据此方法使用的是Expression动态生成数据转换函数,其效率和手写的原生代码差不多,和原生手写代码相比,多余的转换损失很小【最大的性能损失都是在值类型拆装箱上】

      此方案和其他网上的方案有些不同的是:不是将List先转换成DataTable,然后写入SqlBulkCopy的,而是使用一个实现IDataReader的读取器包装List,每往SqlBulkCopy插入一行数据才会转换一行数据

      IDataReader方案和DataTable方案相比优点

      效率高:DataTable方案需要先完全转换后,才能交由SqlBulkCopy写入数据库,而IDataReader方案可以边转换边交给SqlBulkCopy写入数据库(例如:10万数据插入速度可提升30%)

      占用内存少:DataTable方案需要先完全转换后,才能交由SqlBulkCopy写入数据库,需要占用大量内存,而IDataReader方案可以边转换边交给SqlBulkCopy写入数据库,无须占用过多内存

      强大:因为是边写入边转换,而且EnumerableReader传入的是一个迭代器,可以实现持续插入数据的效果

      2.实现原理

      ① 实体Model与表映射

      数据库表代码

      CREATE TABLE [dbo].[Person](
       [Id] [BIGINT] NOT NULL,
       [Name] [VARCHAR](64) NOT NULL,
       [Age] [INT] NOT NULL,
       [CreateTime] [DATETIME] NULL,
       [Sex] [INT] NOT NULL,
      PRIMARY KEY CLUSTERED
      (
       [Id] ASC
      )WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY]
      ) ON [PRIMARY]

      实体类代码

      public class Person
      {
        public long Id { get; set; }
        public string Name { get; set; }
        public int Age { get; set; }
        public DateTime? CreateTime { get; set; }
        public Gender Sex { get; set; }
      }

      public enum Gender
      {
        Man = 0,
        Woman = 1
      }

      创建字段映射【如果没有此字段映射会导致数据填错位置,如果类型不对还会导致报错】【因为:没有此字段映射默认是按照列序号对应插入的】

      创建映射使用的SqlBulkCopy类型的ColumnMappings属性来完成,数据列与数据库中列的映射

      //创建批量插入对象
      using (var copy = new SqlBulkCopy(connection, options, externalTransaction))
      {
        foreach (var column in ModelToDataTable<TModel>.Columns)
        {
          //创建字段映射
          copy.ColumnMappings.Add(column.ColumnName, column.ColumnName);
        }
      }

      ② 实体转换成数据行

      将数据转换成数据行采用的是:反射+Expression来完成

      其中反射是用于获取编写Expression所需程序类,属性等信息

      其中Expression是用于生成高效转换函数其中ModelToDataTable类型利用了静态泛型类特性,实现泛型参数的缓存效果

      在ModelToDataTable的静态构造函数中,生成转换函数,获取需要转换的属性信息,并存入静态只读字段中,完成缓存

      ③ 使用IDataReader插入数据的重载

      EnumerableReader是实现了IDataReader接口的读取类,用于将模型对象,在迭代器中读取出来,并转换成数据行,可供SqlBulkCopy读取

      SqlBulkCopy只会调用三个方法:GetOrdinal、Read、GetValue其中GetOrdinal只会在首行读取每个列所代表序号【需要填写:SqlBulkCopy类型的ColumnMappings属性】

      其中Read方法是迭代到下一行,并调用ModelToDataTable.ToRowData.Invoke()来将模型对象转换成数据行object[]其中GetValue方法是获取当前行指定下标位置的值

      3.完整代码

      扩展方法类

       public static class SqlConnectionExtension
        {
          /// <summary>
          /// 批量复制
          /// </summary>
          /// <typeparam name=”TModel”>插入的模型对象</typeparam>
          /// <param name=”source”>需要批量插入的数据源</param>
          /// <param name=”connection”>数据库连接对象</param>
          /// <param name=”tableName”>插入表名称【为NULL默认为实体名称】</param>
          /// <param name=”bulkCopyTimeout”>插入超时时间</param>
          /// <param name=”batchSize”>写入数据库一批数量【如果为0代表全部一次性插入】最合适数量【这取决于您的环境,尤其是行数和网络延迟。就个人而言,我将从BatchSize属性设置为1000行开始,然后看看其性能如何。如果可行,那么我将使行数加倍(例如增加到2000、4000等),直到性能下降或超时。否则,如果超时发生在1000,那么我将行数减少一半(例如500),直到它起作用为止。】</param>
          /// <param name=”options”>批量复制参数</param>
          /// <param name=”externalTransaction”>执行的事务对象</param>
          /// <returns>插入数量</returns>
          public static int BulkCopy<TModel>(this SqlConnection connection,
            IEnumerable<TModel> source,
            string tableName = null,
            int bulkCopyTimeout = 30,
            int batchSize = 0,
            SqlBulkCopyOptions options = SqlBulkCopyOptions.Default,
            SqlTransaction externalTransaction = null)
          {
            //创建读取器
            using (var reader = new EnumerableReader<TModel>(source))
            {
              //创建批量插入对象
              using (var copy = new SqlBulkCopy(connection, options, externalTransaction))
              {
                //插入的表
                copy.DestinationTableName = tableName ?? typeof(TModel).Name;
                //写入数据库一批数量
                copy.BatchSize = batchSize;
                //超时时间
                copy.BulkCopyTimeout = bulkCopyTimeout;
                //创建字段映射【如果没有此字段映射会导致数据填错位置,如果类型不对还会导致报错】【因为:没有此字段映射默认是按照列序号对应插入的】
                foreach (var column in ModelToDataTable<TModel>.Columns)
                {
                  //创建字段映射
                  copy.ColumnMappings.Add(column.ColumnName, column.ColumnName);
                }
                //将数据批量写入数据库
                copy.WriteToServer(reader);
                //返回插入数据数量
                return reader.Depth;
              }
            }
          }

          /// <summary>
          /// 批量复制-异步
          /// </summary>
          /// <typeparam name=”TModel”>插入的模型对象</typeparam>
          /// <param name=”source”>需要批量插入的数据源</param>
          /// <param name=”connection”>数据库连接对象</param>
          /// <param name=”tableName”>插入表名称【为NULL默认为实体名称】</param>
          /// <param name=”bulkCopyTimeout”>插入超时时间</param>
          /// <param name=”batchSize”>写入数据库一批数量【如果为0代表全部一次性插入】最合适数量【这取决于您的环境,尤其是行数和网络延迟。就个人而言,我将从BatchSize属性设置为1000行开始,然后看看其性能如何。如果可行,那么我将使行数加倍(例如增加到2000、4000等),直到性能下降或超时。否则,如果超时发生在1000,那么我将行数减少一半(例如500),直到它起作用为止。】</param>
          /// <param name=”options”>批量复制参数</param>
          /// <param name=”externalTransaction”>执行的事务对象</param>
          /// <returns>插入数量</returns>
          public static async Task<int> BulkCopyAsync<TModel>(this SqlConnection connection,
            IEnumerable<TModel> source,
            string tableName = null,
            int bulkCopyTimeout = 30,
            int batchSize = 0,
            SqlBulkCopyOptions options = SqlBulkCopyOptions.Default,
            SqlTransaction externalTransaction = null)
          {
            //创建读取器
            using (var reader = new EnumerableReader<TModel>(source))
            {
              //创建批量插入对象
              using (var copy = new SqlBulkCopy(connection, options, externalTransaction))
              {
                //插入的表
                copy.DestinationTableName = tableName ?? typeof(TModel).Name;
                //写入数据库一批数量
                copy.BatchSize = batchSize;
                //超时时间
                copy.BulkCopyTimeout = bulkCopyTimeout;
                //创建字段映射【如果没有此字段映射会导致数据填错位置,如果类型不对还会导致报错】【因为:没有此字段映射默认是按照列序号对应插入的】
                foreach (var column in ModelToDataTable<TModel>.Columns)
                {
                  //创建字段映射
                  copy.ColumnMappings.Add(column.ColumnName, column.ColumnName);
                }
                //将数据批量写入数据库
                await copy.WriteToServerAsync(reader);
                //返回插入数据数量
                return reader.Depth;
              }
            }
          }
        }

      封装的迭代器数据读取器

       /// <summary>
        /// 迭代器数据读取器
        /// </summary>
        /// <typeparam name=”TModel”>模型类型</typeparam>
        public class EnumerableReader<TModel> : IDataReader
        {
          /// <summary>
          /// 实例化迭代器读取对象
          /// </summary>
          /// <param name=”source”>模型源</param>
          public EnumerableReader(IEnumerable<TModel> source)
          {
            _source = source ?? throw new ArgumentNullException(nameof(source));
            _enumerable = source.GetEnumerator();
          }

          private readonly IEnumerable<TModel> _source;
          private readonly IEnumerator<TModel> _enumerable;
          private object[] _currentDataRow = Array.Empty<object>();
          private int _depth;
          private bool _release;

          public void Dispose()
          {
            _release = true;
            _enumerable.Dispose();
          }

          public int GetValues(object[] values)
          {
            if (values == null) throw new ArgumentNullException(nameof(values));
            var length = Math.Min(_currentDataRow.Length, values.Length);
            Array.Copy(_currentDataRow, values, length);
            return length;
          }

          public int GetOrdinal(string name)
          {
            for (int i = 0; i < ModelToDataTable<TModel>.Columns.Count; i++)
            {
              if (ModelToDataTable<TModel>.Columns[i].ColumnName == name) return i;
            }

            return -1;
          }

          public long GetBytes(int ordinal, long dataIndex, byte[] buffer, int bufferIndex, int length)
          {
            if (dataIndex < 0) throw new Exception($”起始下标不能小于0!”);
            if (bufferIndex < 0) throw new Exception(“目标缓冲区起始下标不能小于0!”);
            if (length < 0) throw new Exception(“读取长度不能小于0!”);
            var numArray = (byte[])GetValue(ordinal);
            if (buffer == null) return numArray.Length;
            if (buffer.Length <= bufferIndex) throw new Exception(“目标缓冲区起始下标不能大于目标缓冲区范围!”);
            var freeLength = Math.Min(numArray.Length – bufferIndex, length);
            if (freeLength <= 0) return 0;
            Array.Copy(numArray, dataIndex, buffer, bufferIndex, length);
            return freeLength;
          }

          public long GetChars(int ordinal, long dataIndex, char[] buffer, int bufferIndex, int length)
          {
            if (dataIndex < 0) throw new Exception($”起始下标不能小于0!”);
            if (bufferIndex < 0) throw new Exception(“目标缓冲区起始下标不能小于0!”);
            if (length < 0) throw new Exception(“读取长度不能小于0!”);
            var numArray = (char[])GetValue(ordinal);
            if (buffer == null) return numArray.Length;
            if (buffer.Length <= bufferIndex) throw new Exception(“目标缓冲区起始下标不能大于目标缓冲区范围!”);
            var freeLength = Math.Min(numArray.Length – bufferIndex, length);
            if (freeLength <= 0) return 0;
            Array.Copy(numArray, dataIndex, buffer, bufferIndex, length);
            return freeLength;
          }

          public bool IsDBNull(int i)
          {
            var value = GetValue(i);
            return value == null || value is DBNull;
          }
          public bool NextResult()
          {
            //移动到下一个元素
            if (!_enumerable.MoveNext()) return false;
            //行层+1
            Interlocked.Increment(ref _depth);
            //得到数据行
            _currentDataRow = ModelToDataTable<TModel>.ToRowData.Invoke(_enumerable.Current);
            return true;
          }

          public byte GetByte(int i) => (byte)GetValue(i);
          public string GetName(int i) => ModelToDataTable<TModel>.Columns[i].ColumnName;
          public string GetDataTypeName(int i) => ModelToDataTable<TModel>.Columns[i].DataType.Name;
          public Type GetFieldType(int i) => ModelToDataTable<TModel>.Columns[i].DataType;
          public object GetValue(int i) => _currentDataRow[i];
          public bool GetBoolean(int i) => (bool)GetValue(i);
          public char GetChar(int i) => (char)GetValue(i);
          public Guid GetGuid(int i) => (Guid)GetValue(i);
          public short GetInt16(int i) => (short)GetValue(i);
          public int GetInt32(int i) => (int)GetValue(i);
          public long GetInt64(int i) => (long)GetValue(i);
          public float GetFloat(int i) => (float)GetValue(i);
          public double GetDouble(int i) => (double)GetValue(i);
          public string GetString(int i) => (string)GetValue(i);
          public decimal GetDecimal(int i) => (decimal)GetValue(i);
          public DateTime GetDateTime(int i) => (DateTime)GetValue(i);
          public IDataReader GetData(int i) => throw new NotSupportedException();
          public int FieldCount => ModelToDataTable<TModel>.Columns.Count;
          public object this[int i] => GetValue(i);
          public object this[string name] => GetValue(GetOrdinal(name));
          public void Close() => Dispose();
          public DataTable GetSchemaTable() => ModelToDataTable<TModel>.ToDataTable(_source);
          public bool Read() => NextResult();
          public int Depth => _depth;
          public bool IsClosed => _release;
          public int RecordsAffected => 0;
        }

      模型对象转数据行工具类

      /// <summary>
        /// 对象转换成DataTable转换类
        /// </summary>
        /// <typeparam name=”TModel”>泛型类型</typeparam>
        public static class ModelToDataTable<TModel>
        {
          static ModelToDataTable()
          {
            //如果需要剔除某些列可以修改这段代码
            var propertyList = typeof(TModel).GetProperties().Where(w => w.CanRead).ToArray();
            Columns = new ReadOnlyCollection<DataColumn>(propertyList
              .Select(pr => new DataColumn(pr.Name, GetDataType(pr.PropertyType))).ToArray());
            //生成对象转数据行委托
            ToRowData = BuildToRowDataDelegation(typeof(TModel), propertyList);
          }

          /// <summary>
          /// 构建转换成数据行委托
          /// </summary>
          /// <param name=”type”>传入类型</param>
          /// <param name=”propertyList”>转换的属性</param>
          /// <returns>转换数据行委托</returns>
          private static Func<TModel, object[]> BuildToRowDataDelegation(Type type, PropertyInfo[] propertyList)
          {
            var source = Expression.Parameter(type);
            var items = propertyList.Select(property => ConvertBindPropertyToData(source, property));
            var array = Expression.NewArrayInit(typeof(object), items);
            var lambda = Expression.Lambda<Func<TModel, object[]>>(array, source);
            return lambda.Compile();
          }

          /// <summary>
          /// 将属性转换成数据
          /// </summary>
          /// <param name=”source”>源变量</param>
          /// <param name=”property”>属性信息</param>
          /// <returns>获取属性数据表达式</returns>
          private static Expression ConvertBindPropertyToData(ParameterExpression source, PropertyInfo property)
          {
            var propertyType = property.PropertyType;
            var expression = (Expression)Expression.Property(source, property);
            if (propertyType.IsEnum)
              expression = Expression.Convert(expression, propertyType.GetEnumUnderlyingType());
            return Expression.Convert(expression, typeof(object));
          }

          /// <summary>
          /// 获取数据类型
          /// </summary>
          /// <param name=”type”>属性类型</param>
          /// <returns>数据类型</returns>
          private static Type GetDataType(Type type)
          {
            //枚举默认转换成对应的值类型
            if (type.IsEnum)
              return type.GetEnumUnderlyingType();
            //可空类型
            if (type.IsGenericType && type.GetGenericTypeDefinition() == typeof(Nullable<>))
              return GetDataType(type.GetGenericArguments().First());
            return type;
          }

          /// <summary>
          /// 列集合
          /// </summary>
          public static IReadOnlyList<DataColumn> Columns { get; }

          /// <summary>
          /// 对象转数据行委托
          /// </summary>
          public static Func<TModel, object[]> ToRowData { get; }

          /// <summary>
          /// 集合转换成DataTable
          /// </summary>
          /// <param name=”source”>集合</param>
          /// <param name=”tableName”>表名称</param>
          /// <returns>转换完成的DataTable</returns>
          public static DataTable ToDataTable(IEnumerable<TModel> source, string tableName = “TempTable”)
          {
            //创建表对象
            var table = new DataTable(tableName);
            //设置列
            foreach (var dataColumn in Columns)
            {
              table.Columns.Add(new DataColumn(dataColumn.ColumnName, dataColumn.DataType));
            }

            //循环转换每一行数据
            foreach (var item in source)
            {
              table.Rows.Add(ToRowData.Invoke(item));
            }

            //返回表对象
            return table;
          }
        }

      三、测试封装代码

      1.测试代码

      创表代码

      CREATE TABLE [dbo].[Person](
       [Id] [BIGINT] NOT NULL,
       [Name] [VARCHAR](64) NOT NULL,
       [Age] [INT] NOT NULL,
       [CreateTime] [DATETIME] NULL,
       [Sex] [INT] NOT NULL,
      PRIMARY KEY CLUSTERED
      (
       [Id] ASC
      )WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY]
      ) ON [PRIMARY]

      实体类代码

      定义的实体的属性名称需要和SqlServer列名称类型对应

      public class Person
      {
        public long Id { get; set; }
        public string Name { get; set; }
        public int Age { get; set; }
        public DateTime? CreateTime { get; set; }
        public Gender Sex { get; set; }
      }

      public enum Gender
      {
        Man = 0,
        Woman = 1
      }

      测试方法

      //生成10万条数据
      var persons = new Person[100000];
      var random = new Random();
      for (int i = 0; i < persons.Length; i++)
      {
        persons[i] = new Person
        {
          Id = i + 1,
          Name = “张三” + i,
          Age = random.Next(1, 128),
          Sex = (Gender)random.Next(2),
          CreateTime = random.Next(2) == 0 ? null : (DateTime?) DateTime.Now.AddSeconds(i)
        };
      }

      //创建数据库连接
      using (var conn = new SqlConnection(“Server=.;Database=DemoDataBase;User ID=sa;Password=8888;”))
      {
        conn.Open();
        var sw = Stopwatch.StartNew();
        //批量插入数据
        var qty = conn.BulkCopy(persons);
        sw.Stop();
        Console.WriteLine(sw.Elapsed.TotalMilliseconds + “ms”);
      }

      执行批量插入结果

      226.4767ms

      请按任意键继续. . .

      四、代码下载

      GitHub代码地址:https://github.com/liu-zhen-liang/PackagingComponentsSet/tree/main/SqlBulkCopyComponents

      来源:脚本之家

      链接:https://www.jb51.net/article/201634.htm

      ,。,!

    • 0
    • 0
    • 0
    • 5
    • 请登录之后再进行评论

      登录
    • 单栏布局 侧栏位置: