RabbitMQ——发布确认模式
发布确认模式(Publish-Confirm)是一种确保消息可靠投递的重要机制,用于确保消息已经被正确地发送到 RabbitMQ 服务器。
RabbitMQ 发布确认模式的三种形式:单条确认、批量确认和异步确认。
单条发布确认
单条发布确认(Single Publisher Confirm)是最基本的确认模式,在发布一条消息后,等待服务器确认该消息是否成功接收。
缺点:每条消息都需要等待服务器的确认,可能导致性能开销较大,特别是在高并发的场景下。
Channel channel = connection.createChannel()
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 开启发布确认
channel.confirmSelect();
// 发布消息,等待服务器确认
channel.basicPublish("", QUEUE_NAME, null, "Hello, RabbitMQ!".getBytes());
// 等待服务器确认
if (channel.waitForConfirms()) {
System.out.println("Message sent successfully.");
} else {
System.out.println("Failed to send message.");
}
批量发布确认
批量发布确认模式允许在一次性确认多个消息是否成功被服务器接收,这在大量消息的场景中可以提高效率。
缺点:当一批消息中有一条消息发送失败时,整个批量确认失败,可能需要重新发送整批消息且不知道是哪条消息失败。
public class BatchPublisherConfirmExample {
private static final String QUEUE_NAME = "my_queue";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 开启发布确认
channel.confirmSelect();
// 发布多条消息
for (int i = 0; i < 100; i++) {
String message = "Message " + i;
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
}
// 批量确认
if (confirmMessages(channel)) {
System.out.println("All messages sent successfully.");
} else {
System.out.println("Failed to send one or more messages.");
}
}
private static boolean confirmMessages(Channel channel) throws InterruptedException, TimeoutException {
// 批量确认消息的序号
final SortedSet<Long> unconfirmedSet = Collections.synchronizedSortedSet(new TreeSet<>());
// 添加确认监听器
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
if (multiple) {
// 批量确认时,将小于等于 deliveryTag 的所有消息标记为已确认
unconfirmedSet.headSet(deliveryTag + 1).clear();
} else {
// 单个确认时,直接移除该消息标记
unconfirmedSet.remove(deliveryTag);
}
}
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
// 处理未确认的消息,可选择重发或进行其他处理
System.out.println("Message with delivery tag " + deliveryTag + " not confirmed.");
}
});
// 等待服务器确认,设置超时时间为5000毫秒
if (channel.waitForConfirms(5000)) {
return true;
} else {
// 处理未确认的消息,可选择重发或进行其他处理
System.out.println("Timeout: Some messages not confirmed.");
return false;
}
}
}
异步发布确认
异步确认是一种更灵活的确认方式,通过回调函数处理消息的确认和未确认事件。这种方式可以在异步场景中更好地处理消息的状态。
public class AsyncPublisherConfirmExample {
private static final String QUEUE_NAME = "my_queue";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 开启异步发布确认模式
channel.confirmSelect();
// 发布消息
for (int i = 0; i < 100; i++) {
String message = "Message " + i;
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
}
// 设置异步确认的回调函数
setAsyncConfirmCallback(channel);
}
private static void setAsyncConfirmCallback(Channel channel) {
// 使用线程安全的跳表存储未确认消息的序号
final ConcurrentNavigableMap<Long, Boolean> unconfirmedSet = new ConcurrentSkipListMap<>();
// 异步发布确认的回调函数
ConfirmCallback ackconfirmCallback = (deliveryTag, multiple) -> {
if (multiple) {
// 批量确认时,删除已确认的消息
unconfirmedSet.headMap(deliveryTag + 1).clear();
} else {
// 单个确认时,直接移除该消息标记
unconfirmedSet.remove(deliveryTag);
}
System.out.println("Message with delivery tag " + deliveryTag + " confirmed.");
};
// 异步发布未确认的回调函数
ConfirmCallback nackconfirmCallback = (deliveryTag, multiple) -> {
// 将消息的序号标记为未确认
unconfirmedSet.put(deliveryTag, false);
System.out.println("Message with delivery tag " + deliveryTag + " sent.");
};
channel.addConfirmListener(ackconfirmCallback, nackconfirmCallback);
}
}
文章评论