Apache Doris基础简介
作者:Aimyon_36
Apache Doris 是一个现代化的 MPP(Massively Parallel Processing,即大规模并行处理)分析型数据库产品。仅需亚秒级响应时间即可获得查询结果,有效地支持实时数据分析。
一、Doris架构
Doris 的架构很简洁,只设 FE(Frontend)、BE(Backend)两种角色、两个进程,不依赖于外部组件,方便部署和运维,FE、BE 都可线性扩展。
- FE(Frontend):存储、维护集群元数据;负责接收、解析查询请求,规划查询计划,调度查询执行,返回查询结果。主要有三个角色:
- Leader 和 Follower:主要是用来达到元数据的高可用,保证单节点宕机的情况下,元数据能够实时地在线恢复,而不影响整个服务。
- Observer:用来扩展查询节点,同时起到元数据备份的作用。如果在发现集群压力非常大的情况下,需要去扩展整个查询的能力,那么可以加 observer 的节点。observer 不参与任何的写入,只参与读取。
- BE(Backend):负责物理数据的存储和计算;依据 FE 生成的物理计划,分布式执行查询。
- 数据的可靠性由 BE 保证,BE 会对整个数据存储多副本或者是三副本。副本数可根据需求动态调整。
- MySQL Client:Doris 借助 MySQL 协议,用户使用任意 MySQL 的 ODBC/JDBC 以及 MySQL 的客户端,都可以直接访问 Doris。
- Broker:Broker 为一个独立的无状态进程。封装了文件系统接口,提供 Doris 读取远端存储系统中文件的能力,包括 HDFS,S3,BOS 等。
FE负责存储元数据和接收前端请求,BE负责执行查询,Observer负责扩展查询;
此处的Doris只是引用了Mysql的客户端协议,方便用户可以通过MysqlClient直接访问Doris,类似的mysql -user -password -P 9030就能通过像登录mysql客户端的方式访问Doris。
二、基本概念
1. Row & Column
一张表包括行(Row)和列(Column)。Row 即用户的一行数据。Column 用于描述一行数据中不同的字段。
- 在默认的数据模型中,Column 只分为排序列和非排序列。存储引擎会按照排序列对数据进行排序存储,并建立稀疏索引,以便在排序数据上进行快速查找。
- 在聚合模型中,Column 可以分为两大类:Key 和 Value。从业务角度看,Key 和Value 可以分别对应维度列和指标列。
从聚合模型的角度来说,Key 列相同的行,会聚合成一行。其中 Value 列的聚合方式由用户在表时指定。
2. Partition & Tablet
在 Doris 的存储引擎中,用户数据首先被划分成若干个分区(Partition),划分的规则通常是按照用户指定的分区列进行范围划分,比如按时间划分。而在每个分区内,数据被进一步的按照 Hash 的方式分桶,分桶的规则是要找用户指定的分桶列的值进行 Hash 后分桶。每个分桶就是一个数据分片(Tablet),也是数据划分的最小逻辑单元。
- Tablet 之间的数据是没有交集的,独立存储的。Tablet 也是数据移动、复制等操作的最小物理存储单元。
- Partition 可以视为是逻辑上最小的管理单元。数据的导入与删除,都可以或仅能针对一个 Partition 进行。
此处的Partition & Tablet类似Hive中的分区分桶。
3. 建表示例
建表语法:
CREATE [EXTERNAL] TABLE [IF NOT EXISTS] [database.]table_name ( column_definition1[, column_definition2, ...] [, index_definition1[, index_definition12,]] ) [ENGINE = [olap|mysql|broker|hive]] [key_desc] [COMMENT "table comment"]; [partition_desc] [distribution_desc] [rollup_index] [PROPERTIES ("key"="value", ...)] [BROKER PROPERTIES ("key"="value", ...)];
Doris 的建表是一个同步命令,命令返回成功,即表示建表成功。
Doris 支持支持单分区和复合分区两种建表方式。
- 复合分区:既有分区也有分桶
- 第一级称为 Partition,即分区。用户可以指定某一维度列作为分区列(当前只支持整型和时间类型的列),并指定每个分区的取值范围。
- 第二级称为 Distribution,即分桶。用户可以指定一个或多个维度列以及桶数对数据进行 HASH 分布。
- 单分区:只做 HASH 分布,即只分桶。
字段类型
TINYINT | 1 字节 | 范围:-2^7 + 1 ~ 2^7 - 1 |
SMALLINT | 2 字节 | 范围:-2^15 + 1 ~ 2^15 - 1 |
INT | 4 字节 | 范围:-2^31 + 1 ~ 2^31 - 1 |
BIGINT | 8 字节 | 范围:-2^63 + 1 ~ 2^63 - 1 |
LARGEINT | 16 字节 | 范围:-2^127 + 1 ~ 2^127 - 1 |
FLOAT | 4 字节 | 支持科学计数法 |
DOUBLE | 12 字节 | 支持科学计数法 |
DECIMAL[(precision, scale)] | 16 字节 | 保证精度的小数类型。 默认是DECIMAL(10, 0) precision: 1 ~ 27 scale: 0 ~ 9其中整数部分为 1 ~ 18不支持科学计数法 |
DATE | 3 字节 | 范围:0000-01-01 ~ 9999-12-31 |
DATETIME | 8 字节 | 范围:0000-01-01 00:00:00 ~ 9999-12-31 23:59:59 |
CHAR[(length)] | 定长字符串。长度范围:1 ~ 255。默认为 1 | |
VARCHAR[(length)] | 变长字符串。长度范围:1 ~ 65533 | |
BOOLEAN | 与 TINYINT 一样,0 代表 false,1 代表 true | |
HLL | 1~16385 个字节 | hll 列类型,不需要指定长度和默认值、长度根据数据的聚合程度系统内控制,并且 HLL 列只能通过 配 套 的 hll_union_agg 、Hll_cardinality、hll_hash 进行查询或使用 |
BITMAP | bitmap 列类型,不需要指定长度和默认值。表示整型的集合,元素最大支持到 2^64 - 1 | |
STRING | 变长字符串,0.15 版本支持,最大支持 2147483643 字节(2GB-4),长度还受 be 配置string_type_soft_limit , 实际能存储的最大长度取两者最小值。只能用在 value 列,不能用在 key 列和分区、分桶列 |
注:聚合模型在定义字段类型后,可以指定字段的 agg_type 聚合类型,如果不指定,则该列为 key 列。否则,该列为 value 列, 类型包括:SUM、MAX、MIN、REPLACE。
Range Partition
CREATE TABLE IF NOT EXISTS example_db.expamle_range_tbl ( `user_id` LARGEINT NOT NULL COMMENT "用户 id", `date` DATE NOT NULL COMMENT "数据灌入日期时间", `timestamp` DATETIME NOT NULL COMMENT "数据灌入的时间戳", `city` VARCHAR(20) COMMENT "用户所在城市", `age` SMALLINT COMMENT "用户年龄", `sex` TINYINT COMMENT "用户性别", `last_visit_date` DATETIME REPLACE DEFAULT "1970-01-01 00:00:00" COMMENT "用户最后一次访问时间", `cost` BIGINT SUM DEFAULT "0" COMMENT "用户总消费", `max_dwell_time` INT MAX DEFAULT "0" COMMENT "用户最大停留时间", `min_dwell_time` INT MIN DEFAULT "99999" COMMENT "用户最小停留时间" ) ENGINE=olap AGGREGATE KEY(`user_id`, `date`, `timestamp`, `city`, `age`, `sex`) PARTITION BY RANGE(`date`) ( PARTITION `p201701` VALUES LESS THAN ("2017-02-01"), PARTITION `p201702` VALUES LESS THAN ("2017-03-01"), PARTITION `p201703` VALUES LESS THAN ("2017-04-01") ) DISTRIBUTED BY HASH(`user_id`) BUCKETS 16 PROPERTIES ( "replication_num" = "3", "storage_medium" = "SSD", "storage_cooldown_time" = "2018-01-01 12:00:00" );
上述建表语句中,AGGREGATE KEY(user_id
, date
, timestamp
, city
, age
, sex
)属于是key列,下边的列在定义类型后添加了聚合类型(MAX,MIN,REPLACE,SUM)的列称为value列。
`last_visit_date` DATETIME REPLACE DEFAULT "1970-01-01 00:00:00" COMMENT "用户最后一次访问时间", `cost` BIGINT SUM DEFAULT "0" COMMENT "用户总消费", `max_dwell_time` INT MAX DEFAULT "0" COMMENT "用户最大停留时间", `min_dwell_time` INT MIN DEFAULT "99999" COMMENT "用户最小停留时间"
此处还使用了复合分区,使用PARTITION BY RANGE(date
)指定使用date字段进行范围分区,分区的范围是左闭右开的,并通过HASH算法根据user_id列进行分桶。
PARTITION BY RANGE(`date`) ( PARTITION `p201701` VALUES LESS THAN ("2017-02-01"), PARTITION `p201702` VALUES LESS THAN ("2017-03-01"), PARTITION `p201703` VALUES LESS THAN ("2017-04-01") ) DISTRIBUTED BY HASH(`user_id`) BUCKETS 16
List Partition
CREATE TABLE IF NOT EXISTS example_db.expamle_list_tbl ( `user_id` LARGEINT NOT NULL COMMENT "用户 id", `date` DATE NOT NULL COMMENT "数据灌入日期时间", `timestamp` DATETIME NOT NULL COMMENT "数据灌入的时间戳", `city` VARCHAR(20) COMMENT "用户所在城市", `age` SMALLINT COMMENT "用户年龄", `sex` TINYINT COMMENT "用户性别", `last_visit_date` DATETIME REPLACE DEFAULT "1970-01-01 00:00:00" COMMENT "用户最后一次访问时间", `cost` BIGINT SUM DEFAULT "0" COMMENT "用户总消费", `max_dwell_time` INT MAX DEFAULT "0" COMMENT "用户最大停留时间", `min_dwell_time` INT MIN DEFAULT "99999" COMMENT "用户最小停留时 间" ) ENGINE=olap AGGREGATE KEY(`user_id`, `date`, `timestamp`, `city`, `age`, `sex`) PARTITION BY LIST(`city`) ( PARTITION `p_cn` VALUES IN ("Beijing", "Shanghai", "Hong Kong"), PARTITION `p_usa` VALUES IN ("New York", "San Francisco"), PARTITION `p_jp` VALUES IN ("Tokyo") ) DISTRIBUTED BY HASH(`user_id`) BUCKETS 16 PROPERTIES ( "replication_num" = "3", "storage_medium" = "SSD", "storage_cooldown_time" = "2018-01-01 12:00:00" );
(1)列的定义
在AGGREGATE KEY 数据模型中,所有没有指定聚合方式(SUM、REPLACE、MAX、MIN)的列视为 Key 列。而其余则为 Value 列。
定义列时,可参照如下建议:
- Key 列必须在所有 Value 列之前。
- 尽量选择整型类型。因为整型类型的计算和查找比较效率远高于字符串。
- 对于不同长度的整型类型的选择原则,遵循够用即可。
- 对于 VARCHAR 和 STRING 类型的长度,遵循 够用即可。
- 所有列的总字节长度(包括 Key 和 Value)不能超过 100KB。
(2)分区分桶
Doris 支持两层的数据划分。第一层是 Partition,支持 Range 和 List 的划分方式。第二层是 Bucket(Tablet),仅支持 Hash 的划分方式。
也可以仅使用一层分区。使用一层分区时,只支持 Bucket 划分。
Partition
- Partition 列可以指定一列或多列。分区类必须为 KEY 列。多列分区的使用方式在后面介绍。
- 不论分区列是什么类型,在写分区值时,都需要加双引号。
- 分区数量理论上没有上限。
- 当不使用 Partition 建表时,系统会自动生成一个和表名同名的,全值范围的Partition。该 Partition 对用户不可见,并且不可删改。
Range 分区 : 分区列通常为时间列,以方便的管理新旧数据。不可添加范围重叠的分区。
- VALUES LESS THAN (…) 仅指定上界,系统会将前一个分区的上界作为该分区的下界,生成一个左闭右开的区间。分区的删除不会改变已存在分区的范围。删除分区可能出现空洞。
- VALUES […) 指定同时指定上下界,生成一个左闭右开的区间。
通过 VALUES […) 同时指定上下界比较容易理解。这里举例说明,当使用 VALUES LESS THAN (…) 语句进行分区的增删操作时,分区范围的变化情况:
(1)如上 expamle_range_tbl 示例,当建表完成后,会自动生成如下 3 个分区:
p201701: [MIN_VALUE, 2017-02-01) p201702: [2017-02-01, 2017-03-01) p201703: [2017-03-01, 2017-04-01)
(2)增加一个分区 p201705 VALUES LESS THAN (“2017-06-01”),分区结果如下:
p201701: [MIN_VALUE, 2017-02-01)
p201702: [2017-02-01, 2017-03-01)
p201703: [2017-03-01, 2017-04-01)
p201705: [2017-04-01, 2017-06-01)
(3)此时删除分区 p201703,则分区结果如下:
p201701: [MIN_VALUE, 2017-02-01)
p201702: [2017-02-01, 2017-03-01)
p201705: [2017-04-01, 2017-06-01)
注意: p201702 和 p201705 的分区范围并没有发生变化,而这两个分区之间,出现了一个空洞:[2017-03-01, 2017-04-01)。即如果导入的数据范围在这个空洞范围内,是无法导入的。
List 分区:分区列支持 BOOLEAN, TINYINT, SMALLINT, INT, BIGINT, LARGEINT, DATE, DATETIME, CHAR, VARCHAR 数据类型,分区值为枚举值。只有当数据为目标分区枚举值其中之一时,才可以命中分区。不可添加范围重叠的分区。
Partition 支持通过 VALUES IN (…) 来指定每个分区包含的枚举值。下面通过示例说明,进行分区的增删操作时,分区的变化。
(1)如上 example_list_tbl 示例,当建表完成后,会自动生成如下 3 个分区:
p_cn: ("Beijing", "Shanghai", "Hong Kong") p_usa: ("New York", "San Francisco") p_jp: ("Tokyo")
(2)增加一个分区 p_uk VALUES IN (“London”),分区结果如下:
p_cn: ("Beijing", "Shanghai", "Hong Kong")
p_usa: ("New York", "San Francisco")
p_jp: ("Tokyo")
p_uk: ("London")
(3)删除分区 p_jp,分区结果如下:
p_cn: ("Beijing", "Shanghai", "Hong Kong")
p_usa: ("New York", "San Francisco")
p_uk: ("London")
Bucket
- 如果使用了 Partition,则 DISTRIBUTED … 语句描述的是数据在各个分区内的划分规则。如果不使用 Partition,则描述的是对整个表的数据的划分规则。
- 分桶列可以是多列,但必须为 Key 列。分桶列可以和 Partition 列相同或不同。
- 分桶列的选择,是在 查询吞吐 和 查询并发 之间的一种权衡:
- 如果选择多个分桶列,则数据分布更均匀。如果一个查询条件不包含所有分桶列的等值条件,那么该查询会触发所有分桶同时扫描,这样查询的吞吐会增加,单个查询的延迟随之降低。这个方式适合大吞吐低并发的查询场景。
指定的分桶列越多,分桶的粒度越细。
比如使用学科+班级的key列进行分桶,如果查询时只指定了班级,那么会发生全分区扫描。‘
- 如果仅选择一个或少数分桶列,则对应的点查询可以仅触发一个分桶扫描。此时,当多个点查询并发时,这些查询有较大的概率分别触发不同的分桶扫描,各个查询之间的 IO 影响较小(尤其当不同桶分布在不同磁盘上时),所以这种方式适合高并发的点查询场景。
- 分桶的数量理论上没有上限。
复合分区使用场景:
(1)有时间维度或类似带有有序值的维度,可以以这类维度列作为分区列。分区粒度可以根据导入频次、分区数据量等进行评估。
(2)历史数据删除需求:如有删除历史数据的需求(比如仅保留最近 N 天的数据)。使用复合分区,可以通过删除历史分区来达到目的。也可以通过在指定分区内发送 DELETE 语句进行数据删除。
(3)解决数据倾斜问题:每个分区可以单独指定分桶数量。如按天分区,当每天的数据量差异很大时,可以通过指定分区的分桶数,合理划分不同分区的数据,分桶列建议选择区分度大的列。
(3)多列分区
Range 分区
PARTITION BY RANGE(`date`, `id`) ( PARTITION `p201701_1000` VALUES LESS THAN ("2017-02-01", "1000"), PARTITION `p201702_2000` VALUES LESS THAN ("2017-03-01", "2000"), PARTITION `p201703_all` VALUES LESS THAN ("2017-04-01") )
指定 date
(DATE 类型) 和 id
(INT 类型) 作为分区列。以上示例最终得到的分区如下:
p201701_1000: [(MIN_VALUE, MIN_VALUE), ("2017-02-01", "1000") )
p201702_2000: [("2017-02-01", "1000"), ("2017-03-01", "2000") )
p201703_all: [("2017-03-01", "2000"), ("2017-04-01", MIN_VALUE))
注意,最后一个分区用户缺省只指定了 date
列的分区值,所以 id
列的分区值会默认填充 MIN_VALUE
。当用户插入数据时,分区列值会按照顺序依次比较,最终得到对应的分区。举例如下:
数据 --> 分区
2017-01-01, 200 --> p201701_1000
2017-01-01, 2000 --> p201701_1000
2017-02-01, 100 --> p201701_1000
2017-02-01, 2000 --> p201702_2000
2017-02-15, 5000 --> p201702_2000
2017-03-01, 2000 --> p201703_all
2017-03-10, 1 --> p201703_all
2017-04-01, 1000 --> 无法导入
2017-05-01, 1000 --> 无法导入
List 分区
PARTITION BY LIST(`id`, `city`) ( PARTITION `p1_city` VALUES IN (("1", "Beijing"), ("1", "Shanghai")), PARTITION `p2_city` VALUES IN (("2", "Beijing"), ("2", "Shanghai")), PARTITION `p3_city` VALUES IN (("3", "Beijing"), ("3", "Shanghai")) )
指定 id
(INT 类型) 和 city
(VARCHAR 类型) 作为分区列。最终得到的分区如下:
p1_city: [("1", "Beijing"), ("1", "Shanghai")] p2_city: [("2", "Beijing"), ("2", "Shanghai")] p3_city: [("3", "Beijing"), ("3", "Shanghai")]
当用户插入数据时,分区列值会按照顺序依次比较,最终得到对应的分区。举例如下:
数据 ---> 分区 1, Beijing ---> p1_city 1, Shanghai ---> p1_city 2, Shanghai ---> p2_city 3, Beijing ---> p3_city 1, Tianjin ---> 无法导入 4, Beijing ---> 无法导入
(4)PROPERTIES
- replication_num
- 每个 Tablet 的副本数量。默认为 3,建议保持默认即可。在建表语句中,所有 Partition 中的 Tablet 副本数量统一指定。而在增加新分区时,可以单独指定新分区中 Tablet 的副本数量。
- 副本数量可以在运行时修改。强烈建议保持奇数。
- 最大副本数量取决于集群中独立 IP 的数量(注意不是 BE 数量)。Doris 中副本分布的原则是,不允许同一个 Tablet 的副本分布在同一台物理机上,而识别物理机即通过 IP。所以,即使在同一台物理机上部署了 3 个或更多 BE 实例,如果这些 BE 的 IP 相同,则依然只能设置副本数为 1。
- 对于一些小,并且更新不频繁的维度表,可以考虑设置更多的副本数。这样在 Join 查询
- 时,可以有更大的概率进行本地数据 Join。
- storage_medium & storage_cooldown_time
- BE 的数据存储目录可以显式的指定为 SSD 或者 HDD(通过 .SSD 或者 .HDD 后缀区分)。建表时,可以统一指定所有 Partition 初始存储的介质。注意,后缀作用是显式指定磁盘介质,而不会检查是否与实际介质类型相符。
(5)ENGINE
ENGINE 的类型是 olap,即默认的 ENGINE 类型。在 Doris 中,只有这个ENGINE 类型是由 Doris 负责数据管理和存储的。其他 ENGINE 类型,如 mysql、broker、es 等等,本质上只是对外部其他数据库或系统中的表的映射,以保证 Doris 可以读取这些数据。而 Doris 本身并不创建、管理和存储任何非 olap ENGINE 类型的表和数据。
三、数据模型
Doris 的数据模型主要分为 3 类:Aggregate、Uniq、Duplicate
1. Aggregate 模型
表中的列按照是否设置了 AggregationType,分为 Key(维度列)和 Value(指标列)。没有设置 AggregationType 的称为 Key,设置了 AggregationType 的称为 Value。
当导入数据时,对于 Key 列相同的行会聚合成一行, Value 列会按照设置的AggregationType 进行聚合。AggregationType 目前有以下四种聚合方式:
- SUM:求和,多行的 Value 进行累加
- REPLACE:替代,下一批数据中的 Value 会替换之前导入过的行中的 Value。REPLACE_IF_NOT_NULL :当遇到 null 值则不更新。
- MAX:保留最大值。
- MIN:保留最小值。
数据的聚合,在 Doris 中有如下三个阶段发生:
(1)每一批次数据导入的 ETL 阶段。该阶段会在每一批次导入的数据内部进行聚合。
(2)底层 BE 进行数据 Compaction 的阶段。该阶段,BE 会对已导入的不同批次的数据进行进一步的聚合。
(3)数据查询阶段。在数据查询时,对于查询涉及到的数据,会进行对应的聚合。数据在不同时间,可能聚合的程度不一致。比如一批数据刚导入时,可能还未与之前已存在的数据进行聚合。但是对于用户而言,用户只能查询到聚合后的数据。即不同的聚合程度对于用户查询而言是透明的。用户需始终认为数据以最终的完成的聚合程度存在,而不应假设某些聚合还未发生。
举个栗子:导入数据聚合
CREATE TABLE IF NOT EXISTS test_db.example_site_visit ( `user_id` LARGEINT NOT NULL COMMENT "用户 id", `date` DATE NOT NULL COMMENT "数据灌入日期时间", `city` VARCHAR(20) COMMENT "用户所在城市", `age` SMALLINT COMMENT "用户年龄", `sex` TINYINT COMMENT "用户性别", `last_visit_date` DATETIME REPLACE DEFAULT "1970-01-01 00:00:00" COMMENT "用户最后一次访问时间", `last_visit_date_not_null` DATETIME REPLACE_IF_NOT_NULL DEFAULT "1970-01-01 00:00:00" COMMENT "用户最后一次访问时间", `cost` BIGINT SUM DEFAULT "0" COMMENT "用户总消费", `max_dwell_time` INT MAX DEFAULT "0" COMMENT "用户最大停留时间", `min_dwell_time` INT MIN DEFAULT "99999" COMMENT "用户最小停留时 间" ) AGGREGATE KEY(`user_id`, `date`, `city`, `age`, `sex`) DISTRIBUTED BY HASH(`user_id`) BUCKETS 10;
插入数据:
执行结果:
聚合发生在所有key列的值都一样的情况下,类似group by c1,c2,c3
Insert into 单条数据这种操作在 Doris 里不能在生产使用,会引发写阻塞,因为每一批次的数据导入会引发数据的聚合逻辑操作。
再举个栗子: 保留明细数据
CREATE TABLE IF NOT EXISTS test_db.example_site_visit2 ( `user_id` LARGEINT NOT NULL COMMENT "用户 id", `date` DATE NOT NULL COMMENT "数据灌入日期时间", `timestamp` DATETIME COMMENT "数据灌入时间,精确到秒", `city` VARCHAR(20) COMMENT "用户所在城市", `age` SMALLINT COMMENT "用户年龄", `sex` TINYINT COMMENT "用户性别", `last_visit_date` DATETIME REPLACE DEFAULT "1970-01-01 00:00:00" COMMENT "用户最后一次访问时间", `cost` BIGINT SUM DEFAULT "0" COMMENT "用户总消费", `max_dwell_time` INT MAX DEFAULT "0" COMMENT "用户最大停留时间", `min_dwell_time` INT MIN DEFAULT "99999" COMMENT "用户最小停留时 间" ) AGGREGATE KEY(`user_id`, `date`, `timestamp`, `city`, `age`, `sex`) DISTRIBUTED BY HASH(`user_id`) BUCKETS 10;
使用更细粒度的列从而让Key列的值都不一样,达到保存明细数据的效果,上述使用timestamp这种细粒度的列值,能够记录到每条数据。
2. Uniq 模型
在某些多维分析场景下,用户更关注的是如何保证 Key 的唯一性,即如何获得 Primary Key 唯一性约束。因此,我们引入了 Uniq 的数据模型。该模型本质上是聚合模型的一个特例,也是一种简化的表结构表示方式。
建表语句:
CREATE TABLE IF NOT EXISTS test_db.user ( `user_id` LARGEINT NOT NULL COMMENT "用户 id", `username` VARCHAR(50) NOT NULL COMMENT "用户昵称", `city` VARCHAR(20) COMMENT "用户所在城市", `age` SMALLINT COMMENT "用户年龄", `sex` TINYINT COMMENT "用户性别", `phone` LARGEINT COMMENT "用户电话", `address` VARCHAR(500) COMMENT "用户地址", `register_time` DATETIME COMMENT "用户注册时间" ) UNIQUE KEY(`user_id`, `username`) DISTRIBUTED BY HASH(`user_id`) BUCKETS 10;
插入数据:
insert into test_db.user values\ (10000,'wuyanzu',' 北 京 ',18,0,12345678910,' 北 京 朝 阳 区 ','2017-10-01 07:00:00'),\ (10000,'wuyanzu',' 北 京 ',19,0,12345678910,' 北 京 朝 阳 区 ','2017-10-01 07:00:00'),\ (10000,'zhangsan','北京',20,0,12345678910,'北京海淀区','2017-11-15 06:10:20');
执行结果:
Unique模型确保了UNIQUE KEY(
user_id
,username
)的唯一行,而其他的列会选择最新的列值。
3. Duplicate 模型
在某些多维分析场景下,数据既没有主键,也没有聚合需求。Duplicate 数据模型可以满足这类需求。数据完全按照导入文件中的数据进行存储,不会有任何聚合。即使两行数据完全相同,也都会保留。 而在建表语句中指定的 DUPLICATE KEY,只是用来指明底层数据按照那些列进行排序。
4. 模型选建议
因为数据模型在建表时就已经确定,且无法修改。所以,选择一个合适的数据模型非常重要。
(1)Aggregate 模型可以通过预聚合,极大地降低聚合查询时所需扫描的数据量和查询的计算量,非常适合有固定模式的报表类查询场景。但是该模型对 count(*) 查询很不友好。同时因为固定了 Value 列上的聚合方式,在进行其他类型的聚合查询时,需要考虑语意正确性
(2)Uniq 模型针对需要唯一主键约束的场景,可以保证主键唯一性约束。但是无法利用 ROLLUP 等预聚合带来的查询优势(因为本质是 REPLACE,没有 SUM 这种聚合方式)。
(3)Duplicate 适合任意维度的 Ad-hoc 查询。虽然同样无法利用预聚合的特性,但是不 受聚合模型的约束,可以发挥列存模型的优势(只读取相关列,而不需要读取所有 Key 列)
如何应对业务上的Count(*)场景问题:
Doris提供的预聚合导致用户在查询时,获取的结果都是对key聚合后的结果,无法获取到相应key列的条数这样的结果。
因此,当业务上有频繁的 count( * ) 查询时,增加一个 count 列,并且导入数据中,该列值恒为 1。则 select count() from table; 的结果等价于 select sum(count) from table;。而后者的查询效率将远高于前者。不过这种方式也有使用限制,就是用户需要自行保证,不会重复导入AGGREGATE KEY 列都相同的行。否则,select sum(count) from table; 只能表述原始导入的行数,而不是 select count() from table; 的语义。
另一种方式,就是 将如上的 count 列的聚合类型改为 REPLACE,且依然值恒为 1。那
么 select sum(count) from table; 和 select count(*) from table; 的结果将是一致的。并且这种
方式,没有导入重复行的限制。
四、ROLLUP
ROLLUP 在多维分析中是“上卷”的意思,即将数据按某种指定的粒度进行进一步聚合。
- 在 Doris 中,我们将用户通过建表语句创建出来的表称为 Base 表(Base Table)。Base 表中保存着按用户建表语句指定的方式存储的基础数据。
- 在 Base 表之上,我们可以创建任意多个 ROLLUP 表。这些 ROLLUP 的数据是基于 Base 表产生的,并且在物理上是独立存储的。
- ROLLUP 表的基本作用,在于在 Base 表的基础上,获得更粗粒度的聚合数据。
1. Aggregate模型的ROLLUP
因为 Uniq 只是 Aggregate 模型的一个特例,所以不加以区别。
查看表的结构信息
例如想要查看每个用户的总消费,建立一个只有 user_id 和 cost 的 rollup。alter table example_site_visit2 add rollup rollup_cost_userid(user_id,cost);
再次查看表结构
- 创建rollup不会改变原有的表结构,并且对用户而言是不可见的。
- 可以通过 explain 查看执行计划,是否使用到了 rollup
- explain SELECT user_id, sum(cost) FROM example_site_visit2 GROUP BY user_id;
查看你ROLLUP表的完成状态。
SHOW ALTER TABLE ROLLUP;
Aggregate模型的roolup表完成了对基表更粗粒度的聚合,具体的,当数据插入到基表中时,Doris会自动将数据按照指定的聚合键进行分组,并计算每个分组的聚合结果,roolup表也是如此完成数据的预聚合。
2. Duplicate 模型中的 ROLLUP
因为 Duplicate 模型没有聚合的语意。所以该模型中的 ROLLUP,已经失去了“上卷”这一层含义。而仅仅是作为调整列顺序,以命中前缀索引的作用。
前缀索引
不同于传统的数据库设计,Doris 不支持在任意列上创建索引。Doris 这类 MPP 架构的 OLAP 数据库,通常都是通过提高并发,来处理大量数据的。
本质上,Doris 的数据存储在类似 SSTable(Sorted String Table)的数据结构中。该结构是一种有序的数据结构,可以按照指定的列进行排序存储。在这种数据结构上,以排序列作为条件进行找,会非常的高效。
在 Aggregate、Uniq 和 Duplicate 三种数据模型中。底层的数据存储,是按照各自建表语句中,AGGREGATE KEY、UNIQ KEY 和 DUPLICATE KEY 中指定的列进行排序存储的。而前缀索引,即在排序的基础上,实现的一种根据给定前缀列,快速查询数据的索引方式。
我们将一行数据的前 36 个字节 作为这行数据的前缀索引。当遇到 VARCHAR 类型时,前缀索引会直接截断。
以下表结构的前缀索引为 user_id(8 Bytes) + age(4 Bytes) + message(prefix 20 Bytes)
[message列的一部分-取20个字节]。
ColumnName | ColumnType |
---|---|
user_id | BIGINT |
age | INT |
message | VARCHAR(100) |
max_dwell_time | DATETIME |
min_dwell_time | DATETIME |
以下表结构的前缀索引为 user_name(20 Bytes)。即使没有达到 36 个字节,因为遇到VARCHAR,所以直接截断,不再往后继续。
ColumnName | ColumnType |
---|---|
user_name | VARCHAR(20) |
age | INT |
message | VARCHAR(100) |
max_dwell_time | DATETIME |
min_dwell_time | DATETIME |
当我们的查询条件,是前缀索引的前缀时,可以极大的加快查询速度。比如在第一个例子中,我们执行如下查询:
SELECT * FROM table WHERE user_id=1829239 and age=20;
该查询的效率会远高于如下查询:
SELECT * FROM table WHERE age=20;
所以在建表时,正确的选择列顺序,能够极大地提高查询效率。
ROLLUP 调整前缀索引
因为建表时已经指定了列顺序,所以一个表只有一种前缀索引。这对于使用其他不能命中前缀索引的列作为条件进行的查询来说,效率上可能无法满足需求。因此,我们可以通过创建 ROLLUP 来人为的调整列顺序。
Base 表结构如下:
ColumnName | ColumnType |
---|---|
user_id | BIGINT |
age | INT |
message | VARCHAR(100) |
max_dwell_time | DATETIME |
min_dwell_time | DATETIME |
我们可以在此基础上创建一个 ROLLUP 表:
ColumnName | ColumnType |
---|---|
age | INT |
user_id | BIGINT |
message | VARCHAR(100) |
max_dwell_time | DATETIME |
min_dwell_time | DATETIME |
可以看到,ROLLUP 和 Base 表的列完全一样,只是将 user_id 和 age 的顺序调换了。那么当我们进行如下查询时:
SELECT * FROM table where age=20 and message LIKE "%error%";
会优先选择 ROLLUP 表,因为 ROLLUP 的前缀索引匹配度更高。
3. DROLLUP 的使用说明
- ROLLUP 最根本的作用是提高某些查询的查询效率(无论是通过聚合来减少数据量,还是修改列顺序以匹配前缀索引)。因此 ROLLUP 的含义已经超出了“上卷”的范围。这也是为什么在源代码中,将其命名为 Materialized Index(物化索引)的原因。
- ROLLUP 是附属于 Base 表的,可以看做是 Base 表的一种辅助数据结构。用户可以在 Base 表的基础上,创建或删除 ROLLUP,但是不能在查询中显式的指定查询某ROLLUP。是否命中ROLLUP 完全由 Doris 系统自动决定。
- ROLLUP 的数据是独立物理存储的。因此,创建的 ROLLUP 越多,占用的磁盘空间也就越大。同时对导入速度也会有影响(导入的 ETL 阶段会自动产生所有ROLLUP 的数据),但是不会降低查询效率(只会更好)。
- ROLLUP 的数据更新与 Base 表是完全同步的。用户无需关心这个问题。
- ROLLUP 中列的聚合方式,与 Base 表完全相同。在创建 ROLLUP 无需指定,也不能修改。
- 查询能否命中 ROLLUP 的一个必要条件(非充分条件)是,查询所涉及的所有列(包括 select list 和 where 中的查询条件列等)都存在于该 ROLLUP 的列中。否则,查询只能命中 Base 表。
- 某些类型的查询(如 count(*))在任何条件下,都无法命中 ROLLUP。
- 可以通过 EXPLAIN your_sql; 命令获得查询执行计划,在执行计划中,查看是否命中ROLLUP。
- 可以通过 DESC tbl_name ALL; 语句显示 Base 表和所有已创建完成的 ROLLUP。
五、物化视图
物化视图就是包含了查询结果的数据库对象,可能是对远程数据的本地 copy,也可能是一个表或多表 join 后结果的行或列的子集,也可能是聚合后的结果。说白了,就是预先存储查询结果的一种数据库对象。
在 Doris 中的物化视图,就是查询结果预先存储起来的特殊的表。
物化视图的出现主要是为了满足用户,既能对原始明细数据的任意维度分析,也能快速的对固定维度进行分析查询。
视图是一种虚拟表,它是基于一个或多个数据库表的查询结果构建而成的。视图并不实际存储数据,而是根据定义的查询逻辑,在查询时动态生成结果,可以理解为视图是定义好的一组Sql语句,用户查询视图时,数据库会根据视图的定义执行相应的查询操作,并返回结果。
物化视图,是一种将查询结果预先存储起来的特殊的表。当基表数据更新时,物化视图也会一同更新。
物化视图创建删除语法:
create materialized view 物化视图名 as {sql语句} DROP MATERIALIZED VIEW 物化视图名 on Base 表名;
适用场景:
- 分析需求覆盖明细数据查询以及固定维度查询两方面。
- 查询仅涉及表中的很小一部分列或行。
- 查询包含一些耗时处理操作,比如:时间很久的聚合操作等。
- 查询需要匹配不同前缀索引。
优势:
- 对于那些经常重复的使用相同的子查询结果的查询性能大幅提升。
- Doris 自动维护物化视图的数据,无论是新的导入,还是删除操作都能保证 base 表和物化视图表的数据一致性。无需任何额外的人工维护成本。
- 查询时,会自动匹配到最优物化视图,并直接从物化视图中读取数据。自动维护物化视图的数据会造成一些维护开销,会在后面的物化视图的局限性中展开说明。
物化视图 VS Rollup:
- 在没有物化视图功能之前,用户一般都是使用 Rollup 功能通过预聚合方式提升查询效率的。但是 Rollup 具有一定的局限性,他不能基于明细模型做预聚合。
- 物化视图则在覆盖了 Rollup 的功能的同时,还能支持更丰富的聚合函数。所以物化视图其实是 Rollup 的一个超集。
物化视图不仅达到预聚合的目的,也能像rollup一样修改字段顺序,优化前缀索引。
- ALTER TABLE ADD ROLLUP 语法支持的功能现在均可以通过CREATE MATERIALIZED VIEW 实现。
Doris 系统提供了一整套对物化视图的 DDL 语法,包括创建,查看,删除。DDL 的语法和PostgreSQL, Oracle 都是一致的。但是 Doris 目前创建物化视图只能在单表操作,不支持 join。
1. 创建物化视图
首先要根据查询语句的特点来决定创建一个什么样的物化视图。并不是说物化视图定义和某个查询语句一模一样就最好。这里有两个原则:
(1)从查询语句中抽象出,多个查询共有的分组和聚合方式作为物化视图的定义。
(2)不需要给所有维度组合都创建物化视图。
首先第一个点,一个物化视图如果抽象出来,并且多个查询都可以匹配到这张物化视图。这种物化视图效果最好。因为物化视图的维护本身也需要消耗资源。如果物化视图只和某个特殊的查询很贴合,而其他查询均用不到这个物化视图。则会导致这张物化视图的性价比不高,既占用了集群的存储资源,还不能为更多的查询服务。
第二点就是,在实际的分析查询中,并不会覆盖到所有的维度分析。所以给常用的维度组合创建物化视图即可,从而到达一个空间和时间上的平衡。
创建物化视图是一个异步的操作,也就是说用户成功提交创建任务后,Doris 会在后台对存量的数据进行计算,直到创建成功。在构建期间,用户依然可以正常的查询和导入新的数据。创建任务会自动处理当前的存量数据和所有新到达的增量数据,从而保持和 base 表的数据一致性。
2. 物化视图的删选匹配
销售记录表:Base表
Field | Type | Key | Extra |
---|---|---|---|
record_id | INT | TRUE | |
saller_id | INT | TRUE | |
store_id | INT | TRUE | |
sale_date | DATE | FALSE | |
sale_amt | BIGINT | FALSE |
mv_1: 存储了不同时间不同销售员的售卖量
Field | Type | Key | Extra |
---|---|---|---|
saller_id | INT | TRUE | |
sale_date | DATE | FALSE | |
sale_amt | BIGINT | FALSE | sum |
mv_2:存储了不同时间不同门店的销售量
Field | Type | Key | Extra |
---|---|---|---|
store_id | INT | TRUE | |
sale_date | DATE | FALSE | |
sale_amt | BIGINT | FALSE | sum |
mv_3:存储了每个销售员的总销售量
Field | Type | Key | Extra |
---|---|---|---|
saller_id | INT | TRUE | |
sale_amt | BIGINT | FALSE | sum |
当查询 7 月 19 日,各个销售员都买了多少钱的话。就可以匹配 mv_1 物化视图。直接对 mv_1 的数据进行查询。
物化视图的自动匹配分为下面两个步骤:
(1)根据查询条件删选出一个最优的物化视图:这一步的输入是所有候选物化视图表的元数据,根据查询的条件从候选集中输出最优的一个物化视图
- 第一层过滤先判断查询 where 中的谓词涉及到的数据是否能从物化视图中得到。以在这一层过滤中,mv_3 就被淘汰了。
- 第二层过滤查询分组列是否为候选集的分组列的子集。在这一层过滤中,mv_2也被淘汰了。
- 第三层过滤是看查询的聚合列是否为候选集中聚合列的子集。这里 base 表和物化视图表均满足标准。
- 最后一层是过滤看查询需要的列是否存在于候选集合的列中。这里Base表和mv_1表均符合。
这里Base表和mv_1表均符合。
简而言之,物化视图的筛选过程,就是按照sql关键字的执行顺序通过查询字段和物化视图的匹配程度逐一删选出最符合的物化视图。
候选集过滤完后输出一个集合,这个集合中的所有表都能满足查询的需求。但每张表的查询效率都不同。这时候就需要再这个集合根据前缀索引是否能匹配到,以及聚合程度的高低来选出一个最优的物化视图。
从表结构中可以看出,base 表的销售日期列是一个非排序列,而物化视图表的日期是一个排序列,同时聚合程度上 mv_1 表明显比 base 表高。所以最后选择出 mv_1 作为该查询的最优匹配。
(2)根据选出的物化视图对查询进行改写:这一步是结合上一步选择出的最优物化视图,进行查询的改写,最终达到直接查询物化视图的目的。
查询改写的原则:
其中 bitmap 和 hll 的聚合函数在查询匹配到物化视图后,查询的聚合算子会根据物化视图的表结构进行一个改写。
3. 物化视图的限制
- 目前支持的聚合函数包括,常用的 sum,min,max count,以及计算 pv ,uv, 留存率,等常用的去重算法 hll_union,和用于精确去重计算 count(distinct)的算法bitmap_union。
- 物化视图的聚合函数的参数不支持表达式仅支持单列,比如: sum(a+b)不支持。
- 使用物化视图功能后,由于物化视图实际上是损失了部分维度数据的。所以对表的 DML 类型操作会有一些限制:
- 如果表的物化视图 key 中不包含删除语句中的条件列,则删除语句不能执行。
- 比如想要删除渠道为 app 端的数据,由于存在一个物化视图并不包含渠道这个字段,则这个删除不能执行,因为删除在物化视图中无法被执行。这时候你只能把物化视图先删除,然后删除完数据后,重新构建一个新的物化视图。
- 单表上过多的物化视图会影响导入的效率:导入数据时,物化视图和 base 表数据是同步更新的,如果一张表的物化视图表超过 10 张,则有可能导致导入速度很慢。这就像单次导入需要同时导入 10 张表数据是一样的。
- 相同列,不同聚合函数,不能同时出现在一张物化视图中,比如:select sum(a), min(a) from table 不支持。
- 物化视图针对 Unique Key 数据模型,只能改变列顺序,不能起到聚合的作用,所以在 Unique Key 模型上不能通过创建物化视图的方式对数据进行粗粒度聚合操作
六、修改和删除语句
1. 修改表
使用 ALTER TABLE 命令可以对表进行修改,包括 partition 、rollup、schema change、rename 和 index 五种。
rename
将名为 table1 的表修改为 table2
ALTER TABLE table1 RENAME table2;
将表 example_table 中名为 rollup1 的 rollup index 修改为 rollup2
ALTER TABLE example_table RENAME ROLLUP rollup1 rollup2;
将表 example_table 中名为 p1 的 partition 修改为 p2
ALTER TABLE example_table RENAME PARTITION p1 p2;
partition
增加分区, 使用默认分桶方式
现有分区 [MIN, 2013-01-01),增加分区 [2013-01-01, 2014-01-01),
ALTER TABLE example_db.my_table ADD PARTITION p1 VALUES LESS THAN ("2014-01-01");
增加分区,使用新的分桶数
ALTER TABLE example_db.my_table ADD PARTITION p1 VALUES LESS THAN ("2015-01-01") DISTRIBUTED BY HASH(k1) BUCKETS 20;
增加分区,使用新的副本数
ALTER TABLE example_db.my_table ADD PARTITION p1 VALUES LESS THAN ("2015-01-01") ("replication_num"="1");
修改分区副本数
ALTER TABLE example_db.my_table MODIFY PARTITION p1 SET("replication_num"="1");
批量修改指定分区
ALTER TABLE example_db.my_table MODIFY PARTITION (*) SET("storage_medium"="HDD");
批量修改所有分区
ALTER TABLE example_db.my_table MODIFY PARTITION (*) SET("storage_medium"="HDD");
删除分区
ALTER TABLE example_db.my_table DROP PARTITION p1;
增加一个指定上下界的分区
ALTER TABLE example_db.my_table ADD PARTITION p1 VALUES [("2014-01-01"), ("2014-02-01"));
rollup
创建 index: example_rollup_index,基于 base index(k1,k2,k3,v1,v2)。列式存储。
ALTER TABLE example_db.my_table ADD ROLLUP example_rollup_index(k1, k3, v1, v2);
创建 index: example_rollup_index2,基于 example_rollup_index(k1,k3,v1,v2)
ALTER TABLE example_db.my_table ADD ROLLUP example_rollup_index2 (k1, v1) FROM example_rollup_index;
创建 index: example_rollup_index3, 基于 base index (k1,k2,k3,v1), 自定义 rollup 超时时间一小时。
ALTER TABLE example_db.my_table ADD ROLLUP example_rollup_index(k1, k3, v1) PROPERTIES("timeout" = "3600");
删除 index: example_rollup_index2
ALTER TABLE example_db.my_table DROP ROLLUP example_rollup_index2;
2. 删除数据
Doris 目前可以通过两种方式删除数据:DELETE FROM 语句和 ALTER TABLE DROP PARTITION 语句。
DELETE FROM Statement(条件删除)
DELETE FROM table_name [PARTITION partition_name] WHERE column_name1 op { value | value_list } [ AND column_name2 op { value | value_list } ...];
delete from student_kafka where id=1;
- 该语句只能针对 Partition 级别进行删除。如果一个表有多个 partition 含有需要删除的数据,则需要执行多次针对不同 Partition 的 delete 语句。而如果是没有使用Partition 的表,partition 的名称即表名。
- where 后面的条件谓词只能针对 Key 列,并且谓词之间,只能通过 AND 连接。如果想实现 OR 的语义,需要执行多条 delete。
- delete 是一个同步命令,命令返回即表示执行成功。
- 从代码实现角度,delete 是一种特殊的导入操作。该命令所导入的内容,也是一个新的数据版本,只是该版本中只包含命令中指定的删除条件。在实际执行查询时,会根据这些条件进行查询时过滤。所以,不建议大量频繁使用 delete 命令,因为这可能导致查询效率降低。
- 数据的真正删除是在 BE 进行数据 Compaction 时进行的。所以执行完 delete 命令后,并不会立即释放磁盘空间。
- delete 命令一个较强的限制条件是,在执行该命令时,对应的表,不能有正在进行的导入任务(包括 PENDING、ETL、LOADING)。而如果有 QUORUM_FINISHED 状态的导入任务,则可能可以执行。
- delete 也有一个隐含的类似 QUORUM_FINISHED 的状态。即如果 delete 只在多数副本上完成了,也会返回用户成功。但是会在后台生成一个异步的 delete job(Async Delete Job),来继续完成对剩余副本的删除操作。如果此时通过 show delete 命令,可以看到这种任务在 state 一栏会显示 QUORUM_FINISHED。
DROP PARTITION Statement(删除分区)
该命令可以直接删除指定的分区。因为 Partition 是逻辑上最小的数据管理单元,所以使用 DROP PARTITION 命令可以很轻量的完成数据删除工作。并且该命令不受 load 以及任何其他操作的限制,同时不会影响查询效率。是比较推荐的一种数据删除方式。
该命令是同步命令,执行成功即生效。而后台数据真正删除的时间可能会延迟 10 分钟左右。
七、数据的导入导出
导入(Load)功能就是将用户的原始数据导入到 Doris 中。导入成功后,用户即可通过Mysql 客户端查询数据。
为适配不同的数据导入需求,Doris 系统提供了 6 种不同的导入方式。每种导入方式支持不同的数据源,存在不同的使用方式(异步,同步)。
所有导入方式都支持 csv 数据格式。其中 Broker load 还支持 parquet 和 orc 数据格式。
- Broker load
通过 Broker 进程访问并读取外部数据源(如 HDFS)导入到 Doris。用户通过 Mysql协议提交导入作业后,异步执行。通过 SHOW LOAD 命令查看导入结果。
- Stream load
用户通过 HTTP 协议提交请求并携带原始数据创建导入。主要用于快速将本地文件或数据流中的数据导入到 Doris。导入命令同步返回导入结果。
- Insert
类似 MySQL 中的 Insert 语句,Doris 提供 INSERT INTO tbl SELECT …; 的方式从Doris 的表中读取数据并导入到另一张表。或者通过 INSERT INTO tbl VALUES(…); 插入单条数据。
- Multi load
用户通过 HTTP 协议提交多个导入作业。Multi Load 可以保证多个导入作业的原子生效。
Routine load
- 用户通过 MySQL 协议提交例行导入作业,生成一个常驻线程,不间断的从数据源(如Kafka)中读取数据并导入到 Doris 中。
通过 S3 协议直接导入
- 用户通过 S3 协议直接导入数据,用法和 Broker Load 类似。Broker load 是一个异步的导入方式,支持的数据源取决于 Broker 进程支持的数据源。用户需要通过 MySQL 协议创建 Broker load 导入,并通过查看导入命令检查导入结果。
本文不再赘述数据导入导出,具体的可前往官网查看。
到此这篇关于Apache-Doris基础概念的文章就介绍到这了,更多相关Apache Doris内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!