PowerJob的WorkerHealthReporter工作流程源码解读
作者:codecraft
序
本文主要研究一下PowerJob的WorkerHealthReporter
WorkerHealthReporter
tech/powerjob/worker/background/WorkerHealthReporter.java
@Slf4j @RequiredArgsConstructor public class WorkerHealthReporter implements Runnable { private final WorkerRuntime workerRuntime; @Override public void run() { // 没有可用Server,无法上报 String currentServer = workerRuntime.getServerDiscoveryService().getCurrentServerAddress(); if (StringUtils.isEmpty(currentServer)) { log.warn("[WorkerHealthReporter] no available server,fail to report health info!"); return; } SystemMetrics systemMetrics; if (workerRuntime.getWorkerConfig().getSystemMetricsCollector() == null) { systemMetrics = SystemInfoUtils.getSystemMetrics(); } else { systemMetrics = workerRuntime.getWorkerConfig().getSystemMetricsCollector().collect(); } WorkerHeartbeat heartbeat = new WorkerHeartbeat(); heartbeat.setSystemMetrics(systemMetrics); heartbeat.setWorkerAddress(workerRuntime.getWorkerAddress()); heartbeat.setAppName(workerRuntime.getWorkerConfig().getAppName()); heartbeat.setAppId(workerRuntime.getAppId()); heartbeat.setHeartbeatTime(System.currentTimeMillis()); heartbeat.setVersion(PowerJobWorkerVersion.getVersion()); heartbeat.setProtocol(workerRuntime.getWorkerConfig().getProtocol().name()); heartbeat.setClient("KingPenguin"); heartbeat.setTag(workerRuntime.getWorkerConfig().getTag()); // 上报 Tracker 数量 heartbeat.setLightTaskTrackerNum(LightTaskTrackerManager.currentTaskTrackerSize()); heartbeat.setHeavyTaskTrackerNum(HeavyTaskTrackerManager.currentTaskTrackerSize()); // 是否超载 if (workerRuntime.getWorkerConfig().getMaxLightweightTaskNum() <= LightTaskTrackerManager.currentTaskTrackerSize() || workerRuntime.getWorkerConfig().getMaxHeavyweightTaskNum() <= HeavyTaskTrackerManager.currentTaskTrackerSize()){ heartbeat.setOverload(true); } // 获取当前加载的容器列表 heartbeat.setContainerInfos(OmsContainerFactory.getDeployedContainerInfos()); // 发送请求 if (StringUtils.isEmpty(currentServer)) { return; } // log log.info("[WorkerHealthReporter] report health status,appId:{},appName:{},isOverload:{},maxLightweightTaskNum:{},currentLightweightTaskNum:{},maxHeavyweightTaskNum:{},currentHeavyweightTaskNum:{}" , heartbeat.getAppId(), heartbeat.getAppName(), heartbeat.isOverload(), workerRuntime.getWorkerConfig().getMaxLightweightTaskNum(), heartbeat.getLightTaskTrackerNum(), workerRuntime.getWorkerConfig().getMaxHeavyweightTaskNum(), heartbeat.getHeavyTaskTrackerNum() ); TransportUtils.reportWorkerHeartbeat(heartbeat, currentServer, workerRuntime.getTransporter()); } }
WorkerHealthReporter实现了Runnable接口,其run方法先获取currentServer,再获取systemMetrics,接着构建WorkerHeartbeat,最后通过TransportUtils.reportWorkerHeartbeat上报
reportWorkerHeartbeat
tech/powerjob/worker/common/utils/TransportUtils.java
public static void reportWorkerHeartbeat(WorkerHeartbeat req, String address, Transporter transporter) { final URL url = easyBuildUrl(ServerType.SERVER, S4W_PATH, S4W_HANDLER_WORKER_HEARTBEAT, address); transporter.tell(url, req); } public static URL easyBuildUrl(ServerType serverType, String rootPath, String handlerPath, String address) { HandlerLocation handlerLocation = new HandlerLocation() .setRootPath(rootPath) .setMethodPath(handlerPath); return new URL() .setServerType(serverType) .setAddress(Address.fromIpv4(address)) .setLocation(handlerLocation); }
reportWorkerHeartbeat通过transporter.tell发送请求,其rootPath为server,其handlerPath为workerHeartbeat
processWorkerHeartbeat
tech/powerjob/server/core/handler/AbWorkerRequestHandler.java
@Handler(path = S4W_HANDLER_WORKER_HEARTBEAT, processType = ProcessType.NO_BLOCKING) public void processWorkerHeartbeat(WorkerHeartbeat heartbeat) { long startMs = System.currentTimeMillis(); WorkerHeartbeatEvent event = new WorkerHeartbeatEvent() .setAppName(heartbeat.getAppName()) .setAppId(heartbeat.getAppId()) .setVersion(heartbeat.getVersion()) .setProtocol(heartbeat.getProtocol()) .setTag(heartbeat.getTag()) .setWorkerAddress(heartbeat.getWorkerAddress()) .setDelayMs(startMs - heartbeat.getHeartbeatTime()) .setScore(heartbeat.getSystemMetrics().getScore()); processWorkerHeartbeat0(heartbeat, event); monitorService.monitor(event); }
processWorkerHeartbeat方法将heartbeat转换为WorkerHeartbeatEvent,然后执行processWorkerHeartbeat0及monitorService.monitor(event)
processWorkerHeartbeat0
tech/powerjob/server/core/handler/WorkerRequestHandlerImpl.java
protected void processWorkerHeartbeat0(WorkerHeartbeat heartbeat, WorkerHeartbeatEvent event) { WorkerClusterManagerService.updateStatus(heartbeat); }
processWorkerHeartbeat0通过WorkerClusterManagerService.updateStatus(heartbeat)来更新状态
WorkerClusterManagerService.updateStatus
tech/powerjob/server/remote/worker/WorkerClusterManagerService.java
public static void updateStatus(WorkerHeartbeat heartbeat) { Long appId = heartbeat.getAppId(); String appName = heartbeat.getAppName(); ClusterStatusHolder clusterStatusHolder = APP_ID_2_CLUSTER_STATUS.computeIfAbsent(appId, ignore -> new ClusterStatusHolder(appName)); clusterStatusHolder.updateStatus(heartbeat); }
updateStatus先获取appId对应的clusterStatusHolder,然后更新status
ClusterStatusHolder.updateStatus
tech/powerjob/server/remote/worker/ClusterStatusHolder.java
public void updateStatus(WorkerHeartbeat heartbeat) { String workerAddress = heartbeat.getWorkerAddress(); long heartbeatTime = heartbeat.getHeartbeatTime(); WorkerInfo workerInfo = address2WorkerInfo.computeIfAbsent(workerAddress, ignore -> { WorkerInfo wf = new WorkerInfo(); wf.refresh(heartbeat); return wf; }); long oldTime = workerInfo.getLastActiveTime(); if (heartbeatTime < oldTime) { log.warn("[ClusterStatusHolder-{}] receive the expired heartbeat from {}, serverTime: {}, heartTime: {}", appName, heartbeat.getWorkerAddress(), System.currentTimeMillis(), heartbeat.getHeartbeatTime()); return; } workerInfo.refresh(heartbeat); List<DeployedContainerInfo> containerInfos = heartbeat.getContainerInfos(); if (!CollectionUtils.isEmpty(containerInfos)) { containerInfos.forEach(containerInfo -> { Map<String, DeployedContainerInfo> infos = containerId2Infos.computeIfAbsent(containerInfo.getContainerId(), ignore -> Maps.newConcurrentMap()); infos.put(workerAddress, containerInfo); }); } }
ClusterStatusHolder的updateStatus方法先获取workerInfo,判断其heartbeatTime是否小于lastActiveTime,是则返回,否则执行workerInfo.refresh(heartbeat),最后更新一下heartbeat.getContainerInfos()
refresh
tech/powerjob/server/common/module/WorkerInfo.java
public void refresh(WorkerHeartbeat workerHeartbeat) { address = workerHeartbeat.getWorkerAddress(); lastActiveTime = workerHeartbeat.getHeartbeatTime(); protocol = workerHeartbeat.getProtocol(); client = workerHeartbeat.getClient(); tag = workerHeartbeat.getTag(); systemMetrics = workerHeartbeat.getSystemMetrics(); containerInfos = workerHeartbeat.getContainerInfos(); lightTaskTrackerNum = workerHeartbeat.getLightTaskTrackerNum(); heavyTaskTrackerNum = workerHeartbeat.getHeavyTaskTrackerNum(); if (workerHeartbeat.isOverload()) { overloading = true; lastOverloadTime = workerHeartbeat.getHeartbeatTime(); log.warn("[WorkerInfo] worker {} is overload!", getAddress()); } else { overloading = false; } }
WorkerInfo的refresh方法根据workerHeartbeat更新lastActiveTime及overloading等信息
DisconnectedWorkerFilter
tech/powerjob/server/extension/defaultimpl/workerfilter/DisconnectedWorkerFilter.java
@Slf4j @Component public class DisconnectedWorkerFilter implements WorkerFilter { @Override public boolean filter(WorkerInfo workerInfo, JobInfoDO jobInfo) { boolean timeout = workerInfo.timeout(); if (timeout) { log.info("[Job-{}] filter worker[{}] due to timeout(lastActiveTime={})", jobInfo.getId(), workerInfo.getAddress(), workerInfo.getLastActiveTime()); } return timeout; } }
DisconnectedWorkerFilter实现了WorkerFilter接口,其filter方法返回workerInfo.timeout()
timeout
tech/powerjob/server/common/module/WorkerInfo.java
private static final long WORKER_TIMEOUT_MS = 60000; public boolean timeout() { long timeout = System.currentTimeMillis() - lastActiveTime; return timeout > WORKER_TIMEOUT_MS; }
timeout方法判断当前时间与lastActiveTime的时间差,之后与默认的WORKER_TIMEOUT_MS(60s
)对比
getSuitableWorkers
tech/powerjob/server/remote/worker/WorkerClusterQueryService.java
public List<WorkerInfo> getSuitableWorkers(JobInfoDO jobInfo) { List<WorkerInfo> workers = Lists.newLinkedList(getWorkerInfosByAppId(jobInfo.getAppId()).values()); workers.removeIf(workerInfo -> filterWorker(workerInfo, jobInfo)); DispatchStrategy dispatchStrategy = DispatchStrategy.of(jobInfo.getDispatchStrategy()); switch (dispatchStrategy) { case RANDOM: Collections.shuffle(workers); break; case HEALTH_FIRST: workers.sort((o1, o2) -> o2.getSystemMetrics().calculateScore() - o1.getSystemMetrics().calculateScore()); break; default: // do nothing } // 限定集群大小(0代表不限制) if (!workers.isEmpty() && jobInfo.getMaxWorkerCount() > 0 && workers.size() > jobInfo.getMaxWorkerCount()) { workers = workers.subList(0, jobInfo.getMaxWorkerCount()); } return workers; } private boolean filterWorker(WorkerInfo workerInfo, JobInfoDO jobInfo) { for (WorkerFilter filter : workerFilters) { if (filter.filter(workerInfo, jobInfo)) { return true; } } return false; }
getSuitableWorkers方法会remove掉filterWorker(workerInfo, jobInfo)为true的worker
小结
PowerJob的WorkerHealthReporter实现了Runnable接口,其run方法先获取currentServer,再获取systemMetrics,接着构建WorkerHeartbeat,最后通过TransportUtils.reportWorkerHeartbeat上报;
reportWorkerHeartbeat通过transporter.tell发送请求,其rootPath为server,其handlerPath为workerHeartbeat;
服务端通过WorkerClusterManagerService.updateStatus(heartbeat)来更新状态,主要是执行WorkerInfo的refresh方法,它根据workerHeartbeat更新lastActiveTime及overloading等信息;
而DisconnectedWorkerFilter实现了WorkerFilter接口,其filter方法返回workerInfo.timeout(),它会将心跳超时的worker给排除掉。
以上就是PowerJob的WorkerHealthReporter工作流程源码解读的详细内容,更多关于PowerJob WorkerHealthReporter工作流程的资料请关注脚本之家其它相关文章!
您可能感兴趣的文章:
- PowerJob的TimingStrategyHandler工作流程源码解读
- PowerJob的IdGenerateService工作流程源码解读
- PowerJob LockService方法工作流程源码解读
- PowerJob的Evaluator方法工作流程源码解读
- PowerJob的DatabaseMonitorAspect源码流程
- PowerJob的AbstractScriptProcessor实现类工作流程源码解读
- PowerJob的OmsLogHandler工作流程源码解析
- PowerJob的ServerDiscoveryService工作流程源码解读
- PowerJob的ProcessorLoader工作流程源码解读
- PowerJob的DispatchStrategy方法工作流程源码解读