C#操作RabbitMQ的常用方法及关键点总结
一、常用库选择
1. EasyNetQ(推荐快速开发)
- 特点:封装了底层API,提供简洁的API和异步支持
- 安装:Install-Package EasyNetQ
- 示例:
using var bus = RabbitHutch.CreateBus("host=localhost");
bus.Publish(new MyMessage { Content = "Hello RabbitMQ" });
bus.Subscribe<MyMessage>("subscriber1", msg => Console.WriteLine(msg.Content));
2. RabbitMQ.Client(官方底层库)
- 特点:完全控制消息队列配置,适合复杂场景
- 安装:Install-Package RabbitMQ.Client
- 示例:
var factory = new ConnectionFactory() { HostName = "localhost" };
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();
channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false);
二、核心操作流程
1. 发送消息
string message = "Test message";
byte[] body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "", routingKey: "hello", body: body);
2. 接收消息
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(message);
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
};
channel.BasicConsume(queue: "hello", autoAck: false, consumer: consumer);
三、关键配置项
1. 消息确认机制
- 自动确认:autoAck: true(简单场景)
- 手动确认:channel.BasicAck()(保证消息处理可靠性)
2. 持久化配置
- 队列持久化:durable: true
- 消息持久化:
BasicProperties.Persistent = true
3. 发布确认
channel.ConfirmSelect();
channel.BasicPublish(...);
if (!channel.WaitForConfirms(timeout))
throw new Exception("Publish failed");
四、高级功能
1. 消息TTL
var args = new Dictionary<string, object>();
args["x-message-ttl"] = 60000; // 1分钟
channel.QueueDeclare(..., arguments: args);
2. 死信队列
args["x-dead-letter-exchange"] = "dlx_exchange";
args["x-message-ttl"] = 30000; // 超时后转到死信队列
3. 优先级队列
args["x-max-priority"] = 10; // 设置队列最大优先级
var props = new BasicProperties { Priority = 5 }; // 设置消息优先级
五、注意事项
1. 连接管理
- 使用using语句确保资源释放
- 配置心跳机制:
factory.RequestedHeartbeat = 30;
2. 错误处理
- 捕获
BrokerUnreachableException等异常
- 实现重试机制(推荐 Polly 库)
3. 性能优化
- 批量发送:channel.PublishBatch()
- 调整预取计数:consumer.PrefetchCount = 100