Paimon 分区过期机制详解

Paimon 分区过期机制详解

总结

  • 更新时间:2026-03-03
  • 本文档详细分析 Paimon 分区过期(Partition Expiration)的源码实现和测试验证
  • 触发时机:每次 commit 后在 maintain 阶段检查,受 partition.expiration-check-interval 控制
  • 过期策略
    • values-time(默认):从分区值提取时间,与当前时间比较
    • update-time:比较分区最后修改时间
  • 删除机制:创建 DELETE 类型的 Manifest 条目(Overwrite 空数据),文件真实删除由 snapshot 过期处理
  • 关键发现
    • 分区过期在 commit 后异步执行(默认 ASYNC 模式)
    • partition.timestamp-formatter 用于解析分区值中的时间,格式不匹配时分区不会被过期(静默跳过)
    • 分区删除不是立即物理删除文件,而是标记删除

一、测试表与验证

1.1 测试表定义

CREATE TABLE paimon.pm_marketing.matched_goods_record_ma (
    task_id STRING COMMENT '任务ID',
    user_id STRING COMMENT '用户ID (member_id或device_id)',
    goods_sn STRING COMMENT '商品编码',
    match_time TIMESTAMP COMMENT '匹配时间',
    dt STRING COMMENT '分区字段 yyyyMMddHH'
) PARTITIONED BY (dt)
WITH (
    'bucket' = '20',
    'bucket-key' = 'user_id',
    'partition.expiration-time' = '3 d',
    'partition.expiration-check-interval' = '1 h',
    'partition.timestamp-formatter' = 'yyyyMMddHH'
);

1.2 配置解读

配置 含义
partition.expiration-time 3 d 分区超过 3 天过期
partition.expiration-check-interval 1 h 每 1 小时检查一次是否有分区需要过期
partition.timestamp-formatter yyyyMMddHH 分区值 dt 的时间格式,如 2026030112 表示 2026-03-01 12:00

1.3 过期判断示例

假设当前时间为 2026-03-03 15:00:00,过期时间为 3 天:

过期阈值 = 2026-03-03 15:00:00 - 3d = 2026-02-28 15:00:00

dt=2026022800 → 解析为 2026-02-28 00:00:00 → 早于阈值 → ✅ 过期
dt=2026022815 → 解析为 2026-02-28 15:00:00 → 等于阈值   → ❌ 不过期(isAfter,不含等于)
dt=2026022816 → 解析为 2026-02-28 16:00:00 → 晚于阈值 → ❌ 不过期
dt=2026030100 → 解析为 2026-03-01 00:00:00 → 晚于阈值 → ❌ 不过期

1.4 本地验证测试

-- Spark SQL:建表
CREATE TABLE paimon.test_db.expire_test (
    id BIGINT,
    name STRING,
    dt STRING COMMENT '分区字段 yyyyMMddHH'
) PARTITIONED BY (dt)
TBLPROPERTIES (
    'bucket' = '2',
    'bucket-key' = 'id',                              -- append 表固定桶必须指定 bucket-key
    'partition.expiration-time' = '1 min',            -- 测试用:1 分钟过期
    'partition.expiration-check-interval' = '0 s',    -- 测试用:每次 commit 都检查
    'partition.timestamp-formatter' = 'yyyyMMddHH'
);

-- 插入多个分区的数据
INSERT INTO paimon.test_db.expire_test VALUES
    (1, 'a', '2026020100'),   -- 远早于当前时间,应该过期
    (2, 'b', '2026020200'),   -- 远早于当前时间,应该过期
    (3, 'c', '2099120100');   -- 远晚于当前时间,不应该过期

-- ✅ 已测试(Spark 3.3.4 + Paimon 1.3.1,2026-03-03)
-- 插入完成后立即查看分区:
SHOW PARTITIONS paimon.test_db.expire_test;
-- 结果:只剩 dt=2099120100(过期分区在 commit 阶段已被清理)

SELECT * FROM paimon.test_db.expire_test;
-- 结果:
-- id=3, name=c, dt=2099120100
--
-- dt=2026020100 和 dt=2026020200 已被过期删除 ✅
-- 无需额外的 INSERT 触发,同一次 commit 后的 maintain 阶段就完成了过期清理

二、源码分析:触发链路

2.1 完整调用链

写入数据 → commit → maintain → PartitionExpire.expire()

具体代码路径:

TableCommitImpl.commitMultiple(committables)
  │
  ├── commit.commit(committable)                    // 提交数据
  │
  └── maintain(identifier, maintainExecutor, doExpire)  // 提交后维护
        │
        └── [在 executor 线程中执行]
              │
              ├── expireSnapshots.run()              // 过期快照
              │
              └── partitionExpire.expire(identifier) // ⭐ 过期分区

源码位置:TableCommitImpl.java

public void commitMultiple(List<ManifestCommittable> committables, boolean checkAppendFiles) {
    // ... 提交数据 ...

    if (!committables.isEmpty()) {
        maintain(
            committables.get(committables.size() - 1).identifier(),
            maintainExecutor,
            newSnapshots > 0 || expireForEmptyCommit);
    }
}

2.2 检查间隔控制

PartitionExpire.expire() 不是每次都执行,而是根据 checkInterval 控制频率:

// PartitionExpire.java
List<Map<String, String>> expire(LocalDateTime now, long commitIdentifier) {
    if (checkInterval.isZero()                                      // 间隔为 0,每次都检查
            || now.isAfter(lastCheck.plus(checkInterval))           // 距上次检查超过间隔
            || (endInputCheckPartitionExpire && Long.MAX_VALUE == commitIdentifier)) {  // Batch EndInput
        List<Map<String, String>> expired = doExpire(now.minus(expirationTime), commitIdentifier);
        lastCheck = now;
        return expired;
    }
    return null;  // 跳过本次检查
}

对于测试表(check-interval = 1h)

  • Flink 流式任务每次 checkpoint 后 commit → 调用 maintain → 调用 expire
  • 但只有距上次检查超过 1 小时才真正执行过期检查
  • lastCheck 是内存状态,任务重启后立即执行一次检查

2.3 异步执行

默认使用异步模式(snapshot.expire-execution-mode = async),分区过期不阻塞主线程:

// TableCommitImpl.java
private void maintain(long identifier, ExecutorService executor, boolean doExpire) {
    executor.execute(() -> {   // 异步线程中执行
        maintain(identifier, doExpire);
    });
}

三、源码分析:过期判断

3.1 策略选择

Paimon 支持两种过期策略,由 partition.expiration-strategy 控制(默认 values-time):

策略 判断依据 适用场景
values-time(默认) 从分区值提取时间 分区值本身就是时间(如 dt=20260301)
update-time 分区最后修改时间 分区值非时间格式

3.2 values-time 策略(本表使用)

源码位置:PartitionValuesTimeExpireStrategy.java

public List<PartitionEntry> selectExpiredPartitions(
        FileStoreScan scan, LocalDateTime expirationTime) {
    return scan.withPartitionFilter(new PartitionValuesTimePredicate(expirationTime))
            .readPartitionEntries();
}

// 内部谓词
class PartitionValuesTimePredicate implements PartitionPredicate {
    public boolean test(BinaryRow partition) {
        Object[] array = convertPartition(partition);
        try {
            LocalDateTime partTime = timeExtractor.extract(partitionKeys, Arrays.asList(array));
            return expireDateTime.isAfter(partTime);  // ⭐ 严格大于
        } catch (DateTimeParseException e) {
            LOG.warn("Can't extract datetime from partition {}...", ...);
            return false;  // 解析失败 → 不过期
        }
    }
}

关键点

  • 使用 isAfter(严格大于),等于阈值的分区不会被过期
  • 解析失败的分区不会被过期,只打 WARN 日志

3.3 时间提取器

源码位置:PartitionTimeExtractor.java

对于本表(单分区字段 dt,formatter = yyyyMMddHH):

public LocalDateTime extract(List<String> partitionKeys, List<?> partitionValues) {
    // pattern == null,使用第一个分区字段的值
    String timestampString = partitionValues.get(0).toString();  // e.g., "2026030112"
    return toLocalDateTime(timestampString, this.formatter);     // 使用 yyyyMMddHH 解析
}

private static LocalDateTime toLocalDateTime(String timestampString, String formatterPattern) {
    DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern(formatterPattern, Locale.ROOT);
    try {
        return LocalDateTime.parse(timestampString, dateTimeFormatter);
    } catch (DateTimeParseException e) {
        // 回退:尝试按 LocalDate 解析,时间设为 00:00:00
        return LocalDateTime.of(
            LocalDate.parse(timestampString, dateTimeFormatter),
            LocalTime.MIDNIGHT);
    }
}

解析流程(以 dt=2026030112 为例):

  1. 取分区值:"2026030112"
  2. 使用 yyyyMMddHH 格式化器解析
  3. 结果:2026-03-01T12:00:00

四、源码分析:分区删除

4.1 删除过程

// PartitionExpire.java
private List<Map<String, String>> doExpire(LocalDateTime expireDateTime, long commitIdentifier) {
    // 1. 获取过期分区列表
    List<PartitionEntry> partitionEntries =
            strategy.selectExpiredPartitions(scan, expireDateTime);

    // 2. 转换为分区字符串
    List<Map<String, String>> expired = convertToPartitionString(expiredPartValues);

    // 3. 限制单次过期数量(默认 100)
    // 由 partition.expiration-max-num 控制

    // 4. 分批或整体删除
    if (expireBatchSize > 0 && expireBatchSize < expired.size()) {
        Lists.partition(expired, expireBatchSize)
                .forEach(batch -> doBatchExpire(batch, commitIdentifier));
    } else {
        doBatchExpire(expired, commitIdentifier);
    }
    return expired;
}

4.2 删除实现(Overwrite 空数据)

源码位置:FileStoreCommitImpl.java

public void dropPartitions(List<Map<String, String>> partitions, long commitIdentifier) {
    // 构建分区过滤器
    PartitionPredicate partitionFilter = ...;

    // ⭐ 本质是 Overwrite 操作:用空数据覆盖目标分区
    tryOverwritePartition(
            partitionFilter,
            emptyList(),    // 新的 data 文件列表为空
            emptyList(),    // 新的 changelog 文件列表为空
            commitIdentifier, ...);
}

删除不是物理删除

  1. 在 Manifest 中标记分区的所有文件为 DELETE
  2. 创建新的 Snapshot 记录此操作
  3. 文件的物理删除由后续的 snapshot 过期 处理
Snapshot 1: [dt=20260228 → file1, file2]  (ADD)
Snapshot 2: [dt=20260228 → file1, file2]  (DELETE)  ← 分区过期创建
Snapshot 3: snapshot过期 → 物理删除 file1, file2   ← 真正删除文件

五、完整流程图

┌─────────────────────────────────────────────────────────────────┐
│                  Flink Streaming Job 运行中                      │
└─────────────────────────────────────────────────────────────────┘
         │
         │ Checkpoint 完成
         ▼
┌─────────────────────┐
│ TableCommitImpl      │
│ .commitMultiple()    │
│                      │
│ 1. commit 数据       │
│ 2. maintain(异步)    │──────────────────────────────────┐
└─────────────────────┘                                   │
                                                          ▼ (异步线程)
                                              ┌─────────────────────┐
                                              │ PartitionExpire      │
                                              │ .expire(now, id)     │
                                              │                      │
                                              │ 检查间隔:             │
                                              │ now - lastCheck >= 1h?│
                                              │   NO → return null   │
                                              │   YES ↓               │
                                              └──────────┬────────────┘
                                                         │
                                                         ▼
                                   ┌───────────────────────────────────┐
                                   │ PartitionValuesTimeExpireStrategy │
                                   │ .selectExpiredPartitions()        │
                                   │                                   │
                                   │ 遍历所有分区:                      │
                                   │   dt=2026022800                   │
                                   │   → 解析为 2026-02-28 00:00       │
                                   │   → now - 3d = 2026-02-28 15:00  │
                                   │   → 00:00 < 15:00 → ✅ 过期       │
                                   │                                   │
                                   │   dt=2026030100                   │
                                   │   → 解析为 2026-03-01 00:00       │
                                   │   → 00:00 > 15:00? NO → ❌ 不过期 │
                                   └──────────┬────────────────────────┘
                                              │
                                              ▼
                                   ┌──────────────────────┐
                                   │ FileStoreCommitImpl   │
                                   │ .dropPartitions()     │
                                   │                       │
                                   │ Overwrite 空数据:      │
                                   │ → DELETE Manifest 条目 │
                                   │ → 新 Snapshot          │
                                   │                       │
                                   │ (文件物理删除由        │
                                   │  snapshot过期处理)     │
                                   └──────────────────────┘

六、关键注意事项

6.1 分区过期只在有 commit 时触发

分区过期检查发生在 commit 之后的 maintain 阶段。如果没有数据写入(没有 commit),分区过期不会被触发。

影响:如果表长时间没有新数据写入,过期分区不会被自动清理。

解决方案

-- 手动触发过期检查(Flink SQL)
CALL sys.expire_partitions('pm_marketing.matched_goods_record_ma');

6.2 timestamp-formatter 格式必须匹配

如果分区值无法按 partition.timestamp-formatter 解析,该分区不会被过期

dt=2026-03-01    + formatter=yyyyMMddHH  → ❌ 解析失败,分区不过期
dt=2026030112    + formatter=yyyyMMddHH  → ✅ 解析成功
dt=abc           + formatter=yyyyMMddHH  → ❌ 解析失败,分区不过期

只有 WARN 日志,不会报错中断任务。

6.3 过期数量限制

  • partition.expiration-max-num(默认 100):单次最多过期 100 个分区
  • 如果过期分区超过 100 个,需要多次 commit 才能全部清理
  • 可以通过 partition.expiration-batch-size 分批删除防止 OOM

6.4 文件不会立即物理删除

分区过期只是在 Manifest 中标记 DELETE:

  • 文件物理删除依赖 snapshot 过期
  • snapshot.time-retained(默认 1h)和 snapshot.num-retained.min(默认 10)控制
  • 如果 snapshot 不过期,文件会一直占用存储

6.5 写入已过期分区会报错

如果持续向已过期的分区写入数据,Paimon 会抛出异常防止无限重启:

RuntimeException: You are writing data to expired partitions,
and you can filter this data to avoid job failover.

解决方案:在上游过滤掉时间过老的数据。

6.6 lastCheck 重启后重置

lastCheck 是内存状态,不持久化:

  • 任务重启后 lastCheck 初始化为启动时间
  • 第一次 commit 后立即执行一次过期检查(因为 now.isAfter(lastCheck.plus(checkInterval)) 为 true)

七、配置最佳实践

7.1 本表配置分析

'partition.expiration-time' = '3 d',
'partition.expiration-check-interval' = '1 h',
'partition.timestamp-formatter' = 'yyyyMMddHH'

合理性分析

  • 分区格式 yyyyMMddHH(精确到小时),过期时间 3 天 → 每天最多产生 24 个分区,3 天 = 72 个分区,未超过默认 max-num=100
  • 检查间隔 1 小时 → 最坏情况下分区多存活 1 小时(3 天 + 1 小时)
  • 使用默认策略 values-time → 从分区值 dt 提取时间判断

7.2 建议补充配置

-- 当前配置已经合理,可选补充:
'partition.expiration-strategy' = 'values-time',   -- 显式指定(虽然是默认值)
'snapshot.time-retained' = '1 h',                  -- 确保 snapshot 过期,文件及时物理删除
'snapshot.num-retained.min' = '3'                  -- snapshot 最少保留数

八、源码关键类索引

路径 职责
PartitionExpire paimon-core/.../operation/PartitionExpire.java 过期协调:控制检查频率,调用策略,触发删除
PartitionValuesTimeExpireStrategy paimon-core/.../partition/PartitionValuesTimeExpireStrategy.java 基于分区值时间的过期判断
PartitionUpdateTimeExpireStrategy paimon-core/.../partition/PartitionUpdateTimeExpireStrategy.java 基于分区修改时间的过期判断
PartitionTimeExtractor paimon-core/.../partition/PartitionTimeExtractor.java 从分区值中提取时间戳
TableCommitImpl paimon-core/.../table/sink/TableCommitImpl.java commit 后触发 maintain → 分区过期
FileStoreCommitImpl paimon-core/.../operation/FileStoreCommitImpl.java 执行分区删除(dropPartitions)
CoreOptions paimon-api/.../CoreOptions.java 配置定义

文档版本:v1.0
创建时间:2026-03-03
基于版本:Paimon 1.3.1 源码分析

Fail2ban + MSMTP 邮件告警配置指南 2026-02-13
ESP 8266测试 2026-03-21

评论区