PowerJob的ProcessorLoader工作流程源码解读
作者:codecraft
序
本文主要研究一下PowerJob的ProcessorLoader
ProcessorLoader
tech/powerjob/worker/processor/ProcessorLoader.java
public interface ProcessorLoader { ProcessorBean load(ProcessorDefinition definition); }
ProcessorLoader定义了load方法,用于根据ProcessorDefinition加载ProcessorBean
ProcessorDefinition
tech/powerjob/worker/extension/processor/ProcessorDefinition.java
@Getter @Setter @ToString @Accessors(chain = true) public class ProcessorDefinition implements Serializable { /** * 后台配置的处理器类型 */ private String processorType; /** * 后台配置的处理器信息 */ private String processorInfo; @Override public boolean equals(Object o) { if (this == o) { return true; } if (o == null || getClass() != o.getClass()) { return false; } ProcessorDefinition that = (ProcessorDefinition) o; return Objects.equals(processorType, that.processorType) && Objects.equals(processorInfo, that.processorInfo); } @Override public int hashCode() { return Objects.hash(processorType, processorInfo); } }
ProcessorDefinition定义了processorType及processorInfo两个属性
ProcessorBean
tech/powerjob/worker/extension/processor/ProcessorBean.java
@Getter @Setter @Accessors(chain = true) public class ProcessorBean { /** * 真正用来执行逻辑的处理器对象 */ private transient BasicProcessor processor; /** * 加载该处理器对象的 classLoader,可空,空则使用 {@link Object#getClass()#getClassLoader() 代替} */ private transient ClassLoader classLoader; }
ProcessorBean定义了BasicProcessor及ClassLoader两个属性
PowerJobProcessorLoader
tech/powerjob/worker/processor/PowerJobProcessorLoader.java
@Slf4j public class PowerJobProcessorLoader implements ProcessorLoader { private final List<ProcessorFactory> processorFactoryList; private final Map<ProcessorDefinition, ProcessorBean> def2Bean = new ConcurrentHashMap<>(128); public PowerJobProcessorLoader(List<ProcessorFactory> processorFactoryList) { this.processorFactoryList = processorFactoryList; } @Override public ProcessorBean load(ProcessorDefinition definition) { return def2Bean.computeIfAbsent(definition, ignore -> { final String processorType = definition.getProcessorType(); log.info("[ProcessorFactory] start to load Processor: {}", definition); for (ProcessorFactory pf : processorFactoryList) { final String pfName = pf.getClass().getSimpleName(); if (!Optional.ofNullable(pf.supportTypes()).orElse(Collections.emptySet()).contains(processorType)) { log.info("[ProcessorFactory] [{}] can't load type={}, skip!", pfName, processorType); continue; } log.info("[ProcessorFactory] [{}] try to load processor: {}", pfName, definition); try { ProcessorBean processorBean = pf.build(definition); if (processorBean != null) { log.info("[ProcessorFactory] [{}] load processor successfully: {}", pfName, definition); return processorBean; } } catch (Throwable t) { log.error("[ProcessorFactory] [{}] load processor failed: {}", pfName, definition, t); } } throw new PowerJobException("fetch Processor failed, please check your processorType and processorInfo config"); }); } }
PowerJobProcessorLoader实现了ProcessorLoader接口,其构造器要求传入processorFactoryList,它还定义了def2Bean,用于维护ProcessorDefinition与ProcessorBean的关系;其load方法使用ConcurrentHashMap的computeIfAbsent,将加载好的ProcessorBean放入到def2Bean;其加载过程为遍历processorFactoryList,找到支持该processorType的ProcessorFactory,然后执行其build方法进行构造
ProcessorFactory
tech/powerjob/worker/extension/processor/ProcessorFactory.java
public interface ProcessorFactory { /** * 支持的处理器类型,类型不匹配则跳过该 ProcessorFactory 的加载逻辑 * 对应的是控制台的'处理器类型' TAB,不做任何定制的情况下,取值范围为 {@link ProcessorType#name()} * @return 支持的处理器类型 */ Set<String> supportTypes(); /** * 根据处理器定义构建处理器对象 * 注意:Processor 为单例对象,即 PowerJob 对每一个 ProcessorBean 只调用一次 build 方法 * @param processorDefinition 处理器定义 * @return null or ProcessorBean */ ProcessorBean build(ProcessorDefinition processorDefinition); }
ProcessorFactory接口定义了supportTypes、build方法;它有四个实现类,其中BuiltInSpringProcessorFactory及BuildInSpringMethodProcessorFactory继承自AbstractBuildInSpringProcessorFactory,另外两个为BuiltInDefaultProcessorFactory、JarContainerProcessorFactory
BuiltInDefaultProcessorFactory
tech/powerjob/worker/processor/impl/BuiltInDefaultProcessorFactory.java
@Slf4j public class BuiltInDefaultProcessorFactory implements ProcessorFactory { @Override public Set<String> supportTypes() { return Sets.newHashSet(ProcessorType.BUILT_IN.name()); } @Override public ProcessorBean build(ProcessorDefinition processorDefinition) { String className = processorDefinition.getProcessorInfo(); try { Class<?> clz = Class.forName(className); BasicProcessor basicProcessor = (BasicProcessor) clz.getDeclaredConstructor().newInstance(); return new ProcessorBean() .setProcessor(basicProcessor) .setClassLoader(basicProcessor.getClass().getClassLoader()); }catch (Exception e) { log.warn("[ProcessorFactory] load local Processor(className = {}) failed.", className, e); } return null; } }
BuiltInDefaultProcessorFactory是默认的处理器工厂,通过全限定类名加载处理器,但没有IOC功能
JarContainerProcessorFactory
tech/powerjob/worker/processor/impl/JarContainerProcessorFactory.java
@Slf4j public class JarContainerProcessorFactory implements ProcessorFactory { private final WorkerRuntime workerRuntime; public JarContainerProcessorFactory(WorkerRuntime workerRuntime) { this.workerRuntime = workerRuntime; } @Override public Set<String> supportTypes() { return Sets.newHashSet(ProcessorType.EXTERNAL.name()); } @Override public ProcessorBean build(ProcessorDefinition processorDefinition) { String processorInfo = processorDefinition.getProcessorInfo(); String[] split = processorInfo.split("#"); String containerName = split[0]; String className = split[1]; log.info("[ProcessorFactory] try to load processor({}) in container({})", className, containerName); OmsContainer omsContainer = OmsContainerFactory.fetchContainer(Long.valueOf(containerName), workerRuntime); if (omsContainer != null) { return new ProcessorBean() .setProcessor(omsContainer.getProcessor(className)) .setClassLoader(omsContainer.getContainerClassLoader()); } else { log.warn("[ProcessorFactory] load container failed. processor info : {}", processorInfo); } return null; } }
JarContainerProcessorFactory主要是通过OmsContainer来加载ProcessorBean
AbstractBuildInSpringProcessorFactory
tech/powerjob/worker/processor/impl/AbstractBuildInSpringProcessorFactory.java
@Slf4j public abstract class AbstractBuildInSpringProcessorFactory implements ProcessorFactory { protected final ApplicationContext applicationContext; protected AbstractBuildInSpringProcessorFactory(ApplicationContext applicationContext) { this.applicationContext = applicationContext; } @Override public Set<String> supportTypes() { return Sets.newHashSet(ProcessorType.BUILT_IN.name()); } protected boolean checkCanLoad() { try { ApplicationContext.class.getClassLoader(); return applicationContext != null; } catch (Throwable ignore) { } return false; } @SuppressWarnings("unchecked") protected static <T> T getBean(String className, ApplicationContext ctx) throws Exception { // 0. 尝试直接用 Bean 名称加载 try { final Object bean = ctx.getBean(className); if (bean != null) { return (T) bean; } } catch (Exception ignore) { } // 1. ClassLoader 存在,则直接使用 clz 加载 ClassLoader classLoader = ctx.getClassLoader(); if (classLoader != null) { return (T) ctx.getBean(classLoader.loadClass(className)); } // 2. ClassLoader 不存在(系统类加载器不可见),尝试用类名称小写加载 String[] split = className.split("\\."); String beanName = split[split.length - 1]; // 小写转大写 char[] cs = beanName.toCharArray(); cs[0] += 32; String beanName0 = String.valueOf(cs); log.warn("[SpringUtils] can't get ClassLoader from context[{}], try to load by beanName:{}", ctx, beanName0); return (T) ctx.getBean(beanName0); } }
AbstractBuildInSpringProcessorFactory是两个spring相关ProcessorFactory的抽象类,它使用spring的ApplicationContext来加载
BuiltInSpringProcessorFactory
tech/powerjob/worker/processor/impl/BuiltInSpringProcessorFactory.java
@Slf4j public class BuiltInSpringProcessorFactory extends AbstractBuildInSpringProcessorFactory { public BuiltInSpringProcessorFactory(ApplicationContext applicationContext) { super(applicationContext); } @Override public ProcessorBean build(ProcessorDefinition processorDefinition) { try { boolean canLoad = checkCanLoad(); if (!canLoad) { log.info("[ProcessorFactory] can't find Spring env, this processor can't load by 'BuiltInSpringProcessorFactory'"); return null; } String processorInfo = processorDefinition.getProcessorInfo(); //用于区分方法级别的参数 if (processorInfo.contains("#")) { return null; } BasicProcessor basicProcessor = getBean(processorInfo, applicationContext); return new ProcessorBean() .setProcessor(basicProcessor) .setClassLoader(basicProcessor.getClass().getClassLoader()); } catch (NoSuchBeanDefinitionException ignore) { log.warn("[ProcessorFactory] can't find the processor in SPRING"); } catch (Throwable t) { log.warn("[ProcessorFactory] load by BuiltInSpringProcessorFactory failed. If you are using Spring, make sure this bean was managed by Spring", t); } return null; } }
BuiltInSpringProcessorFactory通过ApplicationContext加载spring相关的Bean,但它不处理processorInfo包含#
的processorDefinition
BuildInSpringMethodProcessorFactory
tech/powerjob/worker/processor/impl/BuildInSpringMethodProcessorFactory.java
@Slf4j public class BuildInSpringMethodProcessorFactory extends AbstractBuildInSpringProcessorFactory { private static final List<String> jobHandlerRepository = new LinkedList<>(); private final static String DELIMITER = "#"; public BuildInSpringMethodProcessorFactory(ApplicationContext applicationContext) { super(applicationContext); } @Override public ProcessorBean build(ProcessorDefinition processorDefinition) { try { boolean canLoad = checkCanLoad(); if (!canLoad) { log.info("[ProcessorFactory] can't find Spring env, this processor can't load by 'BuildInSpringMethodProcessorFactory'"); return null; } String processorInfo = processorDefinition.getProcessorInfo(); if (!processorInfo.contains(DELIMITER)) { log.info("[ProcessorFactory] can't parse processorDefinition, this processor can't load by 'BuildInSpringMethodProcessorFactory'"); return null; } String[] split = processorInfo.split(DELIMITER); String methodName = split[1]; String className = split[0]; Object bean = getBean(className,applicationContext); Method[] methods = bean.getClass().getDeclaredMethods(); for (Method method : methods) { PowerJobHandler powerJob = method.getAnnotation(PowerJobHandler.class); if (powerJob == null) { continue; } String name = powerJob.name(); //匹配到和页面定义相同的methodName if (!name.equals(methodName)) { continue; } if (name.trim().length() == 0) { throw new RuntimeException("method-jobhandler name invalid, for[" + bean.getClass() + "#" + method.getName() + "] ."); } if (containsJobHandler(name)) { throw new RuntimeException("jobhandler[" + name + "] naming conflicts."); } method.setAccessible(true); registerJobHandler(methodName); MethodBasicProcessor processor = new MethodBasicProcessor(bean, method); return new ProcessorBean() .setProcessor(processor) .setClassLoader(processor.getClass().getClassLoader()); } } catch (NoSuchBeanDefinitionException ignore) { log.warn("[ProcessorFactory] can't find the processor in SPRING"); } catch (Throwable t) { log.warn("[ProcessorFactory] load by BuiltInSpringProcessorFactory failed. If you are using Spring, make sure this bean was managed by Spring", t); } return null; } public static void registerJobHandler(String name) { jobHandlerRepository.add(name); } private boolean containsJobHandler(String name) { return jobHandlerRepository.contains(name); } }
BuildInSpringMethodProcessorFactory专门用于processorInfo包含#的processorDefinition,它会遍历指定class的methods,找到方法上标注有@PowerJobHandler注解且方法名一致的method,注册到jobHandlerRepository,其创建的是MethodBasicProcessor
小结
PowerJob的ProcessorLoader定义了load方法,用于根据ProcessorDefinition加载ProcessorBean;PowerJobProcessorLoader实现了ProcessorLoader接口,它会遍历processorFactoryList,找到支持该processorType的ProcessorFactory,然后执行其build方法进行构造;ProcessorFactory接口定义了supportTypes、build方法;它有四个实现类,其中BuiltInSpringProcessorFactory及BuildInSpringMethodProcessorFactory继承自AbstractBuildInSpringProcessorFactory,另外两个为BuiltInDefaultProcessorFactory、JarContainerProcessorFactory。
以上就是PowerJob的ProcessorLoader的详细内容,更多关于PowerJob ProcessorLoader的资料请关注脚本之家其它相关文章!
您可能感兴趣的文章:
- PowerJob的TimingStrategyHandler工作流程源码解读
- PowerJob的IdGenerateService工作流程源码解读
- PowerJob LockService方法工作流程源码解读
- PowerJob的Evaluator方法工作流程源码解读
- PowerJob的DatabaseMonitorAspect源码流程
- PowerJob的AbstractScriptProcessor实现类工作流程源码解读
- PowerJob的WorkerHealthReporter工作流程源码解读
- PowerJob的OmsLogHandler工作流程源码解析
- PowerJob的ServerDiscoveryService工作流程源码解读
- PowerJob的DispatchStrategy方法工作流程源码解读