java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > Java批量执行datax脚本

Java如何批量执行datax脚本

作者:可乐还是甜的好

这篇文章主要介绍了Java如何批量执行datax脚本问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教

一、前言

最近在使用datax同步数据表,由于好多个表,一开始每次都需要等一个执行完再执行下一个,这样明显效率很低,于是写了个Java方法来批量操作;

环境: linux服务器

二、Java代码

import java.io.BufferedReader;
import java.io.File;
import java.io.FileOutputStream;
import java.io.InputStreamReader;
import java.text.SimpleDateFormat;
import java.util.Date;

/**
 * @Author: Huang JX
 * @Date: 2021/11/11
 * @Description: datax 批量执行脚本,日志存放在当前目录的 DataxSyncLogs 中;
 * 注意:需要输入 datax 脚本的文件夹路径作为 main 函数的参数,如 home/xxx/datax/job/
 * <p>
 * 使用说明:
 * 1、去掉文件的包名package;
 * 2、将此Java文件放到 /datax/bin/目录下;
 * 3、javac DataxSyncUtil.java 
 * 4、java DataxSyncUtil [参数]datax脚本文件夹路径
 */
public class DataxSyncUtil {
    public static void main(String[] args) throws Exception {
        FileOutputStream out = null;
        StringBuffer sb = new StringBuffer();
        SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmss");
        try {
            String jobPath = args[0];
            if (jobPath == null && "".equals(jobPath.trim())) {
                throw new Exception("请输入datax脚本文件夹路径");
            }
            String logDir = jobPath + "DataxSyncLogs";
            File logDirFile = new File(logDir);
            if (!logDirFile.exists()) {
                logDirFile.mkdir(); // 创建日志文件夹
            }
            String logFileName = logDir + "/datax_sync_" + sdf.format(new Date()) + ".log";
            File file = new File(logFileName);
            if (!file.exists())
                file.createNewFile();
            out = new FileOutputStream(file, true);
            sb.append("==========================================\n");
            sb.append("==========Datax Sync Job Start!==========\n");
            sb.append("==========================================\n");
            File f = new File(jobPath);
            if (!f.exists()) {
                sb.append(jobPath + " not exists\n");
                return;
            }
            System.out.print(sb.toString());
            out.write(sb.toString().getBytes("utf-8"));
            String cmdStr;
            File fa[] = f.listFiles();
            for (int i = 0; i < fa.length; i++) {
                sb = new StringBuffer();
                File fs = fa[i];
                if (!fs.isDirectory()) {
                    Process pr = null;
                    cmdStr = "python3 datax.py " + jobPath + fs.getName();
                    sb.append("start cmd: " + cmdStr + "\n");
                    System.out.print("start cmd: " + cmdStr + "\n");
                    pr = Runtime.getRuntime().exec(cmdStr);
                    BufferedReader in = new BufferedReader(new InputStreamReader(pr.getInputStream()));
                    String line;
                    while (true) {
                        if (!((line = in.readLine()) != null)) break;
                        sb.append(line + "\n");
                        System.out.print(line + "\n");
                    }
                    in.close();
                    pr.waitFor();
                    out.write(sb.toString().getBytes("utf-8"));
                }
            }
            sb.append("==========================================\n");
            sb.append("===========Datax Sync Job End!===========\n");
            sb.append("==========================================\n");
            System.out.print(sb.toString());
            out.write(sb.toString().getBytes("utf-8"));
        } finally {
            out.flush();
            out.close();
        }
    }
}

注意:

看一下注释的说明,将job的文件路径做为main函数的参数;

【更新记录】

1、2021-12-06 更新写日志方式,每跑完一个任务就写日志到文件中;

三、使用说明

1.把DataxSyncUtil拷到服务器中datax的bin目录下,如/home/xxx/datax/bin/,注意去除Java类的包名;

2.编译Java程序,javac DataxSyncUtil.java

3.执行Java程序,传入参数为放置datax脚本的文件夹,如java DataxSyncUtil /home/xxx/datax/job/

4.如果想在后台运行,则为nohup java DataxSyncUtil /home/xxx/datax/job/ &

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。

您可能感兴趣的文章:
阅读全文