java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > java异步导出的实现

java异步导出的实现过程

作者:linsm1231

这篇文章主要介绍了java异步导出的实现过程,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教

背景

假设我们有一个在线学习平台,管理员需要定期生成销售报告,包括课程销售情况和用户购买情况等重要数据。然而,由于数据量较大,生成报告可能需要较长时间,并且可能会占用大量系统资源,从而影响用户的使用体验。为了解决这个问题,我们考虑采用异步导出的方案。

异步导出的工作原理是将导出操作放在一个异步任务中执行,而不是立即在用户发起导出请求后执行导出操作。这样一来,用户无需等待导出任务完成,就可以继续进行其他操作,而系统则在后台完成导出任务。

这种方案有以下优点:

异步导出在许多需要处理大量数据或耗时操作的场景中都非常有用,可以有效提升系统的性能和用户体验。

数据库设计

首先我们需要设计一个保存导出任务的表,需要记录流转状态、操作人、任务参数,后续任务的创建、导出完成/失败都需要操作这张表

CREATE TABLE `t_export_task`
(
    `id`               bigint(20) NOT NULL AUTO_INCREMENT COMMENT '主键',
    `task_id`          varchar(50)   NOT NULL COMMENT '任务id',
    `task_type`        tinyint(4) NOT NULL COMMENT '任务类型',
    `task_param`       varchar(1000) NOT NULL COMMENT '任务参数',
    `status`           tinyint(3) NOT NULL DEFAULT 0 COMMENT '状态 0-处理中 1-成功 -1失败',
    `file_url`         varchar(500)           DEFAULT NULL COMMENT '文件url',
    `remark`         varchar(200)           DEFAULT NULL COMMENT '备注',
    `create_user_id`   int(11) NOT NULL COMMENT '操作人id',
    `create_user_name` varchar(50)   NOT NULL COMMENT '操作人名称',
    `create_time`      datetime      NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
    `update_time`      datetime      NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
    PRIMARY KEY (`id`)
) ENGINE=InnoDB COMMENT='导出任务记录';

代码实现

导出工具类 :

负责提交导出任务、取消任务以及上传导出文件到OSS服务器等功能。

@Slf4j
@Component
@RequiredArgsConstructor(onConstructor_ = @Autowired)
public class ExportTaskUtil {

    /**
     * 导出任务线程池
     */
    private static volatile ExecutorService executorService;

    /**
     * 导出类型对应的任务
     */
    private final Map<Integer, CompletableFuture<String>> runningTasks = Maps.newConcurrentMap();

    private final ExportTaskHandlerFactory exportTaskHandlerFactory;
    private final FileUploadService fileUploadService;

    /**
     * @description 提交导出任务
     * @author youmu
     * @date 2024/1/26 17:58
     * @param exportTask 导出任务
     */
    public CompletableFuture<String> submit(ExportTask exportTask) {
        // 初始化线程池
        initThreadPool();
        CompletableFuture<String> future = CompletableFuture.supplyAsync(()->{
            File exportFile = null;
            // 获取handler
            ExportTaskHandler handler = exportTaskHandlerFactory.getHandler(exportTask.getTaskType());
            // 生成文件
            try {
                exportFile = handler.generateExportFile(exportTask.getTaskParam());
                if (exportFile == null) {
                    throw new BizException(CodeEnum.NOT_FOUND, "导出文件为空");
                }
                // 上传文件到OSS服务器,获取文件URL
                return uploadToOSS(exportFile);
            } catch (BizException e) {
                throw e;
            } catch (Exception e) {
                throw new RuntimeException(e);
            } finally {
                if (exportFile != null) {
                    FileUtil.del(exportFile);
                }
            }
        },executorService);
        runningTasks.put(exportTask.getTaskType(), future);
        return future;
    }

    private static void initThreadPool() {
        if (executorService == null) {
            synchronized (ExportTaskUtil.class) {
                if (executorService == null) {
                    executorService = ThreadUtil.newFixedExecutor(4, "asyncExport", false);;
                }
            }
        }
    }

    /**
     * @description 取消任务
     * @author youmu
     * @date 2024/1/26 19:04
     * @param exportTask 任务
     */
    public void cancel(ExportTask exportTask) {
        CompletableFuture<String> future = runningTasks.get(exportTask.getTaskType());
        if (future != null && !future.isDone()) {
            future.cancel(true);
        }
    }

    /**
     * @description 上传文件到OSS服务器
     * @author youmu
     * @date 2024/1/29 16:56
     */
    private String uploadToOSS(File exportFile) {
        // 实现文件上传逻辑,返回文件URL
        return fileUploadService.uploadFileBySize(exportFile,"export/" + exportFile.getName());
    }


}

导出任务处理的工厂类以及相关的接口和枚举定义 

导出任务采用来工厂+策略的设计模式,工厂模式将对象的创建逻辑封装到工厂类中,策略模式将不同的行为封装到不同的策略类中,使得代码具有良好的可扩展性、灵活性和可维护性。

/**
 * @description ExportTaskHandler 工厂类
 * @author youmu
 * @date 2024/1/26 18:04
 */
@Slf4j
@Component
public class ExportTaskHandlerFactory {
    private final Map<Integer, ExportTaskHandler> handlerMap = Maps.newHashMap();

    @Autowired
    public ExportTaskHandlerFactory(List<ExportTaskHandler> handlers) {
        for (ExportTaskHandler taskHandler : handlers) {
            handlerMap.put(taskHandler.getExportType().getCode(), taskHandler);
        }
    }

    public ExportTaskHandler getHandler(Integer exportType) {
        return handlerMap.get(exportType);
    }
}
/**
 * @description ExportTaskHandler
 * @author youmu
 * @date 2024/1/26 18:06
 */
public interface ExportTaskHandler {
    File generateExportFile(String param) throws Exception;

    ExportTaskTypeEnum getExportType();

}
/**
 * @description 导出任务类型
 * @author youmu
 * @date 2024/1/29 11:01
 */
@AllArgsConstructor
@Getter
public enum ExportTaskTypeEnum implements IEnum<Integer, String>  {
    CROWD_PACKAGE(1, "人群包"),
    ;

    private final Integer code;
    private final String message;

}
public class OrderExportHandler implements ExportTaskHandler{
    @Override
    public File generateExportFile(String param) throws Exception {
        return null;
    }

    @Override
    public ExportTaskTypeEnum getExportType() {
        return null;
    }
}

业务调用

导出任务的门面类 ExportTaskFacade,它提供了一系列方法来提交、取消、重试导出任务,并提供了查询导出任务的分页接口。

@Slf4j
@Component
@RequiredArgsConstructor(onConstructor_ = @Autowired)
public class ExportTaskFacade {
    private final ExportTaskService exportTaskService;
    private final UserService userService;
    private final ExportTaskUtil exportTaskUtil;

    public void submitTask(Integer exportType, String param) {
        submitTask(null,exportType,param);
    }

    public void cancelTask(Long taskId) {
        ExportTask exportTask = exportTaskService.getById(taskId);
        AssertUtils.notNull(exportTask, new BizException(CodeEnum.NOT_FOUND,"导出任务不存在"));
        exportTaskUtil.cancel(exportTask);
    }

    public void retryTask(Long taskId) {
        ExportTask exportTask = exportTaskService.getById(taskId);
        AssertUtils.notNull(exportTask, new BizException(CodeEnum.NOT_FOUND,"导出任务不存在"));
        // 取消任务,防止异常情况还在执行
        exportTaskUtil.cancel(exportTask);
        // 提交任务
        submitTask(taskId,exportTask.getTaskType(),exportTask.getTaskParam());
    }

    private void submitTask(Long taskId, Integer exportType, String param) {
        ExportTask exportTask;
        if(taskId == null) {
            // 保存导出任务
            exportTask = new ExportTask();
            Integer userId = AuthInfoHolder.getUserId();
            exportTask.setTaskId(CodeGenUtil.genCode(GenCodeTypeEnum.DL));
            exportTask.setCreateUserId(userId);
            exportTask.setCreateUserName(userService.findById(userId).getUserName());
            exportTask.setTaskType(exportType);
            exportTask.setTaskParam(param);
            exportTaskService.save(exportTask);
        } else {
            // 更新导出任务
            exportTask = exportTaskService.getById(taskId);
            exportTaskService.lambdaUpdate()
                    .eq(ExportTask::getId, exportTask.getId())
                    .set(ExportTask::getStatus, ExportStatusEnum.PROCESSING.getCode())
                    .update();
        }
        doSubmit(exportTask);
    }


    private void doSubmit(ExportTask exportTask) {
        exportTaskUtil.submit(exportTask).thenAccept(url->{
            exportTaskService.lambdaUpdate()
                    .eq(ExportTask::getId, exportTask.getId())
                    .set(ExportTask::getStatus, ExportStatusEnum.SUCCESS.getCode())
                    .update();
        }).exceptionally(ex->{
            log.error("[导出任务]执行失败,{}", exportTask.getTaskId(),ex);
            exportTaskService.lambdaUpdate()
                    .eq(ExportTask::getId, exportTask.getId())
                    .set(ExportTask::getStatus, ExportStatusEnum.FAILURE.getCode())
                    .set(ExportTask::getRemark, ex instanceof BizException ? ex.getMessage() : "未知异常")
                    .update();
            return null;
        });
    }

    public Page<ExportTaskVO> findPage(ExportTaskRequest request) {
        Page<ExportTask> page = exportTaskService.findPage(request);
        List<ExportTaskVO> voList = ConverterUtil.toVO(ExportTaskConverter.class, page.getRecords());
        Page<ExportTaskVO> pageVO = new Page<>();
        pageVO.setTotal(page.getTotal());
        pageVO.setSize(page.getSize());
        pageVO.setCurrent(page.getCurrent());
        pageVO.setPages(page.getPages());
        pageVO.setRecords(voList);
        return pageVO;
    }
}

流程图

总结

过以上实践,我们成功实现了一个轻量级的异步导出方案,具有以下优点:

然而,这种方案也存在一些缺点:

针对这些缺点,可以考虑以下优化方案:

通过以上优化方案,可以提高异步导出方案的可靠性和稳定性,确保任务能够及时执行并完成,同时降低了系统的维护成本和风险。

以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。

您可能感兴趣的文章:
阅读全文