1. 消息队列如何保证可靠性#

1.1. 消息持久化#

确保消息在系统崩溃或重启后不丢失

  • 为提升性能, broker 可能先写入内存缓冲区, 再异步刷盘(需配置同步刷盘以确保强一致性)
  • 异步刷盘可能在极短窗口内丢失消息(毫秒级),但吞吐量可提高 10 倍以上,业界常用

消息元数据(如 offset、时间戳)也需持久化,防止消费状态丢失

持久化存储通常结合定期清理(如按时间/大小删除)避免磁盘溢出

1.2. 副本机制(高可用,广泛使用)#

实现原理

  • 单点故障(如节点宕机)会导致服务不可用或数据丢失,副本机制确保高可用和容错
  • 消息在多个节点存储副本,主节点(leader)处理读写,从节点(follower)同步数据
  • 写操作需多数副本确认(quorum,如 3 副本中至少 2 个确认)才算成功
  • 主节点故障时,从节点选举为新主,自动接管服务

业界实践

  • Kafka 使用分区副本(ISR,In-Sync Replicas),RabbitMQ 使用镜像队列

  • 副本数通常设为 2-3,兼顾可靠性和存储成本(副本过多增加开销)

  • 同步 vs 异步复制

    • 同步复制(如 Kafka 的 acks=all)保证强一致性,但延迟稍高
    • 异步复制(如 Kafka 的 acks=1)性能更好,但可能丢少量消息,适用于对丢失敏感度低的场景

1.3. 消息确认机制(生产者与消费者,核心机制)#

实现原理:

生产者确认

  • 生产者发送消息后,等待 broker 确认(ack)
  • 确认级别
    • 全程无需确认(火力全开,性能最高,可能丢消息)
    • 主节点确认(折衷,业界常用)
    • 多数副本确认(最可靠,延迟稍高)

消费者确认

  • 消费者处理消息后,向 broker 发送确认(ack),broker 删除消息
  • 未收到确认的消息可重新投递(at-least-once 语义)
  • 自动确认(auto-ack)性能高,但可能丢失;手动确认(manual-ack)可靠,业界更常用

业界实践

  • Kafka:使用异步批量确认提高吞吐, 生产者配置 acks(0/1/all),消费者手动提交 offset
  • RabbitMQ:生产者用 publisher confirm,消费者用 manual ack
  • 优化:异步批量确认(如 Kafka 缓冲发送)或延迟确认(如 RabbitMQ 批量 ack)提升吞吐。

重复消息可能因网络抖动或重试产生,需幂等性支持

确认超时需重试,增加系统复杂性,但不可或缺

1.4. 幂等性与去重#

消息重试或网络问题可能导致重复发送/消费, 幂等性防止重复处理影响业务逻辑:

  • 当生产者发送消息到消息队列后,如果没有收到确认响应(可能因为网络问题或服务器处理延迟),生产者会认为消息发送失败并重新发送,导致相同消息被发送多次

幂等性是指一个操作执行一次和执行多次的结果是一样的, 下面是 消费者 实现代码:

// 库存服务处理消息的代码示例
public void processInventoryReduction(InventoryMessage message) {
    String messageId = message.getMessageId(); // 消息唯一ID
    
    // 检查消息是否已处理
    if (messageProcessRepository.isProcessed(messageId)) {
        log.info("消息{}已处理,忽略重复消息", messageId);
        return;
    }
    
    try {
        // 执行库存扣减逻辑
        inventoryService.reduce(message.getProductId(), message.getQuantity());
        
        // 标记消息为已处理
        messageProcessRepository.markAsProcessed(messageId);
    } catch (Exception e) {
        log.error("处理消息{}失败", messageId, e);
        throw e;
    }
}

是不是好奇幂等性是怎么实现的? messageProcessRepository 是全局的仓库吗 也就是说 所有的客户端共享这一个仓库 确保消息被处理一次, 如果每个客户端单独维护一个仓库, 好像无法实现?

在多个消费者实例的情况下, messageProcessRepository 必须是全局共享的存储, 最常见的实现是一个数据库表, 记录已处理的消息ID, 而且不要忘了, 消费者也是运行在服务器上的, 而不是客户端

幂等性实现的完整架构

-- 消息处理记录表
CREATE TABLE message_process_record (
    message_id VARCHAR(50) PRIMARY KEY,  -- 消息唯一ID作为主键确保唯一性
    consumer_group VARCHAR(50) NOT NULL, -- 消费者组标识
    process_time TIMESTAMP NOT NULL,     -- 处理时间
    process_status VARCHAR(20) NOT NULL  -- 处理状态
);

-- 可选:添加索引优化查询
CREATE INDEX idx_consumer_message ON message_process_record(consumer_group, message_id);

消费者实现

@Service
public class InventoryConsumer {
    
    @Autowired
    private JdbcTemplate jdbcTemplate;
    
    @Autowired
    private InventoryService inventoryService;
    
    // 消费者组标识,通常从配置中获取
    private static final String CONSUMER_GROUP = "inventory-service";
    
    @RabbitListener(queues = "inventory-queue")
    public void processInventoryReduction(InventoryMessage message) {
        String messageId = message.getMessageId();
        
        // 检查消息是否已处理 - 使用共享数据库
        if (isMessageProcessed(messageId)) {
            log.info("消息{}已处理,忽略重复消息", messageId);
            return;
        }
        
        try {
            // 尝试记录消息处理状态 - 使用数据库唯一约束确保幂等
            if (!markMessageProcessing(messageId)) {
                log.info("消息{}已被其他实例处理,忽略", messageId);
                return;
            }
            
            // 执行库存扣减逻辑
            inventoryService.reduce(message.getProductId(), message.getQuantity());
            
            // 标记消息为已成功处理
            markMessageProcessed(messageId);
        } catch (Exception e) {
            log.error("处理消息{}失败", messageId, e);
            // 可选:标记消息处理失败,以便后续分析
            markMessageFailed(messageId, e.getMessage());
            throw e;
        }
    }
    
    private boolean isMessageProcessed(String messageId) {
        int count = jdbcTemplate.queryForObject(
            "SELECT COUNT(*) FROM message_process_record WHERE message_id = ? AND consumer_group = ?",
            Integer.class, messageId, CONSUMER_GROUP);
        return count > 0;
    }
    
    private boolean markMessageProcessing(String messageId) {
        try {
            jdbcTemplate.update(
                "INSERT INTO message_process_record (message_id, consumer_group, process_time, process_status) " +
                "VALUES (?, ?, NOW(), 'PROCESSING')",
                messageId, CONSUMER_GROUP);
            return true;
        } catch (DuplicateKeyException e) {
            // 如果消息ID已存在,说明已被处理或正在处理
            return false;
        }
    }
    
    private void markMessageProcessed(String messageId) {
        jdbcTemplate.update(
            "UPDATE message_process_record SET process_status = 'PROCESSED', process_time = NOW() " +
            "WHERE message_id = ? AND consumer_group = ?",
            messageId, CONSUMER_GROUP);
    }
    
    private void markMessageFailed(String messageId, String errorMessage) {
        jdbcTemplate.update(
            "UPDATE message_process_record SET process_status = 'FAILED', error_message = ?, process_time = NOW() " +
            "WHERE message_id = ? AND consumer_group = ?",
            errorMessage, messageId, CONSUMER_GROUP);
    }
}

对于需要更强一致性保证的场景,可以结合分布式锁:

@Autowired
private RedissonClient redissonClient;

public void processInventoryReduction(InventoryMessage message) {
    String messageId = message.getMessageId();
    String lockKey = "inventory_msg_lock:" + messageId;
    
    // 检查消息是否已处理
    if (isMessageProcessed(messageId)) {
        log.info("消息{}已处理,忽略重复消息", messageId);
        return;
    }
    
    // 获取分布式锁,确保同一时间只有一个实例处理该消息
    RLock lock = redissonClient.getLock(lockKey);
    try {
        // 尝试获取锁,等待2秒,持有锁10秒
        if (lock.tryLock(2, 10, TimeUnit.SECONDS)) {
            try {
                // 再次检查,防止锁等待期间其他实例已处理
                if (isMessageProcessed(messageId)) {
                    log.info("获取锁后再次检查:消息{}已处理", messageId);
                    return;
                }
                
                // 标记为处理中
                markMessageProcessing(messageId);
                
                // 执行业务逻辑
                inventoryService.reduce(message.getProductId(), message.getQuantity());
                
                // 标记为已处理
                markMessageProcessed(messageId);
            } finally {
                lock.unlock(); // 确保释放锁
            }
        } else {
            log.info("无法获取消息{}的处理锁,跳过处理", messageId);
        }
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
        log.error("获取锁被中断", e);
    }
}

2. Redis List 简易消息队列可靠性分析#

无论什么, 消息队列的可靠性都可以参考上面的规范, 这里我们主要讨论 redis 持久化, 所以上面的很多都没考虑, 保证消息的可靠性分为两个方面:

  • 生产者使用消息持久化
  • 消费者使用重试确认机制

2.1. 生产者 (Redis 持久化)#

启用 Redis 的 AOF(Append-Only File)持久化,确保消息写入 List 后, 即使 Redis 重启也能恢复, AOF 的 everysec 模式适合大多数场景,若需更高可靠性可改为always:

appendonly yes
appendfsync everysec  # 每秒同步,平衡性能与可靠性

2.2. 消费者#

可靠取出:使用 RPOPLPUSH(而非简单的 RPOP),将消息从主队列(如 email_queue)原子性地弹出并推入备份队列(如 backup_queue), 处理成功后再从备份队列移除,确保消息不丢失

def consume_task():
    try:
        order_id = redis_client.rpoplpush("email_queue", "backup_queue")
        if order_id:
            send_email(order_id)
            redis_client.lrem("backup_queue", 1, order_id)  # 确认处理成功
    except Exception as e:
        print(f"Error: {e}")
        # 任务仍在 backup_queue,可重试

失败重试:如果邮件发送失败(如网络问题),消费者可将任务重新推回主队列或记录到错误队列,稍后重试

def retry_task(order_id):
    redis_client.lpush("email_queue", order_id)  # 重新推入主队列

使用 ⁠RPOP 时的主要问题是 一旦任务被弹出,它就从队列中消失了, 使用 RPOPLPUSH 的场景:

初始状态: 
待处理队列 = [任务1, 任务2, 任务3]
处理中队列 = []

1. 应用程序执行 RPOPLPUSH → 将"任务3"从待处理队列移到处理中队列
   待处理队列 = [任务1, 任务2]
   处理中队列 = [任务3]
   
2. 应用程序开始处理"任务3"
   
3. 在处理过程中,应用程序崩溃
   
结果: "任务3"仍然存在于处理中队列,可以在系统重启后恢复

RPOPLPUSH 是一个原子操作,它确保元素从源列表移除并添加到目标列表的过程不会被中断