java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > Mybatis流式查询将结果分批写入文件

Mybatis流式查询并实现将结果分批写入文件

作者:isTrueLoveColour

这篇文章主要介绍了Mybatis流式查询并实现将结果分批写入文件方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教

Mybatis流式查询并将结果分批写入文件

    /**
     * 流式查询,全量导出
     *
     * @param req  查询条件
     * @param size 单个文件数据最大条数
     * @return
     */
    @ApiOperation(value = "流式查询,全量导出")
    @GetMapping("/streamAll")
    public BaseResultModel streamAll(ReqBillRecordBackQuery req, Integer size) {
        try {
            billRecordBackService.streamAll(req, size);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
        return BaseResultModel.success();
    }

以xml的方式

 @Override
    @Transactional
    public void streamAll(ReqBillRecordBackQuery req, Integer size) throws Exception {
        exportXml(req,size);
    }
    private void exportXml(ReqBillRecordBackQuery req, Integer size) throws Exception{
        //文件内容行数
        Integer in = 0;
        //文件名称
        Integer fileName=0;
        String name = "exportTest";
        String suf =".txt";
        String path = "H:\\新建文件夹\\新建文件夹\\export\\";
        File ff = new File(path);
        //递归删除目录中的所有文件和子目录,而不删除目录本身。
        FileUtils.cleanDirectory(ff);
        File fe = new File(path+name+fileName+suf);
        //删除此抽象路径名表示的文件或目录
        //mkdirs()可以建立多级文件夹, mkdir()只会建立一级的文件夹
//        fe.mkdirs();
        //获取文件输出列
        BufferedWriter bufferedWriter=new BufferedWriter(new FileWriter(fe));
        StringBuilder sb = new StringBuilder();
        Cursor<BillRecordBack> billRecordBacks = mapper.streamAll(req);
        for (BillRecordBack bil : billRecordBacks) {
            sb.append(bil).append("\n");
            in++;
            if (in>=size){
                in=0;
                fileName++;
                fe = new File(path+name+fileName+suf);
                bufferedWriter = new BufferedWriter(new FileWriter(fe));
            }
            bufferedWriter.write(sb.toString());
            //将StringBuilder数据重置
            sb.setLength(0);
        }
        //最后需要自己关闭流
        billRecordBacks.close();
        bufferedWriter.close();
    }
    <select id="streamAll" resultType="com.psh.hik.entity.BillRecordBack" fetchSize="5000">
        select t_id,r_id,r_time,r_number,descd,deleted,ctime,crname,mtime,chname from  bill_record_back
        <where>
            <if test="null != param.rTime and ''!= param.rTime">
              ctime = #{param.rTime}
            </if>
            <if test="null != param.rNumber and ''!= param.rNumber">
                ctime = #{param.rNumber}
            </if>
        </where>
    </select>

以mybatis-plus的方式

    private void exportNote(ReqBillRecordBackQuery req, Integer size) throws Exception{
        //lambda表达式访问外部变量有一个非常重要的限制:变量不可变(只是引用不可变,而不是真正的不可变),AtomicInteger是一个提供原子操作的Integer类,通过线程安全的方式操作加减。
        //文件内容行数
        AtomicInteger in = new AtomicInteger(1);
        //文件名称
        AtomicInteger fileName= new AtomicInteger(0);
        String name = "exportTest";
        String suf =".txt";
        String path = "H:\\新建文件夹\\新建文件夹\\export\\";
        File ff = new File(path);
        //递归删除目录中的所有文件和子目录,而不删除目录本身。
        FileUtils.cleanDirectory(ff);
        AtomicReference<File> fe = new AtomicReference<>(new File(path + name + fileName + suf));
        AtomicReference<BufferedWriter> bufferedWriter= new AtomicReference<>(new BufferedWriter(new FileWriter(fe.get())));
        StringBuilder sb = new StringBuilder();
        mapper.exportNote(req,resultContext -> {
            try {
                if (fileName.get()>=20){
                    return;
                }
                BillRecordBack resultObject = resultContext.getResultObject();
                sb.append(resultObject).append("\n");
                //a.incrementAndGet(); 先+1,再返回,a.getAndIncrement()先返回,再 +1
                in.getAndIncrement();
                System.out.println(in);
                if (in.get() >=size){
                    in.set(0);
                    fileName.getAndIncrement();
                    fe.set(new File(path + name + fileName + suf));
                    bufferedWriter.set(new BufferedWriter(new FileWriter(fe.get())));
                }
                bufferedWriter.get().write(sb.toString());
                //将StringBuilder数据重置
                sb.setLength(0);
            }catch (Exception e){
                throw new RuntimeException(e);
            }
        });
        bufferedWriter.get().close();
    }
    @Select("select t_id,r_id,r_time,r_number,descd,deleted,ctime,crname,mtime,chname from  bill_record_back")
    //这个注解是设定每次流式查询的iterator大小的,这里是1000条 ,ResultSetType.FORWARD_ONLY 只允许游标向下移动
    @Options(resultSetType = ResultSetType.FORWARD_ONLY, fetchSize = 5000)
    @ResultType(BillRecordBack.class)
    void exportNote(ReqBillRecordBackQuery req, ResultHandler<BillRecordBack> handler);

Mybatis使用流式查询避免数据量过大导致OOM

本文已springboot项目为例,要实现流式查询需要完成以下几步

POM文件中的配置

springboot中整合mybatis

<dependency>
  <groupId>org.mybatis.spring.boot</groupId>
  <artifactId>mybatis-spring-boot-starter</artifactId>
  <version>1.1.1</version>
</dependency>

mapper.xml文件配置

select语句需要增加fetchSize属性,底层是调用jdbc的setFetchSize方法,查询时从结果集里面每次取设置的行数,循环去取,直到取完。

默认size是0,也就是默认会一次性把结果集的数据全部取出来,当结果集数据量很大时就容易造成内存溢出。

<select id="selectGxids" resultType="java.lang.String" fetchSize="1000">
   SELECT gxid from t_gxid
 </select>

自定义ResultHandler来分批处理结果集

package flowselect;
import org.apache.ibatis.session.ResultContext;
import org.apache.ibatis.session.ResultHandler;
import java.util.Set;
public class GxidResultHandler implements ResultHandler<String> {
  // 这是每批处理的大小
  private final static int BATCH_SIZE = 1000;
  private int size;
  // 存储每批数据的临时容器
  private Set<String> gxids;
  public void handleResult(ResultContext<? extends String> resultContext) {
    // 这里获取流式查询每次返回的单条结果
    String gxid = resultContext.getResultObject();
    // 你可以看自己的项目需要分批进行处理或者单个处理,这里以分批处理为例
    gxids.add(gxid);
    size++;
    if (size == BATCH_SIZE) {
      handle();
    }
  }
  private void handle() {
    try {
      // 在这里可以对你获取到的批量结果数据进行需要的业务处理
    } finally {
      // 处理完每批数据后后将临时清空
      size = 0;
      gxids.clear();
    }
  }
  // 这个方法给外面调用,用来完成最后一批数据处理
  public void end(){
    handle();// 处理最后一批不到BATCH_SIZE的数据
  }
}

serviceImpl类中的使用

package flowselect;
import org.mybatis.spring.SqlSessionTemplate;
import org.springframework.beans.factory.annotation.Autowired;
@Service
public class ServiceImpl implements Service {
  @Autowired
  SqlSessionTemplate sqlSessionTemplate;
  public void method(){
    GxidResultHandler gxidResultHandler = new GxidResultHandler();
    sqlSessionTemplate.select("flowselect.Mapper.selectGxids", gxidResultHandler);
    gxidResultHandler.end();
  }
}

总结

非流式查询:内存会随着查询记录的增长而近乎直线增长。

流式查询:内存会保持稳定,不会随着记录的增长而增长。其内存大小取决于批处理大小BATCH_SIZE的设置,该尺寸越大,内存会越大。所以BATCH_SIZE应该根据业务情况设置合适的大小。

另外要切记每次处理完一批结果要记得释放存储每批数据的临时容器,即上文中的gxids.clear();

以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。

您可能感兴趣的文章:
阅读全文