python

关注公众号 jb51net

关闭
首页 > 脚本专栏 > python > python HDFS导出数据到数据库

python从Hadoop HDFS导出数据到关系数据库

作者:jzy3711

这篇文章主要为大家详细介绍了Python如何从Hadoop HDFS中导出数据并通过DataX工具导入到关系数据库,例如MySQL,Oracle,PostgreSQL等,感兴趣的可以了解下

python从HDFS导出到关系数据库(如MySQL、Oracle、PostgreSQL)

一整套从Hadoop HDFS中导出数据并通过DataX工具导入到关系数据库的过程。

操作步骤

1. 定义参数和变量

sql=$1                  # 导出数据的SQL语句
s_tablename=$2          # 源表名
ds_name=$3              # 目标数据库名称
t_tablename=$4          # 目标表名
temptable="h2o_"`date +%s%N | md5sum | head -c 16`  # 生成一个基于时间戳的临时表名
filename=${s_tablename}_${temptable}  # 文件名
path="hdfs://prdhdfs/tmp/hdfs_to_rdb/$filename/"   # HDFS路径
local_path="/data02/dcadmin/scripts/dataos_scripts/data_exp"  # 本地脚本路径
flag=$5 # 标志,用来确定是否TRUNCATE表

2. 构造SQL查询并提交给Hive

echo "$sql"
sql1=`echo "$sql"|cut -d ";" -f2`  # 截取分号后的部分
sql0=`echo "$sql"|cut -d ";" -f1`  # 截取分号前的部分
sql0="$sql0;insert overwrite directory '${path}' stored as ORC $sql1"  # 构建最终的SQL
echo "$sql0"

kinit -kt /data02/dcadmin/keytab_shengchan/dcadmin.keytab dcadmin@SC.COM
beeline -u "jdbc:hive2://prdnn1.yxbdprd.sc.ctc.com:2181, ... ,prddb1.yxbdprd.sc.ctc.com:2181/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2" -e "set tez.queue.name=offline;$sql0 distribute by rand()"

3. 获取目标数据库连接信息并解析结果为变量

从PostgreSQL数据库中获取目标数据库的连接信息,并解析结果为变量。

re=$(PGPASSWORD=... psql -h 10.251.110.104 -p 18921 -U dacp -d dacp  -t <<EOF
SELECT CASE WHEN ds_type = 'mysql'  THEN CONCAT ('jdbc:mysql://'     ,ds_inst_loc,'/',(ds_conf::json ->>'physicalDbName'),'?characterEncoding=UTF-8') 
            WHEN ds_type = 'oracle' THEN ds_conf::json ->> 'url' 
            WHEN ds_type = 'pg'     THEN CONCAT ('jdbc:postgresql://',ds_inst_loc,'/',(ds_conf::json ->>'physicalDbName')) END jdbc_url
      ,ds_acct
      ,ds_auth
      ,CASE WHEN ds_type = 'mysql'  THEN 'mysqlwriter' 
            WHEN ds_type = 'oracle' THEN 'oraclewriter' 
            WHEN ds_type = 'pg'     THEN 'postgresqlwriter' END ds_type
FROM dacp_dev.dacp_meta_datasource
WHERE ds_type IN ('mysql', 'oracle', 'pg')
  AND upper(trim(ds_name)) = upper(trim('$ds_name'))
EOF
)
eval $(echo $re| awk '{printf("jdbc_url=%s; ds_acct=%s; ds_auth=%s; ds_type=%s",$1,$3,$5,$7)}')

4. 获取目标数据库密码

通过执行Java程序解密数据库密码:

pw=`java -Dpwd=${ds_auth} -jar $local_path/AesCipher-1.0.jar`

5. 预处理SQL语句

根据标志变量flag,确定是否执行TRUNCATE语句:

preSQL="select * from $t_tablename where 1=-1"
if [ "$flag" = "T" ];then
 preSQL="truncate table $t_tablename"
fi
echo "preSQL=$preSQL"

6. 数据导出并导入目标数据库

使用datax执行从HDFS导入到关系数据库的任务:

python $local_path/datax/bin/datax.py -p "-Dpath=$path -Dwriter=$ds_type -Drdb_user=$ds_acct -Drdb_pass="$pw" -Drdb_jdbc="$jdbc_url" -Drdb_table=$t_tablename -DpreSql="$preSQL"" $local_path/hdfs_to_rdb.json

7. Python代码详解

此外,你还展示了大量的Python代码用于处理数据转换和传输。重点如下:

1. 初始化设置和依赖

加载必要的包,并初始化变量。

import time
import datetime
import os
import threadpool
import commands
import calendar
import random
import pymssql
import pymysql
import cx_Oracle
import psycopg2
import socket
from pyhdfs import HdfsClient
from hashlib import md5

# 其他初始化设置

2. 连接数据库并执行SQL

定义了连接数据库并执行SQL的函数:

def connect_database_to_select(conn, sql):
    cursor = conn.cursor()
    try:
        cursor.execute(sql)
        result = cursor.fetchall()
        conn.commit()
        return result
    except Exception as e:
        print('SQL执行错误:{},执行SQL:{}'.format(str(e), sql))
        sys.exit(2)
    finally:
        cursor.close()

def connect_database_to_commit(exe_type, conn, sql, insert_list):
    cursor = conn.cursor()
    try:
        if exe_type.lower() in ('delete', 'insert'):
            cursor.execute(sql)
            conn.commit()
        elif exe_type.lower() == 'insertmany':
            cursor.executemany(sql, insert_list)
            conn.commit()
    except Exception as e:
        print('SQL执行错误:{},执行SQL:{}'.format(str(e), sql))
        sys.exit(2)
    finally:
        cursor.close()

3. 数据导出处理

执行数据导出并提交给Hive:

def produce_exe_data(sql, s_tablename):
    global local_path_name
    local_path_01 = local_path_list[random.randrange(len(local_path_list))] + '/dataos_exp'
    local_path_name = "h2o_{0}_{1}".format(s_tablename, get_md5_str()).lower()
    local_path = local_path_01 + '/' + local_path_name
    if os.path.exists(local_path):
        cmd = 'rm -rf {}'.format(local_path)
        exe_system_cmd(cmd)
    os.mkdir(local_path)
    
    hdfs_path = "hdfs://prdhdfs/tmp/hdfs_to_rdb/{}".format(local_path_name)
    sql = sql.strip().strip(';')
    sql_list = sql.split(';')
    
    hive_conn = hive_connect()
    compress_sql = 'set hive.exec.compress.output=false'
    connect_database_to_commit('insert', hive_conn, compress_sql, '')
    
    for i in range(len(sql_list)):
        sql_str = sql_list[i]
        if i == len(sql_list)-1:      # 如果是最后一条SQL,则执行insert overwrite directory
            sql_str='''INSERT OVERWRITE DIRECTORY '{0}' 
         ROW FORMAT DELIMITED FIELDS TERMINATED BY '\u0001' COLLECTION ITEMS TERMINATED BY '\n' MAP KEYS TERMINATED BY ':' 
         {1} '''.format(hdfs_path, sql_str)
        connect_database_to_commit('insert', hive_conn, sql_str, '')

    if hive_conn:
        hive_conn.close()
    
    cmd = ''' hdfs dfs -get {}/* {} '''.format(hdfs_path, local_path)
    exe_system_cmd(cmd)
    return local_path, hdfs_path

4. 多线程数据传输

利用多线程加速数据传输过程:

def thread_exe_exchange_data(g_tablename, flag, local_path, hdfs_path):
    global rdb_conn
    rdb_conn = get_rdb_database_conn()

    if flag.upper() == 'T':
        presql = 'truncate table {}'.format(g_tablename)
        connect_database_to_commit('insert', rdb_conn, presql, '')

    if ds_type.lower() == 'oracle':
        global oracle_table_field       
        oracle_table_field = get_oracle_table_fields()

        localtime = str(time.strftime("%Y%m%d", time.localtime()))
        ora_dir = "/data03/datafile/sqlldrdata/{0}/".format(localtime)
        if not os.path.exists(ora_dir):
            os.mkdir(ora_dir)
    
    file_lists = os.listdir(local_path)
    global exp_num_list
    global log_list
    global exception_list
    exp_num_list = []
    log_list = []
    exception_list = []

    thread_list = []
    for file_name in file_lists:
        thread_list.append(local_path + '/' + file_name)
    
    pool = threadpool.ThreadPool(5)
    requests = threadpool.makeRequests(exchange_data, thread_list)
    [pool.putRequest(req) for req in requests]
    pool.wait()
   
    if exception_list:
        delete_local_path(local_path, hdfs_path)
        sys.exit(2)

    print('数据导出完成,导出数据总量为:{}'.format(sum(exp_num_list)))

完整python脚本

#!/bin/bash
sql=$1                  #导出数据sql
s_tablename=$2  #源表
ds_name=$3              #目标库
t_tablename=$4  #目标表
temptable="h2o_"`date +%s%N | md5sum | head -c 16`      #构造一个时间戳
filename=${s_tablename}_${temptable}    #文件名
path="hdfs://prdhdfs/tmp/hdfs_to_rdb/$filename/"
local_path="/data02/dcadmin/scripts/dataos_scripts/data_exp"
flag=$5 #t TRUNCATE 
#hadoop fs -mkdir $path
# 参数sql0为 待执行SQL
echo "$sql"
sql1=`echo "$sql"|cut -d ";" -f2`
sql0=`echo "$sql"|cut -d ";" -f1`
sql0="$sql0;insert overwrite directory '${path}' stored as ORC $sql1"
echo "$sql0"
# 向Hive提交HQL
kinit -kt /data02/dcadmin/keytab_shengchan/dcadmin.keytab dcadmin@SC.COM
#beeline <<EOF
#!connect jdbc:hive2://devdataosambari:2181,devdataosnn1:2181,devdataosnn2:2181/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2


#$sql0
beeline -u "jdbc:hive2://prdnn1.yxbdprd.sc.ctc.com:2181,prdnn2.yxbdprd.sc.ctc.com:2181,prdrm1.yxbdprd.sc.ctc.com:2181,prddb2.yxbdprd.sc.ctc.com:2181,prddb1.yxbdprd.sc.ctc.com:2181/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2" -e "set tez.queue.name=offline;$sql0 distribute by rand()"
# 获取目标数据源地址
#eval $(mysql -h 10.251.88.71 -udacp -pdacp123456 dacp_dev -e "select case when ds_type = 'mysql' then concat('jdbc:mysql://', ds_inst_loc, '/',json_unquote(json_extract(ds_conf,'$.physicalDbName')),'?characterEncoding=UTF-8')
#when ds_type = 'oracle' then concat('jdbc:oracle:thin:@', ds_inst_loc, '/',json_unquote(json_extract(ds_conf,'$.physicalDbName')))
#when ds_type = 'pg' then concat('jdbc:postgresql://', ds_inst_loc, '/',json_unquote(json_extract(ds_conf,'$.physicalDbName'))) end jdbc_url,
#ds_acct, ds_auth, case when ds_type = 'mysql' then 'mysqlwriter' when ds_type = 'oracle' then 'oraclewriter' when ds_type = 'pg' then 'postgresqlwriter' end ds_type
#from dacp_meta_datasource 
#where ds_type in ('mysql','oracle','pg') 
#and ds_name = '$ds_name'" | awk 'NR== 2 {printf("jdbc_url=%s; ds_acct=%s; ds_auth=%s; ds_type=%s",$1,$2,$3,$4)}')

re=$(PGPASSWORD=jxFgCKv9GJw2ohS3 psql -h 10.251.110.104 -p 18921 -U dacp -d dacp  -t <<EOF
SELECT CASE WHEN ds_type = 'mysql'  THEN CONCAT ('jdbc:mysql://'     ,ds_inst_loc,'/',(ds_conf::json ->>'physicalDbName'),'?characterEncoding=UTF-8') 
            WHEN ds_type = 'oracle' THEN ds_conf::json ->> 'url' 
            WHEN ds_type = 'pg'     THEN CONCAT ('jdbc:postgresql://',ds_inst_loc,'/',(ds_conf::json ->>'physicalDbName')) END jdbc_url
      ,ds_acct
      ,ds_auth
      ,CASE WHEN ds_type = 'mysql'  THEN 'mysqlwriter' 
            WHEN ds_type = 'oracle' THEN 'oraclewriter' 
            WHEN ds_type = 'pg'     THEN 'postgresqlwriter' END ds_type
FROM dacp_dev.dacp_meta_datasource
WHERE ds_type IN ('mysql', 'oracle', 'pg')
  AND upper(trim(ds_name)) = upper(trim('$ds_name'))
EOF
)
eval $(echo $re| awk '{printf("jdbc_url=%s; ds_acct=%s; ds_auth=%s; ds_type=%s",$1,$3,$5,$7)}')

#eval $(java -jar /data01/etl/scripts/exec_aes.jar $ds_auth | awk ' {printf("pw=%s;",$1)}')
#pw=`java -jar $local_path/exec_aes.jar $ds_auth`
pw=`java -Dpwd=${ds_auth} -jar $local_path/AesCipher-1.0.jar`

preSQL="select * from $t_tablename where 1=-1"
if [ "$flag" = "T" ];then
 preSQL="truncate table $t_tablename"
fi
echo "preSQL=$preSQL"

python $local_path/datax/bin/datax.py -p "-Dpath=$path -Dwriter=$ds_type -Drdb_user=$ds_acct -Drdb_pass=\"$pw\" -Drdb_jdbc=\"$jdbc_url\" -Drdb_table=$t_tablename -DpreSql=\"$preSQL\"" $local_path/hdfs_to_rdb.json

# -*- coding:utf-8 -*-

import time
import datetime
import os
import threadpool
import commands
import calendar
import random
import pymssql
import pymysql
import cx_Oracle
import psycopg2
import socket
from pyhdfs import HdfsClient
from hashlib import md5
import sys
reload(sys)
sys.setdefaultencoding('utf8')
sys.path.append('/data02/dcadmin/scripts/common')
from connect_postgresql import postgresql_connect
from connect_hive import hive_connect

pg_conn_str='dataos_71_pg_dev'



# 本地磁盘目录,文件随机选择一个目录
local_path_list=['/data01','/data02','/data03','/data04','/data05']


def close_datadb_conn():
    if rdb_conn:
        rdb_conn.close()


def connect_database_to_select(conn,sql):
    # print('\r\n执行SQL:{}'.format(sql))
    cursor = conn.cursor()
    try:
        cursor.execute(sql)
        #cursor.execute(sql.decode('utf-8').encode('gbk'))
        result = cursor.fetchall()
        conn.commit()
        return result
    except Exception as e:
        print('SQL执行错误:{},执行SQL:{}'.format(str(e),sql))
        sys.exit(2)
    finally:
        cursor.close()

def connect_database_to_commit(exe_type,conn,sql,insert_list):
    # print('\r\n执行SQL:{}'.format(sql))
    cursor = conn.cursor()
    try:
        if exe_type.lower() in ('delete','insert'):
            cursor.execute(sql)
            conn.commit()
            print('执行SQL:{}'.format(sql))
        elif exe_type.lower()=='insertmany':
            cursor.executemany(sql, insert_list)
            conn.commit()
    except Exception as e:
        print('SQL执行错误c:{},执行SQL:{}'.format(str(e),sql))
        print(sql)
        sys.exit(2)
    finally:
        cursor.close()

# 执行系统命令
def exe_system_cmd(cmd):
    status,output=commands.getstatusoutput(cmd)
    if status!=0:
        print('命令{}:执行失败,请检查!'.format(cmd))
        print('失败日志:{}'.format(output))
        sys.exit(2)
    return output


# 返回MD5串
def get_md5_str():
    # 时间戳
    ts = calendar.timegm(time.gmtime())
    md5_str=md5(str(ts).encode(encoding='utf-8')).hexdigest()
    return md5_str


# 判断输入参数
def judge_input_parameters_num():
    if len(sys.argv)!=6:
        print('参数有问题,请检查!')
        print(sys.argv)
        sys.exit(2)
    else:
        sql         =sys.argv[1]            # 导出数据sql
        s_tablename =sys.argv[2]            # 源表名
        ds_name     =sys.argv[3]            # 目标库
        g_tablename =sys.argv[4]            # 目标表
        flag        =sys.argv[5]            # A:append,T:truncate
    return sql,s_tablename,ds_name,g_tablename,flag


# 执行SQL语句,生成HDFS文件
def produce_exe_data(sql,s_tablename):
    global local_path_name
    # 1、创建本地文件夹
    # 随机选择一个磁盘目录:'/data01','/data02','/data03','/data04','/data05'
    local_path_01 = local_path_list[random.randrange(len(local_path_list))]+'/dataos_exp'     # /data01/dataos_exp
    local_path_name="h2o_{0}_{1}".format(s_tablename,get_md5_str()).lower()
    # local_path_name='h2o_app_hub_resource_value_level_d_e6963bad13299e939a3a4cc2b2a26a47'
    local_path=local_path_01+'/'+local_path_name
    if os.path.exists(local_path):
        cmd='rm -rf {}'.format(local_path)
        exe_system_cmd(cmd)
    os.mkdir(local_path)

    # 创建hdfs文件夹
    hdfs_path="hdfs://prdhdfs/tmp/hdfs_to_rdb/{}".format(local_path_name)

    # 处理SQL,先去除两边的空格,再去除两边的分号
    sql=sql.strip().strip(';')
    sql_list=sql.split(';')
    # 依次执行切分的SQL
    hive_conn=hive_connect()        # 连接生产HIVE
    compress_sql='set hive.exec.compress.output=false'
    print('执行SQL:{}'.format(compress_sql))
    connect_database_to_commit('insert',hive_conn,compress_sql,'')
    for i in range(len(sql_list)):
        sql_str=sql_list[i]
        if i==len(sql_list)-1:      # 如果是最后一条SQL,则执行insert overwrite directory
            sql_str='''INSERT OVERWRITE DIRECTORY '{0}' 
         ROW FORMAT DELIMITED FIELDS TERMINATED BY '\\u0001' COLLECTION ITEMS TERMINATED BY '\\n' MAP KEYS TERMINATED BY ':' 
         {1} '''.format(hdfs_path,sql_str)
        print('执行SQL:{}'.format(sql_str))
        connect_database_to_commit('insert',hive_conn,sql_str,'')
    # 关闭HIVE连接
    if hive_conn:
        hive_conn.close()
    # 将hdfs文件从hdfs_path路径get到local_path下
    cmd=''' hdfs dfs -get {}/* {} '''.format(hdfs_path,local_path)
    exe_system_cmd(cmd)
    print('文件GET成功,当前主机:{},数据临时文件夹:{}'.format(socket.gethostname(),local_path))
    return local_path,hdfs_path
# 获取目标端的连接信息
def get_rdb_conn_msg(ds_name):
    global ds_type
    global ds_acct
    global ds_auth
    global host
    global port
    global database
    global jdbc_url
    sql='''
        SELECT ds_name
              ,ds_type
              ,ds_acct
              ,ds_auth
              ,split_part(ds_inst_loc,':',1) as host
              ,case when split_part(ds_inst_loc,':',2)='' and ds_type='oracle' then '1521' else split_part(ds_inst_loc,':',2) end as port
              ,case when lower(ds_type)='oracle' then split_part(replace(replace(replace(ds_conf::json->>'url','jdbc:oracle:thin:@',''),':1521',''),'/',':'),':',2) else ds_conf::json->>'physicalDbName' end as database
              ,CASE WHEN ds_type = 'mysql'  THEN CONCAT ('jdbc:mysql://'     ,ds_inst_loc,'/',(ds_conf::json ->>'physicalDbName'),'?characterEncoding=UTF-8')
                    WHEN ds_type = 'oracle' THEN ds_conf::json ->>'url'
                    WHEN ds_type = 'pg'     THEN CONCAT ('jdbc:postgresql://',ds_inst_loc,'/',(ds_conf::json ->>'physicalDbName')) END as jdbc_url
        FROM dacp_dev.dacp_meta_datasource
        WHERE ds_type IN ('mysql', 'oracle', 'pg')
          AND upper(trim(ds_name)) = upper(trim('{}')) '''.format(ds_name)
    pg_conn=postgresql_connect(pg_conn_str)
    results=connect_database_to_select(pg_conn,sql)
    # print(results)
    if not results:
        print('未查询到数据库连接信息,请检查,DS_NAME:{}'.format(ds_name))
        sys.exit(2)
    # 关闭数据库连接
    if pg_conn:
        pg_conn.close()
    # 解密密码
    cmd='''java -Dpwd='{0}' -jar /data02/dcadmin/scripts/common/AesCipher-1.0.jar'''.format(results[0][3])
    pw=exe_system_cmd(cmd).replace('\r','').replace('\n','')
    ds_type = results[0][1]ds_acct = results[0][2]
    ds_auth = pw
    host    = results[0][4]
    port    = int(results[0][5])
    database= results[0][6]
    jdbc_url= results[0][7]


# 判断连接的数据库类型,并返回数据库连接conn
def get_rdb_database_conn():
    dbms_conn=None
    try:
        if ds_type.upper()=='SQLSERVER':
            dbms_conn = pymssql.connect(host=host  , user=ds_acct, password=ds_auth, port=port, database=database, charset='utf8')
        elif ds_type.upper()=='MYSQL':
            dbms_conn = pymysql.connect(host=host  , user=ds_acct, passwd=ds_auth  , port=port, database=database, charset='utf8', local_infile=True)
        elif ds_type.upper()=='ORACLE':
            listener = '{0}:{1}/{2}'.format(host,port,database)
            print('listener:{}'.format(listener))
            dbms_conn = cx_Oracle.connect(ds_acct,ds_auth,listener,encoding='utf-8')
        elif ds_type.upper() in ('POSTGRESQL','PG'):
            dbms_conn = psycopg2.connect(host=host , user=ds_acct, password=ds_auth  , port=port, database=database, client_encoding='utf8')
        else:
            print("未知源端数据库类型{}~~~~~,请检查!".format(ds_type.upper()))
            sys.exit(2)
    except Exception as e:
        print('{0},{1}数据库连接失败,请检查~~~~~~!'.format(ds_type,ds_name))
        print('报错日志:{}'.format(e))
        print(host)
        print(ds_acct)
        print(ds_auth)
        print(port)
        print(database)
        sys.exit(2)
    return dbms_conn


def thread_exe_exchange_data(g_tablename,flag,local_path,hdfs_path):
    global rdb_conn
    rdb_conn=get_rdb_database_conn()
    # 执行预处理SQL
    if flag.upper()=='T':
        presql='truncate table {}'.format(g_tablename)
        print('执行SQL:{}'.format(presql))
        connect_database_to_commit('insert',rdb_conn,presql,'')
    # 获取Oracle表结构
    if ds_type.lower() in ('oracle'):
        global oracle_table_field       # oracle 表结构
        oracle_table_field=get_oracle_table_fields()

        # 创建ctl,bad,log存放目录
        global ora_dir
        localtime = str(time.strftime("%Y%m%d", time.localtime()))
        ora_dir = "/data03/datafile/sqlldrdata/{0}/".format(localtime)
        if not os.path.exists(ora_dir):
            os.mkdir(ora_dir)

    # 文件列表
    file_lists=os.listdir(local_path)

    # 多线程导数
    global exp_num_list     # 存储导数数量
    global log_list         # 存储多线程的日志信息
    global exception_list   # 存储多线程异常信息
    exp_num_list  =[]
    log_list      =[]
    exception_list=[]

    thread_list=[]          # 存储多线程任务
    for file_name in file_lists:
        thread_list.append(local_path+'/'+file_name)
    # 创建线程池
    pool=threadpool.ThreadPool(5)
    # 存放任务列表
    requests = threadpool.makeRequests(exchange_data,thread_list)
    [pool.putRequest(req) for req in requests]
    pool.wait()
    # 处理异常
    if exception_list:
        # 导数出现异常,删除文件
        delete_local_path(local_path,hdfs_path)
        print('导数失败,异常日志信息如下:')
        for except_msg in exception_list:
            print(except_msg)
        sys.exit(2)

    # 打印多线程日志
    # log_list.sort()
    # for log in log_list:
    #     print(log)

    # 打印导出结果
    print('数据导出完成,导出数据总量为:{}'.format(sum(exp_num_list)))



# 获取Oracle表结构
def get_oracle_table_fields():
    sql = '''
             SELECT COLUMN_NAME || SUFFIX AS AA
               FROM (SELECT A.COLUMN_NAME
                           ,A.COLUMN_ID
                           ,CASE WHEN UPPER(A.DATA_TYPE) LIKE '%DATE%'      THEN ' DATE "yyyy-mm-dd hh24:mi:ss"'
                                 WHEN UPPER(A.DATA_TYPE) LIKE '%TIMESTAMP%' THEN ' DATE "yyyy-mm-dd hh24:mi:ss.ff"'
                                 WHEN UPPER(A.DATA_TYPE) LIKE '%VARCHAR%'   THEN ' CHAR(3000)'
                                 ELSE '' END AS SUFFIX
                       FROM ALL_TAB_COLUMNS A
                      WHERE UPPER(A.OWNER||'.'||A.TABLE_NAME) = UPPER(TRIM('{0}'))
                      ORDER BY A.COLUMN_ID) '''
    if '.' in g_tablename:
        sql=sql.format(g_tablename)
    else:
        sql=sql.format(database+'.'+g_tablename)
    oracle_table_fields=connect_database_to_select(rdb_conn,sql)
    if not oracle_table_fields:
        print('未查询到表结构,表名:{}'.format(g_tablename))
        sys.exit(2)
    oracle_table_field = ",\n".join([str(list[0]) for list in oracle_table_fields])
    return oracle_table_field
    # 执行单个导出任务
def exchange_data(file_path):
    try:
        output=''
        # 执行导数任务
        if ds_type.lower() in ('pg','postgresql','telpg','antdb'):
            cmd='''psql "port={0} host={1} user={2} dbname={3} password={4} " -c "\copy {5} from '{6}' DELIMITER AS E'\u0001' " '''
            cmd=cmd.format(port,host,ds_acct,database,ds_auth,g_tablename,file_path)
            status,output=commands.getstatusoutput(cmd)
            if status!=0:
                exception_list.append('命令{}:执行失败,请检查!失败日志:{}'.format(cmd,output))
        elif ds_type.lower() in ('mysql','teldb'):
            mysql_conn = pymysql.connect(host=host  , user=ds_acct, passwd=ds_auth  , port=port, database=database, charset='utf8', local_infile=True)
            mysql_cursor=mysql_conn.cursor()
            sql='SET NAMES UTF8'
            mysql_cursor.execute(sql)
            sql='''load data local infile '{}' into table {} fields terminated by X'01' lines terminated by '\\n'  '''.format(file_path,g_tablename)
            #print(sql)
            output=mysql_cursor.execute(sql)
            mysql_conn.commit()
            mysql_conn.close()
            # cmd='''mysql -h {} -P {} -u {} -p{} -D {} -e "SET NAMES UTF8;load data local infile '{}' into table {} fields terminated by X'01' lines terminated by '\\n'"  '''
            # cmd=cmd.format(host,port,ds_acct,ds_auth,database,file_path,g_tablename)
        elif ds_type.lower() in ('oracle'):
            tns='''\'{}/"{}"\'@{}:1521/{}'''.format(ds_acct,ds_auth,host,database)
            ora_file_name=file_path.replace(local_path+'/','')
            ora_file_path=ora_dir+'/'+local_path_name+'_'+ora_file_name
            control_file = ora_file_path+".ctl"
            log_file     = ora_file_path+".log"
            bad_file     = ora_file_path+".bad"
            dis_file     = ora_file_path+".dis"
            content ='''
UNRECOVERABLE LOAD DATA CHARACTERSET AL32UTF8 
APPEND INTO TABLE {0} FIELDS TERMINATED BY x'01' 
TRAILING NULLCOLS ({1}) '''.format(g_tablename, oracle_table_field)
            # 如果控制文件存在,则先删除
            if os.path.exists(control_file):
                cmd='rm -rf {}'.format(control_file)
                exe_system_cmd(cmd)
            # 再创建控制文件
             with open(control_file, "w") as file:
                file.write(content)
            cmd='''export ORACLE_HOME=/data03/apps/db_1;export LD_LIBRARY_PATH=$ORACLE_HOME/lib:$LD_LIBRARY_PATH;cat {0} | /data03/apps/db_1/bin/sqlldr userid={1} control={2} data=\\"-\\" log={3} bad={4} discard={5} errors=0 direct=true parallel=true multithreading=true columnarrayrows=100000 STREAMSIZE=20971520 readsize=20971520 bindsize=20971520 date_cache=0 '''
            cmd=cmd.format(file_path, tns, control_file, log_file, bad_file, dis_file)
            status,output=commands.getstatusoutput(cmd)
            if status!=0:
                exception_list.append('命令{}:执行失败,请检查!失败日志:{}'.format(cmd,output))
        else:
            exception_list.append('目标端数据库类型为:{},此类型暂未支持!'.format(db_type.lower()))

        # 计算导出行数
        if ds_type.lower() in ('pg','postgresql','telpg','antdb'):
            file_row_num=int(output.split('COPY ')[1].strip())
            exp_num_list.append(file_row_num)
        elif ds_type.lower() in ('oracle'):
            try:
                output=output.decode('gbk')
            except:
                output=output
            file_row_num=int(output.split('逻辑记录计数 ')[1].replace('。','').strip())
            exp_num_list.append(file_row_num)
        elif ds_type.lower() in ('mysql','teldb'):
            exp_num_list.append(output)
        # 插入日志
        log_list.append(output)
    except Exception as e:
        exception_list.append(e)




def delete_local_path(local_path,hdfs_path):
    cmd='rm -rf {}'.format(local_path)
    exe_system_cmd(cmd)
    print('本地文件夹删除成功。')
    cmd='hdfs dfs -rm -r {}'.format(hdfs_path)
    exe_system_cmd(cmd)
    print('HDFS文件夹删除成功。')
if __name__ == '__main__':
    starttime = datetime.datetime.now()
    print('开始时间:{0}'.format(starttime.strftime('%Y-%m-%d %H:%M:%S')))
    # 1、判断输入参数
    sql,s_tablename,ds_name,g_tablename,flag=judge_input_parameters_num()
    # 2、执行SQL,生产文件,并返回本地目录
    local_path,hdfs_path=produce_exe_data(sql,s_tablename)
    hdfs_time=datetime.datetime.now()
    #print('当前时间:{}'.format(hdfs_time))
    print("生成HDFS文件耗时:{0}秒".format((hdfs_time - starttime).seconds))
    # 3、获取目标端连接信息(host,port等)
    get_rdb_conn_msg(ds_name)
    # 4、执行导数任务
    thread_exe_exchange_data(g_tablename,flag,local_path,hdfs_path)
    # 5、删除本地文件夹
    delete_local_path(local_path,hdfs_path)
    # 6、关闭数据库连接
    close_datadb_conn()
    endtime = datetime.datetime.now()
    print('结束时间:{0}'.format(endtime.strftime('%Y-%m-%d %H:%M:%S')))
    print('导数耗时:{0}秒'.format((endtime - hdfs_time).seconds))
    print("一共耗时:{0}秒".format((endtime - starttime).seconds))

总结

整个脚本有效地实现了从HDFS到关系数据库的数据迁移,确保数据的完整性和一致性。首先通过Hive导出数据,再利用多线程和DataX工具导入到目标数据库。本地化和多线程处理使过程更高效,适合大数据处理和数据仓库迁移。

请务必按需调整脚本中的具体参数和配置以适应你的环境和数据架构。

以上就是python从Hadoop HDFS导出数据到关系数据库的详细内容,更多关于python HDFS导出数据到数据库的资料请关注脚本之家其它相关文章!

您可能感兴趣的文章:
阅读全文