文章目录
分布式事务系列文章 |
---|
初探分布式事务:扫盲分布式事务的基础概念和理论知识点 |
图解分布式事务中的2PC与Seata方案 |
案例驱动学习:轻松理解TCC分布式事务 |
消息队列与分布式事务:探讨不同MQ如何实现分布式事务的可靠消息传递 |
MQ最大努力通知VS可靠性消息一致性:分布式事务中的区别与应用比较 |
分布式事务方案比较:选择适合你的分布式事务策略! |
文章导图
什么是可靠消息最终一致性
可靠消息最终一致性方案是指当事务发起方执行完成本地事务后并发出一条消息,事务参与方(消息消费者)一定能够接收消息并处理事务成功,此方案强调的是只要消息发给事务参与方最终事务要达到一致。
此方案是利用消息中间件完成,如下图:
事务发起方(消息生产方)将消息发给消息中间件,事务参与方从消息中间件接收消息,事务发起方和消息中间件之间,事务参与方(消息消费方)和消息中间件之间都是通过网络通信,由于网络通信的不确定性会导致分布式事务问题。
需要解决的三个问题
可靠消息最终一致性方案要解决以下几个问题:
1.本地事务与消息发送的原子性问题
本地事务与消息发送的原子性问题即:事务发起方在本地事务执行成功后消息必须发出去,否则就丢弃消息。即实现本地事务和消息发送的原子性,要么都成功,要么都失败。本地事务与消息发送的原子性问题是实现可靠消息最终一致性方案的关键问题。
先来尝试下这种操作,先发送消息,再操作数据库:
begin transaction;
//1.发送MQ
//2.数据库操作
commit transation;
这种情况下无法保证数据库操作与发送消息的一致性,因为可能发送消息成功,数据库操作失败。
你立马想到第二种方案,先进行数据库操作,再发送消息:
begin transaction;
//1.数据库操作
//2.发送MQ
commit transation;
这种情况下貌似没有问题,如果发送MQ消息失败,就会抛出异常,导致数据库事务回滚。但如果是超时异常,数据库回滚,但MQ其实已经正常发送了,同样会导致不一致。
2、事务参与方接收消息的可靠性
事务参与方必须能够从消息队列接收到消息,如果接收消息失败可以重复接收消息。
3、消息重复消费的问题
由于网络的存在,若某一个消费节点超时但是消费成功,此时消息中间件会重复投递此消息,就导致了消息的重复消费。
要解决消息重复消费的问题就要实现事务参与方的方法幂等性。
解决方案
上面讨论了可靠消息最终一致性事务方案需要解决的问题,本节讨论具体的解决方案。
本地消息表方案
本地消息表这个方案最初是eBay提出的,此方案的核心是通过本地事务保证数据业务操作和消息的一致性,然后通过定时任务将消息发送至消息中间件,待确认消息发送给消费方成功再将消息删除。
下面以注册送积分为例来说明:
下例共有两个微服务交互,用户服务和积分服务,用户服务负责添加用户,积分服务负责增加积分。
交互流程如下:
1、用户注册
用户服务在本地事务新增用户和增加 ”积分消息日志“。(用户表和消息表通过本地事务保证一致)
下边是伪代码
begin transaction;
//1.新增用户
//2.存储积分消息日志
commit transation;
这种情况下,本地数据库操作与存储积分消息日志处于同一个事务中,本地数据库操作与记录消息日志操作具备原子性。
2、定时任务扫描日志
如何保证将消息发送给消息队列呢?
经过第一步消息已经写到消息日志表中,可以启动独立的线程,定时对消息日志表中的消息进行扫描并发送至消息中间件,在消息中间件反馈发送成功后删除该消息日志,否则等待定时任务下一周期重试。
3、消费消息
如何保证消费者一定能消费到消息呢?
这里可以使用MQ的ack(即消息确认)机制,消费者监听MQ,具体可以分为两种情况(以springboot工程为例):
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: manual # 手动ack-manual
# acknowledge-mode: auto #自动ack-auto
retry:
enabled: true
max-attempts: 3 # 一定要设置最大重试次数,不然发生异常会一直重试
-
手动ack: 如果消费者接收到消息并且业务处理完成后向MQ发送ack(即消息确认),此时说明消费者正常消费消息完成,MQ将不再向消费者推送消息;如果发生异常,则进行
channel.basicNack()
或channel.basicReject()
方法拒绝消息, 同时要注意void basicNack(long deliveryTag, boolean multiple, boolean requeue)
最后一个requeue=true
,才会重新入队进行消费@RabbitListener(queues = RabbitMqConfig.QUEUE_NAME1) public void handleMessage(Channel channel, Message message){ // 处理消息 try { System.out.println("FirstConsumer {} handleMessage :"+message); //业务都处理成功了,手动进行channel.basicAck代表消费成功 channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); } catch (IOException e) { //使用 `channel.basicNack()` 或 `channel.basicReject()` 方法拒绝消息,且`requeue=true`让 RabbitMQ 将消息重新放回队列,供之后重试 channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true); } }
在手动确认模式下,如果消费者没有发送 ack,消息会保留在队列中(在控制台界面可以看到一条unack的标识),等待消费者再次请求。如果消费者希望重试失败的处理,它需要实现自己的重试逻辑。例如,可以在捕获异常后,使用
channel.basicNack()
或channel.basicReject()
方法拒绝消息,且requeue=true
让 RabbitMQ 将消息重新放回队列,供之后重试
- 自动ack: 在这种模式下,如果消费者在处理消息时没有任何异常,会自动ack代表消费成功;如果抛出异常,RabbitMQ 会将消息重新放回队列进行重试。
由于消息会重复投递,积分服务的”增加积分“功能需要实现幂等性。
RocketMQ事务消息方案
RocketMQ 是一个来自阿里巴巴的分布式消息中间件,于 2012 年开源,并在 2017 年正式成为 Apache 顶级项目。据了解,包括阿里云上的消息产品以及收购的子公司在内,阿里集团的消息产品全线都运行在 RocketMQ 之上,并且最近几年的双十一大促中,RocketMQ 都有抢眼表现。Apache RocketMQ 4.3之后的版本正式支持事务消息,为分布式事务实现提供了便利性支持。
RocketMQ 事务消息设计则主要是为了解决 Producer 端的消息发送与本地事务执行的原子性问题,RocketMQ 的设计中 broker 与 producer 端的双向通信能力,使得 broker 天生可以作为一个事务协调者存在;而 RocketMQ 本身提供的存储机制为事务消息提供了持久化能力;RocketMQ 的高可用机制以及可靠消息设计则为事务消息在系统发生异常时依然能够保证达成事务的最终一致性。
在RocketMQ 4.3后实现了完整的事务消息,实际上其实是对本地消息表的一个封装,将本地消息表移动到了MQ内部,解决 Producer 端的消息发送与本地事务执行的原子性问题。
事务消息交互流程
- 生产者将消息发送至Apache RocketMQ服务端。
- Apache RocketMQ服务端将消息持久化成功之后,向生产者返回Ack确认消息已经发送成功,此时消息被标记为"暂不能投递",这种状态下的消息即为半事务消息。
- 生产者开始执行本地事务逻辑。
- 生产者根据本地事务执行结果向服务端提交二次确认结果(Commit或是Rollback),服务端收到确认结果后处理逻辑如下:
- 二次确认结果为Commit:服务端将半事务消息标记为可投递,并投递给消费者。
- 二次确认结果为Rollback:服务端将回滚事务,不会将半事务消息投递给消费者。
- 在断网或者是生产者应用重启的特殊情况下,若服务端未收到发送者提交的二次确认结果,或服务端收到的二次确认结果为Unknown未知状态,经过固定时间后,服务端将对消息生产者即生产者集群中任一生产者实例发起消息回查。 说明 服务端回查的间隔时间和最大回查次数,请参见参数限制。
- 生产者收到消息回查后,需要检查对应消息的本地事务执行的最终结果。
- 生产者根据检查到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤4对半事务消息进行处理。
事务消息生命周期
- 初始化:半事务消息被生产者构建并完成初始化,待发送到服务端的状态。
- 事务待提交:半事务消息被发送到服务端,和普通消息不同,并不会直接被服务端持久化,而是会被单独存储到事务存储系统中,等待第二阶段本地事务返回执行结果后再提交。此时消息对下游消费者不可见。
- 消息回滚:第二阶段如果事务执行结果明确为回滚,服务端会将半事务消息回滚,该事务消息流程终止。
- 提交待消费:第二阶段如果事务执行结果明确为提交,服务端会将半事务消息重新存储到普通存储系统中,此时消息对下游消费者可见,等待被消费者获取并消费。
- 消费中:消息被消费者获取,并按照消费者本地的业务逻辑进行处理的过程。 此时服务端会等待消费者完成消费并提交消费结果,如果一定时间后没有收到消费者的响应,Apache RocketMQ会对消息进行重试处理。具体信息,请参见消费重试。
- 消费提交:消费者完成消费处理,并向服务端提交消费结果,服务端标记当前消息已经被处理(包括消费成功和失败)。 Apache RocketMQ默认支持保留所有消息,此时消息数据并不会立即被删除,只是逻辑标记已消费。消息在保存时间到期或存储空间不足被删除前,消费者仍然可以回溯消息重新消费。
- 消息删除:Apache RocketMQ按照消息保存机制滚动清理最早的消息数据,将消息从物理文件中删除。更多信息,请参见消息存储和清理机制。
实际流程理解
RocketMQ
为方便理解我们还以注册送积分的例子来描述 整个流程。
Producer 即MQ发送方,本例中是用户服务,负责新增用户。MQ订阅方即消息消费方,本例中是积分服务,负责新增积分。
1、Producer 发送事务消息
Producer (MQ发送方)发送事务消息至MQ Server,MQ Server将消息状态标记为Prepared(预备状态),注意此时这条消息消费者(MQ订阅方)是无法消费到的。
本例中,Producer 发送 ”增加积分消息“ 到MQ Server。
2、MQ Server回应消息发送成功
MQ Server接收到Producer 发送给的消息则回应发送成功表示MQ已接收到消息。
3、Producer 执行本地事务
Producer 端执行业务代码逻辑,通过本地数据库事务控制。
本例中,Producer 执行添加用户操作。
4、消息投递
若Producer 本地事务执行成功则自动向MQServer发送commit消息,MQ Server接收到commit消息后将”增加积分消息“ 状态标记为可消费,此时MQ订阅方(积分服务)即正常消费消息;
若Producer 本地事务执行失败则自动向MQServer发送rollback消息,MQ Server接收到rollback消息后 将删除”增加积分消息“ 。
MQ订阅方(积分服务)消费消息,消费成功则向MQ回应ack,否则将重复接收消息。这里ack默认自动回应,即程序执行正常则自动回应ack。
5、事务回查
如果执行Producer端本地事务过程中,执行端挂掉,或者超时,MQ Server将会不停的询问同组的其他 Producer 来获取事务执行状态,这个过程叫事务回查。MQ Server会根据事务回查结果来决定是否投递消息。
以上主干流程已由RocketMQ实现,对用户侧来说,用户需要分别实现本地事务执行以及本地事务回查方法,因此只需关注本地事务的执行状态即可。
- RoacketMQ提供RocketMQLocalTransactionListener接口:
public interface RocketMQLocalTransactionListener {
/** * - 发送prepare消息成功此方法被回调,该方法用于执行本地事务 * - @param msg 回传的消息,利用transactionId即可获取到该消息的唯一Id * - @param arg 调用send方法时传递的参数,当send时候若有额外的参数可以传递到send方法中,这里能获取到 * - @return 返回事务状态,COMMIT:提交 ROLLBACK:回滚 UNKNOW:回调 */
RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg);
/** * - @param msg 通过获取transactionId来判断这条消息的本地事务执行状态 * - @return 返回事务状态,COMMIT:提交 ROLLBACK:回滚 UNKNOW:回调 */
RocketMQLocalTransactionState checkLocalTransaction(Message msg);
}
- 发送事务消息:以下是RocketMQ提供用于发送事务消息的API:
TransactionMQProducer producer = new TransactionMQProducer("ProducerGroup");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
//设置TransactionListener实现
producer.setTransactionListener(transactionListener);
//发送事务消息
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
其他MQ通用方案
本地库创建一个消息表(t_msg)
create table if not exists t_msg
(
id varchar(32) not null primary key comment '消息id',
body_json text not null comment '消息体,json格式',
status smallint not null default 0 comment '消息状态,0:待投递到mq,1:投递成功,2:投递失败',
fail_msg text comment 'status=2 时,记录消息投递失败的原因',
fail_count int not null default 0 comment '已投递失败次数',
send_retry smallint not null default 1 comment '投递MQ失败了,是否还需要重试?1:是,0:否',
next_retry_time datetime comment '投递失败后,下次重试时间',
create_time datetime comment '创建时间',
update_time datetime comment '最近更新时间',
key idx_status (status)
) comment '本地消息表'
事务消息投递的过程
- step1:开启本地事务
- step2:执行本地业务
- step3:消息表t_msg写入记录,status为0(待投递到MQ)
- step4:提交本地事务
- step5:若事务提交成功,则投递消息到MQ,然后将t_msg中的status置为1(投递成功);本地事务失败的情况不用考虑,此时消息记录也没有写到db中
知识点拓展:Spring事务同步器
知识点拓展: 如何判断事务是否提交成功呢?这就涉及Spring的事务同步器了
TransactionSynchronizationManager.registerSynchronization
是 Java Spring 框架中的一个方法,它用于注册事务同步处理器(TransactionSynchronization
)。事务同步处理器是 Spring 事务管理的一个特性,允许你在事务的边界内执行一些操作,无论是事务提交还是回滚。
具体来说,TransactionSynchronizationManager
负责管理事务同步操作的注册和执行。当你调用 registerSynchronization
方法时,你可以传入一个实现了 TransactionSynchronization
接口的实例。这个实例定义了在事务的不同阶段(如开始、提交、回滚)应该执行哪些操作。
以下是 TransactionSynchronization
接口中定义的一些方法,这些方法可以在事务的不同生命周期点被调用:
beforeCommit(boolean readOnly)
: 在事务提交之前调用,如果事务是只读的,则readOnly
参数为true
。beforeCompletion()
: 在事务实际提交或回滚之前调用,用于执行清理操作。afterCommit()
: 如果事务提交成功,则调用此方法。afterCompletion(int status)
: 在事务完成后调用,无论事务是提交还是回滚。status
参数指示事务的状态:STATUS_COMMITTED
表示提交成功,STATUS_ROLLED_BACK
表示已回滚。
/** * 若有事务,则在事务执行完毕之后,进行投递 * * spring事务扩展点,通过TransactionSynchronizationManager.registerSynchronization添加一个事务同步器TransactionSynchronization, * 事务执行完成之后,不管事务成功还是失败,都会调用TransactionSynchronization#afterCompletion 方法 */
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
@Override
public void afterCompletion(int status) {
/** * 代码走到这里时,事务已经完成了(可能是回滚了、或者是提交了) * 看下本地消息记录是否存在?如果存在,说明事务是成功的,业务是执行成功的,则投递消息 & 并将消息状态置为成功 */
//为了提升性能:事务消息的投递消息这里异步去执行,即使失败了,会有补偿JOB进行重试
mqExecutor.execute(() -> deliverMsg(msgPOList));
}
});
异常情况
step5失败了,其他步骤都成功,此时业务执行成功,但是消息投递失败了,此时需要有个job来进行补偿,对于投递失败的消息进行重试。
消息投递补偿job
这个job负责从本地t_msg表中查询出状态为0记录或者失败需要重试的记录,然后进行重新投递到MQ。
对于投递失败的,采用衰减的方式进行重试,比如第1次失败了,则10秒后,继续重试,若还是失败,则再过20秒,再次重试,需要设置一个最大重试次数,最终还是投递失败,则需要告警+人工干预。
核心代码讲解
发送事务消息
这里按照上面的消息投递流程,在提交完本地事务以后,通过
TransactionSynchronizationManager.registerSynchronization
添加一个事务同步器TransactionSynchronization,这样事务执行完成之后,不管事务成功还是失败,都会调用TransactionSynchronization#afterCompletion
方法,然后我们在里面处理对应的逻辑即可:
- 看下本地消息记录是否存在?如果存在且状态还是未投递,说明事务是成功的,业务是执行成功的,则投递消息 & 并将消息状态置为成功
- 如果本地消息记录为空,说明本地事务回滚了,那么消息表中的记录也会自动事务回滚,不需要额外处理
@Transactional
public void sendMessage(String bodyJson) {
Message message = new Message();
message.setMessageId(UUID.randomUUID().toString());
message.setBodyJson(bodyJson);
message.setCreateTime(LocalDateTime.now());
message.setUpdateTime(LocalDateTime.now());
// Step 1: 开启本地事务
// Step 2: 执行本地业务
// Step 3: 消息表写入记录,status为0
message.setStatus(0);
messageRepository.save(message);
// Step 4: 提交本地事务
/**
* 若有事务,则在事务执行完毕之后,进行投递
*
* spring事务扩展点,通过TransactionSynchronizationManager.registerSynchronization添加一个事务同步器TransactionSynchronization,
* 事务执行完成之后,不管事务成功还是失败,都会调用TransactionSynchronization#afterCompletion 方法
*/
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
@Override
public void afterCompletion(int status) {
/**
* 代码走到这里时,事务已经完成了(可能是回滚了、或者是提交了)
* 看下本地消息记录是否存在?如果存在,说明事务是成功的,业务是执行成功的,则投递消息 & 并将消息状态置为成功
*/
Message msg=messageRepository.findById(message.getMessageId());
if (msg != null && msg.getStatus() == 0) {
// Step 5: 投递消息到MQ
rabbitTemplate.convertAndSend("exchangeName", "routingKey", bodyJson);
// 更新消息状态
msg.setStatus(1);
msg.setUpdateTime(LocalDateTime.now());
msg.save(message);
}
//如果msg==null,说明本地事务回滚了,那么消息表中的记录也会自动事务回滚,不需要额外处理
}
});
}
定时任务补偿处理失败消息
这里用定时任务扫描出状态为0或者status=2且retry=1,并且他们的重试时间在未来2分钟内(2分钟是为了避免一次性查出所有对数据库造成较大压力)要重试的消息:
- 如果发送成功,将状态设置为1代表发送成功了
- 如果发送异常,则将状态设置为2代表发送失败,同时根据已经重试次数是否小于5次(可以根据自己业务设定)设置是否需要继续重试
@Scheduled(cron = "*/60 * * * * ?") // 每1分钟执行一次
public void retryFailedMessages() {
// 查询状态为0或者status=2且retry=1且重试时间在未来2分钟内要重试的消息,sql是这样的
//select m from message m where (m.status = 0 and m.nextRetryTime<=当前时间 + 2分钟) or (m.status = 2 and m.sendRetry = true and m.nextRetryTime<=当前时间 + 2分钟)
List<Message> messages = messageRepository.findMessagesToSend(0);
for (Message message : messages) {
// 尝试重新发送
try {
rabbitTemplate.convertAndSend("exchangeName", "routingKey", message.getBodyJson());
message.setStatus(1); // 设置为已发送成功
messageRepository.save(message);
} catch (Exception e) {
// 处理失败的情况
message.setFailCount(message.getFailCount() + 1);
message.setFailMsg(e.getMessage());
// 设置为发送失败
message.setStatus(2);
message.setNextRetryTime(LocalDateTime.now().plusSeconds(10 * message.getFailCount()));
message.setSendRetry(message.getFailCount() < 5); // 最多重试5次
messageRepository.save(message);
}
}
}
参考文章:
黑马分布式事务
RocketMQ官网
文章评论