RabbitMQ的消息确认机制

1. Confirm模式确认(生产者到路由的确认机制)

消息的confirm确认机制,是指生产者投递消息后,到达了消息服务器Broker里面的exchange交换机,则会给生产者一个应答,生产者接收到应答,用来确定这条消息是否正常的发送到Broker的exchange中,这也是消息可靠性投递的重要保障;
需要3步进行Confirm确认:

  1. 配置文件开启确认模式(correlated 是异步回调模式,生产环境首选);
  2. 实现 RabbitTemplate.ConfirmCallback 接口,自定义确认成功 / 失败的处理逻辑;
  3. 将回调类绑定到 RabbitTemplate,让 Spring 容器接管确认回调。

完整代码实现

  1. 配置文件(application.yml)
    重点:除了开启生产者确认,建议同时开启 “消息返回机制”(处理消息路由失败的场景):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtual-host: /
# 1. 开启生产者确认模式(correlated=异步回调,simple=同步等待,none=关闭)
publisher-confirm-type: correlated
# 2. 开启消息返回机制(当消息无法路由到队列时,返回给生产者,而非直接丢弃)
publisher-returns: true
# 3. 消费者手动ACK(可选,配合生产者确认实现全链路可靠)
listener:
simple:
acknowledge-mode: manual
  1. 实现 ConfirmCallback 回调类
    自定义确认逻辑,处理 “消息到达 Broker 成功 / 失败” 的场景:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

@Component
public class MessageConfirmCallback implements RabbitTemplate.ConfirmCallback {

/**
* 生产者消息确认回调方法
* @param correlationData 消息关联数据(可传递消息唯一ID,用于定位失败消息)
* @param ack 是否确认成功(true=消息到达Broker;false=消息未到达Broker)
* @param cause 失败原因(ack=false时非空)
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
// 获取消息唯一ID(发送时传入,用于定位具体消息)
String msgId = correlationData != null ? correlationData.getId() : "未知ID";

if (ack) {
// 确认成功:消息已到达Broker(交换机/队列已持久化的前提下,消息不会丢失)
System.out.println("消息确认成功,消息ID:" + msgId);
} else {
// 确认失败:消息未到达Broker(网络问题、Broker宕机等)
System.out.println("消息确认失败,消息ID:" + msgId + ",失败原因:" + cause);

// 失败处理逻辑:可重试发送、记录日志、入库待补偿等
retrySendMessage(msgId); // 示例:重试发送方法
}
}

/**
* 示例:消息确认失败后的重试逻辑(可结合定时任务/重试框架实现)
*/
private void retrySendMessage(String msgId) {
// 1. 根据msgId从数据库/缓存中查询原始消息内容
// 2. 限制重试次数(避免无限重试)
// 3. 重试发送(可增加延迟,如使用延迟队列)
System.out.println("开始重试发送消息,消息ID:" + msgId);
}
}
  1. 配置 RabbitTemplate,绑定回调
    通过配置类将回调类绑定到 RabbitTemplate,并开启消息返回回调(处理路由失败):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;

import javax.annotation.PostConstruct;

@Configuration
public class RabbitMQConfig {

@Autowired
private RabbitTemplate rabbitTemplate;

@Autowired
private MessageConfirmCallback messageConfirmCallBack;

/**
* 初始化RabbitTemplate,绑定回调(PostConstruct确保容器启动后执行)
*/
@PostConstruct
public void initRabbitTemplate() {
// 3. 绑定生产者确认回调(对应你提到的第三步)
rabbitTemplate.setConfirmCallback(messageConfirmCallBack);

// 补充:绑定消息返回回调(处理“消息到达Broker,但路由不到队列”的场景)
rabbitTemplate.setReturnsCallback(returned -> {
// returned包含:消息内容、返回码、返回原因、交换机、路由键
System.out.println("消息路由失败,交换机:" + returned.getExchange()
+ ",路由键:" + returned.getRoutingKey()
+ ",原因:" + returned.getReplyText());
// 路由失败处理:重新发送、记录日志等
});

// 关键:设置为true,强制将路由失败的消息返回给生产者(否则Broker会直接丢弃)
rabbitTemplate.setMandatory(true);
}
}
  1. 生产者发送消息(传入唯一 ID)
    发送消息时传入 CorrelationData(携带消息唯一 ID),方便回调时定位消息:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.UUID;

@Component
public class MessageProducer {

@Autowired
private RabbitTemplate rabbitTemplate;

/**
* 发送消息(带唯一ID,支持确认回调)
*/
public void sendMessage(String exchange, String routingKey, String message) {
// 生成消息唯一ID(用于回调时定位)
String msgId = UUID.randomUUID().toString();
CorrelationData correlationData = new CorrelationData(msgId);

// 发送消息
rabbitTemplate.convertAndSend(
exchange,
routingKey,
message,
correlationData // 传入关联数据
);
System.out.println("消息发送完成,消息ID:" + msgId + ",内容:" + message);
}
}

关键补充说明

publisher-confirm-type 三种取值

取值 含义 适用场景
correlated 异步回调模式(推荐),Broker 确认后触发 ConfirmCallback 生产环境、高并发场景
simple 同步等待模式,可通过 rabbitTemplate.waitForConfirms() 阻塞等待确认 低并发、简单场景
none 关闭确认模式,生产者不关心消息是否到达 Broker 非核心、允许丢失的消息

2. Return模式确认

生产者发送消息到 Broker 的交换机,但如果 交换机找不到匹配的队列(比如 RoutingKey 错误、绑定关系不存在),默认情况下 Broker 会直接丢弃这条消息,生产者完全感知不到。
而 Return 模式(也叫消息回退机制) 的核心就是:让 Broker 把 “路由失败的消息”主动返回给生产者,生产者可以在回调中处理这些失败消息(重试、记录日志、丢死信队列等)。

Return 模式的核心条件

要触发 Return 回调,必须同时满足 3 个条件:

  • 消息成功到达 Broker(Confirm 模式会返回 ack=true);
  • 消息无法路由到任何队列(无匹配的 BindingKey/RoutingKey);
  • 生产者开启 mandatory=true(关键!告诉 Broker“路由失败时不要丢弃,返回给我”)。

完整实现 Return 模式的步骤如下:

  1. 配置文件开启 Return 模式
1
2
3
4
5
6
7
8
9
10
11
12
13
14
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtual-host: /
# 1. 开启生产者确认(基础)
publisher-confirm-type: correlated
# 2. 开启 Return 模式(允许 Broker 返回路由失败的消息)
publisher-returns: true
# 3. 强制开启 mandatory(核心!否则 Broker 仍会丢弃路由失败的消息)
template:
mandatory: true
  1. 实现 ReturnsCallback 回调(处理路由失败的消息)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

@Component
public class MessageReturnCallback implements RabbitTemplate.ReturnsCallback {

/**
* Return 模式回调方法(仅当消息路由失败时触发)
* @param returnedMessage 回退的消息详情:包含消息内容、返回码、失败原因、交换机、路由键等
*/
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
// 1. 解析回退消息的关键信息
String exchange = returnedMessage.getExchange(); // 消息发送到的交换机
String routingKey = returnedMessage.getRoutingKey(); // 发送时用的路由键
String message = new String(returnedMessage.getMessage().getBody()); // 消息内容
int replyCode = returnedMessage.getReplyCode(); // 返回码(AMQP 协议定义,如 312=找不到队列)
String replyText = returnedMessage.getReplyText(); // 失败原因

// 2. 打印日志(便于排查问题)
System.out.println("===== 消息路由失败 =====");
System.out.println("交换机:" + exchange);
System.out.println("路由键:" + routingKey);
System.out.println("消息内容:" + message);
System.out.println("返回码:" + replyCode);
System.out.println("失败原因:" + replyText);

// 3. 失败处理逻辑(根据业务选择)
// 方案1:重试发送(注意限制重试次数,避免死循环)
retrySendMessage(exchange, routingKey, message);
// 方案2:记录到数据库,后续人工补偿
// 方案3:发送到死信交换机,兜底处理
}

/**
* 示例:路由失败后的重试逻辑
*/
private void retrySendMessage(String exchange, String routingKey, String message) {
// 这里可以结合定时任务/重试框架(如 Spring Retry)实现有限次数重试
System.out.println("开始重试发送路由失败的消息:" + message);
}
}
  1. 绑定 Return 回调到 RabbitTemplate
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;

import javax.annotation.PostConstruct;

@Configuration
public class RabbitMQConfig {

@Autowired
private RabbitTemplate rabbitTemplate;

@Autowired
private MessageConfirmCallback messageConfirmCallBack; // 之前的 Confirm 回调
@Autowired
private MessageReturnCallback messageReturnCallback; // 新增的 Return 回调

@PostConstruct
public void initRabbitTemplate() {
// 1. 绑定 Confirm 回调(确认消息是否到达 Broker)
rabbitTemplate.setConfirmCallback(messageConfirmCallBack);

// 2. 绑定 Return 回调(处理路由失败的消息)
rabbitTemplate.setReturnsCallback(messageReturnCallback);

// 3. 强制开启 mandatory(和配置文件的 template.mandatory=true 作用一致,双重保障)
rabbitTemplate.setMandatory(true);
}
}
  1. 测试 Return 模式
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.UUID;

@RestController
public class TestController {

@Autowired
private RabbitTemplate rabbitTemplate;

@GetMapping("/sendErrorMsg")
public String sendErrorMsg() {
String exchange = "test.exchange"; // 假设该交换机存在
String routingKey = "error.key"; // 假设该路由键无匹配的队列
String message = "测试路由失败的消息";
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());

// 发送消息(路由键错误,触发 Return 回调)
rabbitTemplate.convertAndSend(exchange, routingKey, message, correlationData);
return "消息发送请求已提交";
}
}

Return 模式 vs Confirm 模式(核心区别)

模式 触发时机 核心作用 结果判断
Confirm 模式 消息到达 / 未到达 Broker 时触发 确认消息是否到达 Broker ack=true → 到达;false → 未到达
Return 模式 消息到达 Broker 但路由失败时触发 确认消息是否路由到队列 触发回调 → 路由失败;不触发 → 路由成功

3. 消费者手动确认

手动 ACK 的核心是:

  • 配置文件开启手动确认模式;
  • 利用 @RabbitListener 注解监听队列,方法参数注入 Channel 和 Message;
  • 封装通用的 ACK/NACK 工具方法,简化业务代码;
  • 内置重试次数控制,避免死循环。

完整封装实现

  1. 基础配置(application.yml)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtual-host: /
# 消费端核心配置:开启手动ACK
listener:
simple:
acknowledge-mode: manual # 手动确认(核心)
prefetch: 10 # 预取数:限制消费者同时处理的未确认消息数
concurrency: 1 # 消费者最小线程数
max-concurrency: 5 # 消费者最大线程数
# 自定义配置:重试次数
rabbitmq:
consumer:
max-retry-count: 3 # 全局默认最大重试次数
  1. 通用 ACK 工具类(封装确认逻辑)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.stereotype.Component;

import java.io.IOException;

/**
* Spring Boot 手动ACK通用工具类(封装核心确认逻辑)
*/
@Component
public class RabbitAckUtil {

/**
* 手动确认消息消费成功
* @param channel Channel对象(Spring自动注入)
* @param message 消息对象
*/
public void ack(Channel channel, Message message) throws IOException {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
// basicAck参数说明:
// 1. deliveryTag:消息唯一标识
// 2. multiple:false=仅确认当前消息;true=确认所有小于等于该tag的消息
channel.basicAck(deliveryTag, false);
System.out.println("消息确认成功,deliveryTag:" + deliveryTag);
}

/**
* 手动确认消息消费失败
* @param channel Channel对象
* @param message 消息对象
* @param requeue 是否重新入队(true=重试;false=丢死信队列)
*/
public void nack(Channel channel, Message message, boolean requeue) throws IOException {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
// basicNack参数说明:
// 1. deliveryTag:消息唯一标识
// 2. multiple:false=仅拒绝当前消息;true=拒绝所有小于等于该tag的消息
// 3. requeue:是否重新入队
channel.basicNack(deliveryTag, false, requeue);
String tip = requeue ? "消息重新入队(重试)" : "消息丢弃(入死信队列)";
System.out.println(tip + ",deliveryTag:" + deliveryTag);
}

/**
* 获取当前消息的重试次数
*/
public int getRetryCount(Message message) {
Object retryCount = message.getMessageProperties().getHeaders().get("retryCount");
return retryCount == null ? 0 : (Integer) retryCount;
}

/**
* 累加重试次数(存入消息头)
*/
public void addRetryCount(Message message) {
int currentCount = getRetryCount(message);
message.getMessageProperties().getHeaders().put("retryCount", currentCount + 1);
}
}
  1. 业务消费者(基于 Spring 注解封装)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import java.io.IOException;

/**
* Spring Boot 手动ACK消费者(业务层封装)
* 特点:只关注业务逻辑,ACK/NACK由工具类封装,重试次数自动控制
*/
@Component
public class SpringBootManualAckConsumer {

@Autowired
private RabbitAckUtil rabbitAckUtil;

// 注入全局最大重试次数
@Value("${rabbitmq.consumer.max-retry-count:3}")
private int maxRetryCount;

/**
* 监听指定队列,Spring自动注入Channel和Message
* 核心:@RabbitListener + 手动调用ACK工具类
*/
@RabbitListener(queues = "spring.boot.manual.ack.queue")
public void consumeMessage(String msg, Channel channel, Message message) throws IOException {
// 1. 获取基础信息
String msgId = message.getMessageProperties().getMessageId();
if (msgId == null) {
msgId = message.getMessageProperties().getHeaders().get("spring_returned_message_correlation").toString();
}

try {
// ===================== 核心业务逻辑 =====================
System.out.println("处理业务消息:" + msg + ",msgId:" + msgId);
// 模拟业务异常(测试重试逻辑,可注释)
// if (msg.contains("error")) {
// throw new RuntimeException("业务处理失败:模拟异常");
// }
// ======================================================

// 2. 业务处理成功:手动ACK
rabbitAckUtil.ack(channel, message);

} catch (Exception e) {
System.err.println("消息处理失败,msgId:" + msgId + ",异常:" + e.getMessage());
// 3. 业务处理失败:控制重试次数
int currentRetryCount = rabbitAckUtil.getRetryCount(message);
if (currentRetryCount < maxRetryCount) {
// 3.1 未耗尽重试次数:累加重试次数 + 重新入队
rabbitAckUtil.addRetryCount(message);
rabbitAckUtil.nack(channel, message, true);
} else {
// 3.2 重试次数耗尽:拒绝重新入队(丢死信队列)
rabbitAckUtil.nack(channel, message, false);
System.err.println("重试次数耗尽,msgId:" + msgId + ",已丢死信队列");
}
}
}
}
  1. 生产者(用于测试)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.UUID;

/**
* 测试生产者:发送带唯一ID的消息
*/
@Component
public class TestProducer {

@Autowired
private RabbitTemplate rabbitTemplate;

/**
* 发送消息(Spring Boot封装方式)
*/
public void sendMessage(String msg) {
// 1. 生成消息唯一ID(用于幂等/追踪)
String msgId = UUID.randomUUID().toString();
// 2. 发送消息,指定消息ID
rabbitTemplate.convertAndSend(
"spring.boot.manual.ack.exchange", // 交换机
"spring.boot.manual.ack.key", // 路由键
msg,
message -> {
message.getMessageProperties().setMessageId(msgId); // 设置消息ID
return message;
}
);
System.out.println("消息发送成功,msgId:" + msgId + ",内容:" + msg);
}
}