Linux

关注公众号 jb51net

关闭
首页 > 网站技巧 > 服务器 > Linux > Apache Sqoop数据采集

Apache Sqoop数据采集原理解析

作者:Aimyon_36

Sqoop项目开始于2009年,最早是作为Hadoop的一个第三方模块存在,后来为了让使用者能够快速部署,也为了让开发人员能够更快速的迭代开发,Sqoop独立成为一个Apache项目,本文给大家介绍Apache Sqoop数据采集原理解析,感兴趣的朋友一起看看吧

Sqoop数据采集格式问题

Apache Sqoop是一款开源的工具,主要用于在Hadoop(Hive)与传统的数据库(mysql、postgresql…)间进行数据的传递,可以将一个关系型数据库(例如 : MySQL ,Oracle ,Postgres等)中的数据导进到Hadoop的HDFS中,也可以将HDFS的数据导进到关系型数据库中。

Sqoop项目开始于2009年,最早是作为Hadoop的一个第三方模块存在,后来为了让使用者能够快速部署,也为了让开发人员能够更快速的迭代开发,Sqoop独立成为一个Apache项目。

一、Sqoop工作原理

Sqoop通过创建一个数据传输的MR程序,进而实现数据传输。

Sqoop安装:

  • JAVA环境配置
  • Hadoop环境配置
  • 相关数据库驱动包

只要环境满足以上设置,直接解压Sqoop安装包即可安装,修改配置后即可使用。

二、Sqoop命令格式

基础使用语法:

sqoop import | export \
--数据库连接参数
--HDFS或者Hive的连接参数
--配置参数

数据传输常用参数:

选项参数
–connectjdbc:mysql://hostname:3306(数据库连接URL)
–username数据库用户名
–password数据库用户密码
–table指定数据表
–columns指定表列值
–where数据过滤条件
–e/–query自定义SQL语句
–driver指定数据库驱动
–delete-target-dir导入数据时,清空目标目录
–target-dir指定导入数据的目录(通常为HDFS路径)
–export-dir指定导出数据的源目录(通常为HDFS路径)

Sqoop命令的使用方法可以通过sqoop -h命令查看相关使用方法,此处不在赘述了

三、Oracle数据采集格式问题

场景:

Step1: 查看业务数据库中 CISS_SERVICE_WORKORDER 表的数据条数。

select count(1) as cnt from CISS_SERVICE_WORKORDER;  178609条

Step2: 采集CISS_SERVICE_WORKORDER的数据到HDFS上

sqoop import \
--connect jdbc:oracle:thin:@oracle.bigdata.cn:1521:helowin \  
--username ciss \
--password 123456 \
--table CISS4.CISS_SERVICE_WORKORDER \
--delete-target-dir \
--target-dir /test/full_imp/ciss4.ciss_service_workorder \
--fields-terminated-by "\001" \   #指定字段分割符
-m 1  #指定并行度

Hive默认使用\001作为表字段的分隔符,但也可以在创建表时指定特殊的分隔符。

Step3: 使用Hive查看导入数据表的行数

create external table test_text(
line string # 将导入的数据一行作为表中的一列
)
location '/test/full_imp/ciss4.ciss_service_workorder';
select count(*) from test_text;  195825条

问题:
Sqoop采集完数据后,HDFS数据中存储的数据行数跟源数据库的数据量不符合。

原因:

Oracle数据:

idnameage
001zhang\nsan18

Sqoop转换后的数据:

001zhang
san18

Hive表中的数据:

idnameage
001zhang
san18

解决方法:

不建议使用,破坏原始数据结构,ODS层数据尽量抱持原结构

常见的文件格式介绍:

类型介绍
TextFileHive默认的文件格式,最简单的数据格式,便于查看和编辑,耗费存储空间,I/O性能较低
SequenceFile含有键值对的二进制文件,优化磁盘利用率和I/O,并行操作数据,查询效率高,但存储空间消耗最大
AvroFile特殊的二进制文件,设计的主要目标是为了满足schema evolution,Schema和数据保存在一起
OrcFile列式存储,Schema存储在footer中,不支持schema evolution,高度压缩比并包含索引,查询速度非常快
ParquetFile列式存储,与Orc类似,压缩比不如Orc,但是查询性能接近,支持的工具更多,通用性更强

Avro格式特点

Sqoop使用Avro格式:

  sqoop import \
  -Dmapreduce.job.user.classpath.first=true \
  --connect jdbc:oracle:thin:@oracle.bigdata.cn:1521:helowin \
  --username ciss \
  --password 123456 \
  --table CISS4.CISS_SERVICE_WORKORDER \
  --delete-target-dir \
  --target-dir /test/full_imp/ciss4.ciss_service_workorder \
  --as-avrodatafile \    # 选择文件存储格式为AVRO
  --fields-terminated-by "\001" \
  -m 1

Hive建表指定文件的存储格式:

create external table test_avro(
line string
)
stored as avro
location '/test/full_imp/ciss4.ciss_service_workorder';

AVRO 数据以 二进制序列化 存储,字段通过预定义的 模式(Schema) 解析,而非依赖分隔符,即使字段内容包含逗号、换行符等特殊字符,也不会影响数据结构的正确性。
Schema 定义(JSON 格式),明确描述了字段名称、类型、顺序等信息。

四、Sqoop增量采集方案

Sqoop 支持两种增量模式:

适用于 仅追加数据 的表(如日志表),基于 递增列(如自增主键 id)采集新数据。

适用于 数据会更新 的表(如用户表),基于 时间戳列(如 last_update_time)采集新增或修改的数据。

append模式要求源数据表具备自增列,如建表时设置的自增id
lastmodified模式要求源数据表具有时间戳字段。

Append模式:

要求:必须有一列自增的值,按照自增的int值进行判断

特点:只能导入增加的数据,无法导入更新的数据

场景:数据只会发生新增,不会发生更新的场景

sqoop import \                                   # 执行数据导入操作
  --connect jdbc:mysql://node3:3306/sqoopTest \  # 连接MySQL数据库(地址:node3,数据库名:sqoopTest)
  --username root \                             # 数据库用户名:root
  --password 123456 \                           # 数据库密码:123456
  --table tb_tohdfs \                           # 要导入的源表:tb_tohdfs
  --target-dir /sqoop/import/test02 \           # HDFS目标目录(数据将写入此路径)
  --fields-terminated-by '\t' \                 # 字段分隔符为制表符(\t)
  --check-column id \                           # 指定增量检查列:id(通常是自增主键)
  --incremental append \                        # 增量模式为“append”(仅导入新数据)
  --last-value 0 \                              # 上次导入的id最大值(初始值为0,首次导入id>0的数据)
  -m 1                                          # 使用1个Map任务(单线程)

appebd模式使用last-value记录上次导入的数据id最大值,初次导入一般为全量导入,即id>0

此处的last_value需要手动填写,因此可以使用Sqoop的job管理进行自动记录。

sqoop job --create my_job -- import ... --incremental append --check-column id --last-value 0
sqoop job --exec my_job  # 自动更新 last-value

lastmodified模式:
要求:必须包含动态时间变化这一列,按照数据变化的时间进行判断

特点:既导入新增的数据也导入更新的数据

场景:表中的记录会新增或更新,且每次更新都会修改 lastmode 时间戳。一般无法满足要求,所以不用。

sqoop import \                                   # 执行数据导入操作
  --connect jdbc:mysql://node3:3306/sqoopTest \  # 连接MySQL数据库(地址:node3,数据库名:sqoopTest)
  --username root \                             # 数据库用户名:root
  --password 123456 \                           # 数据库密码:123456
  --table tb_lastmode \                         # 要导入的源表:tb_lastmode
  --target-dir /sqoop/import/test03 \           # HDFS目标目录(数据将写入此路径)
  --fields-terminated-by '\t' \                 # 字段分隔符为制表符(\t)
  --incremental lastmodified \                  # 增量模式为“lastmodified”(采集新增或修改的数据)
  --check-column lastmode \                     # 指定时间戳列:lastmode(记录数据的更新时间)
  --last-value '2021-06-06 16:09:32' \          # 上次导入的最大时间值(导入此时间之后的新增/修改数据)
  -m 1                                          # 使用1个Map任务(单线程)

lastmodified模式使用时间戳记载数据的更新线。

若同一条记录被多次更新,且 lastmode 时间超过 --last-value,Sqoop 会多次导入该记录。

解决方案:添加 --merge-key <主键列> 参数,合并新旧数据(基于主键去重):

 --merge-key id  # 假设 id 是主键列

自定义模式:
要求:每次运行的输出目录不能相同

特点:自己实现增量的数据过滤,可以实现新增和更新数据的采集

场景:一般用于自定义增量采集每天的分区数据到Hive

sqoop  import \
--connect jdbc:mysql://node3:3306/db_order \
--username root \
--password-file file:///export/data/sqoop.passwd \
--query "select * from tb_order where substring(create_time,1,10) = '2021-09-14' or substring(update_time,1,10) = '2021-09-14' and \$CONDITIONS " \
--delete-target-dir \
--target-dir /nginx/logs/tb_order/daystr=2021-09-14 \
--fields-terminated-by '\t' \
-m 1

自定义模式可以根据设置的sql进行数据导入,因此是最常用的场景。

到此这篇关于Apache Sqoop数据采集问题的文章就介绍到这了,更多相关Apache Sqoop数据采集内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文