详解MySqlBulkLoader的使用
作者:搬砖滴
mysql数据库:最近要写一个服务,跨库数据同步,目前数据量大约一万,以后会越来越多,考虑到扩展性,数据的插入操作就采用了MySqlBulkLoader。本文分两部分来写,第一部分写一下MySqlBulkLoader的使用,第二部分记录使用过程中出现的问题。
一、MySqlBulkLoader的使用
我们先来定义个数据表student,表结构如下:
创建一个core控制台项目,相关代码如下:
入口代码:
using System; using System.Collections.Generic; namespace MySqlBulkLoaderDemo { class Program { static void Main(string[] args) { //装载30个数据 List<Student> stuList = new List<Student>(); for (int i = 0; i < 30; i++) { stuList.Add( new Student { Guid = Guid.NewGuid().ToString(), Name = "QXH", Age = new Random().Next(1, 30) }); } //调用MySqlBulkLoader,往student表中插入stuList int insertCount = MySqlBulkLoaderHelper.BulkInsert<Student>(stuList, "student"); Console.WriteLine($"成功插入{insertCount}条数据"); Console.ReadKey(); } } }
定义一个Student映射类:
using System; using System.Collections.Generic; using System.Text; namespace MySqlBulkLoaderDemo { public class Student { public string Guid { get; set; } public string Name { get; set; } public int Age { get; set; } } }
定义一个MySqlBulkLoaderHelper类,用于存放相关方法:
using MySql.Data.MySqlClient; using System; using System.Collections.Generic; using System.ComponentModel.DataAnnotations.Schema; using System.Data; using System.IO; using System.Linq; using System.Text; namespace MySqlBulkLoaderDemo { public class MySqlBulkLoaderHelper { const string ConnectionString = "server=localhost;port=3306;user=root;password=123456;database=mysql;SslMode = none;AllowLoadLocalInfile=true"; public static int BulkInsert<T>(List<T> entities, string tableName) { DataTable dt = entities.ToDataTable(); using (MySqlConnection conn = new MySqlConnection()) { conn.ConnectionString = ConnectionString; if (conn.State != ConnectionState.Open) { conn.Open(); } if (tableName.IsNullOrEmpty()) { var tableAttribute = typeof(T).GetCustomAttributes(typeof(TableAttribute), true).FirstOrDefault(); if (tableAttribute != null) tableName = ((TableAttribute)tableAttribute).Name; else tableName = typeof(T).Name; } int insertCount = 0; string tmpPath = Path.Combine(Path.GetTempPath(), DateTime.Now.Ticks.ToString() + "_" + Guid.NewGuid().ToString() + ".tmp"); string csv = dt.ToCsvStr(); File.WriteAllText(tmpPath, csv, Encoding.UTF8); using (MySqlTransaction tran = conn.BeginTransaction()) { MySqlBulkLoader bulk = new MySqlBulkLoader(conn) { FieldTerminator = ",", FieldQuotationCharacter = '"', EscapeCharacter = '"', LineTerminator = "\r\n", FileName = tmpPath, Local = true, NumberOfLinesToSkip = 0, TableName = tableName, CharacterSet = "utf8" }; try { bulk.Columns.AddRange(dt.Columns.Cast<DataColumn>().Select(colum => colum.ColumnName).ToList()); insertCount = bulk.Load(); tran.Commit(); } catch (MySqlException ex) { if (tran != null) tran.Rollback(); throw ex; } } File.Delete(tmpPath); return insertCount; } } } }
定义一个帮助类ExtentionHelper,主要是扩展方法:
using Newtonsoft.Json; using System; using System.Collections.Generic; using System.Data; using System.Text; namespace MySqlBulkLoaderDemo { public static class ExtentionHelper { /// <summary> /// 将对象序列化成Json字符串 /// </summary> /// <param name="obj">需要序列化的对象</param> /// <returns></returns> public static string ToJson(this object obj) { return JsonConvert.SerializeObject(obj); } /// <summary> /// 将Json字符串转为DataTable /// </summary> /// <param name="jsonStr">Json字符串</param> /// <returns></returns> public static DataTable ToDataTable(this string jsonStr) { return jsonStr == null ? null : JsonConvert.DeserializeObject<DataTable>(jsonStr); } /// <summary> /// 将IEnumerable'T'转为对应的DataTable /// </summary> /// <typeparam name="T">数据模型</typeparam> /// <param name="iEnumberable">数据源</param> /// <returns>DataTable</returns> public static DataTable ToDataTable<T>(this IEnumerable<T> iEnumberable) { return iEnumberable.ToJson().ToDataTable(); } /// <summary> /// 判断是否为Null或者空 /// </summary> /// <param name="obj">对象</param> /// <returns></returns> public static bool IsNullOrEmpty(this object obj) { if (obj == null) return true; else { string objStr = obj.ToString(); return string.IsNullOrEmpty(objStr); } } /// <summary> ///将DataTable转换为标准的CSV字符串 /// </summary> /// <param name="dt">数据表</param> /// <returns>返回标准的CSV</returns> public static string ToCsvStr(this DataTable dt) { //以半角逗号(即,)作分隔符,列为空也要表达其存在。 //列内容如存在半角逗号(即,)则用半角引号(即"")将该字段值包含起来。 //列内容如存在半角引号(即")则应替换成半角双引号("")转义,并用半角引号(即"")将该字段值包含起来。 StringBuilder sb = new StringBuilder(); DataColumn colum; foreach (DataRow row in dt.Rows) { for (int i = 0; i < dt.Columns.Count; i++) { colum = dt.Columns[i]; if (i != 0) sb.Append(","); if (colum.DataType == typeof(string) && row[colum].ToString().Contains(",")) { sb.Append("\"" + row[colum].ToString().Replace("\"", "\"\"") + "\""); } else sb.Append(row[colum].ToString()); } sb.AppendLine(); } return sb.ToString(); } } }
完整项目:MySqlBulkLoaderDemo
运行结果如下:
二、MySqlBulkLoader使用过程中出现的问题
上边已经完整了介绍了MySqlBulkLoader的使用,但是在使用过程中出现了很多问题,主要集中在两方面,第一个方面是Mysql数据库不支持加载本地文件数据;第二个方面是我的数据库在阿里云服务器上,而代码在本地,换句话说数据库和项目是分别放在不同服务器上的。
1、Mysql数据库不支持加载本地文件数据
(1)MySQLBulkLoader原理?
我们结合SQLBulkCopy来说,用过SqlServer数据库的都熟悉SQLBulkCopy,很方便,可以直接将datatable中的数据批量导入到数据库。与SQLBulkCopy不同,MySQLBulkLoader也称为LOAD DATA INFILE,他要从文件读取数据,所以我们需要将我们的数据集(如上边的List<Student>)保存到文件,然后再从文件里面读取。而对于Mysql来说,为了数据库的安全,本地导入文件的配置没有开启,所以使用MySQLBulkLoader批量导入数据库,就需要mysql数据库支持本地导入文件。否则会出现以下错误:
The used command is not allowed with this MySQL version
(2)解决方案
mysql数据库开启允许本地导入数据的配置,命令如下:
SET GLOBAL local_infile=1;//1表示开启,0表示关闭
查看该配置的状态命令如下:
SHOW VARIABLES LIKE '%local%';
在项目里面的数据库连接字符串做设置
数据库连接字符串要加上”AllowLoadLocalInfile=true“,如下:
const string ConnectionString = "server=localhost;port=3306;user=root;password=123456;database=mysql;SslMode = none;AllowLoadLocalInfile=true";
2、数据库和项目是分别放在不同服务器上
(1)问题描述
数据库和项目是分别放在不同服务器上,会造成以下问题:
System.NotSupportedException HResult=0x80131515 Message=To use MySqlBulkLoader.Local=true, set AllowLoadLocalInfile=true in the connection string. See https://fl.vu/mysql-load-data
(2)原因
因为项目中将数据集生成的文件保存在了项目所在的服务器,另一个服务器上的数据库在插入数据操作时,找不到数据集文件,导致的错误
(3)解决方法
方法很简单,因为数据库并不在项目所在的服务器,所以MySqlBulkLoader中要设置Local = true
读取本地文件,进行导入。具体代码如下:
(4)总结
如果你的项目和数据库在一台服务器上,那么就不会出现该问题。