PowerJob的AbstractScriptProcessor实现类工作流程源码解读
作者:codecraft
序
本文主要研究一下PowerJob的AbstractScriptProcessor
AbstractScriptProcessor
tech/powerjob/official/processors/impl/script/AbstractScriptProcessor.java
@Slf4j public abstract class AbstractScriptProcessor extends CommonBasicProcessor { private static final ForkJoinPool POOL = new ForkJoinPool(4 * Runtime.getRuntime().availableProcessors()); private static final Set<String> DOWNLOAD_PROTOCOL = Sets.newHashSet("http", "https", "ftp"); protected static final String SH_SHELL = "/bin/sh"; protected static final String CMD_SHELL = "cmd.exe"; private static final String WORKER_DIR = PowerFileUtils.workspace() + "/official_script_processor/"; @Override protected ProcessResult process0(TaskContext context) throws Exception { OmsLogger omsLogger = context.getOmsLogger(); String scriptParams = CommonUtils.parseParams(context); omsLogger.info("[SYSTEM] ScriptProcessor start to process, params: {}", scriptParams); if (scriptParams == null) { String message = "[SYSTEM] ScriptParams is null, please check jobParam configuration."; omsLogger.warn(message); return new ProcessResult(false, message); } String scriptPath = prepareScriptFile(context.getInstanceId(), scriptParams); omsLogger.info("[SYSTEM] Generate executable file successfully, path: {}", scriptPath); if (SystemUtils.IS_OS_WINDOWS) { if (StringUtils.equals(getRunCommand(), SH_SHELL)) { String message = String.format("[SYSTEM] Current OS is %s where shell scripts cannot run.", SystemUtils.OS_NAME); omsLogger.warn(message); return new ProcessResult(false, message); } } // 授权 if ( !SystemUtils.IS_OS_WINDOWS) { ProcessBuilder chmodPb = new ProcessBuilder("/bin/chmod", "755", scriptPath); // 等待返回,这里不可能导致死锁(shell产生大量数据可能导致死锁) chmodPb.start().waitFor(); omsLogger.info("[SYSTEM] chmod 755 authorization complete, ready to start execution~"); } // 2. 执行目标脚本 ProcessBuilder pb = StringUtils.equals(getRunCommand(), CMD_SHELL) ? new ProcessBuilder(getRunCommand(), "/c", scriptPath) : new ProcessBuilder(getRunCommand(), scriptPath); Process process = pb.start(); StringBuilder inputBuilder = new StringBuilder(); StringBuilder errorBuilder = new StringBuilder(); boolean success = true; String result; final Charset charset = getCharset(); try (InputStream is = process.getInputStream(); InputStream es = process.getErrorStream()) { POOL.execute(() -> copyStream(is, inputBuilder, omsLogger, charset)); POOL.execute(() -> copyStream(es, errorBuilder, omsLogger, charset)); success = process.waitFor() == 0; } catch (InterruptedException ie) { omsLogger.info("[SYSTEM] ScriptProcessor has been interrupted"); } finally { result = String.format("[INPUT]: %s;[ERROR]: %s", inputBuilder.toString(), errorBuilder.toString()); } return new ProcessResult(success, result); } /** * 生成脚本名称 * @param instanceId id of instance * @return 文件名称 */ protected abstract String getScriptName(Long instanceId); /** * 获取运行命令(eg,shell返回 /bin/sh) * @return 执行脚本的命令 */ protected abstract String getRunCommand(); //...... }
AbstractScriptProcessor继承了CommonBasicProcessor,它定义了一个parallelism为4*Runtime.getRuntime().availableProcessors()的ForkJoinPool;其process0方法先读取scriptParams,然后执行prepareScriptFile获取scriptPath,接着使用chmod变更script权限为755,然后通过getRunCommand获取命令,接着往pool提交copyStream,等待process返回
prepareScriptFile
private String prepareScriptFile(Long instanceId, String processorInfo) throws IOException { String scriptPath = WORKER_DIR + getScriptName(instanceId); File script = new File(scriptPath); if (script.exists()) { return scriptPath; } File dir = new File(script.getParent()); boolean success = dir.mkdirs(); success = script.createNewFile(); if (!success) { throw new RuntimeException("create script file failed"); } // 如果是下载链接,则从网络获取 for (String protocol : DOWNLOAD_PROTOCOL) { if (processorInfo.startsWith(protocol)) { FileUtils.copyURLToFile(new URL(processorInfo), script, 5000, 300000); return scriptPath; } } final Charset charset = getCharset(); if(charset != null) { try (Writer fstream = new OutputStreamWriter(Files.newOutputStream(script.toPath()), charset); BufferedWriter out = new BufferedWriter(fstream)) { out.write(processorInfo); out.flush(); } } else { try (FileWriter fw = new FileWriter(script); BufferedWriter bw = new BufferedWriter(fw)) { bw.write(processorInfo); bw.flush(); } } return scriptPath; }
prepareScriptFile先通过getScriptName获取scriptPath,如果是http、https、ftp链接则通过FileUtils.copyURLToFile下载,否则把scriptParams写入到scriptPath
copyStream
private static void copyStream(InputStream is, StringBuilder sb, OmsLogger omsLogger, Charset charset) { String line; try (BufferedReader br = new BufferedReader(new InputStreamReader(is, charset))) { while ((line = br.readLine()) != null) { sb.append(line); // 同步到在线日志 omsLogger.info(line); } } catch (Exception e) { log.warn("[ScriptProcessor] copyStream failed.", e); omsLogger.warn("[SYSTEM] copyStream failed.", e); sb.append("Exception: ").append(e); } }
copyStream会读取InputStream到StringBuilder,同时打印到omsLogger
ShellProcessor
tech/powerjob/official/processors/impl/script/ShellProcessor.java
public class ShellProcessor extends AbstractScriptProcessor { @Override protected String getScriptName(Long instanceId) { return String.format("shell_%d.sh", instanceId); } @Override protected String getRunCommand() { return SH_SHELL; } }
ShellProcessor的getScriptName是基于shell_%d.sh和instanceId生成的;其getRunCommand为/bin/sh
CMDProcessor
tech/powerjob/official/processors/impl/script/CMDProcessor.java
public class CMDProcessor extends AbstractScriptProcessor { @Override protected String getScriptName(Long instanceId) { return String.format("cmd_%d.bat", instanceId); } @Override protected String getRunCommand() { return "cmd.exe"; } @Override protected Charset getCharset() { return Charset.defaultCharset(); } }
CMDProcessor的getScriptName是基于cmd_%d.bat和instanceId生成,其getRunCommand为`cmd.exe
PowerShellProcessor
tech/powerjob/official/processors/impl/script/PowerShellProcessor.java
public class PowerShellProcessor extends AbstractScriptProcessor { @Override protected String getScriptName(Long instanceId) { return String.format("powershell_%d.ps1", instanceId); } @Override protected String getRunCommand() { return "powershell.exe"; } @Override protected Charset getCharset() { return Charset.defaultCharset(); } }
PowerShellProcessor的getScriptName是基于powershell_%d.ps1"和instanceId生成,其getRunCommand为powershell.exe
PythonProcessor
tech/powerjob/official/processors/impl/script/PythonProcessor.java
public class PythonProcessor extends AbstractScriptProcessor { @Override protected String getScriptName(Long instanceId) { return String.format("python_%d.py", instanceId); } @Override protected String getRunCommand() { return "python"; } }
PythonProcessor的getScriptName是基于python_%d.py和instanceId生成,其getRunCommand为python
小结
AbstractScriptProcessor继承了CommonBasicProcessor,它有四个实现类分别是ShellProcessor、CMDProcessor、PowerShellProcessor、PythonProcessor;它定义了getScriptName、getRunCommand抽象方法;其process0方法主要是把scriptParams写入到本地文件(scriptParams是http、https、ftp的则根据url进行下载),然后修改权限为755,然后执行pb.start(),再将input及errorStream收集到StringBuilder并打印到omsLogger,最后process.waitFor()等待处理完成。
以上就是PowerJob的AbstractScriptProcessor方法工作流程源码解读的详细内容,更多关于PowerJob AbstractScriptProcessor的资料请关注脚本之家其它相关文章!
您可能感兴趣的文章:
- PowerJob的TimingStrategyHandler工作流程源码解读
- PowerJob的IdGenerateService工作流程源码解读
- PowerJob LockService方法工作流程源码解读
- PowerJob的Evaluator方法工作流程源码解读
- PowerJob的DatabaseMonitorAspect源码流程
- PowerJob的WorkerHealthReporter工作流程源码解读
- PowerJob的OmsLogHandler工作流程源码解析
- PowerJob的ServerDiscoveryService工作流程源码解读
- PowerJob的ProcessorLoader工作流程源码解读
- PowerJob的DispatchStrategy方法工作流程源码解读