Java如何批量执行datax脚本
作者:可乐还是甜的好
这篇文章主要介绍了Java如何批量执行datax脚本问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
一、前言
最近在使用datax同步数据表,由于好多个表,一开始每次都需要等一个执行完再执行下一个,这样明显效率很低,于是写了个Java方法来批量操作;
环境: linux服务器
二、Java代码
import java.io.BufferedReader;
import java.io.File;
import java.io.FileOutputStream;
import java.io.InputStreamReader;
import java.text.SimpleDateFormat;
import java.util.Date;
/**
* @Author: Huang JX
* @Date: 2021/11/11
* @Description: datax 批量执行脚本,日志存放在当前目录的 DataxSyncLogs 中;
* 注意:需要输入 datax 脚本的文件夹路径作为 main 函数的参数,如 home/xxx/datax/job/
* <p>
* 使用说明:
* 1、去掉文件的包名package;
* 2、将此Java文件放到 /datax/bin/目录下;
* 3、javac DataxSyncUtil.java
* 4、java DataxSyncUtil [参数]datax脚本文件夹路径
*/
public class DataxSyncUtil {
public static void main(String[] args) throws Exception {
FileOutputStream out = null;
StringBuffer sb = new StringBuffer();
SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmss");
try {
String jobPath = args[0];
if (jobPath == null && "".equals(jobPath.trim())) {
throw new Exception("请输入datax脚本文件夹路径");
}
String logDir = jobPath + "DataxSyncLogs";
File logDirFile = new File(logDir);
if (!logDirFile.exists()) {
logDirFile.mkdir(); // 创建日志文件夹
}
String logFileName = logDir + "/datax_sync_" + sdf.format(new Date()) + ".log";
File file = new File(logFileName);
if (!file.exists())
file.createNewFile();
out = new FileOutputStream(file, true);
sb.append("==========================================\n");
sb.append("==========Datax Sync Job Start!==========\n");
sb.append("==========================================\n");
File f = new File(jobPath);
if (!f.exists()) {
sb.append(jobPath + " not exists\n");
return;
}
System.out.print(sb.toString());
out.write(sb.toString().getBytes("utf-8"));
String cmdStr;
File fa[] = f.listFiles();
for (int i = 0; i < fa.length; i++) {
sb = new StringBuffer();
File fs = fa[i];
if (!fs.isDirectory()) {
Process pr = null;
cmdStr = "python3 datax.py " + jobPath + fs.getName();
sb.append("start cmd: " + cmdStr + "\n");
System.out.print("start cmd: " + cmdStr + "\n");
pr = Runtime.getRuntime().exec(cmdStr);
BufferedReader in = new BufferedReader(new InputStreamReader(pr.getInputStream()));
String line;
while (true) {
if (!((line = in.readLine()) != null)) break;
sb.append(line + "\n");
System.out.print(line + "\n");
}
in.close();
pr.waitFor();
out.write(sb.toString().getBytes("utf-8"));
}
}
sb.append("==========================================\n");
sb.append("===========Datax Sync Job End!===========\n");
sb.append("==========================================\n");
System.out.print(sb.toString());
out.write(sb.toString().getBytes("utf-8"));
} finally {
out.flush();
out.close();
}
}
}
注意:
看一下注释的说明,将job的文件路径做为main函数的参数;
【更新记录】
1、2021-12-06 更新写日志方式,每跑完一个任务就写日志到文件中;
三、使用说明
1.把DataxSyncUtil拷到服务器中datax的bin目录下,如/home/xxx/datax/bin/,注意去除Java类的包名;
2.编译Java程序,javac DataxSyncUtil.java
3.执行Java程序,传入参数为放置datax脚本的文件夹,如java DataxSyncUtil /home/xxx/datax/job/
4.如果想在后台运行,则为nohup java DataxSyncUtil /home/xxx/datax/job/ &
总结
以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。
