主题
RabbitMQ
RabbitMQ 是一个开源的消息代理和队列服务器,用于在分布式系统中实现异步消息传递。它实现了高级消息队列协议(AMQP),提供了可靠的消息传递机制,支持多种消息模式,包括点对点、发布/订阅等。
核心特性
- 可靠性:支持消息持久化、确认机制,确保消息不丢失
- 灵活的路由:支持多种交换器类型(direct、topic、fanout、headers)
- 集群支持:支持高可用集群部署
- 管理界面:提供 Web 管理界面,方便监控和管理
- 多协议支持:支持 AMQP、MQTT、STOMP 等多种协议
Docker 安装
使用 Docker 安装 RabbitMQ 是最简单快捷的方式:
bash
docker pull rabbitmq:3-management启动 RabbitMQ 容器(包含管理界面):
bash
docker run -d \
--name rabbitmq \
--restart=always \
-p 5672:5672 \
-p 15672:15672 \
-e RABBITMQ_DEFAULT_USER=admin \
-e RABBITMQ_DEFAULT_PASS=admin \
rabbitmq:3-management参数说明:
5672:AMQP 协议端口15672:Web 管理界面端口RABBITMQ_DEFAULT_USER:默认用户名RABBITMQ_DEFAULT_PASS:默认密码
访问管理界面
启动成功后,可以通过浏览器访问管理界面:
http://localhost:15672使用上面设置的用户名和密码登录(默认:admin/admin)。
Spring Boot 集成
添加依赖
在 pom.xml 中添加 RabbitMQ 依赖:
xml
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>配置连接
在 application.yml 中配置 RabbitMQ 连接信息:
yaml
spring:
rabbitmq:
host: localhost
port: 5672
username: admin
password: admin
virtual-host: /
# 开启发送确认
publisher-confirm-type: correlated
# 开启发送失败退回
publisher-returns: true
# 开启消费者确认
listener:
simple:
acknowledge-mode: manual配置类
创建 RabbitMQ 配置类,定义队列、交换器和绑定关系:
java
@Configuration
public class RabbitMQConfig {
// 队列名称
public static final String QUEUE_NAME = "test.queue";
// 交换器名称
public static final String EXCHANGE_NAME = "test.exchange";
// 路由键
public static final String ROUTING_KEY = "test.routing.key";
// 声明队列
@Bean
public Queue queue() {
return QueueBuilder.durable(QUEUE_NAME).build();
}
// 声明交换器(Direct 类型)
@Bean
public DirectExchange exchange() {
return new DirectExchange(EXCHANGE_NAME);
}
// 绑定队列和交换器
@Bean
public Binding binding() {
return BindingBuilder
.bind(queue())
.to(exchange())
.with(ROUTING_KEY);
}
}消息生产者
创建消息发送服务:
java
@Service
public class MessageProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMessage(String message) {
rabbitTemplate.convertAndSend(
RabbitMQConfig.EXCHANGE_NAME,
RabbitMQConfig.ROUTING_KEY,
message
);
}
}消息消费者
创建消息接收服务:
java
@Component
public class MessageConsumer {
@RabbitListener(queues = RabbitMQConfig.QUEUE_NAME)
public void handleMessage(String message, Channel channel, Message msg) {
try {
// 处理消息
System.out.println("收到消息: " + message);
// 手动确认消息
channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
// 拒绝消息并重新入队
channel.basicNack(
msg.getMessageProperties().getDeliveryTag(),
false,
true
);
}
}
}交换器类型
Direct Exchange
直接交换器,根据路由键精确匹配:
java
@Bean
public DirectExchange directExchange() {
return new DirectExchange("direct.exchange");
}Topic Exchange
主题交换器,支持通配符匹配:
java
@Bean
public TopicExchange topicExchange() {
return new TopicExchange("topic.exchange");
}
// 路由键示例:user.created、order.paidFanout Exchange
扇出交换器,广播消息到所有绑定的队列:
java
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("fanout.exchange");
}Headers Exchange
头部交换器,根据消息头匹配:
java
@Bean
public HeadersExchange headersExchange() {
return new HeadersExchange("headers.exchange");
}消息确认机制
生产者确认
确保消息成功发送到 RabbitMQ:
java
@Configuration
public class RabbitMQConfig implements RabbitTemplate.ConfirmCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void init() {
rabbitTemplate.setConfirmCallback(this);
}
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
System.out.println("消息发送成功");
} else {
System.out.println("消息发送失败: " + cause);
}
}
}消费者确认
确保消息被正确处理:
java
@RabbitListener(queues = "test.queue")
public void handleMessage(String message, Channel channel, Message msg) {
try {
// 处理业务逻辑
processMessage(message);
// 确认消息
channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
// 拒绝消息,不重新入队
channel.basicNack(
msg.getMessageProperties().getDeliveryTag(),
false,
false
);
}
}死信队列
处理无法被正常消费的消息:
java
@Configuration
public class DeadLetterConfig {
// 死信队列
@Bean
public Queue deadLetterQueue() {
return QueueBuilder.durable("dead.letter.queue").build();
}
// 死信交换器
@Bean
public DirectExchange deadLetterExchange() {
return new DirectExchange("dead.letter.exchange");
}
// 绑定死信队列
@Bean
public Binding deadLetterBinding() {
return BindingBuilder
.bind(deadLetterQueue())
.to(deadLetterExchange())
.with("dead.letter.routing.key");
}
// 普通队列,设置死信参数
@Bean
public Queue normalQueue() {
return QueueBuilder
.durable("normal.queue")
.withArgument("x-dead-letter-exchange", "dead.letter.exchange")
.withArgument("x-dead-letter-routing-key", "dead.letter.routing.key")
.build();
}
}延迟队列
实现消息延迟发送(使用 RabbitMQ 延迟插件):
java
@Bean
public Queue delayQueue() {
return QueueBuilder
.durable("delay.queue")
.withArgument("x-message-ttl", 60000) // 延迟 60 秒
.withArgument("x-dead-letter-exchange", "target.exchange")
.withArgument("x-dead-letter-routing-key", "target.routing.key")
.build();
}集群部署
单机多节点
在同一台机器上启动多个 RabbitMQ 节点:
bash
# 启动第一个节点
RABBITMQ_NODE_PORT=5672 RABBITMQ_NODENAME=rabbit@node1 rabbitmq-server -detached
# 启动第二个节点
RABBITMQ_NODE_PORT=5673 RABBITMQ_NODENAME=rabbit@node2 rabbitmq-server -detached
# 将节点2加入集群
rabbitmqctl -n rabbit@node2 stop_app
rabbitmqctl -n rabbit@node2 join_cluster rabbit@node1
rabbitmqctl -n rabbit@node2 start_app多机集群
在不同机器上部署 RabbitMQ 节点,配置相同的 Erlang Cookie,然后加入集群。
监控与管理
管理界面功能
- 概览:查看连接数、队列数、消息速率等
- 连接管理:查看和管理客户端连接
- 队列管理:查看队列状态、消息数量、消费者数量
- 交换器管理:查看和管理交换器
- 用户管理:管理用户和权限
命令行工具
bash
# 查看队列状态
rabbitmqctl list_queues
# 查看交换器
rabbitmqctl list_exchanges
# 查看绑定关系
rabbitmqctl list_bindings
# 查看连接
rabbitmqctl list_connections最佳实践
- 消息持久化:对于重要消息,设置持久化标志
- 合理设置队列长度:避免队列无限增长
- 使用确认机制:确保消息可靠传递
- 监控和告警:及时发现问题
- 合理使用交换器类型:根据业务场景选择合适的交换器
- 避免消息堆积:及时处理消息,防止队列阻塞
常见问题
消息丢失
- 确保消息和队列都设置了持久化
- 使用生产者确认机制
- 使用消费者确认机制
消息重复消费
- 实现幂等性处理
- 使用消息去重机制
性能优化
- 合理设置预取数量(prefetch)
- 使用批量确认
- 优化网络配置
RabbitMQ 作为成熟的消息中间件,在企业级应用中广泛使用,通过合理配置和使用,可以构建可靠、高性能的异步消息系统。