java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > SpringBoot Minio分片上传、断点续传

SpringBoot基于Minio实现分片上传、断点续传的实现

作者:喵只想打代码

本文主要介绍了SpringBoot基于Minio实现分片上传、断点续传的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧

一、准备工作

安装 Minio 服务后,在 SpringBoot 项目中使用以下代码来获取 MinioClient(用于操作 Minio 的服务端):

MinioClient client = MinioClient.builder()
                .endpoint("http://192.168.xx.133:9000")  // 服务端IP+端口
                .credentials(minioProperties.getAccessKey(), // 服务端用户名
                             minioProperties.getSecretKey()) // 服务端密码
                .build();

二、实现分片上传+断点续传

2.1 思路

分片上传和断点续传的实现过程中,需要在Minio内部记录已上传的分片文件。

这些分片文件将以文件md5作为父目录,分片文件的名字按照01,02,...的顺序进行命名。同时,还必须知道当前文件的分片总数,这样就能够根据总数来判断文件是否上传完毕了。

比如,一个文件被分成了10片,所以总数是10。当前端发起上传请求时,把一个个文件分片依次上传,Minio 服务器中存储的临时文件依次是01、02、03 等等。

假设前端把05分片上传完毕了之后断开了连接,由于 Minio 服务器仍然存储着01~05的分片文件,因此前端再次上传文件时,只需从06序号开始上传分片,而不用从头开始传输。这就是所谓的断点续传。

2.2 代码

① 分片上传API

为了实现以上思路,考虑实现一个方法,用于上传文件的某一个分片。

/**
     * 将文件进行分片上传
     * <p>有一个未处理的bug(虽然概率很低很低):</p>
     * 当两个线程同时上传md5相同的文件时,由于两者会定位到同一个桶的同一个临时目录,两个线程会相互产生影响!
     * 
     * @param file 分片文件
     * @param currIndex 当前文件的分片索引
     * @param totalPieces 切片总数(对于同一个文件,请确保切片总数始终不变)
     * @param md5 整体文件MD5
     * @return 剩余未上传的文件索引集合
     */
    public FragResult uploadFileFragment(MultipartFile file,
                                  Integer currIndex, Integer totalPieces, String md5) throws Exception {
        checkNull(currIndex, totalPieces, md5);
        // 临时文件存放桶
        if ( !this.bucketExists(DEFAULT_TEMP_BUCKET_NAME) ) {
            this.createBucket(DEFAULT_TEMP_BUCKET_NAME);
        }
        // 得到已上传的文件索引
        Iterable<Result<Item>> results = this.getFilesByPrefix(DEFAULT_TEMP_BUCKET_NAME, md5.concat("/"), false);
        Set<Integer> savedIndex = Sets.newHashSet();
        boolean fileExists = false;
        for (Result<Item> item : results) {
            Integer idx = Integer.valueOf( getContentAfterSlash(item.get().objectName()) );
            if (currIndex.equals( idx )) {
                fileExists = true;
            }
            savedIndex.add( idx );
        }
        // 得到未上传的文件索引
        Set<Integer> remainIndex = Sets.newTreeSet();
        for (int i = 0; i < totalPieces; i++) {
            if ( !savedIndex.contains(i) ) {
                remainIndex.add(i);
            }
        }
        if (fileExists) {
            return new FragResult(false, remainIndex, "index [" + currIndex + "] exists");
        }
        this.uploadFileStream(DEFAULT_TEMP_BUCKET_NAME, this.getFileTempPath(md5, currIndex, totalPieces), file.getInputStream());
        // 还剩一个索引未上传,当前上传索引刚好是未上传索引,上传完当前索引后就完全结束了。
        if ( remainIndex.size() == 1 && remainIndex.contains(currIndex) ) {
            return new FragResult(true, null, "completed");
        }
        return new FragResult(false, remainIndex, "index [" + currIndex + "] has been uploaded");
    }

值得注意的是,我在项目中实践该方法时,上述参数都是由前端传来的,因此文件分片过程发生在前端,分片的大小也由前端定义。

② 合并文件API

当所有分片文件上传完毕,需要手动调用 Minio 原生 API 来合并临时文件(当然,在上面的那个方法中,当最后一个分片上传完毕后直接执行合并操作也是可以的)

临时文件合并完毕后,将会自动删除所有临时文件。

/**
     * 合并分片文件,并放到指定目录
     * 前提是之前已把所有分片上传完毕。
     * 
     * @param bucketName 目标文件桶名
     * @param targetName 目标文件名(含完整路径)
     * @param totalPieces 切片总数(对于同一个文件,请确保切片总数始终不变)
     * @param md5 文件md5
     * @return minio原生对象,记录了文件上传信息
     */
    public boolean composeFileFragment(String bucketName, String targetName, 
                                                   Integer totalPieces, String md5) throws Exception {
        checkNull(bucketName, targetName, totalPieces, md5);
        // 检查文件索引是否都上传完毕
        Iterable<Result<Item>> results = this.getFilesByPrefix(DEFAULT_TEMP_BUCKET_NAME, md5.concat("/"), false);
        Set<String> savedIndex = Sets.newTreeSet();
        for (Result<Item> item : results) {
            savedIndex.add( item.get().objectName() );
        }
        if (savedIndex.size() == totalPieces) {
            // 文件路径 转 文件合并对象
            List<ComposeSource> sourceObjectList = savedIndex.stream()
                    .map(filePath -> ComposeSource.builder()
                            .bucket(DEFAULT_TEMP_BUCKET_NAME)
                            .object( filePath )
                            .build())
                    .collect(Collectors.toList());
            ObjectWriteResponse objectWriteResponse = client.composeObject(
                    ComposeObjectArgs.builder()
                            .bucket(bucketName)
                            .object(targetName)
                            .sources(sourceObjectList)
                            .build());
            // 上传成功,则删除所有的临时分片文件
            List<String> filePaths = Stream.iterate(0, i -> ++i)
                    .limit(totalPieces)
                    .map(i -> this.getFileTempPath(md5, i, totalPieces) )
                    .collect(Collectors.toList());
            Iterable<Result<DeleteError>> deleteResults = this.removeFiles(DEFAULT_TEMP_BUCKET_NAME, filePaths);
            // 遍历错误集合(无元素则成功)
            for (Result<DeleteError> result : deleteResults) {
                DeleteError error = result.get();
                System.err.printf("[Bigfile] 分片'%s'删除失败! 错误信息: %s", error.objectName(), error.message());
            }
            return true;
        }
        throw new GlobalException("The fragment index is not complete. Please check parameters [totalPieces] or [md5]");
    }

以上方法的源码我放到了https://github.com/sky-boom/minio-spring-boot-starter,对原生的 Minio API 进行了封装,抽取成了minio-spring-boot-starter组件,感兴趣的朋友欢迎前去查看。

2.3 后端调用API示例

这里以单线程的分片上传为例(即前端每次只上传一个分片文件,调用分片上传接口后,接口返回下一个分片文件的序号)

① Controller 层

    /**
     * 分片上传
     * @param user 用户对象
     * @param fileAddDto file: 分片文件, 
     *                   currIndex: 当前分片索引, 
     *                   totalPieces: 分片总数,
     *                   md5: 文件md5
     * @return 前端需上传的下一个分片序号(-1表示上传完成)
     */
    @PostMapping("/file/big/upload")
    public ResultData<String> uploadBigFile(User user, BigFileAddDto fileAddDto) {
        // 1.文件为空,返回失败 (一般不是用户的问题)
        if (fileAddDto.getFile() == null) {
            throw new GlobalException();
        }
        // 2.名字为空,或包含特殊字符,则提示错误
        String fileName = fileAddDto.getFile().getOriginalFilename();
        if (StringUtils.isEmpty(fileName) || fileName.matches(FileSysConstant.NAME_EXCEPT_SYMBOL)) {
            throw new GlobalException(ResultCode.INCORRECT_FILE_NAME);
        }
        // 3. 执行分片上传
        String result = fileSystemService.uploadBigFile(user, fileAddDto);
        return GlobalResult.success(result);
    }

② Service 层

    @Override
    public String uploadBigFile(User user, BigFileAddDto fileAddDto) {
        try {
            MultipartFile file = fileAddDto.getFile();
            Integer currIndex = fileAddDto.getCurrIndex();
            Integer totalPieces = fileAddDto.getTotalPieces();
            String md5 = fileAddDto.getMd5();
            log.info("[Bigfile] 上传文件md5: {} ,分片索引: {}", md5, currIndex);
            FragResult fragResult = minioUtils.uploadFileFragment(file, currIndex, totalPieces, md5);
            // 分片全部上传完毕
            if ( fragResult.isAllCompleted() ) {
                FileInfo fileInfo = getFileInfo(fileAddDto, user.getId());
                DBUtils.checkOperation( fileSystemMapper.insertFile(fileInfo) );
                String realPath = generateRealPath(generateVirtPath(fileAddDto.getParentPath(), file.getOriginalFilename()));
                // 发起文件合并请求, 无异常则成功
                minioUtils.composeFileFragment(getBucketByUsername(user.getUsername()), realPath, totalPieces, md5);
                return "-1";
            } else {
                Iterator<Integer> iterator = fragResult.getRemainIndex().iterator();
                if (iterator.hasNext()) {
                    String nextIndex = iterator.next().toString();
                    log.info("[BigFile] 下一个需上传的文件索引是:{}", nextIndex);
                    return nextIndex;
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        log.error("[Bigfile] 上传文件时出现异常");
        throw new GlobalException(ResultCode.FILE_UPLOAD_ERROR);
    }

2.4 前端

前端主要负责:

前端代码此处不展示,有缘后续再花时间补充吧………………

到此这篇关于SpringBoot基于Minio实现分片上传、断点续传的实现的文章就介绍到这了,更多相关SpringBoot Minio分片上传、断点续传内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文