Paimon 分区过期机制详解
总结
- 更新时间:2026-03-03
- 本文档详细分析 Paimon 分区过期(Partition Expiration)的源码实现和测试验证
- 触发时机:每次 commit 后在 maintain 阶段检查,受
partition.expiration-check-interval控制 - 过期策略:
values-time(默认):从分区值提取时间,与当前时间比较update-time:比较分区最后修改时间
- 删除机制:创建 DELETE 类型的 Manifest 条目(Overwrite 空数据),文件真实删除由 snapshot 过期处理
- 关键发现:
- 分区过期在 commit 后异步执行(默认 ASYNC 模式)
partition.timestamp-formatter用于解析分区值中的时间,格式不匹配时分区不会被过期(静默跳过)- 分区删除不是立即物理删除文件,而是标记删除
一、测试表与验证
1.1 测试表定义
CREATE TABLE paimon.pm_marketing.matched_goods_record_ma (
task_id STRING COMMENT '任务ID',
user_id STRING COMMENT '用户ID (member_id或device_id)',
goods_sn STRING COMMENT '商品编码',
match_time TIMESTAMP COMMENT '匹配时间',
dt STRING COMMENT '分区字段 yyyyMMddHH'
) PARTITIONED BY (dt)
WITH (
'bucket' = '20',
'bucket-key' = 'user_id',
'partition.expiration-time' = '3 d',
'partition.expiration-check-interval' = '1 h',
'partition.timestamp-formatter' = 'yyyyMMddHH'
);
1.2 配置解读
| 配置 | 值 | 含义 |
|---|---|---|
| partition.expiration-time | 3 d | 分区超过 3 天过期 |
| partition.expiration-check-interval | 1 h | 每 1 小时检查一次是否有分区需要过期 |
| partition.timestamp-formatter | yyyyMMddHH | 分区值 dt 的时间格式,如 2026030112 表示 2026-03-01 12:00 |
1.3 过期判断示例
假设当前时间为 2026-03-03 15:00:00,过期时间为 3 天:
过期阈值 = 2026-03-03 15:00:00 - 3d = 2026-02-28 15:00:00
dt=2026022800 → 解析为 2026-02-28 00:00:00 → 早于阈值 → ✅ 过期
dt=2026022815 → 解析为 2026-02-28 15:00:00 → 等于阈值 → ❌ 不过期(isAfter,不含等于)
dt=2026022816 → 解析为 2026-02-28 16:00:00 → 晚于阈值 → ❌ 不过期
dt=2026030100 → 解析为 2026-03-01 00:00:00 → 晚于阈值 → ❌ 不过期
1.4 本地验证测试
-- Spark SQL:建表
CREATE TABLE paimon.test_db.expire_test (
id BIGINT,
name STRING,
dt STRING COMMENT '分区字段 yyyyMMddHH'
) PARTITIONED BY (dt)
TBLPROPERTIES (
'bucket' = '2',
'bucket-key' = 'id', -- append 表固定桶必须指定 bucket-key
'partition.expiration-time' = '1 min', -- 测试用:1 分钟过期
'partition.expiration-check-interval' = '0 s', -- 测试用:每次 commit 都检查
'partition.timestamp-formatter' = 'yyyyMMddHH'
);
-- 插入多个分区的数据
INSERT INTO paimon.test_db.expire_test VALUES
(1, 'a', '2026020100'), -- 远早于当前时间,应该过期
(2, 'b', '2026020200'), -- 远早于当前时间,应该过期
(3, 'c', '2099120100'); -- 远晚于当前时间,不应该过期
-- ✅ 已测试(Spark 3.3.4 + Paimon 1.3.1,2026-03-03)
-- 插入完成后立即查看分区:
SHOW PARTITIONS paimon.test_db.expire_test;
-- 结果:只剩 dt=2099120100(过期分区在 commit 阶段已被清理)
SELECT * FROM paimon.test_db.expire_test;
-- 结果:
-- id=3, name=c, dt=2099120100
--
-- dt=2026020100 和 dt=2026020200 已被过期删除 ✅
-- 无需额外的 INSERT 触发,同一次 commit 后的 maintain 阶段就完成了过期清理
二、源码分析:触发链路
2.1 完整调用链
写入数据 → commit → maintain → PartitionExpire.expire()
具体代码路径:
TableCommitImpl.commitMultiple(committables)
│
├── commit.commit(committable) // 提交数据
│
└── maintain(identifier, maintainExecutor, doExpire) // 提交后维护
│
└── [在 executor 线程中执行]
│
├── expireSnapshots.run() // 过期快照
│
└── partitionExpire.expire(identifier) // ⭐ 过期分区
源码位置:TableCommitImpl.java
public void commitMultiple(List<ManifestCommittable> committables, boolean checkAppendFiles) {
// ... 提交数据 ...
if (!committables.isEmpty()) {
maintain(
committables.get(committables.size() - 1).identifier(),
maintainExecutor,
newSnapshots > 0 || expireForEmptyCommit);
}
}
2.2 检查间隔控制
PartitionExpire.expire() 不是每次都执行,而是根据 checkInterval 控制频率:
// PartitionExpire.java
List<Map<String, String>> expire(LocalDateTime now, long commitIdentifier) {
if (checkInterval.isZero() // 间隔为 0,每次都检查
|| now.isAfter(lastCheck.plus(checkInterval)) // 距上次检查超过间隔
|| (endInputCheckPartitionExpire && Long.MAX_VALUE == commitIdentifier)) { // Batch EndInput
List<Map<String, String>> expired = doExpire(now.minus(expirationTime), commitIdentifier);
lastCheck = now;
return expired;
}
return null; // 跳过本次检查
}
对于测试表(check-interval = 1h):
- Flink 流式任务每次 checkpoint 后 commit → 调用
maintain→ 调用expire - 但只有距上次检查超过 1 小时才真正执行过期检查
lastCheck是内存状态,任务重启后立即执行一次检查
2.3 异步执行
默认使用异步模式(snapshot.expire-execution-mode = async),分区过期不阻塞主线程:
// TableCommitImpl.java
private void maintain(long identifier, ExecutorService executor, boolean doExpire) {
executor.execute(() -> { // 异步线程中执行
maintain(identifier, doExpire);
});
}
三、源码分析:过期判断
3.1 策略选择
Paimon 支持两种过期策略,由 partition.expiration-strategy 控制(默认 values-time):
| 策略 | 判断依据 | 适用场景 |
|---|---|---|
| values-time(默认) | 从分区值提取时间 | 分区值本身就是时间(如 dt=20260301) |
| update-time | 分区最后修改时间 | 分区值非时间格式 |
3.2 values-time 策略(本表使用)
源码位置:PartitionValuesTimeExpireStrategy.java
public List<PartitionEntry> selectExpiredPartitions(
FileStoreScan scan, LocalDateTime expirationTime) {
return scan.withPartitionFilter(new PartitionValuesTimePredicate(expirationTime))
.readPartitionEntries();
}
// 内部谓词
class PartitionValuesTimePredicate implements PartitionPredicate {
public boolean test(BinaryRow partition) {
Object[] array = convertPartition(partition);
try {
LocalDateTime partTime = timeExtractor.extract(partitionKeys, Arrays.asList(array));
return expireDateTime.isAfter(partTime); // ⭐ 严格大于
} catch (DateTimeParseException e) {
LOG.warn("Can't extract datetime from partition {}...", ...);
return false; // 解析失败 → 不过期
}
}
}
关键点:
- 使用
isAfter(严格大于),等于阈值的分区不会被过期 - 解析失败的分区不会被过期,只打 WARN 日志
3.3 时间提取器
源码位置:PartitionTimeExtractor.java
对于本表(单分区字段 dt,formatter = yyyyMMddHH):
public LocalDateTime extract(List<String> partitionKeys, List<?> partitionValues) {
// pattern == null,使用第一个分区字段的值
String timestampString = partitionValues.get(0).toString(); // e.g., "2026030112"
return toLocalDateTime(timestampString, this.formatter); // 使用 yyyyMMddHH 解析
}
private static LocalDateTime toLocalDateTime(String timestampString, String formatterPattern) {
DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern(formatterPattern, Locale.ROOT);
try {
return LocalDateTime.parse(timestampString, dateTimeFormatter);
} catch (DateTimeParseException e) {
// 回退:尝试按 LocalDate 解析,时间设为 00:00:00
return LocalDateTime.of(
LocalDate.parse(timestampString, dateTimeFormatter),
LocalTime.MIDNIGHT);
}
}
解析流程(以 dt=2026030112 为例):
- 取分区值:
"2026030112" - 使用
yyyyMMddHH格式化器解析 - 结果:
2026-03-01T12:00:00
四、源码分析:分区删除
4.1 删除过程
// PartitionExpire.java
private List<Map<String, String>> doExpire(LocalDateTime expireDateTime, long commitIdentifier) {
// 1. 获取过期分区列表
List<PartitionEntry> partitionEntries =
strategy.selectExpiredPartitions(scan, expireDateTime);
// 2. 转换为分区字符串
List<Map<String, String>> expired = convertToPartitionString(expiredPartValues);
// 3. 限制单次过期数量(默认 100)
// 由 partition.expiration-max-num 控制
// 4. 分批或整体删除
if (expireBatchSize > 0 && expireBatchSize < expired.size()) {
Lists.partition(expired, expireBatchSize)
.forEach(batch -> doBatchExpire(batch, commitIdentifier));
} else {
doBatchExpire(expired, commitIdentifier);
}
return expired;
}
4.2 删除实现(Overwrite 空数据)
源码位置:FileStoreCommitImpl.java
public void dropPartitions(List<Map<String, String>> partitions, long commitIdentifier) {
// 构建分区过滤器
PartitionPredicate partitionFilter = ...;
// ⭐ 本质是 Overwrite 操作:用空数据覆盖目标分区
tryOverwritePartition(
partitionFilter,
emptyList(), // 新的 data 文件列表为空
emptyList(), // 新的 changelog 文件列表为空
commitIdentifier, ...);
}
删除不是物理删除:
- 在 Manifest 中标记分区的所有文件为 DELETE
- 创建新的 Snapshot 记录此操作
- 文件的物理删除由后续的 snapshot 过期 处理
Snapshot 1: [dt=20260228 → file1, file2] (ADD)
Snapshot 2: [dt=20260228 → file1, file2] (DELETE) ← 分区过期创建
Snapshot 3: snapshot过期 → 物理删除 file1, file2 ← 真正删除文件
五、完整流程图
┌─────────────────────────────────────────────────────────────────┐
│ Flink Streaming Job 运行中 │
└─────────────────────────────────────────────────────────────────┘
│
│ Checkpoint 完成
▼
┌─────────────────────┐
│ TableCommitImpl │
│ .commitMultiple() │
│ │
│ 1. commit 数据 │
│ 2. maintain(异步) │──────────────────────────────────┐
└─────────────────────┘ │
▼ (异步线程)
┌─────────────────────┐
│ PartitionExpire │
│ .expire(now, id) │
│ │
│ 检查间隔: │
│ now - lastCheck >= 1h?│
│ NO → return null │
│ YES ↓ │
└──────────┬────────────┘
│
▼
┌───────────────────────────────────┐
│ PartitionValuesTimeExpireStrategy │
│ .selectExpiredPartitions() │
│ │
│ 遍历所有分区: │
│ dt=2026022800 │
│ → 解析为 2026-02-28 00:00 │
│ → now - 3d = 2026-02-28 15:00 │
│ → 00:00 < 15:00 → ✅ 过期 │
│ │
│ dt=2026030100 │
│ → 解析为 2026-03-01 00:00 │
│ → 00:00 > 15:00? NO → ❌ 不过期 │
└──────────┬────────────────────────┘
│
▼
┌──────────────────────┐
│ FileStoreCommitImpl │
│ .dropPartitions() │
│ │
│ Overwrite 空数据: │
│ → DELETE Manifest 条目 │
│ → 新 Snapshot │
│ │
│ (文件物理删除由 │
│ snapshot过期处理) │
└──────────────────────┘
六、关键注意事项
6.1 分区过期只在有 commit 时触发
分区过期检查发生在 commit 之后的 maintain 阶段。如果没有数据写入(没有 commit),分区过期不会被触发。
影响:如果表长时间没有新数据写入,过期分区不会被自动清理。
解决方案:
-- 手动触发过期检查(Flink SQL)
CALL sys.expire_partitions('pm_marketing.matched_goods_record_ma');
6.2 timestamp-formatter 格式必须匹配
如果分区值无法按 partition.timestamp-formatter 解析,该分区不会被过期。
dt=2026-03-01 + formatter=yyyyMMddHH → ❌ 解析失败,分区不过期
dt=2026030112 + formatter=yyyyMMddHH → ✅ 解析成功
dt=abc + formatter=yyyyMMddHH → ❌ 解析失败,分区不过期
只有 WARN 日志,不会报错中断任务。
6.3 过期数量限制
partition.expiration-max-num(默认 100):单次最多过期 100 个分区- 如果过期分区超过 100 个,需要多次 commit 才能全部清理
- 可以通过
partition.expiration-batch-size分批删除防止 OOM
6.4 文件不会立即物理删除
分区过期只是在 Manifest 中标记 DELETE:
- 文件物理删除依赖 snapshot 过期
- 受
snapshot.time-retained(默认 1h)和snapshot.num-retained.min(默认 10)控制 - 如果 snapshot 不过期,文件会一直占用存储
6.5 写入已过期分区会报错
如果持续向已过期的分区写入数据,Paimon 会抛出异常防止无限重启:
RuntimeException: You are writing data to expired partitions,
and you can filter this data to avoid job failover.
解决方案:在上游过滤掉时间过老的数据。
6.6 lastCheck 重启后重置
lastCheck 是内存状态,不持久化:
- 任务重启后
lastCheck初始化为启动时间 - 第一次 commit 后立即执行一次过期检查(因为
now.isAfter(lastCheck.plus(checkInterval))为 true)
七、配置最佳实践
7.1 本表配置分析
'partition.expiration-time' = '3 d',
'partition.expiration-check-interval' = '1 h',
'partition.timestamp-formatter' = 'yyyyMMddHH'
合理性分析:
- 分区格式
yyyyMMddHH(精确到小时),过期时间 3 天 → 每天最多产生 24 个分区,3 天 = 72 个分区,未超过默认 max-num=100 - 检查间隔 1 小时 → 最坏情况下分区多存活 1 小时(3 天 + 1 小时)
- 使用默认策略
values-time→ 从分区值dt提取时间判断
7.2 建议补充配置
-- 当前配置已经合理,可选补充:
'partition.expiration-strategy' = 'values-time', -- 显式指定(虽然是默认值)
'snapshot.time-retained' = '1 h', -- 确保 snapshot 过期,文件及时物理删除
'snapshot.num-retained.min' = '3' -- snapshot 最少保留数
八、源码关键类索引
| 类 | 路径 | 职责 |
|---|---|---|
| PartitionExpire | paimon-core/.../operation/PartitionExpire.java | 过期协调:控制检查频率,调用策略,触发删除 |
| PartitionValuesTimeExpireStrategy | paimon-core/.../partition/PartitionValuesTimeExpireStrategy.java | 基于分区值时间的过期判断 |
| PartitionUpdateTimeExpireStrategy | paimon-core/.../partition/PartitionUpdateTimeExpireStrategy.java | 基于分区修改时间的过期判断 |
| PartitionTimeExtractor | paimon-core/.../partition/PartitionTimeExtractor.java | 从分区值中提取时间戳 |
| TableCommitImpl | paimon-core/.../table/sink/TableCommitImpl.java | commit 后触发 maintain → 分区过期 |
| FileStoreCommitImpl | paimon-core/.../operation/FileStoreCommitImpl.java | 执行分区删除(dropPartitions) |
| CoreOptions | paimon-api/.../CoreOptions.java | 配置定义 |
文档版本:v1.0
创建时间:2026-03-03
基于版本:Paimon 1.3.1 源码分析