Skip to content

RocketMQ

RocketMQ 是阿里巴巴开源的分布式消息中间件,具有低延迟、高并发、高可用、高可靠的特点。它支持多种消息类型,包括普通消息、顺序消息、事务消息、延时消息等,广泛应用于电商、金融、物流等场景。

核心特性

  • 高吞吐量:单机支持万级消息吞吐
  • 低延迟:毫秒级消息投递
  • 高可用:支持主从复制、多副本机制
  • 顺序消息:支持全局顺序和分区顺序
  • 事务消息:支持分布式事务
  • 延时消息:支持消息延迟投递
  • 消息过滤:支持 Tag 和 SQL 过滤
  • 消息轨迹:支持消息轨迹追踪

架构组件

  • NameServer:路由注册中心,管理 Broker 的路由信息
  • Broker:消息存储和转发服务器
  • Producer:消息生产者
  • Consumer:消息消费者

Docker 安装

安装 NameServer

bash
docker pull apache/rocketmq:5.1.4

启动 NameServer:

bash
docker run -d \
  --name rmqnamesrv \
  --restart=always \
  -p 9876:9876 \
  -e "JAVA_OPT_EXT=-server -Xms512m -Xmx512m" \
  apache/rocketmq:5.1.4 \
  sh mqnamesrv

安装 Broker

启动 Broker(需要先启动 NameServer):

bash
docker run -d \
  --name rmqbroker \
  --restart=always \
  --link rmqnamesrv:namesrv \
  -p 10909:10909 \
  -p 10911:10911 \
  -e "NAMESRV_ADDR=namesrv:9876" \
  -e "JAVA_OPT_EXT=-server -Xms512m -Xmx512m" \
  apache/rocketmq:5.1.4 \
  sh mqbroker -c /home/rocketmq/rocketmq-5.1.4/conf/broker.conf

安装 RocketMQ Console

RocketMQ Console 是 RocketMQ 的管理控制台:

bash
docker pull styletang/rocketmq-console-ng:latest

启动控制台:

bash
docker run -d \
  --name rmqconsole \
  --restart=always \
  -p 8080:8080 \
  -e "JAVA_OPTS=-Drocketmq.namesrv.addr=localhost:9876" \
  styletang/rocketmq-console-ng:latest

访问控制台:http://localhost:8080

Spring Boot 集成

添加依赖

pom.xml 中添加 RocketMQ 依赖:

xml
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.3.0</version>
</dependency>

配置连接

application.yml 中配置 RocketMQ:

yaml
rocketmq:
  name-server: localhost:9876
  producer:
    group: my-producer-group
    send-message-timeout: 3000
    retry-times-when-send-failed: 2
    retry-times-when-send-async-failed: 2
  consumer:
    group: my-consumer-group

消息生产者

创建消息发送服务:

java
@Service
public class RocketMQProducer {
    
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    
    /**
     * 发送普通消息
     */
    public void sendMessage(String topic, String message) {
        rocketMQTemplate.convertAndSend(topic, message);
    }
    
    /**
     * 发送带 Tag 的消息
     */
    public void sendMessageWithTag(String topic, String tag, String message) {
        rocketMQTemplate.convertAndSend(topic + ":" + tag, message);
    }
    
    /**
     * 发送同步消息
     */
    public SendResult sendSyncMessage(String topic, String message) {
        return rocketMQTemplate.syncSend(topic, message);
    }
    
    /**
     * 发送异步消息
     */
    public void sendAsyncMessage(String topic, String message) {
        rocketMQTemplate.asyncSend(topic, message, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                System.out.println("消息发送成功: " + sendResult);
            }
            
            @Override
            public void onException(Throwable e) {
                System.err.println("消息发送失败: " + e.getMessage());
            }
        });
    }
    
    /**
     * 发送单向消息(不关心发送结果)
     */
    public void sendOneWayMessage(String topic, String message) {
        rocketMQTemplate.sendOneWay(topic, message);
    }
}

消息消费者

创建消息接收服务:

java
@Component
@RocketMQMessageListener(
    topic = "test-topic",
    consumerGroup = "my-consumer-group",
    selectorExpression = "*" // Tag 过滤,* 表示所有
)
public class RocketMQConsumer implements RocketMQListener<String> {
    
    @Override
    public void onMessage(String message) {
        System.out.println("收到消息: " + message);
        // 处理业务逻辑
        processMessage(message);
    }
    
    private void processMessage(String message) {
        // 业务处理
    }
}

顺序消息

生产者

发送顺序消息需要指定 MessageQueueSelector:

java
@Service
public class OrderMessageProducer {
    
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    
    public void sendOrderMessage(String orderId, String message) {
        // 根据订单ID选择队列,确保同一订单的消息在同一队列
        rocketMQTemplate.syncSendOrderly(
            "order-topic",
            message,
            orderId
        );
    }
}

消费者

顺序消息的消费者需要实现 RocketMQListener 并设置 consumeMode:

java
@Component
@RocketMQMessageListener(
    topic = "order-topic",
    consumerGroup = "order-consumer-group",
    consumeMode = ConsumeMode.ORDERLY // 顺序消费
)
public class OrderMessageConsumer implements RocketMQListener<String> {
    
    @Override
    public void onMessage(String message) {
        // 顺序处理消息
        processOrderMessage(message);
    }
}

事务消息

生产者

发送事务消息:

java
@Service
public class TransactionMessageProducer {
    
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    
    public void sendTransactionMessage(String topic, String message) {
        TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction(
            topic,
            MessageBuilder.withPayload(message).build(),
            null // 业务参数
        );
    }
}

事务监听器

实现事务监听器处理事务消息:

java
@RocketMQTransactionListener
public class TransactionListenerImpl implements RocketMQTransactionListener {
    
    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        try {
            // 执行本地事务
            executeLocalBusiness(msg);
            // 提交事务
            return LocalTransactionState.COMMIT_MESSAGE;
        } catch (Exception e) {
            // 回滚事务
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }
    }
    
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        // 检查本地事务状态
        if (isLocalTransactionSuccess(msg)) {
            return LocalTransactionState.COMMIT_MESSAGE;
        }
        return LocalTransactionState.ROLLBACK_MESSAGE;
    }
}

延时消息

发送延时消息:

java
@Service
public class DelayMessageProducer {
    
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    
    public void sendDelayMessage(String topic, String message, int delayLevel) {
        // delayLevel: 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
        rocketMQTemplate.syncSend(
            topic,
            MessageBuilder
                .withPayload(message)
                .setHeader(MessageConst.PROPERTY_DELAY_TIME_LEVEL, delayLevel)
                .build()
        );
    }
}

消息过滤

Tag 过滤

java
// 生产者发送带 Tag 的消息
rocketMQTemplate.convertAndSend("topic:tag1", message);

// 消费者只接收特定 Tag 的消息
@RocketMQMessageListener(
    topic = "topic",
    consumerGroup = "consumer-group",
    selectorExpression = "tag1 || tag2" // 只接收 tag1 或 tag2
)

SQL 过滤

java
@RocketMQMessageListener(
    topic = "topic",
    consumerGroup = "consumer-group",
    messageModel = MessageModel.CLUSTERING,
    selectorType = SelectorType.SQL92,
    selectorExpression = "age > 18 AND city = 'Beijing'"
)

批量消息

发送批量消息:

java
@Service
public class BatchMessageProducer {
    
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    
    public void sendBatchMessage(String topic, List<String> messages) {
        List<Message> messageList = messages.stream()
            .map(msg -> MessageBuilder.withPayload(msg).build())
            .collect(Collectors.toList());
        
        rocketMQTemplate.syncSend(topic, messageList);
    }
}

集群部署

双主双从架构

NameServer1          NameServer2
    |                    |
Broker-A(Master)    Broker-B(Master)
    |                    |
Broker-A(Slave)     Broker-B(Slave)

Broker 配置

broker.conf 中配置:

properties
# Broker 名称
brokerName=broker-a
# Broker ID,0 表示 Master,非 0 表示 Slave
brokerId=0
# NameServer 地址
namesrvAddr=localhost:9876
# 数据存储路径
storePathRootDir=/home/rocketmq/store
# 提交日志存储路径
storePathCommitLog=/home/rocketmq/store/commitlog

监控与管理

RocketMQ Console

通过 Web 控制台可以:

  • 查看 Topic 和 Consumer Group
  • 查看消息轨迹
  • 查看消费进度
  • 发送测试消息
  • 查看集群状态

命令行工具

bash
# 查看 Topic 列表
mqadmin topicList -n localhost:9876

# 创建 Topic
mqadmin updateTopic -n localhost:9876 -t test-topic -c DefaultCluster

# 查看消费进度
mqadmin consumerProgress -n localhost:9876 -g consumer-group

# 查看消息
mqadmin queryMsgById -n localhost:9876 -i msgId

最佳实践

  1. 合理设置 Topic 和 Tag:使用 Tag 进行消息分类和过滤
  2. 消息幂等性:确保消息处理的幂等性
  3. 消费失败处理:合理设置重试次数和死信队列
  4. 顺序消息:确保业务真正需要顺序消费
  5. 批量消费:提高消费性能
  6. 监控告警:及时发现问题
  7. 消息大小控制:单条消息建议不超过 4MB

常见问题

消息丢失

  • 使用同步发送确保消息发送成功
  • 开启消息持久化
  • 使用事务消息保证一致性

消息重复

  • 实现消息去重机制
  • 使用消息 ID 或业务唯一标识

消费堆积

  • 增加消费者实例
  • 优化消费逻辑
  • 使用批量消费

RocketMQ 作为高性能的分布式消息中间件,在大型分布式系统中发挥着重要作用,通过合理使用其特性,可以构建稳定可靠的消息系统。