PowerJob的MapProcessor工作流程源码解读
作者:codecraft
序
本文主要研究一下PowerJob的MapProcessor
powerjob 任务与工作流配置
MapProcessor
tech/powerjob/worker/core/processor/sdk/MapProcessor.java
public interface MapProcessor extends BasicProcessor { Logger log = LoggerFactory.getLogger(MapProcessor.class); int RECOMMEND_BATCH_SIZE = 200; /** * 分发子任务 * @param taskList 子任务,再次执行时可通过 TaskContext#getSubTask 获取 * @param taskName 子任务名称,即子任务处理器中 TaskContext#getTaskName 获取到的值 * @throws PowerJobCheckedException map 失败将抛出异常 */ default void map(List<?> taskList, String taskName) throws PowerJobCheckedException { if (CollectionUtils.isEmpty(taskList)) { return; } TaskDO task = ThreadLocalStore.getTask(); WorkerRuntime workerRuntime = ThreadLocalStore.getRuntimeMeta(); if (taskList.size() > RECOMMEND_BATCH_SIZE) { log.warn("[Map-{}] map task size is too large, network maybe overload... please try to split the tasks.", task.getInstanceId()); } // 修复 map 任务命名和根任务名或者最终任务名称一致导致的问题(无限生成子任务或者直接失败) if (TaskConstant.ROOT_TASK_NAME.equals(taskName) || TaskConstant.LAST_TASK_NAME.equals(taskName)) { log.warn("[Map-{}] illegal map task name : {}! please do not use 'OMS_ROOT_TASK' or 'OMS_LAST_TASK' as map task name. as a precaution, it will be renamed 'X-{}' automatically." ,task.getInstanceId() ,taskName , taskName); taskName ="X-"+taskName; } // 1. 构造请求 ProcessorMapTaskRequest req = new ProcessorMapTaskRequest(task, taskList, taskName); // 2. 可靠发送请求(任务不允许丢失,需要使用 ask 方法,失败抛异常) boolean requestSucceed = TransportUtils.reliableMapTask(req, task.getAddress(), workerRuntime); if (requestSucceed) { log.info("[Map-{}] map task[name={},num={}] successfully!", task.getInstanceId(), taskName, taskList.size()); }else { throw new PowerJobCheckedException("map failed for task: " + taskName); } } /** * 是否为根任务 * @return true -> 根任务 / false -> 非根任务 */ default boolean isRootTask() { TaskDO task = ThreadLocalStore.getTask(); return TaskConstant.ROOT_TASK_NAME.equals(task.getTaskName()); } }
MapProcessor接口继承了BasicProcessor,它提供了默认的map方法用于分发子任务,它主要是构造了ProcessorMapTaskRequest,通过TransportUtils.reliableMapTask发送请求;它还提供了isRootTask方法用于判断当前任务是不是根任务
ProcessorMapTaskRequest
tech/powerjob/worker/pojo/request/ProcessorMapTaskRequest.java
@Getter @NoArgsConstructor public class ProcessorMapTaskRequest implements PowerSerializable { private Long instanceId; private Long subInstanceId; private String taskName; private List<SubTask> subTasks; @Getter @NoArgsConstructor @AllArgsConstructor public static class SubTask { private String taskId; private byte[] taskContent; } public ProcessorMapTaskRequest(TaskDO parentTask, List<?> subTaskList, String taskName) { this.instanceId = parentTask.getInstanceId(); this.subInstanceId = parentTask.getSubInstanceId(); this.taskName = taskName; this.subTasks = Lists.newLinkedList(); subTaskList.forEach(subTask -> { // 同一个 Task 内部可能多次 Map,因此还是要确保线程级别的唯一 String subTaskId = parentTask.getTaskId() + "." + ThreadLocalStore.getTaskIDAddr().getAndIncrement(); // 写入类名,方便反序列化 subTasks.add(new SubTask(subTaskId, SerializerUtils.serialize(subTask))); }); } }
ProcessorMapTaskRequest的构造器将subTaskList转换为一系列的SubTask,它使用SerializerUtils.serialize序列化(Kryo
)了用户定义的subTask
reliableMapTask
tech/powerjob/worker/common/utils/TransportUtils.java
public static boolean reliableMapTask(ProcessorMapTaskRequest req, String address, WorkerRuntime workerRuntime) throws PowerJobCheckedException { try { return reliableAsk(ServerType.WORKER, WTT_PATH, WTT_HANDLER_MAP_TASK, address, req, workerRuntime.getTransporter()).isSuccess(); } catch (Throwable throwable) { throw new PowerJobCheckedException(throwable); } } private static AskResponse reliableAsk(ServerType t, String rootPath, String handlerPath, String address, PowerSerializable req, Transporter transporter) throws Exception { final URL url = easyBuildUrl(t, rootPath, handlerPath, address); final CompletionStage<AskResponse> completionStage = transporter.ask(url, req, AskResponse.class); return completionStage .toCompletableFuture() .get(RemoteConstant.DEFAULT_TIMEOUT_MS, TimeUnit.MILLISECONDS); }
reliableMapTask方法通过reliableAsk往taskTracker/mapTask接口发送请求,默认是5s超时
onReceiveProcessorMapTaskRequest
tech/powerjob/worker/actors/TaskTrackerActor.java
/** * 子任务 map 处理器 */ @Handler(path = WTT_HANDLER_MAP_TASK) public AskResponse onReceiveProcessorMapTaskRequest(ProcessorMapTaskRequest req) { HeavyTaskTracker taskTracker = HeavyTaskTrackerManager.getTaskTracker(req.getInstanceId()); if (taskTracker == null) { log.warn("[TaskTrackerActor] receive ProcessorMapTaskRequest({}) but system can't find TaskTracker.", req); return null; } boolean success = false; List<TaskDO> subTaskList = Lists.newLinkedList(); try { req.getSubTasks().forEach(originSubTask -> { TaskDO subTask = new TaskDO(); subTask.setTaskName(req.getTaskName()); subTask.setSubInstanceId(req.getSubInstanceId()); subTask.setTaskId(originSubTask.getTaskId()); subTask.setTaskContent(originSubTask.getTaskContent()); subTaskList.add(subTask); }); success = taskTracker.submitTask(subTaskList); }catch (Exception e) { log.warn("[TaskTrackerActor] process map task(instanceId={}) failed.", req.getInstanceId(), e); } AskResponse response = new AskResponse(); response.setSuccess(success); return response; }
TaskTrackerActor提供了onReceiveProcessorMapTaskRequest方法处理ProcessorMapTaskRequest,它将入参的subTasks转换为一系列的TaskDO,然后通过taskTracker.submitTask提交
submitTask
tech/powerjob/worker/core/tracker/task/heavy/HeavyTaskTracker.java
/** * 提交Task任务(MapReduce的Map,Broadcast的广播),上层保证 batchSize,同时插入过多数据可能导致失败 * * @param newTaskList 新增的子任务列表 */ public boolean submitTask(List<TaskDO> newTaskList) { if (finished.get()) { return true; } if (CollectionUtils.isEmpty(newTaskList)) { return true; } // 基础处理(多循环一次虽然有些浪费,但分布式执行中,这点耗时绝不是主要占比,忽略不计!) newTaskList.forEach(task -> { task.setInstanceId(instanceId); task.setStatus(TaskStatus.WAITING_DISPATCH.getValue()); task.setFailedCnt(0); task.setLastModifiedTime(System.currentTimeMillis()); task.setCreatedTime(System.currentTimeMillis()); task.setLastReportTime(-1L); }); log.debug("[TaskTracker-{}] receive new tasks: {}", instanceId, newTaskList); return taskPersistenceService.batchSave(newTaskList); }
submitTask方法先填充一些字段信息,比如设置status为TaskStatus.WAITING_DISPATCH,然后调用taskPersistenceService.batchSave保存
batchSave
tech/powerjob/worker/persistence/TaskPersistenceService.java
public boolean batchSave(List<TaskDO> tasks) { if (CollectionUtils.isEmpty(tasks)) { return true; } try { return execute(() -> taskDAO.batchSave(tasks)); }catch (Exception e) { log.error("[TaskPersistenceService] batchSave tasks({}) failed.", tasks, e); } return false; } private static <T> T execute(SupplierPlus<T> executor) throws Exception { return CommonUtils.executeWithRetry(executor, RETRY_TIMES, RETRY_INTERVAL_MS); }
batchSave通过taskDAO.batchSave报错,它针对异常会重试3次,每次间隔100ms
TaskDAOImpl.batchSave
tech/powerjob/worker/persistence/TaskDAOImpl.java
public boolean batchSave(Collection<TaskDO> tasks) throws SQLException { String insertSql = "insert into task_info(task_id, instance_id, sub_instance_id, task_name, task_content, address, status, result, failed_cnt, created_time, last_modified_time, last_report_time) values (?,?,?,?,?,?,?,?,?,?,?,?)"; boolean originAutoCommitFlag ; try (Connection conn = connectionFactory.getConnection()) { originAutoCommitFlag = conn.getAutoCommit(); conn.setAutoCommit(false); try ( PreparedStatement ps = conn.prepareStatement(insertSql)) { for (TaskDO task : tasks) { fillInsertPreparedStatement(task, ps); ps.addBatch(); } ps.executeBatch(); return true; } catch (Throwable e) { conn.rollback(); throw e; } finally { conn.setAutoCommit(originAutoCommitFlag); } } }
TaskDAOImpl的batchSave直接通过jdbc的executeBatch进行批量保存
Dispatcher
tech/powerjob/worker/core/tracker/task/heavy/HeavyTaskTracker.java
/** * 定时扫描数据库中的task(出于内存占用量考虑,每次最多获取100个),并将需要执行的任务派发出去 */ protected class Dispatcher implements Runnable { // 数据库查询限制,每次最多查询几个任务 private static final int DB_QUERY_LIMIT = 100; @Override public void run() { if (finished.get()) { return; } Stopwatch stopwatch = Stopwatch.createStarted(); // 1. 获取可以派发任务的 ProcessorTracker List<String> availablePtIps = ptStatusHolder.getAvailableProcessorTrackers(); // 2. 没有可用 ProcessorTracker,本次不派发 if (availablePtIps.isEmpty()) { log.debug("[TaskTracker-{}] no available ProcessorTracker now.", instanceId); return; } // 3. 避免大查询,分批派发任务 long currentDispatchNum = 0; long maxDispatchNum = availablePtIps.size() * instanceInfo.getThreadConcurrency() * 2L; AtomicInteger index = new AtomicInteger(0); // 4. 循环查询数据库,获取需要派发的任务 while (maxDispatchNum > currentDispatchNum) { int dbQueryLimit = Math.min(DB_QUERY_LIMIT, (int) maxDispatchNum); List<TaskDO> needDispatchTasks = taskPersistenceService.getTaskByStatus(instanceId, TaskStatus.WAITING_DISPATCH, dbQueryLimit); currentDispatchNum += needDispatchTasks.size(); needDispatchTasks.forEach(task -> { // 获取 ProcessorTracker 地址,如果 Task 中自带了 Address,则使用该 Address String ptAddress = task.getAddress(); if (StringUtils.isEmpty(ptAddress) || RemoteConstant.EMPTY_ADDRESS.equals(ptAddress)) { ptAddress = availablePtIps.get(index.getAndIncrement() % availablePtIps.size()); } dispatchTask(task, ptAddress); }); // 数量不足 或 查询失败,则终止循环 if (needDispatchTasks.size() < dbQueryLimit) { break; } } log.debug("[TaskTracker-{}] dispatched {} tasks,using time {}.", instanceId, currentDispatchNum, stopwatch.stop()); } }
HeavyTaskTracker每5s调度一次Dispatcher,其run方法先通过ptStatusHolder.getAvailableProcessorTrackers()获取可以派发任务的ProcessorTracker,之后循环从数据库拉取一批状态为TaskStatus.WAITING_DISPATCH的任务,通过轮询的方式进行dispatchTask
dispatchTask
tech/powerjob/worker/core/tracker/task/heavy/HeavyTaskTracker.java
protected void dispatchTask(TaskDO task, String processorTrackerAddress) { // 1. 持久化,更新数据库(如果更新数据库失败,可能导致重复执行,先不处理) TaskDO updateEntity = new TaskDO(); updateEntity.setStatus(TaskStatus.DISPATCH_SUCCESS_WORKER_UNCHECK.getValue()); // 写入处理该任务的 ProcessorTracker updateEntity.setAddress(processorTrackerAddress); boolean success = taskPersistenceService.updateTask(instanceId, task.getTaskId(), updateEntity); if (!success) { log.warn("[TaskTracker-{}] dispatch task(taskId={},taskName={}) failed due to update task status failed.", instanceId, task.getTaskId(), task.getTaskName()); return; } // 2. 更新 ProcessorTrackerStatus 状态 ptStatusHolder.getProcessorTrackerStatus(processorTrackerAddress).setDispatched(true); // 3. 初始化缓存 taskId2BriefInfo.put(task.getTaskId(), new TaskBriefInfo(task.getTaskId(), TaskStatus.DISPATCH_SUCCESS_WORKER_UNCHECK, -1L)); // 4. 任务派发 TaskTrackerStartTaskReq startTaskReq = new TaskTrackerStartTaskReq(instanceInfo, task, workerRuntime.getWorkerAddress()); TransportUtils.ttStartPtTask(startTaskReq, processorTrackerAddress, workerRuntime.getTransporter()); log.debug("[TaskTracker-{}] dispatch task(taskId={},taskName={}) successfully.", instanceId, task.getTaskId(), task.getTaskName()); }
dispatchTask先更新status为DISPATCH_SUCCESS_WORKER_UNCHECK,更新处理该任务的ProcessorTracker,之后构建TaskTrackerStartTaskReq,通过TransportUtils.ttStartPtTask派送执行task的请求
onReceiveTaskTrackerStartTaskReq
tech/powerjob/worker/actors/ProcessorTrackerActor.java
@Handler(path = RemoteConstant.WPT_HANDLER_START_TASK, processType = ProcessType.NO_BLOCKING) public void onReceiveTaskTrackerStartTaskReq(TaskTrackerStartTaskReq req) { Long instanceId = req.getInstanceInfo().getInstanceId(); // 创建 ProcessorTracker 一定能成功 ProcessorTracker processorTracker = ProcessorTrackerManager.getProcessorTracker( instanceId, req.getTaskTrackerAddress(), () -> new ProcessorTracker(req, workerRuntime)); TaskDO task = new TaskDO(); task.setTaskId(req.getTaskId()); task.setTaskName(req.getTaskName()); task.setTaskContent(req.getTaskContent()); task.setFailedCnt(req.getTaskCurrentRetryNums()); task.setSubInstanceId(req.getSubInstanceId()); processorTracker.submitTask(task); }--pre>
ProcessorTrackerActor的onReceiveTaskTrackerStartTaskReq主要是获取或者创建processorTracker,然后执行其submitTask提交任务
ProcessorTracker.submitTask
tech/powerjob/worker/core/tracker/processor/ProcessorTracker.java
public void submitTask(TaskDO newTask) { // 一旦 ProcessorTracker 出现异常,所有提交到此处的任务直接返回失败,防止形成死锁 // 死锁分析:TT创建PT,PT创建失败,无法定期汇报心跳,TT长时间未收到PT心跳,认为PT宕机(确实宕机了),无法选择可用的PT再次派发任务,死锁形成,GG斯密达 T_T if (lethal) { ProcessorReportTaskStatusReq report = new ProcessorReportTaskStatusReq() .setInstanceId(instanceId) .setSubInstanceId(newTask.getSubInstanceId()) .setTaskId(newTask.getTaskId()) .setStatus(TaskStatus.WORKER_PROCESS_FAILED.getValue()) .setResult(lethalReason) .setReportTime(System.currentTimeMillis()); TransportUtils.ptReportTask(report, taskTrackerAddress, workerRuntime); return; } boolean success = false; // 1. 设置值并提交执行 newTask.setInstanceId(instanceInfo.getInstanceId()); newTask.setAddress(taskTrackerAddress); HeavyProcessorRunnable heavyProcessorRunnable = new HeavyProcessorRunnable(instanceInfo, taskTrackerAddress, newTask, processorBean, omsLogger, statusReportRetryQueue, workerRuntime); try { threadPool.submit(heavyProcessorRunnable); success = true; } catch (RejectedExecutionException ignore) { log.warn("[ProcessorTracker-{}] submit task(taskId={},taskName={}) to ThreadPool failed due to ThreadPool has too much task waiting to process, this task will dispatch to other ProcessorTracker.", instanceId, newTask.getTaskId(), newTask.getTaskName()); } catch (Exception e) { log.error("[ProcessorTracker-{}] submit task(taskId={},taskName={}) to ThreadPool failed.", instanceId, newTask.getTaskId(), newTask.getTaskName(), e); } // 2. 回复接收成功 if (success) { ProcessorReportTaskStatusReq reportReq = new ProcessorReportTaskStatusReq(); reportReq.setInstanceId(instanceId); reportReq.setSubInstanceId(newTask.getSubInstanceId()); reportReq.setTaskId(newTask.getTaskId()); reportReq.setStatus(TaskStatus.WORKER_RECEIVED.getValue()); reportReq.setReportTime(System.currentTimeMillis()); TransportUtils.ptReportTask(reportReq, taskTrackerAddress, workerRuntime); log.debug("[ProcessorTracker-{}] submit task(taskId={}, taskName={}) success, current queue size: {}.", instanceId, newTask.getTaskId(), newTask.getTaskName(), threadPool.getQueue().size()); } }
ProcessorTracker的submitTask方法创建HeavyProcessorRunnable,提交到threadPool执行,之后回复ProcessorReportTaskStatusReq,告知status为TaskStatus.WORKER_RECEIVED
PowerJob分布式框架任务调度框架
小结
- MapProcessor接口继承了BasicProcessor,它提供了默认的map方法用于分发子任务,它主要是构造了ProcessorMapTaskRequest,通过TransportUtils.reliableMapTask发送请求,它先把task保存下来,初始的status为TaskStatus.WAITING_DISPATCH;
- HeavyTaskTracker每5s调度一次Dispatcher,其run方法先通过ptStatusHolder.getAvailableProcessorTrackers()获取可以派发任务的ProcessorTracker,之后循环从数据库拉取一批状态为TaskStatus.WAITING_DISPATCH的任务,通过轮询的方式进行dispatchTask;
- dispatchTask先更新status为DISPATCH_SUCCESS_WORKER_UNCHECK,更新处理该任务的ProcessorTracker,之后构建TaskTrackerStartTaskReq,通过TransportUtils.ttStartPtTask派送执行task的请求;
- ProcessorTracker的submitTask方法创建HeavyProcessorRunnable,提交到threadPool执行,之后回复ProcessorReportTaskStatusReq,告知status为TaskStatus.WORKER_RECEIVED;
- 最后通过HeavyProcessorRunnable调用对应的processor.process方法执行具体的任务
以上就是PowerJob的MapProcessor工作流程源码解读的详细内容,更多关于PowerJob MapProcessor的资料请关注脚本之家其它相关文章!