Mybatis-Plus insertBatch执行缓慢的原因查询
作者:江畔独步
背景
最近在SpringCloud项目中, 使用Mybatis-Plus执行一个88万条左右的数据插入MySQL数据库的操作时,发现执行时长竟然长达2个小时,按理讲,MP框架执行如下批处理操作时:
- XXService.insertBatch()
- XXService.updateBatchById()
- xxService.deleteBatchIds()
- xxService.selectBatchIds
在jdbc的底层将使用 stmt.addBatch() 和 stmt.executeBatch()方法,根据以往使用原生jdbc 批处理方式 执行sql的经验来看,执行性能非常高,处理时长非常短.
即使使用MP封装对jdbc底层的操作,但这个场景下执行如此缓慢,此处必定有幺蛾子.
排查历程
一. 设置开启控制台完整sql打印模式
方式①:
application-[dev|prod|mybatisplus].yml 中 添加 log-impl参数项(如下示例最后一项):
configuration: #配置返回数据库(column下划线命名&&返回java实体是驼峰命名),自动匹配无需as(没开启这个,SQL需要写as: select user_id as userId) map-underscore-to-camel-case: true cache-enabled: false jdbc-type-for-null: null log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
方式②:
设置logback打印详细日志:
logging: config: classpath:logback-spring.xml level: root: debug #测试sql语句打印时,请置为 debug
方式③:
mybatis: configuration: log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
原理同①
二. 启动工程,查看sql日志
启动工程前,将第二个参数batchSize 调小为 3:
status = insertBatch(list,3);
启动项目,可以看到每每4行会有一个循环:
分别是:
第一条sql语句为insert into插入语句模板:
第二/三/四条语句为数据条目,可以明显看到与上述batchSize配置一样,为3条。
==> Preparing: INSERT INTO test_table ( `day`, day_time, plat_type, uv,created,updated ) VALUES ( ?, ?, ?, ?,?,? ) ==> Parameters: 2020-05-31 00:00:00.0(Timestamp), 2020-05-31 14:00:00.0(Timestamp), Android(String), 11(Integer), 2020-06-05 12:23:24.437(Timestamp), 2020-06-05 12:23:24.437(Timestamp) ==> Parameters: 2020-05-31 00:00:00.0(Timestamp), 2020-05-31 14:00:00.0(Timestamp), IOS(String), 22(Integer), 2020-06-05 12:23:24.437(Timestamp), 2020-06-05 12:23:24.437(Timestamp) ==> Parameters: 2020-05-31 00:00:00.0(Timestamp), 2020-05-31 14:00:00.0(Timestamp), MP(String), 33(Integer), 2020-06-05 12:23:24.437(Timestamp), 2020-06-05 12:23:24.437(Timestamp) ==> Preparing: INSERT INTO test_table ( `day`, day_time, plat_type, uv,created,updated ) VALUES ( ?, ?, ?, ?,?,? ) ==> Parameters: 2020-05-31 00:00:00.0(Timestamp), 2020-05-31 14:00:00.0(Timestamp), H5(String), 44(Integer), 2020-06-05 12:23:24.437(Timestamp), 2020-06-05 12:23:24.437(Timestamp) ==> Parameters: 2020-05-31 00:00:00.0(Timestamp), 2020-05-31 14:00:00.0(Timestamp), PC(String), 55(Integer), 2020-06-05 12:23:24.437(Timestamp), 2020-06-05 12:23:24.437(Timestamp) ==> Parameters: 2020-05-31 00:00:00.0(Timestamp), 2020-05-31 14:00:00.0(Timestamp), QuickApp(String), 66(Integer), 2020-06-05 12:23:24.437(Timestamp), 2020-06-05 12:23:24.437(Timestamp) ....
Console这种查看方式,显示的sql不太直观,无法判断后台发送了几条,是执行的单条insert还是insertBatch,打开服务端mysql log日志(位置: /tmp/mysql.sql)查看:
命令: tail -100f /tmp/mysql.log,然后执行业务逻辑批插入.
如下图所示,MP里的insertBatch 语句 确实是按单次插入给执行了:
111 Query SELECT @@session.tx_read_only 111 Query SELECT @@session.tx_isolation 111 Query SELECT @@session.tx_read_only 111 Query INSERT INTO test_table ( `day`, day_time, plat_type, uv,created,updated ) VALUES ( '2020-05-31 00:00:00.0', '2020-05-31 00:00:00.0', 'Android', 11,'2020-06-05 12:23:24.437','2020-06-05 12:23:24.437' ) 111 Query INSERT INTO test_table ( `day`, day_time, plat_type, uv,created,updated ) VALUES ( '2020-05-31 00:00:00.0', '2020-05-31 00:00:00.0', 'IOS', 22,'2020-06-05 12:23:24.437','2020-06-05 12:23:24.437' ) 111 Query INSERT INTO test_table ( `day`, day_time, plat_type, uv,created,updated ) VALUES ( '2020-05-31 00:00:00.0', '2020-05-31 00:00:00.0', 'MP', 33,'2020-06-05 12:23:24.437','2020-06-05 12:23:24.437' )
三. 跟踪MY insertBatch源码
个人理解以注释形式填入以下源码中.
/** * 批量插入 * * @param entityList * @param batchSize * @return */ @Transactional(rollbackFor = Exception.class) @Override public boolean insertBatch(List<T> entityList, int batchSize) { if (CollectionUtils.isEmpty(entityList)) { throw new IllegalArgumentException("Error: entityList must not be empty"); } try (SqlSession batchSqlSession = sqlSessionBatch()) { //数据总条数 int size = entityList.size(); //根据枚举中定义的模板, 形成上述日志中sql插入语句,如下: //Preparing: INSERT INTO test_table ( `day`, day_time, plat_type, uv,created,updated ) VALUES ( ?, ?, ?, ?,?,? ) String sqlStatement = sqlStatement(SqlMethod.INSERT_ONE); //核心批量处理代码, 使用循环的方式, 以类似原生addBatch 和 executeBatch 方法的方式实现批量处理 for (int i = 0; i < size; i++) { //类比原生jdbc的 addBatch 方法,在此之前,先根据本条数据, 构建insert into插入语句. batchSqlSession.insert(sqlStatement, entityList.get(i)); //如果数据条数为2条以上, 且能被batchSize整除, 则向DBMS批量提交执行一次 if (i >= 1 && i % batchSize == 0) { batchSqlSession.flushStatements(); } } //剩余不能整除的少量数据(小于batchSize), 再向DBMS批量提交执行一次 batchSqlSession.flushStatements(); } catch (Throwable e) { throw new MybatisPlusException("Error: Cannot execute insertBatch Method. Cause", e); } return true; }
SqlMethod.INSERT_ONE枚举值定义的类源码:
/** * <p> * MybatisPlus 支持 SQL 方法 * </p> * * @author hubin * @Date 2016-01-23 */ public enum SqlMethod { /** * 插入 */ INSERT_ONE("insert", "插入一条数据(选择字段插入)", "<script>INSERT INTO %s %s VALUES %s</script>"), INSERT_ONE_ALL_COLUMN("insertAllColumn", "插入一条数据(全部字段插入)", "<script>INSERT INTO %s %s VALUES %s</script>"), /** * 删除 */ DELETE_BY_ID("deleteById", "根据ID 删除一条数据", "<script>DELETE FROM %s WHERE %s=#{%s}</script>"), DELETE_BY_MAP("deleteByMap", "根据columnMap 条件删除记录", "<script>DELETE FROM %s %s</script>"), DELETE("delete", "根据 entity 条件删除记录", "<script>DELETE FROM %s %s</script>"), DELETE_BATCH_BY_IDS("deleteBatchIds", "根据ID集合,批量删除数据", "<script>DELETE FROM %s WHERE %s IN (%s)</script>"), /** * 逻辑删除 */ LOGIC_DELETE_BY_ID("deleteById", "根据ID 逻辑删除一条数据", "<script>UPDATE %s %s WHERE %s=#{%s}</script>"), LOGIC_DELETE_BY_MAP("deleteByMap", "根据columnMap 条件逻辑删除记录", "<script>UPDATE %s %s %s</script>"), LOGIC_DELETE("delete", "根据 entity 条件逻辑删除记录", "<script>UPDATE %s %s %s</script>"), LOGIC_DELETE_BATCH_BY_IDS("deleteBatchIds", "根据ID集合,批量逻辑删除数据", "<script>UPDATE %s %s WHERE %s IN (%s)</script>"), /** * 修改 */ UPDATE_BY_ID("updateById", "根据ID 选择修改数据", "<script>UPDATE %s %s WHERE %s=#{%s} %s</script>"), UPDATE_ALL_COLUMN_BY_ID("updateAllColumnById", "根据ID 修改全部数据", "<script>UPDATE %s %s WHERE %s=#{%s} %s</script>"), UPDATE("update", "根据 whereEntity 条件,更新记录", "<script>UPDATE %s %s %s</script>"), UPDATE_FOR_SET("updateForSet", "根据 whereEntity 条件,自定义Set值更新记录", "<script>UPDATE %s %s %s</script>"), /** * 逻辑删除 -> 修改 */ LOGIC_UPDATE_BY_ID("updateById", "根据ID 修改数据", "<script>UPDATE %s %s WHERE %s=#{%s} %s</script>"), LOGIC_UPDATE_ALL_COLUMN_BY_ID("updateAllColumnById", "根据ID 选择修改数据", "<script>UPDATE %s %s WHERE %s=#{%s} %s</script>"), /** * 查询 */ SELECT_BY_ID("selectById", "根据ID 查询一条数据", "SELECT %s FROM %s WHERE %s=#{%s}"), SELECT_BY_MAP("selectByMap", "根据columnMap 查询一条数据", "<script>SELECT %s FROM %s %s</script>"), SELECT_BATCH_BY_IDS("selectBatchIds", "根据ID集合,批量查询数据", "<script>SELECT %s FROM %s WHERE %s IN (%s)</script>"), SELECT_ONE("selectOne", "查询满足条件一条数据", "<script>SELECT %s FROM %s %s</script>"), SELECT_COUNT("selectCount", "查询满足条件总记录数", "<script>SELECT COUNT(1) FROM %s %s</script>"), SELECT_LIST("selectList", "查询满足条件所有数据", "<script>SELECT %s FROM %s %s</script>"), SELECT_PAGE("selectPage", "查询满足条件所有数据(并翻页)", "<script>SELECT %s FROM %s %s</script>"), SELECT_MAPS("selectMaps", "查询满足条件所有数据", "<script>SELECT %s FROM %s %s</script>"), SELECT_MAPS_PAGE("selectMapsPage", "查询满足条件所有数据(并翻页)", "<script>SELECT %s FROM %s %s</script>"), SELECT_OBJS("selectObjs", "查询满足条件所有数据", "<script>SELECT %s FROM %s %s</script>"), /** * 逻辑删除 -> 查询 */ LOGIC_SELECT_BY_ID("selectById", "根据ID 查询一条数据", "SELECT %s FROM %s WHERE %s=#{%s} %s"), LOGIC_SELECT_BATCH_BY_IDS("selectBatchIds", "根据ID集合,批量查询数据", "<script>SELECT %s FROM %s WHERE %s IN (%s) %s</script>"); private final String method; private final String desc; private final String sql; SqlMethod(final String method, final String desc, final String sql) { this.method = method; this.desc = desc; this.sql = sql; } public String getMethod() { return this.method; } public String getDesc() { return this.desc; } public String getSql() { return this.sql; } }
经过MP复杂的封装 和 调用,最终定位到 insertBatch 的底层执行逻辑方法为doFlushStatements(boolean isRollback) ,位于
org.apache.ibatis.executor.BatchExecutor.java中,如下:
@Override public List<BatchResult> doFlushStatements(boolean isRollback) throws SQLException { try { //这里的 BatchResult 并非为批操作执行后的结果集, 而是 mybatis封装的MappedStatement(类比jdbc中的PrepareStatement), 加上 sql 语句, 再加上输入参数parameterObject的包装类. List<BatchResult> results = new ArrayList<BatchResult>(); //如果sql被回滚,则返回空集合. if (isRollback) { return Collections.emptyList(); } //遍历 Statement for (int i = 0, n = statementList.size(); i < n; i++) { Statement stmt = statementList.get(i); applyTransactionTimeout(stmt); //将Statement/sql/与请求参数包装成BatchResult对象 BatchResult batchResult = batchResultList.get(i); try { //Statement执行的executeBatch条数赋值给updateCounts字段. batchResult.setUpdateCounts(stmt.executeBatch()); MappedStatement ms = batchResult.getMappedStatement(); List<Object> parameterObjects = batchResult.getParameterObjects(); //获取KeyGenerator KeyGenerator keyGenerator = ms.getKeyGenerator(); //如果是jdbc类型的KeyGenerator, 走下面的processBatch方法, 进而走jdbc底层的executeBatch方法. if (Jdbc3KeyGenerator.class.equals(keyGenerator.getClass())) { Jdbc3KeyGenerator jdbc3KeyGenerator = (Jdbc3KeyGenerator) keyGenerator; jdbc3KeyGenerator.processBatch(ms, stmt, parameterObjects); } else if (!NoKeyGenerator.class.equals(keyGenerator.getClass())) { //issue #141 for (Object parameter : parameterObjects) { keyGenerator.processAfter(this, ms, stmt, parameter); } } // Close statement to close cursor #1109 closeStatement(stmt); } catch (BatchUpdateException e) { //异常返回的消息包装 StringBuilder message = new StringBuilder(); message.append(batchResult.getMappedStatement().getId()) .append(" (batch index #") .append(i + 1) .append(")") .append(" failed."); if (i > 0) { message.append(" ") .append(i) .append(" prior sub executor(s) completed successfully, but will be rolled back."); } throw new BatchExecutorException(message.toString(), e, results, batchResult); } results.add(batchResult); } return results; } finally { //数据库连接相关的资源释放 for (Statement stmt : statementList) { closeStatement(stmt); } currentSql = null; statementList.clear(); batchResultList.clear(); } }
通过上述代码,看到了其底层调用了jdbc的 stmt.executeBatch() 方法.
四. 跟踪JDBC executeBatch() 源码
打开本工程使用的mysql驱动源码(mysql-connector-java-8.0.16-sources.jar) ,在StatementImpl.java 类中,可以看到批处理执行语句executeBatch()方法如下:
@Override public int[] executeBatch() throws SQLException { return Util.truncateAndConvertToInt(executeBatchInternal()); }
其又调用了executeBatchInternal()方法:
该方法源码如下:
protected long[] executeBatchInternal() throws SQLException { JdbcConnection locallyScopedConn = checkClosed(); synchronized (locallyScopedConn.getConnectionMutex()) { if (locallyScopedConn.isReadOnly()) { throw SQLError.createSQLException(Messages.getString("Statement.34") + Messages.getString("Statement.35"), MysqlErrorNumbers.SQL_STATE_ILLEGAL_ARGUMENT, getExceptionInterceptor()); } implicitlyCloseAllOpenResults(); List<Object> batchedArgs = this.query.getBatchedArgs(); if (batchedArgs == null || batchedArgs.size() == 0) { return new long[0]; } // we timeout the entire batch, not individual statements int individualStatementTimeout = getTimeoutInMillis(); setTimeoutInMillis(0); CancelQueryTask timeoutTask = null; try { resetCancelledState(); statementBegins(); try { this.retrieveGeneratedKeys = true; // The JDBC spec doesn't forbid this, but doesn't provide for it either...we do.. long[] updateCounts = null; if (batchedArgs != null) { int nbrCommands = batchedArgs.size(); this.batchedGeneratedKeys = new ArrayList<>(batchedArgs.size()); boolean multiQueriesEnabled = locallyScopedConn.getPropertySet().getBooleanProperty(PropertyKey.allowMultiQueries).getValue(); if (multiQueriesEnabled || (locallyScopedConn.getPropertySet().getBooleanProperty(PropertyKey.rewriteBatchedStatements).getValue() && nbrCommands > 4)) { return executeBatchUsingMultiQueries(multiQueriesEnabled, nbrCommands, individualStatementTimeout); } timeoutTask = startQueryTimer(this, individualStatementTimeout); updateCounts = new long[nbrCommands]; for (int i = 0; i < nbrCommands; i++) { updateCounts[i] = -3; } SQLException sqlEx = null; int commandIndex = 0; for (commandIndex = 0; commandIndex < nbrCommands; commandIndex++) { try { String sql = (String) batchedArgs.get(commandIndex); updateCounts[commandIndex] = executeUpdateInternal(sql, true, true); if (timeoutTask != null) { // we need to check the cancel state on each iteration to generate timeout exception if needed checkCancelTimeout(); } // limit one generated key per OnDuplicateKey statement getBatchedGeneratedKeys(this.results.getFirstCharOfQuery() == 'I' && containsOnDuplicateKeyInString(sql) ? 1 : 0); } catch (SQLException ex) { updateCounts[commandIndex] = EXECUTE_FAILED; if (this.continueBatchOnError && !(ex instanceof MySQLTimeoutException) && !(ex instanceof MySQLStatementCancelledException) && !hasDeadlockOrTimeoutRolledBackTx(ex)) { sqlEx = ex; } else { long[] newUpdateCounts = new long[commandIndex]; if (hasDeadlockOrTimeoutRolledBackTx(ex)) { for (int i = 0; i < newUpdateCounts.length; i++) { newUpdateCounts[i] = java.sql.Statement.EXECUTE_FAILED; } } else { System.arraycopy(updateCounts, 0, newUpdateCounts, 0, commandIndex); } sqlEx = ex; break; //throw SQLError.createBatchUpdateException(ex, newUpdateCounts, getExceptionInterceptor()); } } } if (sqlEx != null) { throw SQLError.createBatchUpdateException(sqlEx, updateCounts, getExceptionInterceptor()); } } if (timeoutTask != null) { stopQueryTimer(timeoutTask, true, true); timeoutTask = null; } return (updateCounts != null) ? updateCounts : new long[0]; } finally { this.query.getStatementExecuting().set(false); } } finally { stopQueryTimer(timeoutTask, false, false); resetCancelledState(); setTimeoutInMillis(individualStatementTimeout); clearBatch(); } } }
其中,关键的代码为:
if (multiQueriesEnabled || (locallyScopedConn.getPropertySet().getBooleanProperty(PropertyKey.rewriteBatchedStatements).getValue() && nbrCommands > 4)) { return executeBatchUsingMultiQueries(multiQueriesEnabled, nbrCommands, individualStatementTimeout); }
能进入if语句,并执行批处理方法 executeBatchUsingMultiQueryies 的条件我:
①. multiQueriesEnables = true
PropertyKey.java中定义了 multiQueriesEnables 的枚举值,如下:
allowMultiQueries("allowMultiQueries", true);
②. 数据库url连接参数封装的connection对象的" rewriteBatchedStatements "属性设置为true,并且 数据总条数 > 4条.
根据以上语句,可以得出:
通过在jdbc的连接url处,设置:
A). &rewriteBatchedStatements=true
B). &allowMultiQueries=true
结合起来可以实现真正的批量insertBatch功能(另单独设置A也可以达成目标),批量update,批量delete同理.
完整的springcloud+druid数据库连接池实现多数据源配置:
spring: profiles: name: test_multi_datasource aop: proxy-target-class: true auto: true datasource: druid: #DBMS1 db1: url: jdbc:mysql://IP:PORT/database_1?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=utf8&useSSL=false&rewriteBatchedStatements=true username: xxxx password: xxxx driver-class-name: com.mysql.cj.jdbc.Driver initialSize: 5 minIdle: 5 maxActive: 20 #DBMS1 db2: url: jdbc:mysql://IP:PORT/database_2?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=utf8&useSSL=false&rewriteBatchedStatements=true username: yyyy password: yyyy driver-class-name: com.mysql.cj.jdbc.Driver initialSize: 5 minIdle: 5 maxActive: 20
总结
以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。