PowerJob的DesignateServer工作流程源码解读
作者:codecraft
这篇文章主要介绍了PowerJob的DesignateServer工作流程源码解读,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
序
本文主要研究一下PowerJob的DesignateServer
DesignateServer
tech/powerjob/server/remote/server/redirector/DesignateServer.java
@Target(ElementType.METHOD) @Retention(RetentionPolicy.RUNTIME) public @interface DesignateServer { /** * 转发请求需要 AppInfo 下的 currentServer 信息,因此必须要有 appId 作为入参,该字段指定了 appId 字段的参数名称,默认为 appId * @return appId 参数名称 */ String appIdParameterName() default "appId"; }
DesignateServer注解定义了appIdParameterName属性,默认是appId
DesignateServerAspect
tech/powerjob/server/remote/server/redirector/DesignateServerAspect.java
@Slf4j @Aspect @Component @Order(0) @RequiredArgsConstructor public class DesignateServerAspect { private final TransportService transportService; private final AppInfoRepository appInfoRepository; private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); @Around(value = "@annotation(designateServer))") public Object execute(ProceedingJoinPoint point, DesignateServer designateServer) throws Throwable { // 参数 Object[] args = point.getArgs(); // 方法名 String methodName = point.getSignature().getName(); // 类名 String className = point.getSignature().getDeclaringTypeName(); Signature signature = point.getSignature(); // 方法签名 MethodSignature methodSignature = (MethodSignature) signature; String[] parameterNames = methodSignature.getParameterNames(); String[] parameterTypes = Arrays.stream(methodSignature.getParameterTypes()).map(Class::getName).toArray(String[]::new); Long appId = null; for (int i = 0; i < parameterNames.length; i++) { if (StringUtils.equals(parameterNames[i], designateServer.appIdParameterName())) { appId = Long.parseLong(String.valueOf(args[i])); break; } } if (appId == null) { throw new PowerJobException("can't find appId in params for:" + signature); } // 获取执行机器 AppInfoDO appInfo = appInfoRepository.findById(appId).orElseThrow(() -> new PowerJobException("can't find app info")); String targetServer = appInfo.getCurrentServer(); // 目标IP为空,本地执行 if (StringUtils.isEmpty(targetServer)) { return point.proceed(); } // 目标IP与本地符合则本地执行 if (Objects.equals(targetServer, transportService.defaultProtocol().getAddress())) { return point.proceed(); } log.info("[DesignateServerAspect] the method[{}] should execute in server[{}], so this request will be redirect to remote server!", signature.toShortString(), targetServer); // 转发请求,远程执行后返回结果 RemoteProcessReq remoteProcessReq = new RemoteProcessReq() .setClassName(className) .setMethodName(methodName) .setParameterTypes(parameterTypes) .setArgs(args); final URL friendUrl = ServerURLFactory.process2Friend(targetServer); CompletionStage<AskResponse> askCS = transportService.ask(Protocol.HTTP.name(), friendUrl, remoteProcessReq, AskResponse.class); AskResponse askResponse = askCS.toCompletableFuture().get(RemoteConstant.DEFAULT_TIMEOUT_MS, TimeUnit.MILLISECONDS); if (!askResponse.isSuccess()) { throw new PowerJobException("remote process failed: " + askResponse.getMessage()); } // 考虑范型情况 Method method = methodSignature.getMethod(); JavaType returnType = getMethodReturnJavaType(method); return OBJECT_MAPPER.readValue(askResponse.getData(), returnType); } //...... }
DesignateServerAspect拦截了@DesignateServer注解,它先解析方法参数名,取出参数名与@DesignateServer的appIdParameterName一致的参数值,再通过appInfoRepository.findById找到AppInfoDO,获取appInfo.getCurrentServer();若currentServer就是本机则执行point.proceed(),否则构建RemoteProcessReq,通过transportService.ask转发请求
示例
tech/powerjob/server/core/instance/InstanceLogService.java
/** * 获取日志的下载链接 * @param appId AOP 专用 * @param instanceId 任务实例 ID * @return 下载链接 */ @DesignateServer public String fetchDownloadUrl(Long appId, Long instanceId) { String url = "http://" + NetUtils.getLocalHost() + ":" + port + "/instance/downloadLog?instanceId=" + instanceId; log.info("[InstanceLog-{}] downloadURL for appId[{}]: {}", instanceId, appId, url); return url; }
fetchDownloadUrl指定了@DesignateServer注解,会根据appId的值限定在指定server执行
小结
PowerJob的DesignateServer注解定义了appIdParameterName属性,默认是appId;DesignateServerAspect拦截了@DesignateServer注解,它判断currentServer就是本机则执行point.proceed(),否则构建RemoteProcessReq,通过transportService.ask转发请求到指定server执行。
以上就是PowerJob的DesignateServer工作流程源码解读的详细内容,更多关于PowerJob DesignateServer的资料请关注脚本之家其它相关文章!