Skip to content

RabbitMQ

RabbitMQ 是一个开源的消息代理和队列服务器,用于在分布式系统中实现异步消息传递。它实现了高级消息队列协议(AMQP),提供了可靠的消息传递机制,支持多种消息模式,包括点对点、发布/订阅等。

核心特性

  • 可靠性:支持消息持久化、确认机制,确保消息不丢失
  • 灵活的路由:支持多种交换器类型(direct、topic、fanout、headers)
  • 集群支持:支持高可用集群部署
  • 管理界面:提供 Web 管理界面,方便监控和管理
  • 多协议支持:支持 AMQP、MQTT、STOMP 等多种协议

Docker 安装

使用 Docker 安装 RabbitMQ 是最简单快捷的方式:

bash
docker pull rabbitmq:3-management

启动 RabbitMQ 容器(包含管理界面):

bash
docker run -d \
  --name rabbitmq \
  --restart=always \
  -p 5672:5672 \
  -p 15672:15672 \
  -e RABBITMQ_DEFAULT_USER=admin \
  -e RABBITMQ_DEFAULT_PASS=admin \
  rabbitmq:3-management

参数说明:

  • 5672:AMQP 协议端口
  • 15672:Web 管理界面端口
  • RABBITMQ_DEFAULT_USER:默认用户名
  • RABBITMQ_DEFAULT_PASS:默认密码

访问管理界面

启动成功后,可以通过浏览器访问管理界面:

http://localhost:15672

使用上面设置的用户名和密码登录(默认:admin/admin)。

Spring Boot 集成

添加依赖

pom.xml 中添加 RabbitMQ 依赖:

xml
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

配置连接

application.yml 中配置 RabbitMQ 连接信息:

yaml
spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: admin
    password: admin
    virtual-host: /
    # 开启发送确认
    publisher-confirm-type: correlated
    # 开启发送失败退回
    publisher-returns: true
    # 开启消费者确认
    listener:
      simple:
        acknowledge-mode: manual

配置类

创建 RabbitMQ 配置类,定义队列、交换器和绑定关系:

java
@Configuration
public class RabbitMQConfig {
    
    // 队列名称
    public static final String QUEUE_NAME = "test.queue";
    // 交换器名称
    public static final String EXCHANGE_NAME = "test.exchange";
    // 路由键
    public static final String ROUTING_KEY = "test.routing.key";
    
    // 声明队列
    @Bean
    public Queue queue() {
        return QueueBuilder.durable(QUEUE_NAME).build();
    }
    
    // 声明交换器(Direct 类型)
    @Bean
    public DirectExchange exchange() {
        return new DirectExchange(EXCHANGE_NAME);
    }
    
    // 绑定队列和交换器
    @Bean
    public Binding binding() {
        return BindingBuilder
            .bind(queue())
            .to(exchange())
            .with(ROUTING_KEY);
    }
}

消息生产者

创建消息发送服务:

java
@Service
public class MessageProducer {
    
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    public void sendMessage(String message) {
        rabbitTemplate.convertAndSend(
            RabbitMQConfig.EXCHANGE_NAME,
            RabbitMQConfig.ROUTING_KEY,
            message
        );
    }
}

消息消费者

创建消息接收服务:

java
@Component
public class MessageConsumer {
    
    @RabbitListener(queues = RabbitMQConfig.QUEUE_NAME)
    public void handleMessage(String message, Channel channel, Message msg) {
        try {
            // 处理消息
            System.out.println("收到消息: " + message);
            
            // 手动确认消息
            channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            // 拒绝消息并重新入队
            channel.basicNack(
                msg.getMessageProperties().getDeliveryTag(),
                false,
                true
            );
        }
    }
}

交换器类型

Direct Exchange

直接交换器,根据路由键精确匹配:

java
@Bean
public DirectExchange directExchange() {
    return new DirectExchange("direct.exchange");
}

Topic Exchange

主题交换器,支持通配符匹配:

java
@Bean
public TopicExchange topicExchange() {
    return new TopicExchange("topic.exchange");
}

// 路由键示例:user.created、order.paid

Fanout Exchange

扇出交换器,广播消息到所有绑定的队列:

java
@Bean
public FanoutExchange fanoutExchange() {
    return new FanoutExchange("fanout.exchange");
}

Headers Exchange

头部交换器,根据消息头匹配:

java
@Bean
public HeadersExchange headersExchange() {
    return new HeadersExchange("headers.exchange");
}

消息确认机制

生产者确认

确保消息成功发送到 RabbitMQ:

java
@Configuration
public class RabbitMQConfig implements RabbitTemplate.ConfirmCallback {
    
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    @PostConstruct
    public void init() {
        rabbitTemplate.setConfirmCallback(this);
    }
    
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (ack) {
            System.out.println("消息发送成功");
        } else {
            System.out.println("消息发送失败: " + cause);
        }
    }
}

消费者确认

确保消息被正确处理:

java
@RabbitListener(queues = "test.queue")
public void handleMessage(String message, Channel channel, Message msg) {
    try {
        // 处理业务逻辑
        processMessage(message);
        
        // 确认消息
        channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);
    } catch (Exception e) {
        // 拒绝消息,不重新入队
        channel.basicNack(
            msg.getMessageProperties().getDeliveryTag(),
            false,
            false
        );
    }
}

死信队列

处理无法被正常消费的消息:

java
@Configuration
public class DeadLetterConfig {
    
    // 死信队列
    @Bean
    public Queue deadLetterQueue() {
        return QueueBuilder.durable("dead.letter.queue").build();
    }
    
    // 死信交换器
    @Bean
    public DirectExchange deadLetterExchange() {
        return new DirectExchange("dead.letter.exchange");
    }
    
    // 绑定死信队列
    @Bean
    public Binding deadLetterBinding() {
        return BindingBuilder
            .bind(deadLetterQueue())
            .to(deadLetterExchange())
            .with("dead.letter.routing.key");
    }
    
    // 普通队列,设置死信参数
    @Bean
    public Queue normalQueue() {
        return QueueBuilder
            .durable("normal.queue")
            .withArgument("x-dead-letter-exchange", "dead.letter.exchange")
            .withArgument("x-dead-letter-routing-key", "dead.letter.routing.key")
            .build();
    }
}

延迟队列

实现消息延迟发送(使用 RabbitMQ 延迟插件):

java
@Bean
public Queue delayQueue() {
    return QueueBuilder
        .durable("delay.queue")
        .withArgument("x-message-ttl", 60000) // 延迟 60 秒
        .withArgument("x-dead-letter-exchange", "target.exchange")
        .withArgument("x-dead-letter-routing-key", "target.routing.key")
        .build();
}

集群部署

单机多节点

在同一台机器上启动多个 RabbitMQ 节点:

bash
# 启动第一个节点
RABBITMQ_NODE_PORT=5672 RABBITMQ_NODENAME=rabbit@node1 rabbitmq-server -detached

# 启动第二个节点
RABBITMQ_NODE_PORT=5673 RABBITMQ_NODENAME=rabbit@node2 rabbitmq-server -detached

# 将节点2加入集群
rabbitmqctl -n rabbit@node2 stop_app
rabbitmqctl -n rabbit@node2 join_cluster rabbit@node1
rabbitmqctl -n rabbit@node2 start_app

多机集群

在不同机器上部署 RabbitMQ 节点,配置相同的 Erlang Cookie,然后加入集群。

监控与管理

管理界面功能

  • 概览:查看连接数、队列数、消息速率等
  • 连接管理:查看和管理客户端连接
  • 队列管理:查看队列状态、消息数量、消费者数量
  • 交换器管理:查看和管理交换器
  • 用户管理:管理用户和权限

命令行工具

bash
# 查看队列状态
rabbitmqctl list_queues

# 查看交换器
rabbitmqctl list_exchanges

# 查看绑定关系
rabbitmqctl list_bindings

# 查看连接
rabbitmqctl list_connections

最佳实践

  1. 消息持久化:对于重要消息,设置持久化标志
  2. 合理设置队列长度:避免队列无限增长
  3. 使用确认机制:确保消息可靠传递
  4. 监控和告警:及时发现问题
  5. 合理使用交换器类型:根据业务场景选择合适的交换器
  6. 避免消息堆积:及时处理消息,防止队列阻塞

常见问题

消息丢失

  • 确保消息和队列都设置了持久化
  • 使用生产者确认机制
  • 使用消费者确认机制

消息重复消费

  • 实现幂等性处理
  • 使用消息去重机制

性能优化

  • 合理设置预取数量(prefetch)
  • 使用批量确认
  • 优化网络配置

RabbitMQ 作为成熟的消息中间件,在企业级应用中广泛使用,通过合理配置和使用,可以构建可靠、高性能的异步消息系统。