事务传播机制是指:方法嵌套调用过程中事务的传播方式。
- Transactional注解使用于类名、方法名,任何存在该注解的类都会被Cglib代理增强。
- Cglib代理增强的核心功能之一:必须修改数据库连接自动提交的特性。
- 传播机制存在的具体形式:
- Transactional注解的方法内部存在多个SQL。
- Transactional注解的方法内部存在多个Transactional注解的方法互相嵌套。
以上两种传播机制的显著区别:第二种方式存在目标类的代理对象互相调用。
通俗点讲:
- 事务核心是将连接自动提交动作转移至用户手中,其中本地线程提供了这种可能性。
- Mybatis真正获取连接执行目标SQL时,将借助DataSourceUtils类从事务同步管理的本地线程变量中尝试获取数据库连接。
- Transactional注解的一个核心作用就是在执行目标方法之前提前建立数据库连接,关闭自动提交特性,并与当前线程建立绑定关系。
事务悬挂:
- 首次执行存在事务的方法是不存在悬挂功能。
- 只有REQUIRES_NEW、NOT_SUPPORTED两种传播方式存在悬挂现象。
- 悬挂核心是接触数据库连接与线程的绑定关系。
1.常见事务传播机制
如果首个事务的传播机制为:SUPPORTS or NOT_SUPPORTED,则最终创建的是空事务,即DefaultTransactionStatus实例中不存在DataSourceTransactionObject类型的属性。
如果首个事务的传播机制为:REQUIRED or REQUIRES_NEW or NESTED,则生成有效的事务。
如果后续存在SUPPORTS类型事务,则会共用DataSourceTransactionObject对象,但是newTransaction属性为false,所以在当前事务下是不会提交或者回滚的。当然如果没有旧事务则当前事务没有任何意义,还是以无事务方式运行。
如果后续存在NOT_SUPPORTED类型事务,则会将旧事务悬挂处理,并且生成一个空事务,其结果就是重新生成一个自动提交特性的数据库连接。
如果后续存在REQUIRES_NEW类型事务,则会将旧事务悬挂处理,并且生成一个新的事务,当前新事务完全符合自动提交 or 回滚的特性。旧事务出现没法影响该新事务,但是新事务出现异常会触发旧事务回滚处理。
如果后续存在MANDATORY类型事务,则会共用DataSourceTransactionObject对象,但是newTransaction属性为false,所以在当前事务下是不会提交或者回滚的。
如果行数据被更新的事务【条件为任何字段--不管该字段是否存在索引】并没有提交,则其他事务【条件为任何字段--不管该字段是否存在索引】只能阻塞等待当前事务结束方可继续更新。
1.1.REQUIRED
Support a current transaction, create a new one if none exists。
1.2.SUPPORTS
Support a current transaction, execute non-transactionally if none exists。
如果起始事务传播类型选择SUPPORTS,则本质还是无事务运行。
如果下游方法存在MANDATORY事务类型,则会抛出异常。
1.3.MANDATORY
Support a current transaction, throw an exception if none exists。
mandatory:强制性的。
强制要求当前存在事务,并自动加入到当前事务控制范围内。
1.4.REQUIRES_NEW
Create a new transaction, and suspend the current transaction if one exists。
旧事务悬挂之后对当前SQL的操作建立新的事务,而且新旧事务之间是没有任何影响的。新旧事务只对自己目标方法发挥作用。
1.5.NOT_SUPPORTED
Execute non-transactionally, suspend the current transaction if one exists。
suspend:悬挂。
旧事务悬挂之后,当前SQL的操作不受任何事务控制,即数据库连接都是自动提交的。并且对旧事务没有任何影响,旧事务依然对自己关注的目标方法发挥作用。
1.6.NEVER
Execute non-transactionally, throw an exception if a transaction exists。
1.7.NESTED
Execute within a nested transaction if a current transaction exists。
2.源码分析
控制器调用目标事务方法主要流程:
- 将事务相关属性抽象为TransactionInfo实例。
- 执行目标方法,如果目标方法中存在其他事务方法则继续走代理流程。
- 根据目标方法执行结果选择全局提交或者回滚。
public abstract class TransactionAspectSupport{
protected Object invokeWithinTransaction(Method method,Class<?> targetClass,InvocationCallback invocation){
TransactionAttributeSource tas = getTransactionAttributeSource();
final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
final PlatformTransactionManager tm = determineTransactionManager(txAttr);
final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {
// 创建事务相关的实例
TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
Object retVal;
try {
// 调用用户自定义的目标方法,当然如果目标方法被Cglib代理则再次由invokeWithinTransaction方法触发目标方法执行
retVal = invocation.proceedWithInvocation();
}catch (Throwable ex) {
// 回滚策略
completeTransactionAfterThrowing(txInfo, ex);
throw ex;
}
// 最终提交策略
commitTransactionAfterReturning(txInfo);
return retVal;
}
}
protected TransactionInfo createTransactionIfNecessary(PlatformTransactionManager tm,TransactionAttribute txAttr,String ji) {
...
TransactionStatus status = null;
if (txAttr != null) {
if (tm != null) {
status = tm.getTransaction(txAttr);
}
}
return prepareTransactionInfo(tm, txAttr, ji, status);
}
}
2.1.抽象事务属性为TransactionInfo
抽象事务流程主要包括:
- 创建DataSourceTransactionObject实例,并首次通过事务同步管理器获取ConnectionHolder。
- 判断事务是否存在:ConnectionHolder存在,并且其属性TransactionActive为true。如果是首次执行事务方法则返回false。
- 事务传播机制之mandatory:前提条件是事务必须存在。即当前方法的事务机制如果是mandatory则其调用方必须存在事务中。
- 针对REQUIRED、REQUIRES_NEW、NESTED三种事务传播机制则选择首次创建数据库连接,并且与线程建立绑定关系,最后初始化事务同步器Synchronization。
- 创建空的transaction:没有实际的transaction,但是存在Synchronization。
public abstract class AbstractPlatformTransactionManager implements PlatformTransactionManager{
@Override
public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition) throws TransactionException {
// 创建 DataSourceTransactionObject
Object transaction = doGetTransaction();
boolean debugEnabled = logger.isDebugEnabled();
// 事务存在的条件
if (isExistingTransaction(transaction)) {
return handleExistingTransaction(definition, transaction, debugEnabled);
}
// 强制性事务之MANDATORY
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
throw new IllegalTransactionStateException("No existing transaction found for transaction marked with propagation 'mandatory'");
//
}else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
// 首次为 null
SuspendedResourcesHolder suspendedResources = suspend(null);
// 默认情况为 true,即新生成同步器
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
DefaultTransactionStatus status = newTransactionStatus(definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
// DataSourceTransactionManager:初始化数据库连接:ConnectionHolder
doBegin(transaction, definition);
prepareSynchronization(status, definition);
return status;
}else {
// 如果起始事务传播机制为SUPPORTS、NOT_SUPPORTED,则表明当前status是没有DataSourceTransactionObject类型的属性,即空事务
// 空事务意味着不存在ConnectionHolder
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null);
}
}
protected DefaultTransactionStatus prepareTransactionStatus(TransactionDefinition definition, Object transaction, boolean newTransaction,
boolean newSynchronization, boolean debug, Object suspendedResources) {
DefaultTransactionStatus status = newTransactionStatus(definition, transaction, newTransaction, newSynchronization, debug, suspendedResources);
prepareSynchronization(status, definition);
return status;
}
protected DefaultTransactionStatus newTransactionStatus(
TransactionDefinition definition, @Nullable Object transaction, boolean newTransaction,
boolean newSynchronization, boolean debug, @Nullable Object suspendedResources) {
boolean actualNewSynchronization = newSynchronization &&
!TransactionSynchronizationManager.isSynchronizationActive();
return new DefaultTransactionStatus(
transaction, newTransaction, actualNewSynchronization,
definition.isReadOnly(), debug, suspendedResources);
}
protected void prepareSynchronization(DefaultTransactionStatus status, TransactionDefinition definition) {
if (status.isNewSynchronization()) {
TransactionSynchronizationManager.setActualTransactionActive(status.hasTransaction());
...
TransactionSynchronizationManager.initSynchronization();
}
}
}
2.1.1.创建DataSourceTransactionObject实例
public class DataSourceTransactionManager{
protected Object doGetTransaction() {
// 事务同步管理器 利用 database 创建数据库连接
DataSourceTransactionObject txObject = new DataSourceTransactionObject();
txObject.setSavepointAllowed(isNestedTransactionAllowed());
// 如果是首次执行事务则ConnectionHolder为null
ConnectionHolder conHolder = (ConnectionHolder) TransactionSynchronizationManager.getResource(obtainDataSource());
txObject.setConnectionHolder(conHolder, false);
return txObject;
}
}
同步事务管理器TransactionSynchronizationManager 尝试通过DataSource从当前线程获取ConnectionHolder,如果存在ConnectionHolder则表明当前线程是在事务环境中运行的。
2.1.2.判断事务存在与否
public abstract class AbstractPlatformTransactionManager implements PlatformTransactionManager{
private TransactionStatus handleExistingTransaction(TransactionDefinition definition, Object transaction){
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
// 将当前事务悬挂处理
Object suspendedResources = suspend(transaction);
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
return prepareTransactionStatus(definition, null, false, newSynchronization, debugEnabled, suspendedResources);
}
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
SuspendedResourcesHolder suspendedResources = suspend(transaction);
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
DefaultTransactionStatus status = newTransactionStatus(definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
doBegin(transaction, definition);
prepareSynchronization(status, definition);
return status;
}
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
if (!isNestedTransactionAllowed()) {
throw new NestedTransactionNotSupportedException(
"Transaction manager does not allow nested transactions by default - " +
"specify 'nestedTransactionAllowed' property with value 'true'");
}
if (useSavepointForNestedTransaction()) {
DefaultTransactionStatus status = prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
status.createAndHoldSavepoint();
return status;
}else {
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
DefaultTransactionStatus status = newTransactionStatus(definition, transaction, true, newSynchronization, debugEnabled, null);
doBegin(transaction, definition);
prepareSynchronization(status, definition);
return status;
}
}
// 如果起始事务发挥作用,此处表示 SUPPORTS 传播类型的事务,即共用当前事务
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
}
protected final SuspendedResourcesHolder suspend(@Nullable Object transaction) throws TransactionException {
if (TransactionSynchronizationManager.isSynchronizationActive()) {
// 获取到需要悬挂处理的 Synchronization,例如SqlSessionSynchronization
List<TransactionSynchronization> suspendedSynchronizations = doSuspendSynchronization();
Object suspendedResources = null;
if (transaction != null) {
suspendedResources = doSuspend(transaction);
}
...
return new SuspendedResourcesHolder(suspendedResources, suspendedSynchronizations, name, readOnly, isolationLevel, wasActive);
}else if (transaction != null) {
Object suspendedResources = doSuspend(transaction);
return new SuspendedResourcesHolder(suspendedResources);
}else {
return null;
}
}
protected Object doSuspend(Object transaction) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
// 将ConnectionHolder置为null
txObject.setConnectionHolder(null);
// 解除 连接与线程的绑定关系,并且返回被解绑的 ConnectionHolder
return TransactionSynchronizationManager.unbindResource(obtainDataSource());
}
}
事务悬挂:
- 解除DataSourceTransactionObject 与 ConnectionHolder 的绑定关系。
- 解析事务与线程的绑定关系。
- 将被解除的ConnectionHolder、TransactionSynchronization等信息重新封装为SuspendedResourcesHolder。
传播机制之NOT_SUPPORTED:事务悬挂之后直接返回当前ConnectionHolder。
传播机制之PROPAGATION_REQUIRES_NEW:事务悬挂后重新建立新的连接,跟首次新建事务逻辑基本一致。
2.1.3.事务传播机制之mandatory
2.1.4.新建事务
将事务相关属性最终抽象为DefaultTransactionStatus。
public class DataSourceTransactionManager{
protected void doBegin(Object transaction, TransactionDefinition definition) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
Connection con = null;
if (!txObject.hasConnectionHolder() || txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
Connection newCon = obtainDataSource().getConnection();
// 初始化 ConnectionHolder
txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
}
// 设置属性SynchronizedWithTransaction
txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
con = txObject.getConnectionHolder().getConnection();
...
if (con.getAutoCommit()) {
// 关闭自动提交
con.setAutoCommit(false);
}
prepareTransactionalConnection(con, definition);
txObject.getConnectionHolder().setTransactionActive(true);
if (txObject.isNewConnectionHolder()) {
// 将 ConnectionHolder & 当前线程 建立绑定关系
TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());
}
}
}
initSynchronization:
初始化事务同步管理器TransactionSynchronizationManager属性之synchronizations。
截止当前:新建的数据库连接与当前线程建立绑定关系,并且初始化synchronizations。注意此时并没有在事务同步管理器中添加ConnectionSynchronization。
2.1.5.新建空事务
2.2.执行目标方法
2.3.事务全局提交或者回滚
提交或者回滚之前都会判断当前DefaultTransactionStatus是否为新事务:
- DefaultTransactionStatus的属性newTransaction是否为true。
- DefaultTransactionStatus的属性transaction必须赋值为DataSourceTransactionObject。
2.3.1.全局提交
public abstract class TransactionAspectSupport{
protected void commitTransactionAfterReturning(@Nullable TransactionInfo txInfo) {
if (txInfo != null && txInfo.getTransactionStatus() != null) {
txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
}
}
}
public abstract class AbstractPlatformTransactionManager{
public final void commit(TransactionStatus status) throws TransactionException {
DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
if (defStatus.isLocalRollbackOnly()) {
processRollback(defStatus, false);
return;
}
if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) {
processRollback(defStatus, true);
return;
}
processCommit(defStatus);
}
private void processCommit(DefaultTransactionStatus status) throws TransactionException {
try {
boolean beforeCompletionInvoked = false;
boolean unexpectedRollback = false;
prepareForCommit(status);// 没有默认实现
triggerBeforeCommit(status);
triggerBeforeCompletion(status);
if(status.isNewTransaction()){
//事务最终获取到连接Connection直接提交
doCommit(status);
}
try {
triggerAfterCommit(status);
}finally {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
}
}
}
}
2.3.2.全局回滚
public abstract class TransactionAspectSupport{
protected void completeTransactionAfterThrowing(@Nullable TransactionInfo txInfo, Throwable ex) {
if (txInfo != null && txInfo.getTransactionStatus() != null) {
// 抛出的异常类型是否满足条件:
if (txInfo.transactionAttribute != null && txInfo.transactionAttribute.rollbackOn(ex)) {
txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus());
}else {
txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
}
}
}
}
public abstract class AbstractPlatformTransactionManager{
public final void rollback(TransactionStatus status) {
DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
processRollback(defStatus, false);
}
private void processRollback(DefaultTransactionStatus status, boolean unexpected) {
triggerBeforeCompletion(status);
if (status.isNewTransaction()) {
doRollback(status);
}
triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
}
}
3.DefaultTransactionStatus
事务属性抽象后的持有者。
public DefaultTransactionStatus(Object transaction, boolean newTransaction, boolean newSynchronization,boolean readOnly,Object suspendedResources) {
this.transaction = transaction;
this.newTransaction = newTransaction;
this.newSynchronization = newSynchronization;
this.readOnly = readOnly;
this.suspendedResources = suspendedResources;
}
transaction:DataSourceTransactionObject实例。
newTransaction:只要符合 #2.1.4章节 的条件就表明为新建的事务。
newSynchronization:同步器只分为两种SYNCHRONIZATION_ALWAYS、SYNCHRONIZATION_NEVER,默认情况下始终为SYNCHRONIZATION_ALWAYS。所以通常情况下该值为true。
suspendedResources:被悬挂的事务。
4.DataSourceTransactionObject
该对象有一个核心属性newConnectionHolder,即真正数据库连接Connection的持有者。
新建的连接被DataSourceTransactionObject持有,并借助事务同步管理器TransactionSynchronizationManager将ConnectionHolder绑定在当前线程上。
文章评论