java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > Mybatis流式查询ResultHanlde

Mybatis流式查询之ResultHanlde问题

作者:认真的老去

这篇文章主要介绍了Mybatis流式查询之ResultHanlde问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教

前言

正常访问数据库的查询操作,都是根据查询sql一次性返回查询结果。

但如果遇到目标数据量过大、且需要全量查询、不能分页、或者内存不想被返回的结果占用过多等需求时(例如导出excel),就可能需要流式查询。

1.准备工作

1.1.Mybaits的jar包引入

注:idea必须配置build,否则扫描不到src下的xml文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>relife</artifactId>
        <groupId>org.example</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>
    <artifactId>relife-object</artifactId>
    <dependencies>
        <!-- https://mvnrepository.com/artifact/org.openjdk.jol/jol-core -->
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>cglib</groupId>
            <artifactId>cglib</artifactId>
            <version>2.2</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.20</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.mybatis/mybatis -->
        <dependency>
            <groupId>org.mybatis</groupId>
            <artifactId>mybatis</artifactId>
            <version>3.4.6</version>
        </dependency>
    </dependencies>
    <build>
        <resources>
            <resource>
                <directory>src/main/java</directory>
                <includes>
                    <include>**/*.xml</include>
                </includes>
            </resource>
        </resources>
    </build>
</project>

1.2.实体类User

public class User {
    private String name;
    private int age;
    private String id;
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
    public int getAge() {
        return age;
    }
    public void setAge(int age) {
        this.age = age;
    }
    public String getId() {
        return id;
    }
    public void setId(String id) {
        this.id = id;
    }
}

1.3.mybatis-config.xml和userMapper.xml

mybatis-config.xml(resource根目录)

<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE configuration
        PUBLIC "-//mybatis.org//DTD Config 3.0//EN"
        "http://mybatis.org/dtd/mybatis-3-config.dtd">
<configuration>
    <environments default="development">
        <environment id="development">
            <transactionManager type="JDBC"/>
            <dataSource type="POOLED">
                <property name="driver" value="com.mysql.jdbc.Driver"/>
                <property name="url"
                          value="jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&amp;characterEncoding=utf8&amp;serverTimezone=Asia/Shanghai&amp;useSSL=false"/>
                <property name="username" value="root"/>
                <property name="password" value="root"/>
            </dataSource>
        </environment>
    </environments>
    <mappers>
        <mapper resource="com/relife/mybatis/userMapper.xml"/>
    </mappers>
</configuration>

userMapper.xml

<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper
        PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
        "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.relife.object.User">
    <select id="selectUser" resultType="com.relife.object.User">
		select id,name,age from user
	</select>
</mapper>

2.流式处理

2.1流式逐条处理handle

public class UserResultHandler implements ResultHandler<User> {
    @Override
    public void handleResult(ResultContext<? extends User> resultContext) {
        // 这里获取流式查询每次返回的单条结果
        User user = resultContext.getResultObject();
        handle(user);
    }
   // 串行逐条执行handle
    private void handle(User user) {
        System.out.println(user.getId());
    }
}

适用于导出excel等,,上述写法缺点也很明显,单线程,串行,所有效率慢,但是类似导出excel,符合要求,也不太方便使用多线程。

2.2.流式批量多线程处理handle

为了解决效率问题,有时候会对结果有比如发送请求,更行其他内容等需求时

public class UserResultHandler<T> implements ResultHandler<T> {
    public final Logger logger = Logger.getLogger(this.getClass());
    /**
     * 线程池线程数
     */
    private int threadPollNum = 100;
    public UserResultHandler() {
    }
    public UserResultHandler(int threadPollNum) {
        this.threadPollNum = threadPollNum;
    }
    // 线程池
    public ExecutorService executorService = Executors.newFixedThreadPool(threadPollNum);
    // 线程执行结果
    public List<Future> futureList = new ArrayList<>();
    @Override
    public void handleResult(ResultContext<? extends T> resultContext) {
        // 这里获取流式查询每次返回的单条结果
        T user = resultContext.getResultObject();
        while (futureList.size() > 200) {
            try {
                Thread.sleep(1000L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            futureList = futureList.stream().filter(future -> !future.isDone()).collect(Collectors.toList());
            logger.info("条数:" + resultContext.getResultCount() + "->未完成结果" + futureList.size());
        }
        UserThread ut = new UserThread(user);
        Future<?> future = executorService.submit(ut);
        futureList.add(future);
    }
    /**
     * 保证所有线程执行完成,并关闭线程池
     */
    public void end() {
        while (futureList.size() != 0) {
            try {
                Thread.sleep(1000L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            futureList = futureList.stream().filter(future -> !future.isDone()).collect(Collectors.toList());
        }
        executorService.shutdown();
    }
    public static class UserThread<T> implements Runnable {
        private T obejct;
        public UserThread(T obejct) {
            this.obejct = obejct;
        }
        @Override
        public void run() {
            System.out.println(((User)obejct).getId());
        }
        public T getObejct() {
            return obejct;
        }
        public void setObejct(T obejct) {
            this.obejct = obejct;
        }
    }
}

上述写法,可用于需要线程返回值的,或者明确需要线程执行完成,且可以保证不占用过多内存。

其中Thread.sleep()方法,可以自定义时间。

3.测试

3.1.测试案例

public static void main(String[] args) throws IOException {
        SqlSessionFactoryBuilder builder = new SqlSessionFactoryBuilder();
        SqlSessionFactory sqlSessionFactory = builder.build(Resources.getResourceAsStream("mybatis-config.xml"));
        System.out.println("sqlSessionFactory:" + sqlSessionFactory);
        SqlSession sqlSession = sqlSessionFactory.openSession();
        // 正常查询
        List<User> userList5 = sqlSession.selectList("selectUser");
        // 流式查询
        UserResultHandler userResultHandler = new UserResultHandler(100);
        sqlSession.select("selectUser", userResultHandler);
        userResultHandler.end();
        sqlSession.close();
    }

3.1.正常查询debug调用

调用图

在这里插入图片描述

3.2.流式查询debug调用

调用图

在这里插入图片描述

4.问题与部分源码解析

4.1加入resulthandle为什么会流式查询?

在DefaultResultSetHandler类中handleResultSet(ResultSetWrapper rsw, ResultMap resultMap, List multipleResults, ResultMapping parentMapping) throws SQLException 方法中出现了调用区别一个303行,一个206行

private void handleResultSet(ResultSetWrapper rsw, ResultMap resultMap, List<Object> multipleResults, ResultMapping parentMapping) throws SQLException {
    try {
      if (parentMapping != null) {
        handleRowValues(rsw, resultMap, null, RowBounds.DEFAULT, parentMapping);
      } else {
      	// 有的话就使用传入的ResultHandle,没有就用默认的DefaultResultHandler
        if (resultHandler == null) {
          DefaultResultHandler defaultResultHandler = new DefaultResultHandler(objectFactory);
          handleRowValues(rsw, resultMap, defaultResultHandler, rowBounds, null);
          // 指定handle时,把结果集放到multipleResults中
          multipleResults.add(defaultResultHandler.getResultList());
        } else {
          // 非指定handle时,multipleResults中不会有结果
          handleRowValues(rsw, resultMap, resultHandler, rowBounds, null);
        }
      }
    } finally {
      // issue #228 (close resultsets)
      closeResultSet(rsw.getResultSet());
    }
  }

这个if判断直接导致了最后实际调用callResultHandler时的区别。

  private void callResultHandler(ResultHandler<?> resultHandler, DefaultResultContext<Object> resultContext, Object rowValue) {
    resultContext.nextResultObject(rowValue);
    // 加入handle的会调用自己的代码实现,没有就用默认的DefaultResultHandler里的方法
    ((ResultHandler<Object>) resultHandler).handleResult(resultContext);
  }

4.2.流式查询为什么没有返回值?

这个可以从两个方向探讨

1.DefaultSqlSession

 // 正常查询
  @Override
  public <E> List<E> selectList(String statement, Object parameter, RowBounds rowBounds) {
    try {
      MappedStatement ms = configuration.getMappedStatement(statement);
      return executor.query(ms, wrapCollection(parameter), rowBounds, Executor.NO_RESULT_HANDLER);
    } catch (Exception e) {
      throw ExceptionFactory.wrapException("Error querying database.  Cause: " + e, e);
    } finally {
      ErrorContext.instance().reset();
    }
  }
 // 流式查询
 @Override
  public void select(String statement, Object parameter, RowBounds rowBounds, ResultHandler handler) {
    try {
      MappedStatement ms = configuration.getMappedStatement(statement);
      // 此方法有返回结果,但是为空,不需要返回(见下边方法)
      executor.query(ms, wrapCollection(parameter), rowBounds, handler);
    } catch (Exception e) {
      throw ExceptionFactory.wrapException("Error querying database.  Cause: " + e, e);
    } finally {
      ErrorContext.instance().reset();
    }
  }

2.DefaultResultSetHandler

@Override
  public List<Object> handleResultSets(Statement stmt) throws SQLException {
    ErrorContext.instance().activity("handling results").object(mappedStatement.getId());
	// 初始化返回值
    final List<Object> multipleResults = new ArrayList<Object>();
    int resultSetCount = 0;
    ResultSetWrapper rsw = getFirstResultSet(stmt);
    List<ResultMap> resultMaps = mappedStatement.getResultMaps();
    int resultMapCount = resultMaps.size();
    validateResultMapsCount(rsw, resultMapCount);
    while (rsw != null && resultMapCount > resultSetCount) {
      ResultMap resultMap = resultMaps.get(resultSetCount);
      // 返回值会在调用方法时,被赋值,从上述4.1可看出,只有不传值时才会把结果放入multipleResults
      handleResultSet(rsw, resultMap, multipleResults, null);
      rsw = getNextResultSet(stmt);
      cleanUpAfterHandlingResultSet();
      resultSetCount++;
    }
    String[] resultSets = mappedStatement.getResultSets();
    if (resultSets != null) {
      while (rsw != null && resultSetCount < resultSets.length) {
        ResultMapping parentMapping = nextResultMaps.get(resultSets[resultSetCount]);
        if (parentMapping != null) {
          String nestedResultMapId = parentMapping.getNestedResultMapId();
          ResultMap resultMap = configuration.getResultMap(nestedResultMapId);
          handleResultSet(rsw, resultMap, null, parentMapping);
        }
        rsw = getNextResultSet(stmt);
        cleanUpAfterHandlingResultSet();
        resultSetCount++;
      }
    }
    return collapseSingleResultList(multipleResults);
  }
  // 只有一条结果,就直接取出,多个或空,不处理
  private List<Object> collapseSingleResultList(List<Object> multipleResults) {
    return multipleResults.size() == 1 ? (List<Object>) multipleResults.get(0) : multipleResults;
  }

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。

您可能感兴趣的文章:
阅读全文