Java如何批量执行datax脚本
作者:可乐还是甜的好
这篇文章主要介绍了Java如何批量执行datax脚本问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
一、前言
最近在使用datax同步数据表,由于好多个表,一开始每次都需要等一个执行完再执行下一个,这样明显效率很低,于是写了个Java方法来批量操作;
环境: linux
服务器
二、Java代码
import java.io.BufferedReader; import java.io.File; import java.io.FileOutputStream; import java.io.InputStreamReader; import java.text.SimpleDateFormat; import java.util.Date; /** * @Author: Huang JX * @Date: 2021/11/11 * @Description: datax 批量执行脚本,日志存放在当前目录的 DataxSyncLogs 中; * 注意:需要输入 datax 脚本的文件夹路径作为 main 函数的参数,如 home/xxx/datax/job/ * <p> * 使用说明: * 1、去掉文件的包名package; * 2、将此Java文件放到 /datax/bin/目录下; * 3、javac DataxSyncUtil.java * 4、java DataxSyncUtil [参数]datax脚本文件夹路径 */ public class DataxSyncUtil { public static void main(String[] args) throws Exception { FileOutputStream out = null; StringBuffer sb = new StringBuffer(); SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmss"); try { String jobPath = args[0]; if (jobPath == null && "".equals(jobPath.trim())) { throw new Exception("请输入datax脚本文件夹路径"); } String logDir = jobPath + "DataxSyncLogs"; File logDirFile = new File(logDir); if (!logDirFile.exists()) { logDirFile.mkdir(); // 创建日志文件夹 } String logFileName = logDir + "/datax_sync_" + sdf.format(new Date()) + ".log"; File file = new File(logFileName); if (!file.exists()) file.createNewFile(); out = new FileOutputStream(file, true); sb.append("==========================================\n"); sb.append("==========Datax Sync Job Start!==========\n"); sb.append("==========================================\n"); File f = new File(jobPath); if (!f.exists()) { sb.append(jobPath + " not exists\n"); return; } System.out.print(sb.toString()); out.write(sb.toString().getBytes("utf-8")); String cmdStr; File fa[] = f.listFiles(); for (int i = 0; i < fa.length; i++) { sb = new StringBuffer(); File fs = fa[i]; if (!fs.isDirectory()) { Process pr = null; cmdStr = "python3 datax.py " + jobPath + fs.getName(); sb.append("start cmd: " + cmdStr + "\n"); System.out.print("start cmd: " + cmdStr + "\n"); pr = Runtime.getRuntime().exec(cmdStr); BufferedReader in = new BufferedReader(new InputStreamReader(pr.getInputStream())); String line; while (true) { if (!((line = in.readLine()) != null)) break; sb.append(line + "\n"); System.out.print(line + "\n"); } in.close(); pr.waitFor(); out.write(sb.toString().getBytes("utf-8")); } } sb.append("==========================================\n"); sb.append("===========Datax Sync Job End!===========\n"); sb.append("==========================================\n"); System.out.print(sb.toString()); out.write(sb.toString().getBytes("utf-8")); } finally { out.flush(); out.close(); } } }
注意:
看一下注释的说明,将job的文件路径做为main函数的参数;
【更新记录】
1、2021-12-06 更新写日志方式,每跑完一个任务就写日志到文件中;
三、使用说明
1.把DataxSyncUtil
拷到服务器中datax的bin目录下,如/home/xxx/datax/bin/
,注意去除Java类的包名;
2.编译Java程序,javac DataxSyncUtil.java
3.执行Java程序,传入参数为放置datax脚本的文件夹,如java DataxSyncUtil /home/xxx/datax/job/
4.如果想在后台运行,则为nohup java DataxSyncUtil /home/xxx/datax/job/ &
总结
以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。