大数据之rabbitmq 消息幂等性 去重表 / 唯一 ID 设计

大数据阿木 发布于 2025-07-12 13 次阅读


摘要:

在分布式系统中,消息队列是保证系统解耦、异步处理的重要组件。RabbitMQ作为一款流行的消息队列中间件,在处理大数据量时,如何保证消息的幂等性成为了一个关键问题。本文将围绕RabbitMQ消息幂等性设计,探讨去重表和唯一ID两种策略,并给出相应的代码实现。

一、

幂等性是指对于同一操作,多次执行与一次执行的结果相同。在消息队列中,幂等性确保了消息即使被重复消费,也不会对系统产生重复处理。本文将结合RabbitMQ,探讨如何设计幂等性策略。

二、去重表策略

去重表策略通过在数据库中创建一个去重表,记录已经消费的消息ID,从而避免重复消费。

1. 去重表设计

去重表可以设计为以下结构:

sql

CREATE TABLE message_unique (


id BIGINT AUTO_INCREMENT PRIMARY KEY,


message_id VARCHAR(255) NOT NULL,


status TINYINT NOT NULL DEFAULT 0,


create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP


);


其中,`message_id`字段存储消息的唯一标识,`status`字段表示消息的消费状态,`create_time`字段记录消息创建时间。

2. 代码实现

以下是一个使用去重表策略的RabbitMQ消费者示例:

python

import pika


import time

连接RabbitMQ


connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))


channel = connection.channel()

创建去重表


channel.queue_declare(queue='unique_queue')

def callback(ch, method, properties, body):


message_id = properties.headers.get('message_id')


检查消息是否已存在


cursor = connection.cursor()


cursor.execute("SELECT COUNT() FROM message_unique WHERE message_id = %s", (message_id,))


result = cursor.fetchone()


if result[0] == 0:


消费消息


print("Processing message:", body)


插入去重表


cursor.execute("INSERT INTO message_unique (message_id) VALUES (%s)", (message_id,))


connection.commit()


else:


print("Duplicate message:", body)

消费消息


channel.basic_consume(queue='unique_queue', on_message_callback=callback, auto_ack=False)

print('Waiting for messages. To exit press CTRL+C')


channel.start_consuming()


三、唯一ID策略

唯一ID策略通过为每条消息生成一个全局唯一的ID,确保消息的唯一性。

1. 唯一ID生成

唯一ID可以使用雪花算法(Snowflake Algorithm)生成,该算法可以生成一个64位的唯一ID,包含时间戳、数据中心ID、机器ID和序列号。

2. 代码实现

以下是一个使用唯一ID策略的RabbitMQ消费者示例:

python

import pika


import time


import snowflake

连接RabbitMQ


connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))


channel = connection.channel()

创建队列


channel.queue_declare(queue='unique_queue')

def generate_unique_id():


return str(snowflake.SnowflakeWorker(1, 1).get_id())

def callback(ch, method, properties, body):


message_id = generate_unique_id()


消费消息


print("Processing message:", body)


消息ID存储在消息头部


properties.headers['message_id'] = message_id

消费消息


channel.basic_consume(queue='unique_queue', on_message_callback=callback, auto_ack=False)

print('Waiting for messages. To exit press CTRL+C')


channel.start_consuming()


四、总结

本文介绍了RabbitMQ消息幂等性设计中的两种策略:去重表和唯一ID。去重表策略通过在数据库中记录消息ID,避免重复消费;唯一ID策略通过生成全局唯一的ID,确保消息的唯一性。在实际应用中,可以根据具体需求选择合适的策略,以保证系统的稳定性和可靠性。

注意:以上代码仅为示例,实际应用中需要根据具体情况进行调整。