RabbitMQ的消息确认机制
1. Confirm模式确认(生产者到路由的确认机制)
消息的confirm确认机制,是指生产者投递消息后,到达了消息服务器Broker里面的exchange交换机,则会给生产者一个应答,生产者接收到应答,用来确定这条消息是否正常的发送到Broker的exchange中,这也是消息可靠性投递的重要保障;
需要3步进行Confirm确认:
- 配置文件开启确认模式(correlated 是异步回调模式,生产环境首选);
- 实现 RabbitTemplate.ConfirmCallback 接口,自定义确认成功 / 失败的处理逻辑;
- 将回调类绑定到 RabbitTemplate,让 Spring 容器接管确认回调。
完整代码实现
- 配置文件(application.yml)
重点:除了开启生产者确认,建议同时开启 “消息返回机制”(处理消息路由失败的场景):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| spring: rabbitmq: host: localhost port: 5672 username: guest password: guest virtual-host: / publisher-confirm-type: correlated publisher-returns: true listener: simple: acknowledge-mode: manual
|
- 实现 ConfirmCallback 回调类
自定义确认逻辑,处理 “消息到达 Broker 成功 / 失败” 的场景:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.stereotype.Component;
@Component public class MessageConfirmCallback implements RabbitTemplate.ConfirmCallback {
@Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { String msgId = correlationData != null ? correlationData.getId() : "未知ID";
if (ack) { System.out.println("消息确认成功,消息ID:" + msgId); } else { System.out.println("消息确认失败,消息ID:" + msgId + ",失败原因:" + cause); retrySendMessage(msgId); } }
private void retrySendMessage(String msgId) { System.out.println("开始重试发送消息,消息ID:" + msgId); } }
|
- 配置 RabbitTemplate,绑定回调
通过配置类将回调类绑定到 RabbitTemplate,并开启消息返回回调(处理路由失败):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Configuration;
import javax.annotation.PostConstruct;
@Configuration public class RabbitMQConfig {
@Autowired private RabbitTemplate rabbitTemplate;
@Autowired private MessageConfirmCallback messageConfirmCallBack;
@PostConstruct public void initRabbitTemplate() { rabbitTemplate.setConfirmCallback(messageConfirmCallBack);
rabbitTemplate.setReturnsCallback(returned -> { System.out.println("消息路由失败,交换机:" + returned.getExchange() + ",路由键:" + returned.getRoutingKey() + ",原因:" + returned.getReplyText()); });
rabbitTemplate.setMandatory(true); } }
|
- 生产者发送消息(传入唯一 ID)
发送消息时传入 CorrelationData(携带消息唯一 ID),方便回调时定位消息:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.support.CorrelationData; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;
import java.util.UUID;
@Component public class MessageProducer {
@Autowired private RabbitTemplate rabbitTemplate;
public void sendMessage(String exchange, String routingKey, String message) { String msgId = UUID.randomUUID().toString(); CorrelationData correlationData = new CorrelationData(msgId);
rabbitTemplate.convertAndSend( exchange, routingKey, message, correlationData ); System.out.println("消息发送完成,消息ID:" + msgId + ",内容:" + message); } }
|
关键补充说明
publisher-confirm-type 三种取值
| 取值 |
含义 |
适用场景 |
| correlated |
异步回调模式(推荐),Broker 确认后触发 ConfirmCallback |
生产环境、高并发场景 |
| simple |
同步等待模式,可通过 rabbitTemplate.waitForConfirms() 阻塞等待确认 |
低并发、简单场景 |
| none |
关闭确认模式,生产者不关心消息是否到达 Broker |
非核心、允许丢失的消息 |
2. Return模式确认
生产者发送消息到 Broker 的交换机,但如果 交换机找不到匹配的队列(比如 RoutingKey 错误、绑定关系不存在),默认情况下 Broker 会直接丢弃这条消息,生产者完全感知不到。
而 Return 模式(也叫消息回退机制) 的核心就是:让 Broker 把 “路由失败的消息”主动返回给生产者,生产者可以在回调中处理这些失败消息(重试、记录日志、丢死信队列等)。
Return 模式的核心条件
要触发 Return 回调,必须同时满足 3 个条件:
- 消息成功到达 Broker(Confirm 模式会返回 ack=true);
- 消息无法路由到任何队列(无匹配的 BindingKey/RoutingKey);
- 生产者开启 mandatory=true(关键!告诉 Broker“路由失败时不要丢弃,返回给我”)。
完整实现 Return 模式的步骤如下:
- 配置文件开启 Return 模式
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| spring: rabbitmq: host: localhost port: 5672 username: guest password: guest virtual-host: / publisher-confirm-type: correlated publisher-returns: true template: mandatory: true
|
- 实现 ReturnsCallback 回调(处理路由失败的消息)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
| import org.springframework.amqp.core.ReturnedMessage; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.stereotype.Component;
@Component public class MessageReturnCallback implements RabbitTemplate.ReturnsCallback {
@Override public void returnedMessage(ReturnedMessage returnedMessage) { String exchange = returnedMessage.getExchange(); String routingKey = returnedMessage.getRoutingKey(); String message = new String(returnedMessage.getMessage().getBody()); int replyCode = returnedMessage.getReplyCode(); String replyText = returnedMessage.getReplyText();
System.out.println("===== 消息路由失败 ====="); System.out.println("交换机:" + exchange); System.out.println("路由键:" + routingKey); System.out.println("消息内容:" + message); System.out.println("返回码:" + replyCode); System.out.println("失败原因:" + replyText);
retrySendMessage(exchange, routingKey, message); }
private void retrySendMessage(String exchange, String routingKey, String message) { System.out.println("开始重试发送路由失败的消息:" + message); } }
|
- 绑定 Return 回调到 RabbitTemplate
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Configuration;
import javax.annotation.PostConstruct;
@Configuration public class RabbitMQConfig {
@Autowired private RabbitTemplate rabbitTemplate;
@Autowired private MessageConfirmCallback messageConfirmCallBack; @Autowired private MessageReturnCallback messageReturnCallback;
@PostConstruct public void initRabbitTemplate() { rabbitTemplate.setConfirmCallback(messageConfirmCallBack);
rabbitTemplate.setReturnsCallback(messageReturnCallback);
rabbitTemplate.setMandatory(true); } }
|
- 测试 Return 模式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.support.CorrelationData; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController;
import java.util.UUID;
@RestController public class TestController {
@Autowired private RabbitTemplate rabbitTemplate;
@GetMapping("/sendErrorMsg") public String sendErrorMsg() { String exchange = "test.exchange"; String routingKey = "error.key"; String message = "测试路由失败的消息"; CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend(exchange, routingKey, message, correlationData); return "消息发送请求已提交"; } }
|
Return 模式 vs Confirm 模式(核心区别)
| 模式 |
触发时机 |
核心作用 结果判断 |
| Confirm 模式 |
消息到达 / 未到达 Broker 时触发 确认消息是否到达 Broker |
ack=true → 到达;false → 未到达 |
| Return 模式 |
消息到达 Broker 但路由失败时触发 确认消息是否路由到队列 |
触发回调 → 路由失败;不触发 → 路由成功 |
3. 消费者手动确认
手动 ACK 的核心是:
- 配置文件开启手动确认模式;
- 利用 @RabbitListener 注解监听队列,方法参数注入 Channel 和 Message;
- 封装通用的 ACK/NACK 工具方法,简化业务代码;
- 内置重试次数控制,避免死循环。
完整封装实现
- 基础配置(application.yml)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| spring: rabbitmq: host: localhost port: 5672 username: guest password: guest virtual-host: / listener: simple: acknowledge-mode: manual prefetch: 10 concurrency: 1 max-concurrency: 5
rabbitmq: consumer: max-retry-count: 3
|
- 通用 ACK 工具类(封装确认逻辑)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
| import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.stereotype.Component;
import java.io.IOException;
@Component public class RabbitAckUtil {
public void ack(Channel channel, Message message) throws IOException { long deliveryTag = message.getMessageProperties().getDeliveryTag(); channel.basicAck(deliveryTag, false); System.out.println("消息确认成功,deliveryTag:" + deliveryTag); }
public void nack(Channel channel, Message message, boolean requeue) throws IOException { long deliveryTag = message.getMessageProperties().getDeliveryTag(); channel.basicNack(deliveryTag, false, requeue); String tip = requeue ? "消息重新入队(重试)" : "消息丢弃(入死信队列)"; System.out.println(tip + ",deliveryTag:" + deliveryTag); }
public int getRetryCount(Message message) { Object retryCount = message.getMessageProperties().getHeaders().get("retryCount"); return retryCount == null ? 0 : (Integer) retryCount; }
public void addRetryCount(Message message) { int currentCount = getRetryCount(message); message.getMessageProperties().getHeaders().put("retryCount", currentCount + 1); } }
|
- 业务消费者(基于 Spring 注解封装)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63
| import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component;
import java.io.IOException;
@Component public class SpringBootManualAckConsumer {
@Autowired private RabbitAckUtil rabbitAckUtil;
@Value("${rabbitmq.consumer.max-retry-count:3}") private int maxRetryCount;
@RabbitListener(queues = "spring.boot.manual.ack.queue") public void consumeMessage(String msg, Channel channel, Message message) throws IOException { String msgId = message.getMessageProperties().getMessageId(); if (msgId == null) { msgId = message.getMessageProperties().getHeaders().get("spring_returned_message_correlation").toString(); }
try { System.out.println("处理业务消息:" + msg + ",msgId:" + msgId);
rabbitAckUtil.ack(channel, message);
} catch (Exception e) { System.err.println("消息处理失败,msgId:" + msgId + ",异常:" + e.getMessage()); int currentRetryCount = rabbitAckUtil.getRetryCount(message); if (currentRetryCount < maxRetryCount) { rabbitAckUtil.addRetryCount(message); rabbitAckUtil.nack(channel, message, true); } else { rabbitAckUtil.nack(channel, message, false); System.err.println("重试次数耗尽,msgId:" + msgId + ",已丢死信队列"); } } } }
|
- 生产者(用于测试)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;
import java.util.UUID;
@Component public class TestProducer {
@Autowired private RabbitTemplate rabbitTemplate;
public void sendMessage(String msg) { String msgId = UUID.randomUUID().toString(); rabbitTemplate.convertAndSend( "spring.boot.manual.ack.exchange", "spring.boot.manual.ack.key", msg, message -> { message.getMessageProperties().setMessageId(msgId); return message; } ); System.out.println("消息发送成功,msgId:" + msgId + ",内容:" + msg); } }
|