Spring Boot 接入 RabbitMQ 完整教程:发送、消费、确认与死信队列
- 发布时间:2026-06-16 10:06:50
- 本文热度:浏览 14 赞 0 评论 0
- 文章标签: Spring Boot rabbitmq Spring AMQP
- 全文共1字,阅读约需1分钟
Spring Boot 接入 RabbitMQ 完整教程:发送、消费、确认与死信队列
RabbitMQ 接入 Spring Boot 并不难,难的是很多教程只写到“消息能发、消费者能收到”就结束了。真正放到业务里,至少还要把连接配置、消息序列化、交换机和队列声明、生产者确认、消费者确认、失败重试、死信队列这些问题一次性想清楚。
下面以“订单支付成功后发送消息”为例,完整走一遍 Spring Boot 接入 RabbitMQ 的常用写法。示例基于 Spring Boot 3.x、Spring AMQP 3.x 和 JDK 17+。
一、环境准备
本地可以用 Docker 快速启动 RabbitMQ:
docker run -d \
--name rabbitmq \
-p 5672:5672 \
-p 15672:15672 \
-e RABBITMQ_DEFAULT_USER=admin \
-e RABBITMQ_DEFAULT_PASS=admin \
rabbitmq:3-management
启动后访问 http://localhost:15672,账号密码都是 admin。5672 是应用连接 RabbitMQ 的端口,15672 是管理后台端口。开发和排查问题时不要只盯代码,RabbitMQ 控制台可以直接看到 exchange、queue、message rate、ready、unacked 等状态。
二、引入依赖
pom.xml 加入 AMQP 依赖:
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
</dependencies>
Spring Boot 会根据 spring-boot-starter-amqp 自动装配 RabbitTemplate、连接工厂和监听容器。普通项目不用自己手写大量 ConnectionFactory 配置,除非要做 SSL、多 RabbitMQ 集群、连接名等定制。
三、配置 RabbitMQ 连接
application.yml:
server:
port: 8080
spring:
rabbitmq:
host: localhost
port: 5672
username: admin
password: admin
virtual-host: /
publisher-confirm-type: correlated
publisher-returns: true
template:
mandatory: true
listener:
simple:
acknowledge-mode: manual
prefetch: 10
concurrency: 2
max-concurrency: 5
几个配置要重点理解:
publisher-confirm-type: correlated 用来确认消息是否成功到达 exchange,解决的是“生产者发出去没有”的问题。
publisher-returns: true 和 mandatory: true 用来处理消息无法路由到队列的情况。消息到达 exchange 不代表一定进入 queue,routing key 写错或 binding 没配好都会导致消息被退回。
acknowledge-mode: manual 是消费者手动确认。业务处理成功后主动 ack,失败后 nack 或 reject。它比自动确认麻烦,但更适合真实业务。
四、声明交换机、队列和绑定关系
这次使用 Direct Exchange。routing key 完全匹配时才会路由到对应队列。
准备四个对象:
- 正常交换机:
demo.order.exchange - 正常队列:
demo.order.queue - 死信交换机:
demo.order.dlx - 死信队列:
demo.order.dlq
package com.example.rabbitmqdemo.config;
import org.springframework.amqp.core.*;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMqConfig {
public static final String ORDER_EXCHANGE = "demo.order.exchange";
public static final String ORDER_QUEUE = "demo.order.queue";
public static final String ORDER_ROUTING_KEY = "order.pay.success";
public static final String ORDER_DLX = "demo.order.dlx";
public static final String ORDER_DLQ = "demo.order.dlq";
public static final String ORDER_DLQ_ROUTING_KEY = "order.pay.dead";
@Bean
public DirectExchange orderExchange() {
return ExchangeBuilder.directExchange(ORDER_EXCHANGE).durable(true).build();
}
@Bean
public Queue orderQueue() {
return QueueBuilder.durable(ORDER_QUEUE)
.deadLetterExchange(ORDER_DLX)
.deadLetterRoutingKey(ORDER_DLQ_ROUTING_KEY)
.build();
}
@Bean
public Binding orderBinding() {
return BindingBuilder.bind(orderQueue()).to(orderExchange()).with(ORDER_ROUTING_KEY);
}
@Bean
public DirectExchange orderDlx() {
return ExchangeBuilder.directExchange(ORDER_DLX).durable(true).build();
}
@Bean
public Queue orderDlq() {
return QueueBuilder.durable(ORDER_DLQ).build();
}
@Bean
public Binding orderDlqBinding() {
return BindingBuilder.bind(orderDlq()).to(orderDlx()).with(ORDER_DLQ_ROUTING_KEY);
}
@Bean
public Jackson2JsonMessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
}
Jackson2JsonMessageConverter 会把 Java 对象转成 JSON。实际项目建议单独定义 MQ DTO,不要直接把 JPA Entity 或复杂领域对象扔进 MQ,因为消息是一种对外契约,不能跟着数据库模型随意变化。
五、定义消息对象
package com.example.rabbitmqdemo.message;
import java.math.BigDecimal;
import java.time.LocalDateTime;
public record OrderPaidMessage(
Long orderId,
Long userId,
BigDecimal amount,
LocalDateTime paidAt
) {}
如果项目还在 Java 8,可以改成普通 POJO。
六、编写生产者
package com.example.rabbitmqdemo.producer;
import com.example.rabbitmqdemo.config.RabbitMqConfig;
import com.example.rabbitmqdemo.message.OrderPaidMessage;
import jakarta.annotation.PostConstruct;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import java.util.UUID;
@Component
public class OrderProducer {
private final RabbitTemplate rabbitTemplate;
public OrderProducer(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
@PostConstruct
public void initCallback() {
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
if (ack) {
System.out.println("消息已到达 exchange, correlationId="
+ (correlationData == null ? null : correlationData.getId()));
} else {
System.err.println("消息未到达 exchange, cause=" + cause);
}
});
rabbitTemplate.setReturnsCallback(this::handleReturnedMessage);
}
public void sendOrderPaidMessage(OrderPaidMessage message) {
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend(
RabbitMqConfig.ORDER_EXCHANGE,
RabbitMqConfig.ORDER_ROUTING_KEY,
message,
this::postProcessMessage,
correlationData
);
}
private Message postProcessMessage(Message message) {
message.getMessageProperties().setMessageId(UUID.randomUUID().toString());
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
return message;
}
private void handleReturnedMessage(ReturnedMessage returned) {
System.err.println("消息无法路由到队列");
System.err.println("replyCode=" + returned.getReplyCode());
System.err.println("replyText=" + returned.getReplyText());
System.err.println("exchange=" + returned.getExchange());
System.err.println("routingKey=" + returned.getRoutingKey());
}
}
ConfirmCallback 关注消息是否到达 exchange,ReturnsCallback 关注消息到达 exchange 后是否路由到 queue。confirm 成功不代表消费者一定能收到,它只能说明 exchange 收到了消息。
七、写一个接口触发发送
@RestController
public class OrderController {
private final OrderProducer orderProducer;
public OrderController(OrderProducer orderProducer) {
this.orderProducer = orderProducer;
}
@PostMapping("/orders/{orderId}/pay")
public String pay(@PathVariable Long orderId) {
OrderPaidMessage message = new OrderPaidMessage(
orderId, 10001L, new BigDecimal("99.90"), LocalDateTime.now()
);
orderProducer.sendOrderPaidMessage(message);
return "ok";
}
}
测试:
curl -X POST http://localhost:8080/orders/1001/pay
八、编写消费者
package com.example.rabbitmqdemo.consumer;
import com.example.rabbitmqdemo.config.RabbitMqConfig;
import com.example.rabbitmqdemo.message.OrderPaidMessage;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Component
public class OrderConsumer {
@RabbitListener(queues = RabbitMqConfig.ORDER_QUEUE)
public void consume(OrderPaidMessage payload, Message rawMessage, Channel channel) throws IOException {
long deliveryTag = rawMessage.getMessageProperties().getDeliveryTag();
try {
System.out.println("收到订单支付消息: " + payload);
handleBusiness(payload);
channel.basicAck(deliveryTag, false);
} catch (Exception ex) {
System.err.println("订单消息处理失败,进入死信队列: " + payload);
ex.printStackTrace();
channel.basicNack(deliveryTag, false, false);
}
}
private void handleBusiness(OrderPaidMessage payload) {
if (payload.orderId() == null) {
throw new IllegalArgumentException("orderId 不能为空");
}
System.out.println("处理订单支付业务,orderId=" + payload.orderId());
}
}
手动 ack 的关键点是:业务成功后再 basicAck。如果业务还没执行完就 ack,后面数据库写入失败、调用下游失败,RabbitMQ 已经认为这条消息处理完成了,不会再投递。
失败时使用:
channel.basicNack(deliveryTag, false, false);
最后一个参数 requeue=false 表示不要重新回到原队列。如果队列配置了死信交换机,消息会进入死信队列。不要随手写 requeue=true,否则数据异常会导致消息无限重试。
九、验证死信队列
临时让业务逻辑抛异常:
private void handleBusiness(OrderPaidMessage payload) {
throw new RuntimeException("模拟业务异常");
}
再次请求:
curl -X POST http://localhost:8080/orders/1002/pay
然后去 RabbitMQ 控制台查看 demo.order.dlq,失败消息应该已经进入死信队列。
死信队列不是垃圾桶,它更像异常消息暂存区。生产环境一般还要做失败原因记录、死信队列长度监控、人工补偿或定时补偿、不可恢复消息归档。
十、交换机怎么选
| 类型 | 路由方式 | 适合场景 |
|---|---|---|
| Direct Exchange | routing key 完全匹配 | 明确的一对一或一对多业务事件 |
| Fanout Exchange | 广播给所有绑定队列 | 多个系统都要收到同一事件 |
| Topic Exchange | routing key 模式匹配 | 按业务域、事件类型灵活订阅 |
| Headers Exchange | 根据 header 匹配 | 使用较少,配置复杂 |
订单支付成功这种事件,如果只是固定几个消费者,Direct Exchange 就够了。如果后续需要 order.pay.success、order.pay.failed、order.refund.success 这类模式订阅,可以考虑 Topic Exchange。
十一、消费者幂等必须考虑
不要假设消息只会被消费一次。网络抖动、消费者重启、ack 丢失、业务超时都可能导致重复投递。RabbitMQ 能帮你做可靠投递,但不能替你保证业务幂等。
private void handleBusiness(OrderPaidMessage payload) {
Long orderId = payload.orderId();
if (alreadyProcessed(orderId)) {
return;
}
grantUserBenefit(orderId);
markProcessed(orderId);
}
更稳的做法是用数据库唯一索引兜底,例如建立消息消费记录表:
CREATE TABLE mq_consume_record (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
message_id VARCHAR(64) NOT NULL,
business_key VARCHAR(128) NOT NULL,
status VARCHAR(32) NOT NULL,
created_at DATETIME NOT NULL,
updated_at DATETIME NOT NULL,
UNIQUE KEY uk_message_id (message_id),
UNIQUE KEY uk_business_key (business_key)
);
只要消息会影响钱、库存、权益、积分,就不要只靠内存判断幂等。
十二、生产者发送失败怎么补偿
前面的 ConfirmCallback 只是打印日志,真实项目里发送失败通常要落库。常见做法是本地消息表:
CREATE TABLE local_message (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
message_id VARCHAR(64) NOT NULL,
exchange_name VARCHAR(128) NOT NULL,
routing_key VARCHAR(128) NOT NULL,
payload TEXT NOT NULL,
status VARCHAR(32) NOT NULL,
retry_count INT NOT NULL DEFAULT 0,
next_retry_time DATETIME NULL,
created_at DATETIME NOT NULL,
updated_at DATETIME NOT NULL,
UNIQUE KEY uk_message_id (message_id)
);
流程是:本地事务内写业务数据和 local_message,事务提交后发送 MQ,confirm 成功后更新消息状态为 SENT,confirm 失败或超时则由定时任务重发。这就是本地消息表方案,在订单、支付、库存场景里比“失败了打印日志”靠谱得多。
十三、发送重试配置
可以给 RabbitTemplate 开启短暂重试:
spring:
rabbitmq:
template:
retry:
enabled: true
initial-interval: 2s
max-attempts: 3
multiplier: 2
max-interval: 10s
这个重试只适合连接抖动一类短暂异常,不能替代本地消息表。服务重启、进程崩溃、线程被打断时,内存里的重试兜不住。
十四、开发中最容易踩的坑
- 队列参数改了但不生效。RabbitMQ 队列已存在时,很多参数不能靠重启应用自动覆盖。
- routing key 写错。confirm 成功只代表到了 exchange,消费者没收到时要检查 exchange、queue、binding、routing key 和 return callback。
- 自动 ack 导致消息“看起来消费了”。业务失败后 RabbitMQ 不再投递,状态就会对不上。
- 消费失败无限重试。
basicNack(tag, false, true)对数据异常很危险。 - 消息对象和业务实体耦合太深。MQ 消息应该是稳定 DTO,不要直接发 Entity。
十五、完整链路建议
比较稳的订单支付消息链路:用户支付成功 -> 订单服务更新订单状态 -> 本地事务写入 local_message -> 事务提交后发送 RabbitMQ -> RabbitMQ confirm 成功 -> 更新 local_message 状态 -> 消费者收到消息 -> 幂等校验 -> 执行业务 -> 手动 ack。
异常链路也要提前设计:发送失败时 local_message 保持待发送,由定时任务重发;消费异常时 nack(requeue=false),进入死信队列,再告警、补偿或人工处理。
总结
Spring Boot 接入 RabbitMQ,代码层面主要是三块:RabbitTemplate 负责发送消息,@RabbitListener 负责消费消息,Queue、Exchange、Binding 负责声明 RabbitMQ 拓扑。
真正决定系统稳定性的不是几个注解,而是生产者确认、return callback、消费者手动 ack、死信队列、幂等处理、失败补偿和清晰的命名规范。RabbitMQ 很适合做业务解耦、异步削峰和事件通知。前提是不要把它当成一个“能存消息的 List”。消息系统一旦进入核心链路,就必须按可靠性组件来设计。