主题
RocketMQ
RocketMQ 是阿里巴巴开源的分布式消息中间件,具有低延迟、高并发、高可用、高可靠的特点。它支持多种消息类型,包括普通消息、顺序消息、事务消息、延时消息等,广泛应用于电商、金融、物流等场景。
核心特性
- 高吞吐量:单机支持万级消息吞吐
- 低延迟:毫秒级消息投递
- 高可用:支持主从复制、多副本机制
- 顺序消息:支持全局顺序和分区顺序
- 事务消息:支持分布式事务
- 延时消息:支持消息延迟投递
- 消息过滤:支持 Tag 和 SQL 过滤
- 消息轨迹:支持消息轨迹追踪
架构组件
- NameServer:路由注册中心,管理 Broker 的路由信息
- Broker:消息存储和转发服务器
- Producer:消息生产者
- Consumer:消息消费者
Docker 安装
安装 NameServer
bash
docker pull apache/rocketmq:5.1.4启动 NameServer:
bash
docker run -d \
--name rmqnamesrv \
--restart=always \
-p 9876:9876 \
-e "JAVA_OPT_EXT=-server -Xms512m -Xmx512m" \
apache/rocketmq:5.1.4 \
sh mqnamesrv安装 Broker
启动 Broker(需要先启动 NameServer):
bash
docker run -d \
--name rmqbroker \
--restart=always \
--link rmqnamesrv:namesrv \
-p 10909:10909 \
-p 10911:10911 \
-e "NAMESRV_ADDR=namesrv:9876" \
-e "JAVA_OPT_EXT=-server -Xms512m -Xmx512m" \
apache/rocketmq:5.1.4 \
sh mqbroker -c /home/rocketmq/rocketmq-5.1.4/conf/broker.conf安装 RocketMQ Console
RocketMQ Console 是 RocketMQ 的管理控制台:
bash
docker pull styletang/rocketmq-console-ng:latest启动控制台:
bash
docker run -d \
--name rmqconsole \
--restart=always \
-p 8080:8080 \
-e "JAVA_OPTS=-Drocketmq.namesrv.addr=localhost:9876" \
styletang/rocketmq-console-ng:latest访问控制台:http://localhost:8080
Spring Boot 集成
添加依赖
在 pom.xml 中添加 RocketMQ 依赖:
xml
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.3.0</version>
</dependency>配置连接
在 application.yml 中配置 RocketMQ:
yaml
rocketmq:
name-server: localhost:9876
producer:
group: my-producer-group
send-message-timeout: 3000
retry-times-when-send-failed: 2
retry-times-when-send-async-failed: 2
consumer:
group: my-consumer-group消息生产者
创建消息发送服务:
java
@Service
public class RocketMQProducer {
@Autowired
private RocketMQTemplate rocketMQTemplate;
/**
* 发送普通消息
*/
public void sendMessage(String topic, String message) {
rocketMQTemplate.convertAndSend(topic, message);
}
/**
* 发送带 Tag 的消息
*/
public void sendMessageWithTag(String topic, String tag, String message) {
rocketMQTemplate.convertAndSend(topic + ":" + tag, message);
}
/**
* 发送同步消息
*/
public SendResult sendSyncMessage(String topic, String message) {
return rocketMQTemplate.syncSend(topic, message);
}
/**
* 发送异步消息
*/
public void sendAsyncMessage(String topic, String message) {
rocketMQTemplate.asyncSend(topic, message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("消息发送成功: " + sendResult);
}
@Override
public void onException(Throwable e) {
System.err.println("消息发送失败: " + e.getMessage());
}
});
}
/**
* 发送单向消息(不关心发送结果)
*/
public void sendOneWayMessage(String topic, String message) {
rocketMQTemplate.sendOneWay(topic, message);
}
}消息消费者
创建消息接收服务:
java
@Component
@RocketMQMessageListener(
topic = "test-topic",
consumerGroup = "my-consumer-group",
selectorExpression = "*" // Tag 过滤,* 表示所有
)
public class RocketMQConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.println("收到消息: " + message);
// 处理业务逻辑
processMessage(message);
}
private void processMessage(String message) {
// 业务处理
}
}顺序消息
生产者
发送顺序消息需要指定 MessageQueueSelector:
java
@Service
public class OrderMessageProducer {
@Autowired
private RocketMQTemplate rocketMQTemplate;
public void sendOrderMessage(String orderId, String message) {
// 根据订单ID选择队列,确保同一订单的消息在同一队列
rocketMQTemplate.syncSendOrderly(
"order-topic",
message,
orderId
);
}
}消费者
顺序消息的消费者需要实现 RocketMQListener 并设置 consumeMode:
java
@Component
@RocketMQMessageListener(
topic = "order-topic",
consumerGroup = "order-consumer-group",
consumeMode = ConsumeMode.ORDERLY // 顺序消费
)
public class OrderMessageConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
// 顺序处理消息
processOrderMessage(message);
}
}事务消息
生产者
发送事务消息:
java
@Service
public class TransactionMessageProducer {
@Autowired
private RocketMQTemplate rocketMQTemplate;
public void sendTransactionMessage(String topic, String message) {
TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction(
topic,
MessageBuilder.withPayload(message).build(),
null // 业务参数
);
}
}事务监听器
实现事务监听器处理事务消息:
java
@RocketMQTransactionListener
public class TransactionListenerImpl implements RocketMQTransactionListener {
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
try {
// 执行本地事务
executeLocalBusiness(msg);
// 提交事务
return LocalTransactionState.COMMIT_MESSAGE;
} catch (Exception e) {
// 回滚事务
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
// 检查本地事务状态
if (isLocalTransactionSuccess(msg)) {
return LocalTransactionState.COMMIT_MESSAGE;
}
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}延时消息
发送延时消息:
java
@Service
public class DelayMessageProducer {
@Autowired
private RocketMQTemplate rocketMQTemplate;
public void sendDelayMessage(String topic, String message, int delayLevel) {
// delayLevel: 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
rocketMQTemplate.syncSend(
topic,
MessageBuilder
.withPayload(message)
.setHeader(MessageConst.PROPERTY_DELAY_TIME_LEVEL, delayLevel)
.build()
);
}
}消息过滤
Tag 过滤
java
// 生产者发送带 Tag 的消息
rocketMQTemplate.convertAndSend("topic:tag1", message);
// 消费者只接收特定 Tag 的消息
@RocketMQMessageListener(
topic = "topic",
consumerGroup = "consumer-group",
selectorExpression = "tag1 || tag2" // 只接收 tag1 或 tag2
)SQL 过滤
java
@RocketMQMessageListener(
topic = "topic",
consumerGroup = "consumer-group",
messageModel = MessageModel.CLUSTERING,
selectorType = SelectorType.SQL92,
selectorExpression = "age > 18 AND city = 'Beijing'"
)批量消息
发送批量消息:
java
@Service
public class BatchMessageProducer {
@Autowired
private RocketMQTemplate rocketMQTemplate;
public void sendBatchMessage(String topic, List<String> messages) {
List<Message> messageList = messages.stream()
.map(msg -> MessageBuilder.withPayload(msg).build())
.collect(Collectors.toList());
rocketMQTemplate.syncSend(topic, messageList);
}
}集群部署
双主双从架构
NameServer1 NameServer2
| |
Broker-A(Master) Broker-B(Master)
| |
Broker-A(Slave) Broker-B(Slave)Broker 配置
在 broker.conf 中配置:
properties
# Broker 名称
brokerName=broker-a
# Broker ID,0 表示 Master,非 0 表示 Slave
brokerId=0
# NameServer 地址
namesrvAddr=localhost:9876
# 数据存储路径
storePathRootDir=/home/rocketmq/store
# 提交日志存储路径
storePathCommitLog=/home/rocketmq/store/commitlog监控与管理
RocketMQ Console
通过 Web 控制台可以:
- 查看 Topic 和 Consumer Group
- 查看消息轨迹
- 查看消费进度
- 发送测试消息
- 查看集群状态
命令行工具
bash
# 查看 Topic 列表
mqadmin topicList -n localhost:9876
# 创建 Topic
mqadmin updateTopic -n localhost:9876 -t test-topic -c DefaultCluster
# 查看消费进度
mqadmin consumerProgress -n localhost:9876 -g consumer-group
# 查看消息
mqadmin queryMsgById -n localhost:9876 -i msgId最佳实践
- 合理设置 Topic 和 Tag:使用 Tag 进行消息分类和过滤
- 消息幂等性:确保消息处理的幂等性
- 消费失败处理:合理设置重试次数和死信队列
- 顺序消息:确保业务真正需要顺序消费
- 批量消费:提高消费性能
- 监控告警:及时发现问题
- 消息大小控制:单条消息建议不超过 4MB
常见问题
消息丢失
- 使用同步发送确保消息发送成功
- 开启消息持久化
- 使用事务消息保证一致性
消息重复
- 实现消息去重机制
- 使用消息 ID 或业务唯一标识
消费堆积
- 增加消费者实例
- 优化消费逻辑
- 使用批量消费
RocketMQ 作为高性能的分布式消息中间件,在大型分布式系统中发挥着重要作用,通过合理使用其特性,可以构建稳定可靠的消息系统。