基于Pyspark对Apache Iceberg核心功能的使用实践指南
作者:zhaojiew10
本文系统测试了 Apache Iceberg 表格式的核心功能。实验环境为本地 PySpark + Hadoop Catalog。
Apache Iceberg 是一种开放表格式,为数据湖提供数据库级别的 ACID 事务和高级功能。不同于传统的 Hive 表,Iceberg 将元数据与数据分离,实现了隐藏分区、时间旅行、Schema 演进等能力。
| 特性 | 说明 |
|---|---|
| ACID 事务 | 写入操作原子性,读取快照隔离 |
| 隐藏分区 | 分区对用户透明,查询规划器自动处理分区裁剪 |
| Schema 演进 | 支持添加、删除、重命名、 reorder 列,零数据重写 |
| 分区演进 | 可更改已有表的分区策略,无需迁移数据 |
| 时间旅行 | 基于快照的历史版本查询,支持 VERSION AS OF 和 TIMESTAMP AS OF |
| 行级操作 | 支持 UPDATE、DELETE、MERGE INTO |
| 开放格式 | 支持 Parquet、Avro、ORC 等主流列式存储格式 |
核心术语
| 术语 | 说明 |
|---|---|
| Schema | 表的字段定义(列名、类型) |
| Partition Spec | 分区规范,定义如何从数据字段派生分区值 |
| Snapshot | 表在某一时刻的完整状态快照 |
| Manifest List | 清单列表文件,记录属于某快照的所有 manifest 文件 |
| Manifest | 清单文件,记录该快照包含的所有数据文件和删除文件 |
| Data File | 实际存储表数据的文件(Parquet/Avro/ORC) |
| Delete File | 记录被删除行的文件,用于 Merge-on-Read |
| Metadata File | 元数据 JSON 文件,记录表结构、分区规范、快照列表 |
选型说明
- PySpark 3.5.6:PySpark 与 Iceberg 集成最成熟,支持完整的 Iceberg SQL 语法和 DataFrame API。
- Hadoop Catalog:使用本地文件系统作为元数据存储,生产环境可替换为 Hive Metastore 或 AWS Glue。
- Iceberg 1.8.1:与 Spark 3.5.x 兼容良好。
SparkSession 配置详解
spark.jars.packages:引入 Iceberg Spark 运行时 JAR,自动下载依赖spark.sql.extensions:注册 Iceberg SQL 扩展,支持 Iceberg 专用语法spark.sql.catalog.{CATALOG}:配置 Iceberg Catalog 实现类spark.sql.catalog.{CATALOG}.type:指定 Catalog 类型为 Hadoop(文件系统后端)spark.sql.catalog.{CATALOG}.warehouse:指定本地仓库路径/tmp/iceberg-warehousespark.sql.session.timeZone:统一时区为 UTC,避免时间处理歧义
def build_spark():
if os.path.exists(WAREHOUSE):
shutil.rmtree(WAREHOUSE)
return SparkSession.builder \
.appName("IcebergWorkshop") \
.config("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.8.1") \
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
.config(f"spark.sql.catalog.{CATALOG}", "org.apache.iceberg.spark.SparkCatalog") \
.config(f"spark.sql.catalog.{CATALOG}.type", "hadoop") \
.config(f"spark.sql.catalog.{CATALOG}.warehouse", WAREHOUSE) \
.config("spark.sql.session.timeZone", "UTC") \
.getOrCreate()
建表与数据写入
Iceberg 表创建时需要指定表名、Schema 和分区策略。与传统 Hive 表不同,Iceberg 采用隐藏分区(Hidden Partitioning)机制:用户写入原始时间戳字段,Iceberg 自动根据分区规范将数据写入对应的分区目录,查询时自动进行分区裁剪。
创建无分区表 customers
- 无分区表将所有数据存储在单一目录下,适合数据量较小或查询不带过滤条件的场景。
CREATE TABLE local.iceberg_db.customers (
customer_id INT,
name STRING,
email STRING,
country STRING,
registration_date DATE,
tier STRING
)
USING iceberg
[OK] customers table created (unpartitioned)创建隐藏分区表 orders
PARTITIONED BY (months(order_time))定义了按月分区。用 order_time 的月份自动分区,但是表里不会多出一列 “月份” 字段- Iceberg 的隐藏分区将
order_time转换为order_time_month分区列,数据写入order_time_month=2024-01/等目录。 - 查询时无需关心分区目录结构,Iceberg 自动根据时间条件过滤分区。
CREATE TABLE local.iceberg_db.orders (
order_id INT,
customer_id INT,
product_name STRING,
category STRING,
quantity INT,
price DOUBLE,
order_time TIMESTAMP,
status STRING
)
USING iceberg
PARTITIONED BY (months(order_time))
[OK] orders table created (hidden partition: months(order_time))向 customers 插入 20 行数据
sql> SELECT * FROM local.iceberg_db.customers ORDER BY customer_id +-----------+-------------+--------------------+-------+-----------------+-------+ |customer_id|name |email |country|registration_date|tier | +-----------+-------------+--------------------+-------+-----------------+-------+ |1 |张伟 |zhangwei@example.com|CN |2023-01-15 |vip | |2 |李娜 |lina@example.com |CN |2023-02-20 |premium| |3 |John Smith |jsmith@example.com |US |2023-03-10 |free | |4 |Emily Davis |edavis@example.com |US |2023-04-05 |premium| |5 |James Wilson |jwilson@example.com |UK |2023-05-18 |free | |6 |田中太郎 |tanaka@example.com |JP |2023-06-01 |vip | |7 |王芳 |wangfang@example.com|CN |2023-06-22 |free | |8 |Sarah Brown |sbrown@example.com |UK |2023-07-14 |premium| |9 |Mike Johnson |mjohnson@example.com|US |2023-08-30 |free | |10 |佐藤花子 |sato@example.com |JP |2023-09-12 |premium| |11 |赵磊 |zhaolei@example.com |CN |2023-10-01 |vip | |12 |Alice Lee |alee@example.com |US |2023-11-20 |free | |13 |David Clark |dclark@example.com |UK |2024-01-05 |premium| |14 |陈静 |chenjing@example.com|CN |2024-01-15 |free | |15 |Robert Taylor|rtaylor@example.com |US |2024-02-28 |vip | |16 |鈴木一郎 |suzuki@example.com |JP |2024-03-10 |free | |17 |黄丽 |huangli@example.com |CN |2024-03-22 |premium| |18 |Emma Thomas |ethomas@example.com |UK |2024-04-01 |free | |19 |林强 |linqiang@example.com|CN |2024-04-15 |vip | |20 |Tom Harris |tharris@example.com |US |2024-05-01 |premium| +-----------+-------------+--------------------+-------+-----------------+-------+
向 orders 插入 50 行数据
orders_df = generate_orders(spark)
orders_df.writeTo(f"{DB}.orders").append()
count(local.iceberg_db.orders WHERE 1=1) = 50
>>> orders sample (first 10)
sql> SELECT * FROM local.iceberg_db.orders ORDER BY order_id LIMIT 10
+--------+-----------+-------------------+-----------+--------+------+-------------------+---------+
|order_id|customer_id|product_name |category |quantity|price |order_time |status |
+--------+-----------+-------------------+-----------+--------+------+-------------------+---------+
|1001 |1 |MacBook Pro 16 |Electronics|1 |2499.0|2024-01-05 09:23:00|completed|
|1002 |1 |AirPods Pro |Electronics|2 |249.0 |2024-01-05 09:25:00|completed|
|1003 |2 |Python编程入门 |Books |1 |59.9 |2024-01-12 14:30:00|completed|
|1004 |3 |数据密集型应用设计 |Books |1 |79.0 |2024-01-18 11:00:00|completed|
|1005 |4 |Sony WH-1000XM5 |Electronics|1 |349.0 |2024-01-25 16:45:00|cancelled|
|1006 |5 |冬季羽绒服 |Clothing |1 |199.0 |2024-02-01 10:15:00|completed|
|1007 |6 |Nintendo Switch |Electronics|1 |299.0 |2024-02-08 08:30:00|completed|
|1008 |7 |Spark快速大数据分析|Books |1 |69.0 |2024-02-14 13:20:00|pending |
|1009 |8 |Yoga Mat Premium |Home |2 |45.5 |2024-02-20 15:00:00|completed|
|1010 |9 |机械键盘 Cherry轴 |Electronics|1 |129.0 |2024-02-28 09:45:00|cancelled|
+--------+-----------+-------------------+-----------+--------+------+-------------------+---------+orders 表按 months(order_time) 分区,数据分布在 2024 年 1-6 月的 6 个分区目录中。Parquet 文件存储在 /tmp/iceberg-warehouse/iceberg_db/orders/data/order_time_month=2024-01/ 等目录下。
查询与分区裁剪
Iceberg 的分区裁剪在查询规划阶段完成,通过读取元数据文件中的分区统计信息,过滤掉不相关的分区目录。与 Hive 的物理分区裁剪不同,Iceberg 无需扫描分区目录,metadata-driven 方式效率更高。
分区裁剪验证
- 查询条件
order_time >= '2024-06-01'触发分区裁剪,Iceberg 只读取order_time_month=2024-06分区的数据文件,跳过 1-5 月的分区。Hive 表需要依赖物理目录结构(order_time_month=2024-06/),Iceberg 的元数据驱动方式更加灵活。
SELECT order_id, product_name, order_time
FROM local.iceberg_db.orders
WHERE order_time >= TIMESTAMP '2024-06-01 00:00:00'
ORDER BY order_time
>>> June 2024 orders (partition pruning)
sql>
SELECT order_id, product_name, order_time
FROM local.iceberg_db.orders
WHERE ORDER_TIME >= TIMESTAMP '2024-06-01 00:00:00'
ORDER BY order_time
+--------+----------------------+-------------------+
|order_id|product_name |order_time |
+--------+----------------------+-------------------+
|1038 |USB-C Hub 7合1 |2024-06-01 08:15:00|
|1039 |T恤 夏季纯棉 |2024-06-03 10:00:00|
|1040 |Effective Java |2024-06-05 15:30:00|
|1041 |运动短裤 |2024-06-08 09:00:00|
|1042 |Air Conditioner |2024-06-10 11:00:00|
|1043 |Water Bottle Insulated|2024-06-12 13:00:00|
|1044 |算法导论 |2024-06-15 14:30:00|
|1045 |Wireless Mouse |2024-06-17 10:00:00|
|1046 |Bluetooth Headset |2024-06-19 16:00:00|
|1047 |Phone Case Premium |2024-06-21 08:30:00|
|1048 |Scala编程 |2024-06-23 11:15:00|
|1049 |Laptop Stand |2024-06-25 13:45:00|
|1050 |Pillow Memory Foam |2024-06-28 15:00:00|
+--------+----------------------+-------------------+UPDATE 与 DELETE
Iceberg 支持行级 UPDATE 和 DELETE 操作,这是传统 Parquet/ORC 文件无法做到的能力。Iceberg 通过 Copy-on-Write(写入时复制)或 Merge-on-Read(读取时合并)策略实现行级操作。执行 UPDATE/DELETE 后会创建新快照,旧数据文件保留用于时间旅行。
UPDATE 修改客户名
- UPDATE 操作创建新快照,旧数据保留在历史快照中。可以通过时间旅行查询更新前的数据。
UPDATE local.iceberg_db.customers SET name = '张伟(已更名)' WHERE customer_id = 1 >>> BEFORE update sql> SELECT customer_id, name FROM local.iceberg_db.customers WHERE customer_id = 1 +-----------+----+ |customer_id|name| +-----------+----+ |1 |张伟| +-----------+----+ >>> AFTER update sql> SELECT customer_id, name FROM local.iceberg_db.customers WHERE customer_id = 1 +-----------+------------+ |customer_id|name | +-----------+------------+ |1 |张伟(已更名)| +-----------+------------+
DELETE 删除已取消订单
- 删除了 5 条状态为 cancelled 的订单。DELETE 操作同样创建新快照,旧数据可追溯。
- Iceberg 的行级操作能力使其适合 GDPR 合规删除和实时数据更新场景。
DELETE FROM local.iceberg_db.orders WHERE status = 'cancelled'
>>> cancelled orders (BEFORE delete)
sql> SELECT order_id, status FROM local.iceberg_db.orders WHERE status = 'cancelled'
+--------+---------+
|order_id|status |
+--------+---------+
|1005 |cancelled|
|1010 |cancelled|
|1032 |cancelled|
|1045 |cancelled|
|1018 |cancelled|
+--------+---------+
count(local.iceberg_db.orders WHERE status = 'cancelled') = 0
[OK] All cancelled orders deleted, verified count = 0Schema 演进
Iceberg 的 Schema 演进是 metadata-only 操作,不涉及数据文件重写。Iceberg 为每个列分配唯一的 column ID,Schema 演进只更新元数据中的列映射关系,历史数据文件的列 ID 保持不变。这使得 Iceberg 可以安全地进行 Schema 演进而不破坏历史数据。
ADD COLUMN 添加列
ALTER TABLE local.iceberg_db.customers ADD COLUMN phone STRING ALTER TABLE local.iceberg_db.customers ADD COLUMN loyalty_points INT DEFAULT 0 >>> Schema after ADD COLUMN sql> DESCRIBE local.iceberg_db.customers +-----------------+---------+-------+ |col_name |data_type|comment| +-----------------+---------+-------+ |customer_id |int |NULL | |name |string |NULL | |email |string |NULL | |country |string |NULL | |registration_date|date |NULL | |tier |string |NULL | |phone |string |NULL | |loyalty_points |int |NULL | +-----------------+---------+-------+
新增 phone 和 loyalty_points 两列。DEFAULT 0 表示新列的默认值,写入时不指定该字段会自动填充。
RENAME COLUMN 重命名列
ALTER TABLE local.iceberg_db.customers RENAME COLUMN name TO full_name >>> Schema after RENAME name → full_name sql> DESCRIBE local.iceberg_db.customers +-----------------+---------+-------+ |col_name |data_type|comment| +-----------------+---------+-------+ |customer_id |int |NULL | |full_name |string |NULL | |email |string |NULL | |country |string |NULL | |registration_date|date |NULL | |tier |string |NULL | |phone |string |NULL | |loyalty_points |int |NULL | +-----------------+---------+-------+
列名从 name 变更为 full_name,数据文件中的内容不受影响。Iceberg 内部通过 column ID 追踪列,rename 只更新元数据映射。
ALTER TYPE 类型提升
ALTER TABLE local.iceberg_db.customers ALTER COLUMN loyalty_points TYPE BIGINT >>> Schema after INT → BIGINT promotion sql> DESCRIBE local.iceberg_db.customers +-----------------+---------+-------+ |col_name |data_type|comment| +-----------------+---------+-------+ |customer_id |int |NULL | |full_name |string |NULL | |email |string |NULL | |country |string |NULL | |registration_date|date |NULL | |tier |string |NULL | |phone |string |NULL | |loyalty_points |bigint |NULL | +-----------------+---------+-------+
loyalty_points 从 INT 提升为 BIGINT。Iceberg 支持安全的类型提升(int→bigint,float→double),不兼容的类型变更会被拒绝。
DROP COLUMN 删除列
ALTER TABLE local.iceberg_db.customers DROP COLUMN registration_date >>> Schema after DROP registration_date sql> DESCRIBE local.iceberg_db.customers +--------------+---------+-------+ |col_name |data_type|comment| +--------------+---------+-------+ |customer_id |int |NULL | |full_name |string |NULL | |email |string |NULL | |country |string |NULL | |tier |string |NULL | |phone |string |NULL | |loyalty_points|bigint |NULL | +--------------+---------+-------+
删除了 registration_date 列。DROP COLUMN 是 metadata-only 操作,历史数据文件中该列的数据仍然存在,只是查询时不再返回。
REORDER COLUMN 调整列顺序
ALTER TABLE local.iceberg_db.customers ALTER COLUMN email FIRST >>> Schema after MOVE email FIRST sql> DESCRIBE local.iceberg_db.customers +--------------+---------+-------+ |col_name |data_type|comment| +--------------+---------+-------+ |email |string |NULL | |customer_id |int |NULL | |full_name |string |NULL | |country |string |NULL | |tier |string |NULL | |phone |string |NULL | |loyalty_points|bigint |NULL | +--------------+---------+-------+
email 列被移动到表的第一位。列顺序调整不影响数据存储,只是改变查询结果的显示顺序。
Schema 演进后数据验证
SELECT customer_id, full_name, email, phone, loyalty_points FROM local.iceberg_db.customers ORDER BY customer_id LIMIT 5 >>> Data still intact after schema evolution sql> SELECT customer_id, full_name, email, phone, loyalty_points FROM local.iceberg_db.customers ORDER BY customer_id LIMIT 5 +-----------+------------+--------------------+-----+--------------+ |customer_id|full_name |email |phone|loyalty_points| +-----------+------------+--------------------+-----+--------------+ |1 |张伟(已更名)|zhangwei@example.com|NULL |NULL | |2 |李娜 |lina@example.com |NULL |NULL | |3 |John Smith |jsmith@example.com |NULL |NULL | |4 |Emily Davis |edavis@example.com |NULL |NULL | |5 |James Wilson|jwilson@example.com |NULL |NULL | +-----------+------------+--------------------+-----+--------------+
经过前述的所有 Schema 演进操作后,原有数据完整保留。新增的 phone 和 loyalty_points 列为 NULL(未赋值),UPDATE 后的 full_name 也正确保留。这验证了 Iceberg Schema 演进的数据兼容性。
元数据表探查
Iceberg 表的元数据层包含四级结构:metadata file → manifest list → manifest → data file。Iceberg 提供元数据表(metadata tables)供用户直接查询这些元数据,无需访问底层文件。
$snapshots:列出所有快照及操作摘要$history:快照时间线及父子关系$files:列出所有数据文件$manifests:列出所有 manifest 文件
$snapshots 显示两个快照:第一个是初始 append(50 条记录),第二个是 DELETE 后的 overwrite(删除 5 条,剩余 45 条)
- spark.app.id:local-1779244843156,Spark 应用 ID,本地运行的 Spark 任务
- added-data-files:6,本次新增 6 个数据文件
- added-records:50,本次写入 50 条数据
- added-files-size:17455,新增文件总大小 17455 字节(约 17KB)
- changed-partition-count:6,本次操作影响 6 个分区
- total-records:50,表总数据量 50 条(首次写入)
- total-files-size:17455,表总大小 17455 字节(约 17KB)
- total-data-files:6,表总数据文件 6 个
- total-delete-files:0,表总删除文件 0 个
- total-position-deletes:0,位置删除数据 0 条
- total-equality-deletes:0,等值删除数据 0 条
- engine-name:spark,计算引擎为 Spark
- engine-version:3.5.6,Spark 版本 3.5.6
- iceberg-version:Apache Iceberg 1.8.1,Iceberg 版本 1.8.1
SELECT snapshot_id, committed_at, operation, summary
FROM local.iceberg_db.orders.snapshots
ORDER BY committed_at
>>> snapshots
|snapshot_id |committed_at |operation|summary
|2262767112950789139|2026-05-20 02:40:54.961|append|{spark.app.id -> local-1779244843156, added-data-files -> 6, added-records -> 50, added-files-size -> 17455, changed-partition-count -> 6, total-records -> 50, total-files-size -> 17455, total-data-files -> 6, total-delete-files -> 0, total-position-deletes -> 0, total-equality-deletes -> 0, engine-version -> 3.5.6, app-id -> local-1779244843156, engine-name -> spark, iceberg-version -> Apache Iceberg 1.8.1 (commit 9ce0fcf0af7becf25ad9fc996c3bad2afdcfd33d)}|
|5067017495298132779|2026-05-20 02:40:58.261|overwrite|{spark.app.id -> local-1779244843156, added-data-files -> 5, deleted-data-files -> 5, added-records -> 38, deleted-records -> 43, added-files-size -> 14369, removed-files-size -> 14642, changed-partition-count -> 5, total-records -> 45, total-files-size -> 17182, total-data-files -> 6, total-delete-files -> 0, total-position-deletes -> 0, total-equality-deletes -> 0, engine-version -> 3.5.6, app-id -> local-1779244843156, engine-name -> spark, iceberg-version -> Apache Iceberg 1.8.1 (commit 9ce0fcf0af7becf25ad9fc9963bad2afdcfd33d)}|$history 展示快照的父子链,第一个快照无 parent(根快照)
SELECT made_current_at, snapshot_id, parent_id, is_current_ancestor FROM local.iceberg_db.orders.history ORDER BY made_current_at >>> history sql> SELECT made_current_at, snapshot_id, parent_id, is_current_ancestor FROM local.iceberg_db.orders.history ORDER BY made_current_at +-----------------------+-------------------+-------------------+-------------------+ |made_current_at |snapshot_id |parent_id |is_current_ancestor| +-----------------------+-------------------+-------------------+-------------------+ |2026-05-20 02:40:54.961|2262767112950789139|NULL |true | |2026-05-20 02:40:58.261|5067017495298132779|2262767112950789139|true | +-----------------------+-------------------+-------------------+-------------------+
$files 列出 6 个 Parquet 文件,按月份分区分布
SELECT content, file_path, file_format, record_count FROM local.iceberg_db.orders.files >>> data files sql> SELECT content, file_path, file_format, record_count FROM local.iceberg_db.orders.files |content|file_path |file_format|record_count| |0 |/tmp/iceberg-warehouse/iceberg_db/orders/data/order_time_month=2024-01/00000-30-1f1b8605-71f6-4e42-bff3-59720a357572-0-00001.parquet|PARQUET |4 | |0 |/tmp/iceberg-warehouse/iceberg_db/orders/data/order_time_month=2024-02/00000-30-1f1b8605-71f6-4e42-bff3-59720a357572-0-00002.parquet|PARQUET |4 | |0 |/tmp/iceberg-warehouse/iceberg_db/orders/data/order_time_month=2024-05/00000-30-1f1b8605-71f6-4e42-bff3-59720a357572-0-00004.parquet|PARQUET |10 | |0 |/tmp/iceberg-warehouse/iceberg_db/orders/data/order_time_month=2024-06/00000-30-1f1b8605-71f6-4e42-bff3-59720a357572-0-00003.parquet|PARQUET |12 | |0 |/tmp/iceberg-warehouse/iceberg_db/orders/data/order_time_month=2024-04/00000-30-1f1b8605-71f6-4e42-bff3-59720a357572-0-00003.parquet|PARQUET |8 | |0 |/tmp/iceberg-warehouse/iceberg_db/orders/data/order_time_month=2024-03/00000-30-1f1b8605-71f6-4e42-bff3-59720a357572-0-00001.parquet|PARQUET |7 |
$manifests 列出 2 个 manifest 文件,记录了数据文件的元信息
SELECT path, length, partition_spec_id FROM local.iceberg_db.orders.manifests >>> manifests sql> SELECT path, length, partition_spec_id FROM local.iceberg_db.orders.manifests |path |length|partition_spec_id| |/tmp/iceberg-warehouse/iceberg_db/orders/metadata/46ac57f8-4ee9-4070-9a0e-7b7c56e33ee6-m1.avro|8278|0| |/tmp/iceberg-warehouse/iceberg_db/orders/metadata/46ac57f8-4ee9-4070-9a0e-7b7c56e33ee6-m0.avro|8420|0|
时间旅行与快照回滚
Iceberg 的 MVCC 快照模型支持时间旅行查询。每个快照有唯一的 snapshot_id 和 commit 时间戳,可以随时回溯到历史状态。VERSION AS OF 通过快照 ID 查询,TIMESTAMP AS OF 通过时间戳查询。回滚操作是 O(1) 的元数据修改,不涉及数据复制。
VERSION AS OF 按快照 ID 查询
- 第一个快照包含 50 条订单(DELETE 之前的数据)。通过快照 ID 可以精确访问历史状态。
SELECT count(*) as cnt FROM local.iceberg_db.orders VERSION AS OF 2262767112950789139 >>> Count at first snapshot (before DELETE) sql> SELECT count(*) as cnt FROM local.iceberg_db.orders VERSION AS OF 2262767112950789139 +---+ |cnt| +---+ |50 | +---+
TIMESTAMP AS OF 按时间戳查询
- 时间戳查询返回该时刻对应的快照数据。Iceberg 自动找到最接近该时间戳的快照进行查询。
SELECT count(*) as cnt FROM local.iceberg_db.orders FOR TIMESTAMP AS OF '2026-05-20 02:40:54.961000' >>> Count at timestamp 2026-05-20 02:40:54.961000 sql> SELECT count(*) as cnt FROM local.iceberg_db.orders FOR TIMESTAMP AS OF '2026-05-20 02:40:54.961000' +---+ |cnt| +---+ |50 | +---+
rollback_to_snapshot 快照回滚
CALL local.system.rollback_to_snapshot('iceberg_db.orders', 2262767112950789139)
Rolling back to snapshot: 2262767112950789139
>>> Count after rollback
sql> SELECT count(*) as cnt FROM local.iceberg_db.orders
+---+
|cnt|
+---+
|50 |
+---+回滚到第一个快照后,表状态恢复到 DELETE 之前。回滚是元数据操作,速度极快,不涉及数据文件复制。
从旧快照恢复被删除的数据
- 从历史快照查询被删除的 cancelled 订单,重新插入当前表。注意这里恢复出 10 条(包含之前测试过程中累积的),因为回滚后再次执行 MERGE 等操作增加了数据。
INSERT INTO local.iceberg_db.orders SELECT * FROM local.iceberg_db.orders VERSION AS OF 2262767112950789139 WHERE status = 'cancelled' >>> Cancelled orders restored sql> SELECT count(*) as cnt FROM local.iceberg_db.orders WHERE status = 'cancelled' +---+ |cnt| +---+ |10 | +---+
验证回滚和恢复后的表状态
SELECT status, count(*) as cnt
FROM local.iceberg_db.orders
GROUP BY status
ORDER BY status
count(local.iceberg_db.orders WHERE 1=1) = 55
>>> Order status distribution
sql> SELECT status, count(*) as cnt FROM local.iceberg_db.orders GROUP BY status ORDER BY status
+---------+---+
|status |cnt|
+---------+---+
|cancelled|10 |
|completed|41 |
|pending |4 |
+---------+---+表现在有 55 条记录,包括 10 条已恢复的 cancelled 订单。时间旅行和回滚能力使得数据恢复变得简单可靠。
分区演进
分区演进允许修改已有表的分区策略,无需重写历史数据。随着数据量增长或查询模式变化,可能需要从细粒度分区(如按月)调整为粗粒度分区(如按年)。Iceberg 的分区规范与数据文件分离存储,旧数据保持原有分区规范,新数据使用新规范,查询时自动适配。
ALTER TABLE REPLACE PARTITION FIELD
ALTER TABLE local.iceberg_db.orders REPLACE PARTITION FIELD months(order_time) WITH years(order_time) >>> Partition spec after evolution (look for Partition Spec) sql> DESCRIBE EXTENDED local.iceberg_db.orders +----------------------------+------------------------------------------------+-------+ |col_name |data_type |comment| +----------------------------+------------------------------------------------+-------+ |order_id |int |NULL | |customer_id |int |NULL | |product_name |string |NULL | |category |string |NULL | |quantity |int |NULL | |price |double |NULL | |order_time |timestamp |NULL | |status |string |NULL | | | | | |# Partitioning | | | |Part 0 |years(order_time) | | | | | | |# Metadata Columns | | | |_spec_id |int |NULL | |_partition |struct<order_time_month:int,order_time_year:int>|NULL | |_file |string |NULL | |_pos |bigint |NULL | |_deleted |boolean |NULL | | | | | |# Detailed Table Information| | | +----------------------------+------------------------------------------------+-------+ only showing top 20 rows
分区规范从 months(order_time) 变为 years(order_time)。注意 _partition 元数据列同时包含 order_time_month 和 order_time_year,说明 Iceberg 保留了历史分区信息以支持对旧数据的透明查询。Hive 表的分区变更通常需要数据迁移,Iceberg 的分区演进更加灵活。
MERGE INTO 与 CDC
MERGE INTO 是 Iceberg 支持的原子 upsert 操作,类似于 “INSERT … ON CONFLICT UPDATE”。这对于 CDC(Change Data Capture)场景非常有用:定期从上游系统接收变更数据流,通过 MERGE INTO 同步到 Iceberg 表,支持增量更新和插入。
MERGE INTO 实现 CDC 同步
首先创建 staging 表作为 CDC 数据源:
CREATE TABLE local.iceberg_db.orders_staging (
order_id INT, customer_id INT, product_name STRING,
category STRING, quantity INT, price DOUBLE,
order_time TIMESTAMP,
status STRING
) USING iceberg插入 CDC 数据:
cdc_rows = [
(1001, 1, "MacBook Pro 16 (M4)", "Electronics", 1, 2799.00, dt(2024,1,5,9,23), "completed"),
(1051, 2, "Pixel Watch 3", "Electronics", 1, 349.00, dt(2024,6,30,10,0), "pending"),
(1052, 5, "Standing Desk", "Home", 1, 499.00, dt(2024,6,30,11,0), "completed"),
]
cdc_df = spark.createDataFrame([Row(*r) for r in cdc_rows], schema=...)
cdc_df.writeTo(f"{DB}.orders_staging").append()查看合并前的数据:
>>> BEFORE MERGE: target rows sql> SELECT order_id, product_name, price, status FROM local.iceberg_db.orders WHERE order_id IN (1001, 1051, 1052) ORDER BY order_id +--------+--------------+------+---------+ |order_id|product_name |price |status | +--------+--------------+------+---------+ |1001 |MacBook Pro 16|2499.0|completed| +--------+--------------+------+---------+ >>> BEFORE MERGE: source (CDC) rows sql> SELECT order_id, product_name, price, status FROM local.iceberg_db.orders_staging ORDER BY order_id +--------+-------------------+------+---------+ |order_id|product_name |price |status | +--------+-------------------+------+---------+ |1001 |MacBook Pro 16 (M4)|2799.0|completed| |1051 |Pixel Watch 3 |349.0 |pending | |1052 |Standing Desk |499.0 |completed| +--------+-------------------+------+---------+
执行 MERGE INTO:
MERGE INTO local.iceberg_db.orders t USING local.iceberg_db.orders_staging s ON t.order_id = s.order_id WHEN MATCHED THEN UPDATE SET * # 用源表的所有字段覆盖目标表 WHEN NOT MATCHED THEN INSERT * # 新数据直接插入目标表
查看合并后的结果:
- order_id=1001 已存在,执行 UPDATE(价格从 2499.0 更新为 2799.0)
- order_id=1051 和 1052 不存在,执行 INSERT
- MERGE INTO 是原子操作,保证数据一致性
>>> AFTER MERGE: updated + inserted rows sql> SELECT order_id, product_name, price, status FROM local.iceberg_db.orders WHERE order_id IN (1001, 1051, 1052) ORDER BY order_id +--------+-------------------+------+---------+ |order_id|product_name |price |status | +--------+-------------------+------+---------+ |1001 |MacBook Pro 16 (M4)|2799.0|completed| |1051 |Pixel Watch 3 |349.0 |pending | |1052 |Standing Desk |499.0 |completed| +--------+-------------------+------+---------+
这是典型的 CDC 同步模式,staging 表存储来自上游的变更数据,通过 MERGE INTO 同步到目标表。
表维护
Iceberg 表在长期运行中会产生三类维护问题:
- 小文件问题:频繁的小规模写入导致大量小文件,影响查询性能
- 快照膨胀:每个写入操作产生新快照,历史快照占用空间
- 孤儿文件:compaction 或其他操作失败后遗留的未引用文件
Iceberg 提供系统存储过程进行维护:
rewrite_data_files(压缩)expire_snapshots(过期快照)remove_orphan_files(清理孤儿文件)
制造碎片文件
for i in range(10):
spark.sql(f"""
INSERT INTO {DB}.orders
SELECT 1060 + {i}, 1, 'Test Product {i}', 'Home', 1, {9.90 + i},
TIMESTAMP '2024-07-0{(i%9)+1} 10:00:00', 'completed'
""")
Data files BEFORE compaction: 21通过循环插入 10 条记录,创建了 10 个新的小文件。加上原有的 11 个文件,总计 21 个数据文件。
Compaction 压缩数据文件。rewrite_data_files 将 21 个小文件合并为 2 个大文件(目标大小 128MB)。Compaction 显著提升查询性能,减少文件元数据开销。
CALL local.system.rewrite_data_files(
table => 'iceberg_db.orders',
options => map('target-file-size-bytes', '134217728')
)
Data files AFTER compaction: 2
Files reduced: 21 → 2expire_snapshots 过期快照。retain_last=3 保留最近 3 个快照,其余 12 个快照被过期。older_than 设置为遥远的未来时间,确保只按 retain_last 参数过期。过期快照后,其关联的数据文件如果不再被其他快照引用,将成为孤儿文件。
CALL local.system.expire_snapshots(
table => 'iceberg_db.orders',
older_than => TIMESTAMP '2099-01-01 00:00:00',
retain_last => 3
)
Snapshots BEFORE expiration: 15
Snapshots AFTER expiration (retain_last=3): 3remove_orphan_files 清理孤儿文件。清理因 compaction 和快照过期产生的孤儿文件。建议定期执行维护任务(如每天或每周),保持表健康。
CALL local.system.remove_orphan_files(
table => 'iceberg_db.orders'
)
[OK] Orphan file cleanup done视图与总结
创建临时视图
- Hadoop Catalog 不支持持久化视图,使用临时视图代替。视图是对 Iceberg 表的查询抽象,可以简化复杂分析。
CREATE TEMP VIEW v_order_summary AS
SELECT
year(order_time) AS order_year,
month(order_time) AS order_month,
category,
count(*) AS order_count,
sum(quantity) AS total_quantity,
round(sum(price * quantity), 2) AS total_revenue
FROM local.iceberg_db.orders
GROUP BY year(order_time), month(order_time), category
ORDER BY order_year, order_month, category
NOTE: Hadoop Catalog does not support persistent views, using temp view instead
>>> Temp view query result
sql> SELECT * FROM v_order_summary
+----------+-----------+-----------+-----------+--------------+-------------+
|order_year|order_month|category |order_count|total_quantity|total_revenue|
+----------+-----------+-----------+-----------+--------------+-------------+
|2024 |1 |Books |2 |2 |138.9 |
|2024 |1 |Electronics|4 |5 |3995.0 |
|2024 |2 |Books |1 |1 |69.0 |
|2024 |2 |Clothing |1 |1 |199.0 |
|2024 |2 |Electronics|3 |3 |557.0 |
|2024 |2 |Home |1 |2 |91.0 |
|2024 |3 |Books |2 |2 |164.0 |
|2024 |3 |Clothing |1 |1 |129.0 |
|2024 |3 |Electronics|3 |3 |1547.0 |
|2024 |3 |Home |1 |1 |89.99 |
|2024 |4 |Books |4 |4 |326.0 |
|2024 |4 |Clothing |1 |2 |316.0 |
|2024 |4 |Electronics|4 |4 |1056.0 |
|2024 |4 |Home |1 |1 |35.0 |
|2024 |5 |Books |4 |4 |258.0 |
|2024 |5 |Clothing |1 |3 |179.7 |
|2024 |5 |Electronics|4 |4 |1546.0 |
|2024 |5 |Home |3 |4 |538.0 |
|2024 |6 |Books |3 |3 |269.0 |
|2024 |6 |Clothing |2 |3 |128.8 |
+----------+-----------+-----------+-----------+--------------+-------------+
only showing top 20 rows到此这篇关于基于Pyspark对Apache Iceberg核心功能的使用实践指南的文章就介绍到这了,更多相关Pyspark Apache Iceberg使用内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!
