java异步导出的实现过程
作者:linsm1231
这篇文章主要介绍了java异步导出的实现过程,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
背景
假设我们有一个在线学习平台,管理员需要定期生成销售报告,包括课程销售情况和用户购买情况等重要数据。然而,由于数据量较大,生成报告可能需要较长时间,并且可能会占用大量系统资源,从而影响用户的使用体验。为了解决这个问题,我们考虑采用异步导出的方案。
异步导出的工作原理是将导出操作放在一个异步任务中执行,而不是立即在用户发起导出请求后执行导出操作。这样一来,用户无需等待导出任务完成,就可以继续进行其他操作,而系统则在后台完成导出任务。
这种方案有以下优点:
- 提高系统响应速度: 用户发起导出请求后,系统可以立即响应而不必等待导出任务完成,从而提高了系统的响应速度。
- 改善用户体验: 用户无需等待导出任务完成,可以继续使用系统进行其他操作,这有助于提升用户体验。
- 降低系统负载: 将耗时的导出操作放在异步任务中执行,可以避免阻塞系统资源,从而降低系统的负载,确保其他用户的操作不受影响。
异步导出在许多需要处理大量数据或耗时操作的场景中都非常有用,可以有效提升系统的性能和用户体验。
数据库设计
首先我们需要设计一个保存导出任务的表,需要记录流转状态、操作人、任务参数,后续任务的创建、导出完成/失败都需要操作这张表
CREATE TABLE `t_export_task`
(
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '主键',
`task_id` varchar(50) NOT NULL COMMENT '任务id',
`task_type` tinyint(4) NOT NULL COMMENT '任务类型',
`task_param` varchar(1000) NOT NULL COMMENT '任务参数',
`status` tinyint(3) NOT NULL DEFAULT 0 COMMENT '状态 0-处理中 1-成功 -1失败',
`file_url` varchar(500) DEFAULT NULL COMMENT '文件url',
`remark` varchar(200) DEFAULT NULL COMMENT '备注',
`create_user_id` int(11) NOT NULL COMMENT '操作人id',
`create_user_name` varchar(50) NOT NULL COMMENT '操作人名称',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (`id`)
) ENGINE=InnoDB COMMENT='导出任务记录';代码实现
导出工具类 :
负责提交导出任务、取消任务以及上传导出文件到OSS服务器等功能。
- 导出任务线程池: 通过
ExecutorService线程池来执行导出任务,并确保线程池的单例化,防止重复创建,提高性能。 - 导出类型对应的任务: 使用
Map<Integer, CompletableFuture<String>>来保存正在运行的导出任务,以便后续取消任务或跟踪任务状态。 - 提交导出任务: 提交导出任务时,先初始化线程池,然后使用
CompletableFuture.supplyAsync()方法执行异步任务,并在异步任务中生成导出文件,然后上传到OSS服务器,最后返回导出文件的URL。 - 取消任务: 取消任务时,从保存的任务映射中获取对应的
CompletableFuture实例,并调用cancel()方法取消任务。 - 上传文件到OSS服务器:
uploadToOSS()方法负责实际的文件上传逻辑,将导出的文件上传到OSS服务器,并返回文件的URL
@Slf4j
@Component
@RequiredArgsConstructor(onConstructor_ = @Autowired)
public class ExportTaskUtil {
/**
* 导出任务线程池
*/
private static volatile ExecutorService executorService;
/**
* 导出类型对应的任务
*/
private final Map<Integer, CompletableFuture<String>> runningTasks = Maps.newConcurrentMap();
private final ExportTaskHandlerFactory exportTaskHandlerFactory;
private final FileUploadService fileUploadService;
/**
* @description 提交导出任务
* @author youmu
* @date 2024/1/26 17:58
* @param exportTask 导出任务
*/
public CompletableFuture<String> submit(ExportTask exportTask) {
// 初始化线程池
initThreadPool();
CompletableFuture<String> future = CompletableFuture.supplyAsync(()->{
File exportFile = null;
// 获取handler
ExportTaskHandler handler = exportTaskHandlerFactory.getHandler(exportTask.getTaskType());
// 生成文件
try {
exportFile = handler.generateExportFile(exportTask.getTaskParam());
if (exportFile == null) {
throw new BizException(CodeEnum.NOT_FOUND, "导出文件为空");
}
// 上传文件到OSS服务器,获取文件URL
return uploadToOSS(exportFile);
} catch (BizException e) {
throw e;
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
if (exportFile != null) {
FileUtil.del(exportFile);
}
}
},executorService);
runningTasks.put(exportTask.getTaskType(), future);
return future;
}
private static void initThreadPool() {
if (executorService == null) {
synchronized (ExportTaskUtil.class) {
if (executorService == null) {
executorService = ThreadUtil.newFixedExecutor(4, "asyncExport", false);;
}
}
}
}
/**
* @description 取消任务
* @author youmu
* @date 2024/1/26 19:04
* @param exportTask 任务
*/
public void cancel(ExportTask exportTask) {
CompletableFuture<String> future = runningTasks.get(exportTask.getTaskType());
if (future != null && !future.isDone()) {
future.cancel(true);
}
}
/**
* @description 上传文件到OSS服务器
* @author youmu
* @date 2024/1/29 16:56
*/
private String uploadToOSS(File exportFile) {
// 实现文件上传逻辑,返回文件URL
return fileUploadService.uploadFileBySize(exportFile,"export/" + exportFile.getName());
}
}
导出任务处理的工厂类以及相关的接口和枚举定义
导出任务采用来工厂+策略的设计模式,工厂模式将对象的创建逻辑封装到工厂类中,策略模式将不同的行为封装到不同的策略类中,使得代码具有良好的可扩展性、灵活性和可维护性。
- ExportTaskHandlerFactory: 这是一个工厂类,用于根据导出任务类型获取对应的任务处理器。在初始化时,它会将所有实现了
ExportTaskHandler接口的处理器注入进来,并根据任务类型建立起映射关系。 - ExportTaskHandler 接口: 这是一个导出任务处理器的接口,定义了生成导出文件和获取导出任务类型的方法,具体的导出任务处理器需要实现该接口。
- ExportTaskTypeEnum 枚举: 这是一个枚举类型,定义了导出任务的类型,包括了任务类型的代码和描述信息。
- OrderExportHandler 类: 这是一个具体的导出任务处理器的实现类,用于处理订单导出任务。它实现了
ExportTaskHandler接口,根据具体业务逻辑生成导出文件,并提供了获取任务类型的方法。
/**
* @description ExportTaskHandler 工厂类
* @author youmu
* @date 2024/1/26 18:04
*/
@Slf4j
@Component
public class ExportTaskHandlerFactory {
private final Map<Integer, ExportTaskHandler> handlerMap = Maps.newHashMap();
@Autowired
public ExportTaskHandlerFactory(List<ExportTaskHandler> handlers) {
for (ExportTaskHandler taskHandler : handlers) {
handlerMap.put(taskHandler.getExportType().getCode(), taskHandler);
}
}
public ExportTaskHandler getHandler(Integer exportType) {
return handlerMap.get(exportType);
}
}
/**
* @description ExportTaskHandler
* @author youmu
* @date 2024/1/26 18:06
*/
public interface ExportTaskHandler {
File generateExportFile(String param) throws Exception;
ExportTaskTypeEnum getExportType();
}
/**
* @description 导出任务类型
* @author youmu
* @date 2024/1/29 11:01
*/
@AllArgsConstructor
@Getter
public enum ExportTaskTypeEnum implements IEnum<Integer, String> {
CROWD_PACKAGE(1, "人群包"),
;
private final Integer code;
private final String message;
}
public class OrderExportHandler implements ExportTaskHandler{
@Override
public File generateExportFile(String param) throws Exception {
return null;
}
@Override
public ExportTaskTypeEnum getExportType() {
return null;
}
}业务调用
导出任务的门面类 ExportTaskFacade,它提供了一系列方法来提交、取消、重试导出任务,并提供了查询导出任务的分页接口。
- 提交任务(submitTask): 提交导出任务时,根据是否传入
taskId参数来判断是新建任务还是更新任务。如果是新建任务,则创建一个新的ExportTask实例并保存到数据库中,然后调用doSubmit方法提交任务;如果是更新任务,则更新任务的状态为正在处理,并调用doSubmit方法提交任务。 - 任务提交处理(doSubmit): 使用
exportTaskUtil.submit(exportTask)提交异步导出任务,并定义了任务完成后的处理逻辑。如果任务执行成功,则更新任务状态为成功,并设置文件的URL;如果任务执行失败,则记录失败日志,并更新任务状态为失败,同时记录异常信息。 - 取消任务(cancelTask): 根据传入的
taskId获取对应的导出任务,然后调用exportTaskUtil.cancel(exportTask)取消任务。 - 重试任务(retryTask): 根据传入的
taskId获取对应的导出任务,先取消任务以防止异常情况,然后重新提交任务。
@Slf4j
@Component
@RequiredArgsConstructor(onConstructor_ = @Autowired)
public class ExportTaskFacade {
private final ExportTaskService exportTaskService;
private final UserService userService;
private final ExportTaskUtil exportTaskUtil;
public void submitTask(Integer exportType, String param) {
submitTask(null,exportType,param);
}
public void cancelTask(Long taskId) {
ExportTask exportTask = exportTaskService.getById(taskId);
AssertUtils.notNull(exportTask, new BizException(CodeEnum.NOT_FOUND,"导出任务不存在"));
exportTaskUtil.cancel(exportTask);
}
public void retryTask(Long taskId) {
ExportTask exportTask = exportTaskService.getById(taskId);
AssertUtils.notNull(exportTask, new BizException(CodeEnum.NOT_FOUND,"导出任务不存在"));
// 取消任务,防止异常情况还在执行
exportTaskUtil.cancel(exportTask);
// 提交任务
submitTask(taskId,exportTask.getTaskType(),exportTask.getTaskParam());
}
private void submitTask(Long taskId, Integer exportType, String param) {
ExportTask exportTask;
if(taskId == null) {
// 保存导出任务
exportTask = new ExportTask();
Integer userId = AuthInfoHolder.getUserId();
exportTask.setTaskId(CodeGenUtil.genCode(GenCodeTypeEnum.DL));
exportTask.setCreateUserId(userId);
exportTask.setCreateUserName(userService.findById(userId).getUserName());
exportTask.setTaskType(exportType);
exportTask.setTaskParam(param);
exportTaskService.save(exportTask);
} else {
// 更新导出任务
exportTask = exportTaskService.getById(taskId);
exportTaskService.lambdaUpdate()
.eq(ExportTask::getId, exportTask.getId())
.set(ExportTask::getStatus, ExportStatusEnum.PROCESSING.getCode())
.update();
}
doSubmit(exportTask);
}
private void doSubmit(ExportTask exportTask) {
exportTaskUtil.submit(exportTask).thenAccept(url->{
exportTaskService.lambdaUpdate()
.eq(ExportTask::getId, exportTask.getId())
.set(ExportTask::getStatus, ExportStatusEnum.SUCCESS.getCode())
.update();
}).exceptionally(ex->{
log.error("[导出任务]执行失败,{}", exportTask.getTaskId(),ex);
exportTaskService.lambdaUpdate()
.eq(ExportTask::getId, exportTask.getId())
.set(ExportTask::getStatus, ExportStatusEnum.FAILURE.getCode())
.set(ExportTask::getRemark, ex instanceof BizException ? ex.getMessage() : "未知异常")
.update();
return null;
});
}
public Page<ExportTaskVO> findPage(ExportTaskRequest request) {
Page<ExportTask> page = exportTaskService.findPage(request);
List<ExportTaskVO> voList = ConverterUtil.toVO(ExportTaskConverter.class, page.getRecords());
Page<ExportTaskVO> pageVO = new Page<>();
pageVO.setTotal(page.getTotal());
pageVO.setSize(page.getSize());
pageVO.setCurrent(page.getCurrent());
pageVO.setPages(page.getPages());
pageVO.setRecords(voList);
return pageVO;
}
}流程图

总结
过以上实践,我们成功实现了一个轻量级的异步导出方案,具有以下优点:
- 使用线程池管理异步任务,确保了任务的并发执行和资源的合理利用。
- 采用 CompletableFuture 实现异步导出和回调更新,简化了异步任务的编写和管理。
- 使用工厂模式和策略模式实现导出任务处理器,使得系统具有良好的可扩展性和灵活性。
然而,这种方案也存在一些缺点:
- 资源管理不足: 如果异步导出任务的并发量过大,而线程池的资源配置不足,则可能导致任务排队等待执行,影响任务的实时性和响应速度。
- 任务执行效率低下: 如果导出任务的处理时间过长,且线程池的工作线程数量有限,则可能导致任务执行效率低下,无法及时完成任务,影响系统的整体性能。
- 可靠性不高,无法保证任务一定会被执行或执行成功,特别是在系统故障或异常情况下。
针对这些缺点,可以考虑以下优化方案:
- 合理调整线程池配置: 根据系统的实际负载情况和性能需求,合理配置线程池的大小和工作线程数量,确保资源的有效利用和任务的及时执行。
- 优化任务处理逻辑: 对任务的处理逻辑进行优化,尽量减少任务的执行时间和资源消耗,提高任务的执行效率和响应速度。
- 引入异步消息处理机制: 使用消息队列或事件驱动模型来实现任务的异步处理,进一步解耦任务提交和任务执行过程,提高系统的可扩展性和灵活性。
- 引入定时任务调度器: 使用定时任务调度器(如 xxl-job)来定期扫描和重试执行异常任务。当任务执行时间超过一定阈值(如2小时)或者任务执行异常时,自动触发重试机制,保证任务的及时执行。
- 增加任务监控和告警机制: 实时监控任务的执行情况,当发现任务执行异常或超时时,及时发送告警通知,以便运维人员及时处理和修复。
通过以上优化方案,可以提高异步导出方案的可靠性和稳定性,确保任务能够及时执行并完成,同时降低了系统的维护成本和风险。
以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。
