原创

Spring Boot 接入 RabbitMQ 完整教程:发送、消费、确认与死信队列

Spring Boot 接入 RabbitMQ 完整教程:发送、消费、确认与死信队列

RabbitMQ 接入 Spring Boot 并不难,难的是很多教程只写到“消息能发、消费者能收到”就结束了。真正放到业务里,至少还要把连接配置、消息序列化、交换机和队列声明、生产者确认、消费者确认、失败重试、死信队列这些问题一次性想清楚。

下面以“订单支付成功后发送消息”为例,完整走一遍 Spring Boot 接入 RabbitMQ 的常用写法。示例基于 Spring Boot 3.x、Spring AMQP 3.x 和 JDK 17+。

一、环境准备

本地可以用 Docker 快速启动 RabbitMQ:

docker run -d \
  --name rabbitmq \
  -p 5672:5672 \
  -p 15672:15672 \
  -e RABBITMQ_DEFAULT_USER=admin \
  -e RABBITMQ_DEFAULT_PASS=admin \
  rabbitmq:3-management

启动后访问 http://localhost:15672,账号密码都是 admin。5672 是应用连接 RabbitMQ 的端口,15672 是管理后台端口。开发和排查问题时不要只盯代码,RabbitMQ 控制台可以直接看到 exchange、queue、message rate、ready、unacked 等状态。

二、引入依赖

pom.xml 加入 AMQP 依赖:

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
</dependencies>

Spring Boot 会根据 spring-boot-starter-amqp 自动装配 RabbitTemplate、连接工厂和监听容器。普通项目不用自己手写大量 ConnectionFactory 配置,除非要做 SSL、多 RabbitMQ 集群、连接名等定制。

三、配置 RabbitMQ 连接

application.yml

server:
  port: 8080

spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: admin
    password: admin
    virtual-host: /
    publisher-confirm-type: correlated
    publisher-returns: true
    template:
      mandatory: true
    listener:
      simple:
        acknowledge-mode: manual
        prefetch: 10
        concurrency: 2
        max-concurrency: 5

几个配置要重点理解:

publisher-confirm-type: correlated 用来确认消息是否成功到达 exchange,解决的是“生产者发出去没有”的问题。

publisher-returns: truemandatory: true 用来处理消息无法路由到队列的情况。消息到达 exchange 不代表一定进入 queue,routing key 写错或 binding 没配好都会导致消息被退回。

acknowledge-mode: manual 是消费者手动确认。业务处理成功后主动 ack,失败后 nack 或 reject。它比自动确认麻烦,但更适合真实业务。

四、声明交换机、队列和绑定关系

这次使用 Direct Exchange。routing key 完全匹配时才会路由到对应队列。

准备四个对象:

  • 正常交换机:demo.order.exchange
  • 正常队列:demo.order.queue
  • 死信交换机:demo.order.dlx
  • 死信队列:demo.order.dlq
package com.example.rabbitmqdemo.config;

import org.springframework.amqp.core.*;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMqConfig {
    public static final String ORDER_EXCHANGE = "demo.order.exchange";
    public static final String ORDER_QUEUE = "demo.order.queue";
    public static final String ORDER_ROUTING_KEY = "order.pay.success";
    public static final String ORDER_DLX = "demo.order.dlx";
    public static final String ORDER_DLQ = "demo.order.dlq";
    public static final String ORDER_DLQ_ROUTING_KEY = "order.pay.dead";

    @Bean
    public DirectExchange orderExchange() {
        return ExchangeBuilder.directExchange(ORDER_EXCHANGE).durable(true).build();
    }

    @Bean
    public Queue orderQueue() {
        return QueueBuilder.durable(ORDER_QUEUE)
                .deadLetterExchange(ORDER_DLX)
                .deadLetterRoutingKey(ORDER_DLQ_ROUTING_KEY)
                .build();
    }

    @Bean
    public Binding orderBinding() {
        return BindingBuilder.bind(orderQueue()).to(orderExchange()).with(ORDER_ROUTING_KEY);
    }

    @Bean
    public DirectExchange orderDlx() {
        return ExchangeBuilder.directExchange(ORDER_DLX).durable(true).build();
    }

    @Bean
    public Queue orderDlq() {
        return QueueBuilder.durable(ORDER_DLQ).build();
    }

    @Bean
    public Binding orderDlqBinding() {
        return BindingBuilder.bind(orderDlq()).to(orderDlx()).with(ORDER_DLQ_ROUTING_KEY);
    }

    @Bean
    public Jackson2JsonMessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }
}

Jackson2JsonMessageConverter 会把 Java 对象转成 JSON。实际项目建议单独定义 MQ DTO,不要直接把 JPA Entity 或复杂领域对象扔进 MQ,因为消息是一种对外契约,不能跟着数据库模型随意变化。

五、定义消息对象

package com.example.rabbitmqdemo.message;

import java.math.BigDecimal;
import java.time.LocalDateTime;

public record OrderPaidMessage(
        Long orderId,
        Long userId,
        BigDecimal amount,
        LocalDateTime paidAt
) {}

如果项目还在 Java 8,可以改成普通 POJO。

六、编写生产者

package com.example.rabbitmqdemo.producer;

import com.example.rabbitmqdemo.config.RabbitMqConfig;
import com.example.rabbitmqdemo.message.OrderPaidMessage;
import jakarta.annotation.PostConstruct;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import java.util.UUID;

@Component
public class OrderProducer {
    private final RabbitTemplate rabbitTemplate;

    public OrderProducer(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }

    @PostConstruct
    public void initCallback() {
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            if (ack) {
                System.out.println("消息已到达 exchange, correlationId="
                        + (correlationData == null ? null : correlationData.getId()));
            } else {
                System.err.println("消息未到达 exchange, cause=" + cause);
            }
        });
        rabbitTemplate.setReturnsCallback(this::handleReturnedMessage);
    }

    public void sendOrderPaidMessage(OrderPaidMessage message) {
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        rabbitTemplate.convertAndSend(
                RabbitMqConfig.ORDER_EXCHANGE,
                RabbitMqConfig.ORDER_ROUTING_KEY,
                message,
                this::postProcessMessage,
                correlationData
        );
    }

    private Message postProcessMessage(Message message) {
        message.getMessageProperties().setMessageId(UUID.randomUUID().toString());
        message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
        return message;
    }

    private void handleReturnedMessage(ReturnedMessage returned) {
        System.err.println("消息无法路由到队列");
        System.err.println("replyCode=" + returned.getReplyCode());
        System.err.println("replyText=" + returned.getReplyText());
        System.err.println("exchange=" + returned.getExchange());
        System.err.println("routingKey=" + returned.getRoutingKey());
    }
}

ConfirmCallback 关注消息是否到达 exchange,ReturnsCallback 关注消息到达 exchange 后是否路由到 queue。confirm 成功不代表消费者一定能收到,它只能说明 exchange 收到了消息。

七、写一个接口触发发送

@RestController
public class OrderController {
    private final OrderProducer orderProducer;

    public OrderController(OrderProducer orderProducer) {
        this.orderProducer = orderProducer;
    }

    @PostMapping("/orders/{orderId}/pay")
    public String pay(@PathVariable Long orderId) {
        OrderPaidMessage message = new OrderPaidMessage(
                orderId, 10001L, new BigDecimal("99.90"), LocalDateTime.now()
        );
        orderProducer.sendOrderPaidMessage(message);
        return "ok";
    }
}

测试:

curl -X POST http://localhost:8080/orders/1001/pay

八、编写消费者

package com.example.rabbitmqdemo.consumer;

import com.example.rabbitmqdemo.config.RabbitMqConfig;
import com.example.rabbitmqdemo.message.OrderPaidMessage;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;

@Component
public class OrderConsumer {
    @RabbitListener(queues = RabbitMqConfig.ORDER_QUEUE)
    public void consume(OrderPaidMessage payload, Message rawMessage, Channel channel) throws IOException {
        long deliveryTag = rawMessage.getMessageProperties().getDeliveryTag();
        try {
            System.out.println("收到订单支付消息: " + payload);
            handleBusiness(payload);
            channel.basicAck(deliveryTag, false);
        } catch (Exception ex) {
            System.err.println("订单消息处理失败,进入死信队列: " + payload);
            ex.printStackTrace();
            channel.basicNack(deliveryTag, false, false);
        }
    }

    private void handleBusiness(OrderPaidMessage payload) {
        if (payload.orderId() == null) {
            throw new IllegalArgumentException("orderId 不能为空");
        }
        System.out.println("处理订单支付业务,orderId=" + payload.orderId());
    }
}

手动 ack 的关键点是:业务成功后再 basicAck。如果业务还没执行完就 ack,后面数据库写入失败、调用下游失败,RabbitMQ 已经认为这条消息处理完成了,不会再投递。

失败时使用:

channel.basicNack(deliveryTag, false, false);

最后一个参数 requeue=false 表示不要重新回到原队列。如果队列配置了死信交换机,消息会进入死信队列。不要随手写 requeue=true,否则数据异常会导致消息无限重试。

九、验证死信队列

临时让业务逻辑抛异常:

private void handleBusiness(OrderPaidMessage payload) {
    throw new RuntimeException("模拟业务异常");
}

再次请求:

curl -X POST http://localhost:8080/orders/1002/pay

然后去 RabbitMQ 控制台查看 demo.order.dlq,失败消息应该已经进入死信队列。

死信队列不是垃圾桶,它更像异常消息暂存区。生产环境一般还要做失败原因记录、死信队列长度监控、人工补偿或定时补偿、不可恢复消息归档。

十、交换机怎么选

类型 路由方式 适合场景
Direct Exchange routing key 完全匹配 明确的一对一或一对多业务事件
Fanout Exchange 广播给所有绑定队列 多个系统都要收到同一事件
Topic Exchange routing key 模式匹配 按业务域、事件类型灵活订阅
Headers Exchange 根据 header 匹配 使用较少,配置复杂

订单支付成功这种事件,如果只是固定几个消费者,Direct Exchange 就够了。如果后续需要 order.pay.successorder.pay.failedorder.refund.success 这类模式订阅,可以考虑 Topic Exchange。

十一、消费者幂等必须考虑

不要假设消息只会被消费一次。网络抖动、消费者重启、ack 丢失、业务超时都可能导致重复投递。RabbitMQ 能帮你做可靠投递,但不能替你保证业务幂等。

private void handleBusiness(OrderPaidMessage payload) {
    Long orderId = payload.orderId();
    if (alreadyProcessed(orderId)) {
        return;
    }
    grantUserBenefit(orderId);
    markProcessed(orderId);
}

更稳的做法是用数据库唯一索引兜底,例如建立消息消费记录表:

CREATE TABLE mq_consume_record (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    message_id VARCHAR(64) NOT NULL,
    business_key VARCHAR(128) NOT NULL,
    status VARCHAR(32) NOT NULL,
    created_at DATETIME NOT NULL,
    updated_at DATETIME NOT NULL,
    UNIQUE KEY uk_message_id (message_id),
    UNIQUE KEY uk_business_key (business_key)
);

只要消息会影响钱、库存、权益、积分,就不要只靠内存判断幂等。

十二、生产者发送失败怎么补偿

前面的 ConfirmCallback 只是打印日志,真实项目里发送失败通常要落库。常见做法是本地消息表:

CREATE TABLE local_message (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    message_id VARCHAR(64) NOT NULL,
    exchange_name VARCHAR(128) NOT NULL,
    routing_key VARCHAR(128) NOT NULL,
    payload TEXT NOT NULL,
    status VARCHAR(32) NOT NULL,
    retry_count INT NOT NULL DEFAULT 0,
    next_retry_time DATETIME NULL,
    created_at DATETIME NOT NULL,
    updated_at DATETIME NOT NULL,
    UNIQUE KEY uk_message_id (message_id)
);

流程是:本地事务内写业务数据和 local_message,事务提交后发送 MQ,confirm 成功后更新消息状态为 SENT,confirm 失败或超时则由定时任务重发。这就是本地消息表方案,在订单、支付、库存场景里比“失败了打印日志”靠谱得多。

十三、发送重试配置

可以给 RabbitTemplate 开启短暂重试:

spring:
  rabbitmq:
    template:
      retry:
        enabled: true
        initial-interval: 2s
        max-attempts: 3
        multiplier: 2
        max-interval: 10s

这个重试只适合连接抖动一类短暂异常,不能替代本地消息表。服务重启、进程崩溃、线程被打断时,内存里的重试兜不住。

十四、开发中最容易踩的坑

  1. 队列参数改了但不生效。RabbitMQ 队列已存在时,很多参数不能靠重启应用自动覆盖。
  2. routing key 写错。confirm 成功只代表到了 exchange,消费者没收到时要检查 exchange、queue、binding、routing key 和 return callback。
  3. 自动 ack 导致消息“看起来消费了”。业务失败后 RabbitMQ 不再投递,状态就会对不上。
  4. 消费失败无限重试。basicNack(tag, false, true) 对数据异常很危险。
  5. 消息对象和业务实体耦合太深。MQ 消息应该是稳定 DTO,不要直接发 Entity。

十五、完整链路建议

比较稳的订单支付消息链路:用户支付成功 -> 订单服务更新订单状态 -> 本地事务写入 local_message -> 事务提交后发送 RabbitMQ -> RabbitMQ confirm 成功 -> 更新 local_message 状态 -> 消费者收到消息 -> 幂等校验 -> 执行业务 -> 手动 ack。

异常链路也要提前设计:发送失败时 local_message 保持待发送,由定时任务重发;消费异常时 nack(requeue=false),进入死信队列,再告警、补偿或人工处理。

总结

Spring Boot 接入 RabbitMQ,代码层面主要是三块:RabbitTemplate 负责发送消息,@RabbitListener 负责消费消息,QueueExchangeBinding 负责声明 RabbitMQ 拓扑。

真正决定系统稳定性的不是几个注解,而是生产者确认、return callback、消费者手动 ack、死信队列、幂等处理、失败补偿和清晰的命名规范。RabbitMQ 很适合做业务解耦、异步削峰和事件通知。前提是不要把它当成一个“能存消息的 List”。消息系统一旦进入核心链路,就必须按可靠性组件来设计。

正文到此结束
评论插件初始化中...
Loading...