Spring 事务源码学习笔记
万维钢 -《精英日课》
为了学习而学习,大约是这么一个过程 —— 首先是撤退。你撤退到密室里,不受功名利禄影响,思考一些最根本的问题。 然后你会产生渴望。你会觉得以前的自己太幼稚了,你想要变聪明、变成好人,你想追求真理,想去谦卑地做成一些事情。 这时候因为学习让你有了“严肃”这个美德,你知道什么重要什么不重要,你就允许自己被学习所改变。你才算是一个能可靠地改变世界的人。
# 01 - Spring 事务的实现原理
所谓数据库事务的实现无非就是在 SQL 执行前自动配置一下数据库连接对象(conn
),比如关闭自动提交(conn.autocommit=false
);在 SQL 执行后自动添加提交(commit
)或者回滚(rollback
)操作,这显然就是标准的 AOP 操作。
Spring 事务的实现本质上就是向 Spring 容器中注册了一个 Advisor, 具体来说是注入了一个 BeanFactoryTransactionAttributeSourceAdvisor
Bean。
然后在 Bean 初始化之后会去寻找跟当前 Bean 相匹配的 Advisor,如果找了就使用动态代理生成一个代理对象。在 Spring 中是使用 @EnableTransactionManagement
注解来启用事务的,具体的实现流程如下:
通过在
@EnableTransactionManagement
注解中引入了TransactionManagementConfigurationSelector
类,该类向 Spring 容器中添加了 2 个 Bean:AutoProxyRegistrar
ProxyTransactionManagementConfiguration
AutoProxyRegistrar
的主要作用是向 Spring 容器中注册了一个 InfrastructureAdvisorAutoProxyCreator 的 BeanPostProcessor。 这个类会在初始化后步骤中去寻找 Advisor 类型的 Bean,并判断当前某个 Bean 是否有匹配的 Advisor,是否需要利用动态代理产生一个代理对象。ProxyTransactionManagementConfiguration
是一个配置类,它又定义了另外三个 Bean:- BeanFactoryTransactionAttributeSourceAdvisor:一个 Advisor
- AnnotationTransactionAttributeSource:相当上述 Advisor 中 的 Pointcut
- TransactionInterceptor:相当于上述 Advisor 中的 Advice
AnnotationTransactionAttributeSource 是一个工具类,它的主要作用就是获取 @Transactional 注解中的属性值,封装成 TransactionAttribute 对象。
TransactionInterceptor 就是代理逻辑,当某个类中存在 @Transactional 注解时,到时就产生一个代理对象作为 Bean,代理对象在执行某个方法时, 最终就会进入到 TransactionInterceptor 的 invoke() 方法。
# 02 - Spring 事务执行的具体流程
Spring 事务实现的主流程在 TransactionAspectSupport::invokeWithinTransaction()
方法中实现。
- 利用所配置的 PlatformTransactionManager 事务管理器新建一个数据库连接。
- 修改数据库连接的
autocommit
为 false。 - 执行
MethodInvocation.proceed()
方法,简单理解就是执行业务方法,其中就会执行 SQL。 - 如果没有抛出异常,则直接提交事务。
- 如果有异常抛出,则回滚事务。
# 03 - Spring 嵌套事务具体执行流程
假设在 a() 方法中调用了 b() 方法, 且调用 b() 方法时需要新开启一个事务,则执行过程如下:
- 首先代理对象在执行 a() 方法之前,先利用事务管理器创建一个数据库连接 connA
connA.setAutoCommit = false
- 绑定 connA 到 ThreadLocal 中
- 执行 a() 方法中的 SQL
- 执行 a() 方法的过程中调用了 b() 方法:
- 首先判断当前线程中是否已经存在一个事务了(判断当前线程的 ThreadLocal 中是否存在一个数据库连接对象,如果存在则表示已经存在一个事务了)。
- 如果当前线程已经存在一个事务了,则将该事务挂起(其实就是将数据库连接信息备份到 SuspendedResourcesHolder 对象中)。
- 使用事务管理器创建一个新的数据库连接 connB
connB.setAutoCommit = false
- 绑定 connB 到 ThreadLocal 中(并将 connA 从 ThreadLocal 中删除)
- 执行 b() 方法中的 SQL
- b() 方法逻辑执行完毕,从 ThreadLocal 中拿出数据库连接 connB 提交事务。
- 恢复挂起的数据库连接 connA, 从 SuspendedResourcesHolder 对象中把数据库连接信息拿出来重新绑定到 ThreadLocal 中。
- a() 逻辑方法执行完毕,从 ThreadLocal 中拿出数据库连接 connA 提交事务。
# 04 - 事务的挂起逻辑
# 4.1-首先需要搞清楚那些情况下需要执行事务挂起的操作:
- 当前线程中没有事务,且当前事务的传播机制是
PROPAGATION_REQUIRED
,PROPAGATION_REQUIRES_NEW
或者PROPAGATION_NESTED
,此时虽然没有事务要挂起,但是有可能需要挂起 TransactionSynchronization:if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED || def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW || def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) { // 没有事务需要挂起,不过 TransactionSynchronization 有可能需要挂起 // suspendedResources 表示当前线程被挂起的资源持有对象(数据库连接、TransactionSynchronization) SuspendedResourcesHolder suspendedResources = suspend(null); if (debugEnabled) { logger.debug("Creating new transaction with name [" + def.getName() + "]: " + def); } try { // 开启事务后,transaction 中就会有数据库连接了,并且isTransactionActive为true // 并返回 TransactionStatus对象,该对象保存了很多信息,包括被挂起的资源 return startTransaction(def, transaction, debugEnabled, suspendedResources); } catch (RuntimeException | Error ex) { resume(null, suspendedResources); throw ex; } }
- 当前线程中已经存在事务,且当前事务的传播机制是
PROPAGATION_NOT_SUPPORTED
(不支持事务),则挂起当前事务,将数据库连接对象从 ThreadLocal 中移除,这样后面的 JdbcTemplate 对象就会创建一个新的数据库连接,从而使得当前事务失效。if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) { // 把当前事务挂起,其中就会把数据库连接对象从ThreadLocal中移除 Object suspendedResources = suspend(transaction); boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS); return prepareTransactionStatus( definition, null, false, newSynchronization, debugEnabled, suspendedResources); }
- 当前线程中已经存在事务,且当前事务的传播机制是
PROPAGATION_REQUIRES_NEW
(新开启事务),则挂起当前事务,并新建一个事务,执行新的事务,等新事务执行完成之后再恢复之前的事务继续执行。if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) { SuspendedResourcesHolder suspendedResources = suspend(transaction); try { return startTransaction(definition, transaction, debugEnabled, suspendedResources); } catch (RuntimeException | Error beginEx) { resumeAfterBeginException(transaction, suspendedResources, beginEx); throw beginEx; } }
# 4.2-事务挂起的具体流程
- 清空并返回当前线程中所有的 TransactionSynchronization 对象(挂起 TransactionSynchronization)。
- 挂起事务,调用
doSuspend()
方法, 把 DataSourceTransactionObject 中的 Connection 清空,解绑 ThreadLocal 中的数据库连接,并返回数据库连接 Connection 对象。protected Object doSuspend(Object transaction) { DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction; txObject.setConnectionHolder(null); return TransactionSynchronizationManager.unbindResource(obtainDataSource()); }
- 清空当前线程中关于 TransactionSynchronizationManager 的设置,并将原有配置信息连同数据库连接对象一起保存到挂起对象 SuspendedResourcesHolder 中并返回该对象。
上述流程的具体实现逻辑代码在 AbstractPlatformTransactionManager::suspend() 方法中。
# 05 - 事务的提交逻辑
- 如果当前事务的状态是已经完成,则抛出异常。
- 如果程序员在本地代码设置了强制回滚,则直接执行回滚逻辑。
- 如果 TransactionManager 设置了 globalRollbackOnParticipationFailure 为 false, 且此事务在之前是否设置了需要回滚,则执行回滚逻辑。
- 执行事务同步回调方法(TransactionSynchronization 中的方法),如果当前事务之前设置了 Save Point 则先释放。
- 执行
doCommit()
方法,提交事务。
主要流程代码在 AbstractPlatformTransactionManager::commit() 方法中。
public final void commit(TransactionStatus status) throws TransactionException {
if (status.isCompleted()) {
throw new IllegalTransactionStateException(
"Transaction is already completed - do not call commit or rollback more than once per transaction");
}
DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
// 可以通过 TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();来设置
// 事务本来是可以要提交的,但是可以强制回滚
if (defStatus.isLocalRollbackOnly()) {
if (defStatus.isDebug()) {
logger.debug("Transactional code has requested rollback");
}
processRollback(defStatus, false);
return;
}
// 判断此事务在之前是否设置了需要回滚,跟globalRollbackOnParticipationFailure有关
if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) {
if (defStatus.isDebug()) {
logger.debug("Global transaction is marked as rollback-only but transactional code requested commit");
}
processRollback(defStatus, true);
return;
}
// 提交
processCommit(defStatus);
}
# 06 - 事务的回滚逻辑
- 首先触发 TransactionSynchronization 中的
beforeCompletion()
方法。 - 检查当前执行的方法有设置 Save Point,如果有则回滚到上一个 SavePoint 位置。
- 如果当前执行的方法是新开了一个事务,那么就直接回滚。
- 如果当前执行的方法,是公用了一个已存在的事务,而当前执行的方法抛了异常,则要根据配置判断是部分回滚还是回滚整个事务。
# 07 - 几个有用的 API
设置允许部分提交。在创建 PlatformTransactionManger 对象的时候设置(通常这个功能都不会使用):
@Bean public PlatformTransactionManager transactionManager() { DataSourceTransactionManager transactionManager = new DataSourceTransactionManager(); transactionManager.setDataSource(dataSource()); // 允许部分回滚: // 1. 如果是同一个事务则只要有一条 SQL 执行成功了就全部提交。 // 2. 如果是不同的事务,则抛出异常的那个事务回滚,没有抛出异常的事务正常提交。 transactionManager.setGlobalRollbackOnParticipationFailure(false); return transactionManager; }
在执行事务方法的过程中有异常抛出,但是业务层对异常进行了捕获处理,然后封装成 ResultVo 对象返回,但是此时该事务还是需要回滚的,这个时候可以使用强制回滚功能。
@Transactional() public ResultVo test() { jdbcTemplate.execute("insert user values(1, 'aaaaa')"); try { // 其他业务逻辑代码... } catch (Exception e) { // 强制回滚 TransactionAspectSupport.currentTransactionStatus().setRollbackOnly(); return new ResultVo(1, null, e.getMessage()); } return null; }
Spring 提供一个事务同步回调接口 TransactionSynchronization,允许我们在事务提交前后做一些自定义的操作。
@Transactional() public void test() { jdbcTemplate.execute("insert user values(1, 'aaaaa')"); // 注册事务同步回调对象 TransactionSynchronizationManager.registerSynchronization( new TestTransactionSynchronization(TransactionSynchronizationManager.getCurrentTransactionName()) ); try { userService.a(); } catch(Exception e) {} } @Transactional(propagation = Propagation.REQUIRES_NEW) public void a() { // 注册事务同步回调对象 TransactionSynchronizationManager.registerSynchronization( new TestTransactionSynchronization(TransactionSynchronizationManager.getCurrentTransactionName()) ); jdbcTemplate.execute("insert user values(2, 'bbbbb')"); throw new NullPointerException(); } class TestTransactionSynchronization implements TransactionSynchronization { private String txName; public TestTransactionSynchronization(String txName) { this.txName = txName; } @Override public int getOrder() { System.out.println("Order: "+TransactionSynchronization.super.getOrder()); return TransactionSynchronization.super.getOrder(); } @Override public void suspend() { System.out.printf("事务 %s 挂起%n", txName); TransactionSynchronization.super.suspend(); } @Override public void resume() { System.out.printf("事务 %s 恢复%n", txName); TransactionSynchronization.super.resume(); } @Override public void flush() { System.out.printf("事务 %s flush%n", txName); TransactionSynchronization.super.flush(); } @Override public void beforeCommit(boolean readOnly) { System.out.printf("事务 %s 提交前%n", txName); TransactionSynchronization.super.beforeCommit(readOnly); } @Override public void beforeCompletion() { System.out.printf("事务 %s 完成前%n", txName); TransactionSynchronization.super.beforeCompletion(); } @Override public void afterCommit() { System.out.printf("事务 %s 提交后%n", txName); TransactionSynchronization.super.afterCommit(); } @Override public void afterCompletion(int status) { System.out.printf("事务 %s 完成后%n", txName); TransactionSynchronization.super.afterCompletion(status); } }
你会得到类似输出,清晰地展示了 2 个事务的提交过程:
事务 com.zhouyu.service.UserService.test 挂起 事务 com.zhouyu.service.UserService.a 完成前 事务 com.zhouyu.service.UserService.a 完成后 事务 com.zhouyu.service.UserService.test 恢复 事务 com.zhouyu.service.UserService.test 提交前 事务 com.zhouyu.service.UserService.test 完成前 事务 com.zhouyu.service.UserService.test 提交后 事务 com.zhouyu.service.UserService.test 完成后
如果您觉得本文对您有用,可以请作者喝杯咖啡。 如需商务合作请加微信(点击右边链接扫码): RockYang
版权申明 : 本站博文如非注明转载则均属作者原创文章,引用或转载请注明出处,如要商用请联系作者,谢谢。