使用MySqlBulkLoader批量插入数据

 

最近在项目中遇到插入数据瓶颈,几万、几十万、几百万的数据保存到MYSQL数据库,使用EF插入数据速度非常慢,数据量非常大时EF插入需要几十分钟,甚至几个小时,这样子的速度肯定不是我们所期望的。

后面经过了解与研究发现MySqlBulkLoader,可以批量将数据插入到数据库并且速度上面远远优于EF。

MySqlBulkLoader主要的实现方式:将需要插入的数据转成DataTable,DataTable转成一个CSV文件,将CSV文件使用批量导入的形式导入到数据库里面去。

 

注意:

1).数据库连接地址需要添加配置AllowLoadLocalInfile=true,允许本地文件导入;

Data Source = 数据库地址; Port = 端口; Initial Catalog = 数据库名; User Id = 用户名; Password = 密码;AllowLoadLocalInfile=true;

2).插入的时候会返回插入行数,但是检查所有的数据都正确,也没有报异常,却返回了插入数量为0,可以检查表是否有唯一索引,插入的数据是否违反了唯一索引

(以下分块展示了代码,如果需要看完整的代码直接看 5.完整的代码) 

 

1.将List转化为DataTable 

   /// <summary>         /// 将List转化为DataTable         /// </summary>         /// <returns></returns>         public DataTable ListToDataTable<T>(List<T> data)         {             #region 创建一个DataTable,以实体名称作为DataTable名称              var tableName = typeof(T).Name;             tableName = tableName.ToSnakeCase(); /*实体名称与表名进行转化,主要根据各项目的规定进行转化,不一定就是我这些写的这种转换方式*/             DataTable dt = new DataTable             {                 TableName = tableName             };              #endregion              #region 拿取列名,以实体的属性名作为列名                     var properties = typeof(T).GetProperties();             foreach (var item in properties)             {                 var curFileName = item.Name;                 curFileName = curFileName.ToSnakeCase();/*列名与字段名进行转化,主要根据各项目的规定进行转化,不一定就是我这些写的这种转换方式*/                 dt.Columns.Add(curFileName);             }              #endregion              #region 列赋值             foreach (var item in data)             {                 DataRow dr = dt.NewRow();                 var columns = dt.Columns;                  var curPropertyList = item.GetType().GetProperties();                 foreach (var p in curPropertyList)                 {                     var name = p.Name;                     name = name.ToSnakeCase();/*列名与字段名进行转化,主要根据各项目的规定进行转化,不一定就是我这些写的这种转换方式*/                     var curValue = p.GetValue(item);                      int i = columns.IndexOf(name);                     dr[i] = curValue;                 }                  dt.Rows.Add(dr);             }              #endregion                return dt;         }

  

2.将DataTable转换为标准的CSV文件 

  /// <summary>     /// csv扩展     /// </summary>     public static class CSVEx     {         /// <summary>         ///将DataTable转换为标准的CSV文件         /// </summary>         /// <param name="table">数据表</param>         /// <param name="tmpPath">文件地址</param>         /// <returns>返回标准的CSV</returns>         public static void ToCsv(this DataTable table, string tmpPath)         {             //以半角逗号(即,)作分隔符,列为空也要表达其存在。             //列内容如存在半角逗号(即,)则用半角引号(即"")将该字段值包含起来。             //列内容如存在半角引号(即")则应替换成半角双引号("")转义,并用半角引号(即"")将该字段值包含起来。             StringBuilder sb = new StringBuilder();             DataColumn colum;             foreach (DataRow row in table.Rows)             {                 for (int i = 0; i < table.Columns.Count; i++)                 {                     Type _datatype = typeof(DateTime);                     colum = table.Columns[i];                     if (i != 0) sb.Append("t");                     //if (colum.DataType == typeof(string) && row[colum].ToString().Contains(","))                     //{                     //    sb.Append(""" + row[colum].ToString().Replace(""", """") + """);                     //}                     if (colum.DataType == _datatype)                     {                         sb.Append(((DateTime)row[colum]).ToString("yyyy/MM/dd HH:mm:ss"));                     }                     else sb.Append(row[colum].ToString());                 }                 sb.Append("rn");             }             StreamWriter sw = new StreamWriter(tmpPath, false, UTF8Encoding.UTF8);             sw.Write(sb.ToString());             sw.Close();         }      }

 

3.CSV文件导入数据到数据库

    /// <summary>     /// 批量导入mysql帮助类     /// </summary>     public static class MySqlHelper     {         /// <summary>         /// MySqlBulkLoader批量导入         /// </summary>         /// <param name="_mySqlConnection">数据库连接地址</param>         /// <param name="table"></param>         /// <param name="csvName"></param>         /// <returns></returns>         public static int BulkLoad(MySqlConnection _mySqlConnection, DataTable table, string csvName)         {             var columns = table.Columns.Cast<DataColumn>().Select(colum => colum.ColumnName).ToList();             MySqlBulkLoader bulk = new MySqlBulkLoader(_mySqlConnection)             {                 FieldTerminator = "t",                 FieldQuotationCharacter = '"',                 EscapeCharacter = '"',                 LineTerminator = "rn",                 FileName = csvName,                 NumberOfLinesToSkip = 0,                 TableName = table.TableName,              };              bulk.Columns.AddRange(columns);             return bulk.Load();         }     }

  

4.使用MySqlBulkLoader批量插入数据

        /// <summary>         /// 使用MySqlBulkLoader批量插入数据         /// </summary>         /// <typeparam name="T"></typeparam>         /// <param name="data"></param>         /// <returns></returns>         /// <exception cref="Exception"></exception>         public int BulkLoaderData<T>(List<T> data)         {             if (data.Count <= 0) return 0;              var connectString = "数据库连接地址";             using (MySqlConnection connection = new MySqlConnection(connectString))             {                 MySqlTransaction sqlTransaction = null;                 try                 {                     if (connection.State == ConnectionState.Closed)                     {                         connection.Open();                     }                     sqlTransaction = connection.BeginTransaction();                       var dt = ListToDataTable<T>(data); //将List转成dataTable                     string tmpPath = Path.GetTempFileName();                     dt.ToCsv(tmpPath); //将DataTable转成CSV文件                     var insertCount = MySqlHelper.BulkLoad(connection, dt, tmpPath); //使用MySqlBulkLoader插入数据                     sqlTransaction.Commit();                      try                     {                         if (File.Exists(tmpPath)) File.Delete(tmpPath);                     }                     catch (Exception)                     {                         //删除文件失败                      }                     return insertCount; //返回执行成功的条数                 }                 catch (Exception e)                 {                     if (sqlTransaction != null)                     {                         sqlTransaction.Rollback();                     }                     //执行异常                      throw e;                 }             }          }

 

 5.完整的代码:

namespace WebApplication1.BrantchInsert {      /// <summary>     /// 批量插入     /// </summary>     public class BulkLoader     {           /// <summary>         /// 测试批量插入入口         /// </summary>         /// <returns></returns>         public int BrantchDataTest()         {              #region 模拟数据             var data = new List<CrmCouponTestDto>() {                  new CrmCouponTestDto {                      Id=1,                      CouponCode="test001",                      CouponId = 1,                      MemberId=100,                      IssueTime=Convert.ToDateTime("2022-06-27 14:00:00"),                      UsageTime=Convert.ToDateTime("3000-12-31 00:00:00"),                      UsageShopId=0,                      UsageBillNo="",                      EffectiveStart=Convert.ToDateTime("2022-06-27 14:00:00"),                      EffectiveEnd=Convert.ToDateTime("2023-06-27 14:00:00"),                      Status=0                  },                  new CrmCouponTestDto {                      Id=2,                      CouponCode="test002",                      CouponId = 1,                        MemberId=101,                      IssueTime=Convert.ToDateTime("2022-06-27 14:00:00"),                      UsageTime=Convert.ToDateTime("2022-06-27 14:30:00"),                      UsageShopId=2,                      UsageBillNo="CS202206271430001",                      EffectiveStart=Convert.ToDateTime("2022-06-27 14:00:00"),                      EffectiveEnd=Convert.ToDateTime("2023-06-27 14:00:00"),                      Status=1                  },                   new CrmCouponTestDto {                      Id=3,                      CouponCode="test003",                      CouponId = 1,                      MemberId=102,                      IssueTime=Convert.ToDateTime("2022-06-27 14:00:00"),                      UsageTime=Convert.ToDateTime("3000-12-31 00:00:00"),                      UsageShopId=0,                      UsageBillNo="",                      EffectiveStart=Convert.ToDateTime("2022-06-27 14:00:00"),                      EffectiveEnd=Convert.ToDateTime("2023-06-27 14:00:00"),                      Status=0                  },                     new CrmCouponTestDto {                      Id=4,                      CouponCode="test004",                      CouponId = 1,                      MemberId=103,                      IssueTime=Convert.ToDateTime("2022-06-27 14:00:00"),                      UsageTime=Convert.ToDateTime("3000-12-31 00:00:00"),                      UsageShopId=0,                      UsageBillNo="",                      EffectiveStart=Convert.ToDateTime("2022-06-27 14:00:00"),                      EffectiveEnd=Convert.ToDateTime("2023-06-27 14:00:00"),                      Status=0                  }              };             #endregion             var result = BulkLoaderData<CrmCouponTestDto>(data);             return result;          }           /// <summary>         /// 使用MySqlBulkLoader批量插入数据         /// </summary>         /// <typeparam name="T"></typeparam>         /// <param name="data"></param>         /// <returns></returns>         /// <exception cref="Exception"></exception>         public int BulkLoaderData<T>(List<T> data)         {             if (data.Count <= 0) return 0;              var connectString = "数据库连接地址";             using (MySqlConnection connection = new MySqlConnection(connectString))             {                 MySqlTransaction sqlTransaction = null;                 try                 {                     if (connection.State == ConnectionState.Closed)                     {                         connection.Open();                     }                     sqlTransaction = connection.BeginTransaction();                       var dt = ListToDataTable<T>(data); //将List转成dataTable                     string tmpPath = Path.GetTempFileName();                     dt.ToCsv(tmpPath); //将DataTable转成CSV文件                     var insertCount = MySqlHelper.BulkLoad(connection, dt, tmpPath); //使用MySqlBulkLoader插入数据                     sqlTransaction.Commit();                      try                     {                         if (File.Exists(tmpPath)) File.Delete(tmpPath);                     }                     catch (Exception)                     {                         //删除文件失败                      }                     return insertCount; //返回执行成功的条数                 }                 catch (Exception e)                 {                     if (sqlTransaction != null)                     {                         sqlTransaction.Rollback();                     }                     //执行异常                      throw e;                 }             }          }           /// <summary>         /// 将List转化为DataTable核心方法         /// </summary>         /// <returns></returns>         public DataTable ListToDataTable<T>(List<T> data)         {             #region 创建一个DataTable,以实体名称作为DataTable名称              var tableName = typeof(T).Name;             tableName = tableName.ToSnakeCase(); /*实体名称与表名进行转化,主要根据各项目的规定进行转化,不一定就是我这些写的这种转换方式*/             DataTable dt = new DataTable             {                 TableName = tableName             };              #endregion              #region 拿取列名,以实体的属性名作为列名                     var properties = typeof(T).GetProperties();             foreach (var item in properties)             {                 var curFileName = item.Name;                 curFileName = curFileName.ToSnakeCase();/*列名与字段名进行转化,主要根据各项目的规定进行转化,不一定就是我这些写的这种转换方式*/                 dt.Columns.Add(curFileName);             }              #endregion              #region 列赋值             foreach (var item in data)             {                 DataRow dr = dt.NewRow();                 var columns = dt.Columns;                  var curPropertyList = item.GetType().GetProperties();                 foreach (var p in curPropertyList)                 {                     var name = p.Name;                     name = name.ToSnakeCase();/*列名与字段名进行转化,主要根据各项目的规定进行转化,不一定就是我这些写的这种转换方式*/                     var curValue = p.GetValue(item);                      int i = columns.IndexOf(name);                     dr[i] = curValue;                 }                  dt.Rows.Add(dr);             }              #endregion                return dt;         }       }       /// <summary>     /// 批量导入mysql帮助类     /// </summary>     public static class MySqlHelper     {         /// <summary>         /// MySqlBulkLoader批量导入         /// </summary>         /// <param name="_mySqlConnection">数据库连接地址</param>         /// <param name="table"></param>         /// <param name="csvName"></param>         /// <returns></returns>         public static int BulkLoad(MySqlConnection _mySqlConnection, DataTable table, string csvName)         {             var columns = table.Columns.Cast<DataColumn>().Select(colum => colum.ColumnName).ToList();             MySqlBulkLoader bulk = new MySqlBulkLoader(_mySqlConnection)             {                 FieldTerminator = "t",                 FieldQuotationCharacter = '"',                 EscapeCharacter = '"',                 LineTerminator = "rn",                 FileName = csvName,                 NumberOfLinesToSkip = 0,                 TableName = table.TableName,              };              bulk.Columns.AddRange(columns);             return bulk.Load();         }     }       /// <summary>     /// csv扩展     /// </summary>     public static class CSVEx     {         /// <summary>         ///将DataTable转换为标准的CSV文件         /// </summary>         /// <param name="table">数据表</param>         /// <param name="tmpPath">文件地址</param>         /// <returns>返回标准的CSV</returns>         public static void ToCsv(this DataTable table, string tmpPath)         {             //以半角逗号(即,)作分隔符,列为空也要表达其存在。             //列内容如存在半角逗号(即,)则用半角引号(即"")将该字段值包含起来。             //列内容如存在半角引号(即")则应替换成半角双引号("")转义,并用半角引号(即"")将该字段值包含起来。             StringBuilder sb = new StringBuilder();             DataColumn colum;             foreach (DataRow row in table.Rows)             {                 for (int i = 0; i < table.Columns.Count; i++)                 {                     Type _datatype = typeof(DateTime);                     colum = table.Columns[i];                     if (i != 0) sb.Append("t");                     //if (colum.DataType == typeof(string) && row[colum].ToString().Contains(","))                     //{                     //    sb.Append(""" + row[colum].ToString().Replace(""", """") + """);                     //}                     if (colum.DataType == _datatype)                     {                         sb.Append(((DateTime)row[colum]).ToString("yyyy/MM/dd HH:mm:ss"));                     }                     else sb.Append(row[colum].ToString());                 }                 sb.Append("rn");             }             StreamWriter sw = new StreamWriter(tmpPath, false, UTF8Encoding.UTF8);             sw.Write(sb.ToString());             sw.Close();         }      }      /// <summary>     /// 字符串转化     /// </summary>     public static class StringExtensions     {         /// <summary>         /// 转换为 main_keys_id 这种形式的字符串方式         /// </summary>         public static string ToSnakeCase(this string input)         {             if (string.IsNullOrEmpty(input)) { return input; }              var startUnderscores = Regex.Match(input, @"^_+");             return startUnderscores + Regex.Replace(input, @"([a-z0-9])([A-Z])", "$1_$2").ToLower();         }     }       /// <summary>     /// 实体     /// </summary>     public class CrmCouponTestDto     {         /// <summary>         /// ID         /// </summary>         public long Id { get; set; }          /// <summary>         /// 卡券号         /// </summary>              public string CouponCode { get; set; }          /// <summary>         /// 卡券ID         /// </summary>         public int CouponId { get; set; }          /// <summary>         /// 会员ID         /// </summary>         public int MemberId { get; set; }          /// <summary>         /// 发放时间         /// </summary>            public DateTime IssueTime { get; set; }          /// <summary>         /// 使用时间         /// </summary>               public DateTime UsageTime { get; set; }          /// <summary>         /// 使用店铺ID         /// </summary>                public int UsageShopId { get; set; }          /// <summary>         /// 使用单号         /// </summary>               public string UsageBillNo { get; set; }          /// <summary>         /// 有效开始时间         /// </summary>               public DateTime EffectiveStart { get; set; }          /// <summary>         /// 有效结束时间         /// </summary>               public DateTime EffectiveEnd { get; set; }          /// <summary>         /// 状态         /// CouponStatus 卡券状态:         /// -1:未领用         /// 0:未使用         /// 1:已使用         /// 2:已过期         ///3:已作废         ///4:转赠中         /// </summary>          public Int16 Status { get; set; }     } }

 

发表评论

相关文章