跳过正文
Background Image
  1. Posts/

Pulsar(3) 消息丢失Bug分析

·978 字·2 分钟·
目录

Pulsar(3) —— 消息丢失Bug分析
#

问题背景
#

环境与配置
#

  • Broker 版本: 4.1.1
  • Client 版本: 4.1.1 (Java)
  • JDK: OpenJDK 24.0.2
  • OS: Linux 5.4.241

Consumer 关键配置
#

.enableBatchIndexAcknowledgement(true)  // 开启批量消息索引确认
.acknowledgmentGroupTime(100, TimeUnit.MILLISECONDS)  // 确认聚合时间
.acknowledgementGroupSize(1000)  // 最大确认聚合数量
.ackMode = Individual  // 单独确认模式

Producer 关键配置
#

.batchingMaxMessages(1000)  // 最大批处理消息数

Bug 现象
#

在对一个分区主题进行生产和消费测试时,发现偶发性的单个消息丢失问题。

预期结果
#

--- Total Acked Messages per Subscription ---
Subscription [sub-1]: 1000000 acks
Subscription [sub-2]: 1000000 acks

实际结果
#

--- Total Acked Messages per Subscription ---
Subscription [sub-2]: 999999 acks  ← 少了一条消息
Subscription [sub-1]: 1000000 acks 
ERROR: [0:123:0:45] not received from [sub-2]!

关键特征
#

  1. 随机发生 - 并非每次必现
  2. 仅丢失一条消息 - 丢失数量固定为一个
  3. 随机 batch index - 丢失消息在 batch 中的索引随机
  4. Broker 有积压 - 但 consumer.receive()redeliverUnacknowledgedMessages() 都无法获取该消息
  5. 关闭某些特性可规避:
    • 设置 enableBatchIndexAcknowledgment = false 时问题消失
    • 设置 acknowledgmentGroupTime = 0 时问题消失
    • Go Client(不支持 Batch Index Ack)无此问题

DEBUG 日志线索
#

Flushing pending acks to broker: 
  last-cumulative-ack: [] 
  -- individual-acks: [] 
  -- individual-batch-index-acks: [(0, 123, {})]

原因分析
#

经过深入分析,发现问题根源在于 Netty Recycler 的错误使用 导致的竞态条件

核心问题定位
#

PersistentAcknowledgmentsGroupingTracker 类中:

// 问题场景:isDuplicate() 和 flushAsync() 之间发生竞态

@Override
public boolean isDuplicate(@NonNull MessageIdData messageId) {
    // ...
    if (type == AckType.Individual) {
        // 问题:这里使用的对象是 Netty Recycler 回收的
        BitSetRecyclable bitSet = BitSetRecyclable.create();
        bitSet.set(batchIndex);
        // 添加到 pendingIndividualBatchIndexAcks
        pendingIndividualBatchIndexAcks.put(msgId, bitSet);
    }
    return false;
}

private void flushAsync() {
    // ...
    for (Map.Entry<MessageIdImpl, BitSetRecyclable> entry : 
            pendingIndividualBatchIndexAcks.entrySet()) {
        // 问题:可能在 isDuplicate() 还在使用时就被 Recycler 回收了
        entry.getValue().recycle();  // ← 这里回收了对象
    }
    pendingIndividualBatchIndexAcks.clear();
}

Netty Recycler 工作原理
#

Netty 的 Recycler 是一种对象池机制:

  1. 创建: 对象不再使用时调用 recycle() 返回池中
  2. 复用: 需要时从池中取出,避免频繁内存分配
  3. 风险: 如果对象仍在被使用就被回收,会导致数据错乱

竞态条件时序
#

线程 A (isDuplicate)                线程 B (flushAsync)
--------------------------------    --------------------------------
1. 创建 BitSetRecyclable 对象
2. set(batchIndex)
3. put to map                       1. iterate map
                                    2. get Value (同一个对象引用)
                                    3. recycle() ← 回收了对象!
4. 返回                             
                                    4. clear map
                                    
5. [问题] 此时 BitSet 已被回收,
   可能被其他线程复用,数据被覆盖!

修复方案
#

PR 标题
#

[fix][client] Fix race condition between isDuplicate() and flushAsync() method in PersistentAcknowledgmentsGroupingTracker due to incorrect use Netty Recycler

核心修复思路
#

避免在 pendingIndividualBatchIndexAcks 中存储从 Recycler 获取的对象,改为在即将发送时才创建回收对象。

代码变更
#

// 修改前:存储 BitSetRecyclable 到 Map
pendingIndividualBatchIndexAcks.put(msgId, bitSet);

// 修改后:存储普通的 BitSet,仅在发送时转换为可回收对象
pendingIndividualBatchIndexAcks.put(msgId, new BitSet());
// 在 flushAsync 中发送前再包装为 BitSetRecyclable
BitSetRecyclable bitSetRecyclable = BitSetRecyclable.create();
bitSetRecyclable.or(bitSet);  // 复制数据
// 发送后回收
bitSetRecyclable.recycle();

关键改动点
#

  1. Map 类型变更: Map<MessageIdImpl, BitSetRecyclable>Map<MessageIdImpl, BitSet>
  2. 延迟创建: 仅在需要发送确认时才创建 BitSetRecyclable
  3. 立即回收: 发送完成后立即回收,确保生命周期可控

验证与测试
#

修复前测试结果
#

  • 运行 10 次,约 3-4 次出现消息丢失
  • 丢失数量均为 1 条

修复后测试结果
#

  • 运行 50 次,无消息丢失
  • 性能无明显退化

参考信息
#