Java中SpringBoot的@Transactional原理
作者:贤子磊
一、@Transactional的使用
- 加在方法上:方法内的所有操作处于一个事务中
- 加在类上
- 该类的所有public修饰的方法都具有共享事务属性
- 如果方法和类上都有事务注解,方法上的事务注解优先
二、@Transactional原理
Springboot目前最为流行,它的约定大于配置的特性深受大家喜欢,注解驱动开发已成为主流。面向元数据遍历已经成为越来越多开发者的偏好,因此原理从Springboot的EnableTransactionManagement注解说起
1、@EnableTransactionManagement
表示开启事务管理
@Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @Documented @Import(TransactionManagementConfigurationSelector.class) public @interface EnableTransactionManagement { boolean proxyTargetClass() default false; AdviceMode mode() default AdviceMode.PROXY; int order() default Ordered.LOWEST_PRECEDENCE; }
通过Import导入了TransactionManagementConfigurationSelector类,其中默认的AdviceMode为AdviceMode.PROXY,即默认使用JDK的动态代理生成代理类。
2、TransactionManagementConfigurationSelector
public class TransactionManagementConfigurationSelector extends AdviceModeImportSelector<EnableTransactionManagement> { /** * Returns {@link ProxyTransactionManagementConfiguration} or * {@code AspectJ(Jta)TransactionManagementConfiguration} for {@code PROXY} * and {@code ASPECTJ} values of {@link EnableTransactionManagement#mode()}, * respectively. */ @Override protected String[] selectImports(AdviceMode adviceMode) { switch (adviceMode) { //根据上面注解的默认配置,adviceMode默认为PROXY case PROXY: return new String[] {AutoProxyRegistrar.class.getName(), ProxyTransactionManagementConfiguration.class.getName()}; case ASPECTJ: return new String[] {determineTransactionAspectClass()}; default: return null; } } private String determineTransactionAspectClass() { return (ClassUtils.isPresent("javax.transaction.Transactional", getClass().getClassLoader()) ? TransactionManagementConfigUtils.JTA_TRANSACTION_ASPECT_CONFIGURATION_CLASS_NAME : TransactionManagementConfigUtils.TRANSACTION_ASPECT_CONFIGURATION_CLASS_NAME); } }
TransactionManagementConfigurationSelector继承自AdviceModeImportSelector类,实现selectImports方法可以注入对应的bean,根据EnableTransactionManagement注解的默认配置,adviceMode默认为PROXY。所以这里会注入AutoProxyRegistrar和ProxyTransactionManagementConfiguration这两个类
- AutoProxyRegistrar
public class AutoProxyRegistrar implements ImportBeanDefinitionRegistrar { private final Log logger = LogFactory.getLog(getClass()); /** * Register, escalate, and configure the standard auto proxy creator (APC) against the * given registry. Works by finding the nearest annotation declared on the importing * {@code @Configuration} class that has both {@code mode} and {@code proxyTargetClass} * attributes. If {@code mode} is set to {@code PROXY}, the APC is registered; if * {@code proxyTargetClass} is set to {@code true}, then the APC is forced to use * subclass (CGLIB) proxying. * <p>Several {@code @Enable*} annotations expose both {@code mode} and * {@code proxyTargetClass} attributes. It is important to note that most of these * capabilities end up sharing a {@linkplain AopConfigUtils#AUTO_PROXY_CREATOR_BEAN_NAME * single APC}. For this reason, this implementation doesn't "care" exactly which * annotation it finds -- as long as it exposes the right {@code mode} and * {@code proxyTargetClass} attributes, the APC can be registered and configured all * the same. */ @Override public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) { boolean candidateFound = false; // 这里面需要特别注意的是:这里是拿到所有的注解类型~~~而不是只拿@EnableAspectJAutoProxy这个类型的 // 原因:因为mode、proxyTargetClass等属性会直接影响到代理得方式,而拥有这些属性的注解至少有: // @EnableTransactionManagement、@EnableAsync、@EnableCaching等~~~~ // 甚至还有启用AOP的注解:@EnableAspectJAutoProxy它也能设置`proxyTargetClass`这个属性的值,因此也会产生关联影响~ Set<String> annTypes = importingClassMetadata.getAnnotationTypes(); for (String annType : annTypes) { AnnotationAttributes candidate = AnnotationConfigUtils.attributesFor(importingClassMetadata, annType); if (candidate == null) { continue; } //获取mode和proxyTargetClass的属性 Object mode = candidate.get("mode"); Object proxyTargetClass = candidate.get("proxyTargetClass"); //如果存在这两个属性且类型符合要求 if (mode != null && proxyTargetClass != null && AdviceMode.class == mode.getClass() && Boolean.class == proxyTargetClass.getClass()) { candidateFound = true; if (mode == AdviceMode.PROXY) { //向容器注入的是一个InfrastructureAdvisorAutoProxyCreator,它主要是读取Advisor类,并对符合的bean进行二次代理 //如果出现多次的话,这里不是覆盖的形式,而是以第一次的为主 //当然它内部有做等级的提升之类的 AopConfigUtils.registerAutoProxyCreatorIfNecessary(registry); //看要不要强制使用CGLIB的方式(这个属性若出现多次,是会是覆盖的形式) if ((Boolean) proxyTargetClass) { AopConfigUtils.forceAutoProxyCreatorToUseClassProxying(registry); return; } } } } //如果一个都没有找到则打印info日志 //可能是自己注入这个类,而不是使用注解去注入 if (!candidateFound && logger.isInfoEnabled()) { String name = getClass().getSimpleName(); logger.info(String.format("%s was imported but no annotations were found " + "having both 'mode' and 'proxyTargetClass' attributes of type " + "AdviceMode and boolean respectively. This means that auto proxy " + "creator registration and configuration may not have occurred as " + "intended, and components may not be proxied as expected. Check to " + "ensure that %s has been @Import'ed on the same class where these " + "annotations are declared; otherwise remove the import of %s " + "altogether.", name, name, name)); } } }
主要作用就是往Spring容器注入了一个自动代理创建器:org.springframework.aop.config.internalAutoProxyCreator
,并且看看是采用CGLIB还是JDK代理
- ProxyTransactionManagementConfiguration
@Configuration(proxyBeanMethods = false) public class ProxyTransactionManagementConfiguration extends AbstractTransactionManagementConfiguration { @Bean(name = TransactionManagementConfigUtils.TRANSACTION_ADVISOR_BEAN_NAME) @Role(BeanDefinition.ROLE_INFRASTRUCTURE) public BeanFactoryTransactionAttributeSourceAdvisor transactionAdvisor( TransactionAttributeSource transactionAttributeSource, TransactionInterceptor transactionInterceptor) { BeanFactoryTransactionAttributeSourceAdvisor advisor = new BeanFactoryTransactionAttributeSourceAdvisor(); advisor.setTransactionAttributeSource(transactionAttributeSource); advisor.setAdvice(transactionInterceptor); if (this.enableTx != null) { advisor.setOrder(this.enableTx.<Integer>getNumber("order")); } return advisor; } @Bean @Role(BeanDefinition.ROLE_INFRASTRUCTURE) public TransactionAttributeSource transactionAttributeSource() { return new AnnotationTransactionAttributeSource(); } @Bean @Role(BeanDefinition.ROLE_INFRASTRUCTURE) public TransactionInterceptor transactionInterceptor( TransactionAttributeSource transactionAttributeSource) { TransactionInterceptor interceptor = new TransactionInterceptor(); interceptor.setTransactionAttributeSource(transactionAttributeSource); if (this.txManager != null) { interceptor.setTransactionManager(this.txManager); } return interceptor; } }
这里是往容器中注入事务相关的bean
- BeanFactoryTransactionAttributeSourceAdvisor:事务的核心,下面分析
- TransactionAttributeSource:事务属性源
- TransactionInterceptor:事务拦截器,它是个MethodInterceptor。(我们可以自定义个beanName一模一样的TransactionInterceptor来覆盖默认的事务拦截器)
(我们可以自定义个beanName一模一样的TransactionInterceptor来覆盖默认的事务拦截器)
再看父类AbstractTransactionManagementConfiguration
@Configuration public abstract class AbstractTransactionManagementConfiguration implements ImportAware { @Nullable protected AnnotationAttributes enableTx; /** * Default transaction manager, as configured through a {@link TransactionManagementConfigurer}. */ @Nullable protected TransactionManager txManager; @Override public void setImportMetadata(AnnotationMetadata importMetadata) { this.enableTx = AnnotationAttributes.fromMap( importMetadata.getAnnotationAttributes(EnableTransactionManagement.class.getName(), false)); if (this.enableTx == null) { throw new IllegalArgumentException( "@EnableTransactionManagement is not present on importing class " + importMetadata.getClassName()); } } //这里可以通过配置文件注入一个默认的事务管理器 @Autowired(required = false) void setConfigurers(Collection<TransactionManagementConfigurer> configurers) { if (CollectionUtils.isEmpty(configurers)) { return; } //最多只允许配置一个 if (configurers.size() > 1) { throw new IllegalStateException("Only one TransactionManagementConfigurer may exist"); } TransactionManagementConfigurer configurer = configurers.iterator().next(); this.txManager = configurer.annotationDrivenTransactionManager(); } //注册一个监听器工厂,用以支持@TransactionalEventListener注解标注的方法,来监听事务相关的事件 @Bean(name = TransactionManagementConfigUtils.TRANSACTIONAL_EVENT_LISTENER_FACTORY_BEAN_NAME) @Role(BeanDefinition.ROLE_INFRASTRUCTURE) public static TransactionalEventListenerFactory transactionalEventListenerFactory() { return new TransactionalEventListenerFactory(); } }
- BeanFactoryTransactionAttributeSourceAdvisor
public class BeanFactoryTransactionAttributeSourceAdvisor extends AbstractBeanFactoryPointcutAdvisor { @Nullable private TransactionAttributeSource transactionAttributeSource; //切面:决定了哪些类会被切入,从而生成的代理对象 private final TransactionAttributeSourcePointcut pointcut = new TransactionAttributeSourcePointcut() { @Override @Nullable protected TransactionAttributeSource getTransactionAttributeSource() { return transactionAttributeSource; } }; // 可议手动设置一个事务属性源 public void setTransactionAttributeSource(TransactionAttributeSource transactionAttributeSource) { this.transactionAttributeSource = transactionAttributeSource; } //可以指定ClassFilter 默认情况下:ClassFilter classFilter = ClassFilter.TRUE; 匹配所有的类的 public void setClassFilter(ClassFilter classFilter) { this.pointcut.setClassFilter(classFilter); } //此处pointcut就是使用自己的这个pointcut去切入 @Override public Pointcut getPointcut() { return this.pointcut; } }
我们继续查看TransactionAttributeSourcePointcut类
abstract class TransactionAttributeSourcePointcut extends StaticMethodMatcherPointcut implements Serializable { protected TransactionAttributeSourcePointcut() { setClassFilter(new TransactionAttributeSourceClassFilter()); } @Override public boolean matches(Method method, Class<?> targetClass) { TransactionAttributeSource tas = getTransactionAttributeSource(); return (tas == null || tas.getTransactionAttribute(method, targetClass) != null); } @Override public boolean equals(@Nullable Object other) { if (this == other) { return true; } if (!(other instanceof TransactionAttributeSourcePointcut)) { return false; } TransactionAttributeSourcePointcut otherPc = (TransactionAttributeSourcePointcut) other; return ObjectUtils.nullSafeEquals(getTransactionAttributeSource(), otherPc.getTransactionAttributeSource()); } @Override public int hashCode() { return TransactionAttributeSourcePointcut.class.hashCode(); } @Override public String toString() { return getClass().getName() + ": " + getTransactionAttributeSource(); } //由子类提供事务属性源 @Nullable protected abstract TransactionAttributeSource getTransactionAttributeSource(); /** * {@link ClassFilter} that delegates to {@link TransactionAttributeSource#isCandidateClass} * for filtering classes whose methods are not worth searching to begin with. */ private class TransactionAttributeSourceClassFilter implements ClassFilter { @Override public boolean matches(Class<?> clazz) { // 实现了如下三个接口的子类,就不需要被代理了 直接放行 // TransactionalProxy它是SpringProxy的子类。如果是被TransactionProxyFactoryBean生产出来的Bean,就会自动实现此接口,那么就不会被这里再次代理了 // PlatformTransactionManager:spring抽象的事务管理器 // PersistenceExceptionTranslator对RuntimeException转换成DataAccessException的转换接口 if (TransactionalProxy.class.isAssignableFrom(clazz) || PlatformTransactionManager.class.isAssignableFrom(clazz) || PersistenceExceptionTranslator.class.isAssignableFrom(clazz)) { return false; } // 重要:拿到事务属性源~~~~~~ // 如果tas == null表示没有配置事务属性源,那是全部匹配的,也就是说所有的方法都匹配 // 或者 标注了@Transaction这样的注解的方法才会给与匹配 TransactionAttributeSource tas = getTransactionAttributeSource(); return (tas == null || tas.isCandidateClass(clazz)); } } }
关于matches方法的调用时机:只要是容器内的每个Bean,都会经过AbstractAutoProxyCreator#postProcessAfterInitialization
从而会调用wrapIfNecessary
方法,因此容器内所有的Bean的所有方法在容器启动时候都会执行此matche方法。
3、TransactionInterceptor
事务处理的核心逻辑就在这个拦截器里面,我们先看下Spring事务的三个接口
- TransactionStatus:代表一个事务的具体运行状态、以及还原点
public interface TransactionStatus extends TransactionExecution, SavepointManager, Flushable { //判断该事务里面是否含有还原点 boolean hasSavepoint(); //将基础会话刷新到数据存储 @Override void flush(); } public interface TransactionExecution { //判断当前的事务是否是新事务 boolean isNewTransaction(); //设置这个目的是为了让事务的唯一结果是进行回滚。 //因此如果你在外层给try catche住不让事务回滚,就会抛出你可能常见的异常: //Transaction rolled back because it has been marked as rollback-only void setRollbackOnly(); //判断事务的是不是必须回滚 boolean isRollbackOnly(); //判断事务是否结果(不管是commit还是rollback) boolean isCompleted(); }
- TransactionDefinition:用于描述隔离级别、超时时间、是否为只读事务和事务传播规则
- PlatformTransactionManager:事务管理器,包含
commit
、rollback
、getTransaction
三个方法
根据上面的分析,我们知道TransactionInterceptor
本质就是一个MethodInterceptor
,被事务拦截的方法最终都会执行到此增强器身上。 MethodInterceptor
是个环绕通知,敲好符合我们的开启、提交、回滚事务等操作。
3.1、invoke方法
public class TransactionInterceptor extends TransactionAspectSupport implements MethodInterceptor, Serializable { /** * Create a new TransactionInterceptor. * <p>Transaction manager and transaction attributes still need to be set. * @see #setTransactionManager * @see #setTransactionAttributes(java.util.Properties) * @see #setTransactionAttributeSource(TransactionAttributeSource) */ public TransactionInterceptor() { } /** * Create a new TransactionInterceptor. * @param ptm the default transaction manager to perform the actual transaction management * @param attributes the transaction attributes in properties format * @see #setTransactionManager * @see #setTransactionAttributes(java.util.Properties) */ public TransactionInterceptor(PlatformTransactionManager ptm, Properties attributes) { setTransactionManager(ptm); setTransactionAttributes(attributes); } /** * Create a new TransactionInterceptor. * @param ptm the default transaction manager to perform the actual transaction management * @param tas the attribute source to be used to find transaction attributes * @see #setTransactionManager * @see #setTransactionAttributeSource(TransactionAttributeSource) */ public TransactionInterceptor(PlatformTransactionManager ptm, TransactionAttributeSource tas) { setTransactionManager(ptm); setTransactionAttributeSource(tas); } @Override @Nullable public Object invoke(MethodInvocation invocation) throws Throwable { // Work out the target class: may be {@code null}. // The TransactionAttributeSource should be passed the target class // as well as the method, which may be from an interface. Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null); // Adapt to TransactionAspectSupport's invokeWithinTransaction... return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed); } //--------------------------------------------------------------------- // Serialization support //--------------------------------------------------------------------- private void writeObject(ObjectOutputStream oos) throws IOException { // Rely on default serialization, although this class itself doesn't carry state anyway... oos.defaultWriteObject(); // Deserialize superclass fields. oos.writeObject(getTransactionManagerBeanName()); oos.writeObject(getTransactionManager()); oos.writeObject(getTransactionAttributeSource()); oos.writeObject(getBeanFactory()); } private void readObject(ObjectInputStream ois) throws IOException, ClassNotFoundException { // Rely on default serialization, although this class itself doesn't carry state anyway... ois.defaultReadObject(); // Serialize all relevant superclass fields. // Superclass can't implement Serializable because it also serves as base class // for AspectJ aspects (which are not allowed to implement Serializable)! setTransactionManagerBeanName((String) ois.readObject()); setTransactionManager((PlatformTransactionManager) ois.readObject()); setTransactionAttributeSource((TransactionAttributeSource) ois.readObject()); setBeanFactory((BeanFactory) ois.readObject()); } }
其中invoke调用父类TransactionAspectSupport的invokeWithinTransaction方法
3.2、invokeWithinTransaction方法
public abstract class TransactionAspectSupport implements BeanFactoryAware, InitializingBean { /** * General delegate for around-advice-based subclasses, delegating to several other template * methods on this class. Able to handle {@link CallbackPreferringPlatformTransactionManager} * as well as regular {@link PlatformTransactionManager} implementations. * @param method the Method being invoked * @param targetClass the target class that we're invoking the method on * @param invocation the callback to use for proceeding with the target invocation * @return the return value of the method, if any * @throws Throwable propagated from the target invocation */ @Nullable protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass, final InvocationCallback invocation) throws Throwable { if (this.reactiveAdapterRegistry != null) { if (KotlinDetector.isKotlinType(method.getDeclaringClass()) && KotlinDelegate.isSuspend(method)) { throw new TransactionUsageException("Unsupported annotated transaction on suspending function detected: " + method + ". Use TransactionalOperator.transactional extensions instead."); } ReactiveAdapter adapter = this.reactiveAdapterRegistry.getAdapter(method.getReturnType()); if (adapter != null) { return new ReactiveTransactionSupport(adapter).invokeWithinTransaction(method, targetClass, invocation); } } //获取事务属性源 TransactionAttributeSource tas = getTransactionAttributeSource(); //获取方法对应的事务属性 final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null); //找到合适事务管理器 final PlatformTransactionManager tm = determineTransactionManager(txAttr); //获取方法的唯一标识 final String joinpointIdentification = methodIdentification(method, targetClass, txAttr); // 如果txAttr为空或者tm属于非CallbackPreferringPlatformTransactionManager,执行目标增强 // 在TransactionManager上,CallbackPreferringPlatformTransactionManager实现PlatformTransactionManager接口,暴露出一个方法用于执行事务处理中的回调 if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) { //看是否有必要创建一个事务,根据事务传播行为做出相应的判断 TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification); Object retVal; try { //回调方法执行,执行目标方法(原有的业务逻辑) retVal = invocation.proceedWithInvocation(); } catch (Throwable ex) { // 出现异常了,进行回滚(注意:并不是所有异常都会rollback的) //如果出现的异常不需要rollback,则会进行commit completeTransactionAfterThrowing(txInfo, ex); throw ex; } finally { //清除信息 cleanupTransactionInfo(txInfo); } if (vavrPresent && VavrDelegate.isVavrTry(retVal)) { // Set rollback-only in case of Vavr failure matching our rollback rules... TransactionStatus status = txInfo.getTransactionStatus(); if (status != null && txAttr != null) { retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status); } } // 目标方法完全执行完成后,提交事务 commitTransactionAfterReturning(txInfo); return retVal; } //编程式事务处理(CallbackPreferringPlatformTransactionManager) 会走这里 else { final ThrowableHolder throwableHolder = new ThrowableHolder(); // It's a CallbackPreferringPlatformTransactionManager: pass a TransactionCallback in. try { Object result = ((CallbackPreferringPlatformTransactionManager) tm).execute(txAttr, status -> { TransactionInfo txInfo = prepareTransactionInfo(tm, txAttr, joinpointIdentification, status); try { Object retVal = invocation.proceedWithInvocation(); if (vavrPresent && VavrDelegate.isVavrTry(retVal)) { // Set rollback-only in case of Vavr failure matching our rollback rules... retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status); } return retVal; } catch (Throwable ex) { if (txAttr.rollbackOn(ex)) { // A RuntimeException: will lead to a rollback. if (ex instanceof RuntimeException) { throw (RuntimeException) ex; } else { throw new ThrowableHolderException(ex); } } else { // A normal return value: will lead to a commit. throwableHolder.throwable = ex; return null; } } finally { cleanupTransactionInfo(txInfo); } }); // Check result state: It might indicate a Throwable to rethrow. if (throwableHolder.throwable != null) { throw throwableHolder.throwable; } return result; } catch (ThrowableHolderException ex) { throw ex.getCause(); } catch (TransactionSystemException ex2) { if (throwableHolder.throwable != null) { logger.error("Application exception overridden by commit exception", throwableHolder.throwable); ex2.initApplicationException(throwableHolder.throwable); } throw ex2; } catch (Throwable ex2) { if (throwableHolder.throwable != null) { logger.error("Application exception overridden by commit exception", throwableHolder.throwable); } throw ex2; } } } }
逻辑很清晰,本质就是通过try…catch…进行事务的提交或者回滚。我们看下里面的获取事务管理器的方法determineTransactionManager
3.3、determineTransactionManager方法
@Nullable protected PlatformTransactionManager determineTransactionManager(@Nullable TransactionAttribute txAttr) { //如果这两个都没配置,所以肯定是手动设置了PlatformTransactionManager的,那就直接返回即可 if (txAttr == null || this.beanFactory == null) { return asPlatformTransactionManager(getTransactionManager()); } //qualifier相当于beanName String qualifier = txAttr.getQualifier(); if (StringUtils.hasText(qualifier)) { //根据此名称以及PlatformTransactionManager.class 去容器内找bean return determineQualifiedTransactionManager(this.beanFactory, qualifier); } // 若没有指定qualifier,那再看看是否指定了transactionManagerBeanName else if (StringUtils.hasText(this.transactionManagerBeanName)) { return determineQualifiedTransactionManager(this.beanFactory, this.transactionManagerBeanName); } //如果都没指定,那就不管了。直接根据类型去容器里找 getBean(Class) //注:如果容器里面有多个PlatformTransactionManager,那么就会导致报错 else { PlatformTransactionManager defaultTransactionManager = asPlatformTransactionManager(getTransactionManager()); if (defaultTransactionManager == null) { defaultTransactionManager = asPlatformTransactionManager( this.transactionManagerCache.get(DEFAULT_TRANSACTION_MANAGER_KEY)); if (defaultTransactionManager == null) { defaultTransactionManager = this.beanFactory.getBean(PlatformTransactionManager.class); this.transactionManagerCache.putIfAbsent( DEFAULT_TRANSACTION_MANAGER_KEY, defaultTransactionManager); } } return defaultTransactionManager; } }
3.4、createTransactionIfNecessary
再看下创建事务的方法createTransactionIfNecessary,在看方法前,我们先看下返回的TransactionInfo类
protected static final class TransactionInfo { // 当前事务的事务管理器 @Nullable private final PlatformTransactionManager transactionManager; // 当前事务的事务属性 @Nullable private final TransactionAttribute transactionAttribute; //joinpoint标识 private final String joinpointIdentification; // 当前事务的TransactionStatus @Nullable private TransactionStatus transactionStatus; //保存当前事务所在的父事务上下文的引用,构成了一个链,准确的说是一个有向无环图 @Nullable private TransactionInfo oldTransactionInfo; public TransactionInfo(@Nullable PlatformTransactionManager transactionManager, @Nullable TransactionAttribute transactionAttribute, String joinpointIdentification) { this.transactionManager = transactionManager; this.transactionAttribute = transactionAttribute; this.joinpointIdentification = joinpointIdentification; } public PlatformTransactionManager getTransactionManager() { Assert.state(this.transactionManager != null, "No PlatformTransactionManager set"); return this.transactionManager; } @Nullable public TransactionAttribute getTransactionAttribute() { return this.transactionAttribute; } /** * Return a String representation of this joinpoint (usually a Method call) * for use in logging. */ public String getJoinpointIdentification() { return this.joinpointIdentification; } //创建一个新的事务 public void newTransactionStatus(@Nullable TransactionStatus status) { this.transactionStatus = status; } @Nullable public TransactionStatus getTransactionStatus() { return this.transactionStatus; } /** * Return whether a transaction was created by this aspect, * or whether we just have a placeholder to keep ThreadLocal stack integrity. */ public boolean hasTransaction() { return (this.transactionStatus != null); } //绑定当前正在处理的事务的所有信息到ThreadLocal private void bindToThread() { // Expose current TransactionStatus, preserving any existing TransactionStatus // for restoration after this transaction is complete. //先从线程中拿出来老的,再把新的(也就是当前)绑定进去 this.oldTransactionInfo = transactionInfoHolder.get(); transactionInfoHolder.set(this); } //当前事务处理完之后,恢复父事务上下文 private void restoreThreadLocalStatus() { // Use stack to restore old transaction TransactionInfo. // Will be null if none was set. transactionInfoHolder.set(this.oldTransactionInfo); } @Override public String toString() { return (this.transactionAttribute != null ? this.transactionAttribute.toString() : "No transaction"); } }
然后再看创建事务的方法createTransactionIfNecessary
protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm, @Nullable TransactionAttribute txAttr, final String joinpointIdentification) { // If no name specified, apply method identification as transaction name. if (txAttr != null && txAttr.getName() == null) { txAttr = new DelegatingTransactionAttribute(txAttr) { @Override public String getName() { return joinpointIdentification; } }; } //从事务管理器里,通过txAttr拿出来一个TransactionStatus TransactionStatus status = null; if (txAttr != null) { if (tm != null) { status = tm.getTransaction(txAttr); } else { if (logger.isDebugEnabled()) { logger.debug("Skipping transactional joinpoint [" + joinpointIdentification + "] because no transaction manager has been configured"); } } } //通过TransactionStatus 等,转换成一个通用的TransactionInfo return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status); }
再看方法prepareTransactionInfo
protected TransactionInfo prepareTransactionInfo(@Nullable PlatformTransactionManager tm, @Nullable TransactionAttribute txAttr, String joinpointIdentification, @Nullable TransactionStatus status) { //构造一个TransactionInfo TransactionInfo txInfo = new TransactionInfo(tm, txAttr, joinpointIdentification); if (txAttr != null) { // We need a transaction for this method... if (logger.isTraceEnabled()) { logger.trace("Getting transaction for [" + txInfo.getJoinpointIdentification() + "]"); } // The transaction manager will flag an error if an incompatible tx already exists. txInfo.newTransactionStatus(status); } else { // The TransactionInfo.hasTransaction() method will return false. We created it only // to preserve the integrity of the ThreadLocal stack maintained in this class. if (logger.isTraceEnabled()) { logger.trace("No need to create transaction for [" + joinpointIdentification + "]: This method is not transactional."); } } // We always bind the TransactionInfo to the thread, even if we didn't create // a new transaction here. This guarantees that the TransactionInfo stack // will be managed correctly even if no transaction was created by this aspect. //把生成的TransactionInfo并绑定到当前线程的ThreadLocal txInfo.bindToThread(); return txInfo; }
3.5、commitTransactionAfterReturning
protected void commitTransactionAfterReturning(@Nullable TransactionInfo txInfo) { if (txInfo != null && txInfo.getTransactionStatus() != null) { if (logger.isTraceEnabled()) { logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() + "]"); } //直接使用事务管理器提交事务 txInfo.getTransactionManager().commit(txInfo.getTransactionStatus()); } }
3.6、completeTransactionAfterThrowing
protected void completeTransactionAfterThrowing(@Nullable TransactionInfo txInfo, Throwable ex) { if (txInfo != null && txInfo.getTransactionStatus() != null) { if (logger.isTraceEnabled()) { logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() + "] after exception: " + ex); } //如果有事务属性了,那就调用rollbackOn看看这个异常需不需要回滚 if (txInfo.transactionAttribute != null && txInfo.transactionAttribute.rollbackOn(ex)) { try { txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus()); } catch (TransactionSystemException ex2) { logger.error("Application exception overridden by rollback exception", ex); ex2.initApplicationException(ex); throw ex2; } catch (RuntimeException | Error ex2) { logger.error("Application exception overridden by rollback exception", ex); throw ex2; } } //否则直接提交 else { // We don't roll back on this exception. // Will still roll back if TransactionStatus.isRollbackOnly() is true. try { txInfo.getTransactionManager().commit(txInfo.getTransactionStatus()); } catch (TransactionSystemException ex2) { logger.error("Application exception overridden by commit exception", ex); ex2.initApplicationException(ex); throw ex2; } catch (RuntimeException | Error ex2) { logger.error("Application exception overridden by commit exception", ex); throw ex2; } } } }
3.7、cleanupTransactionInfo
protected void cleanupTransactionInfo(@Nullable TransactionInfo txInfo) { if (txInfo != null) { //清除(解绑)事务 txInfo.restoreThreadLocalStatus(); } }
4、PlatformTransactionManager
事务管理器接口
public interface PlatformTransactionManager extends TransactionManager { //创建事务 TransactionStatus getTransaction(@Nullable TransactionDefinition definition) throws TransactionException; //提交事务 void commit(TransactionStatus status) throws TransactionException; //回滚事务 void rollback(TransactionStatus status) throws TransactionException; }
4.1、AbstractPlatformTransactionManager
是对PlatformTransactionManager
的一个抽象实现,这个基类提供了以下工作流程处理
- 确定如果有现有的事务;
- 应用适当的传播行为;
- 如果有必要暂停和恢复事务;
- 提交时检查rollback-only标记;
- 应用适当的修改当回滚(实际回滚或设置rollback-only);
- 触发同步回调注册(如果事务同步是激活的)
(1)getTransaction方法
@Override public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition) throws TransactionException { //如果没有配置事务属性,则使用默认的事务属性 TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults()); //获取事务,具体的实现由具体的事务处理器提供 Object transaction = doGetTransaction(); boolean debugEnabled = logger.isDebugEnabled(); //检查当前线程是否存在事务,如果是则直接处理已存在的事务,isExistingTransaction方法由子类去实现 if (isExistingTransaction(transaction)) { // Existing transaction found -> check propagation behavior to find out how to behave. return handleExistingTransaction(def, transaction, debugEnabled); } //超时时间的校验 if (def.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) { throw new InvalidTimeoutException("Invalid transaction timeout", def.getTimeout()); } //处理事务属性中配置的事务传播特性 // PROPAGATION_MANDATORY 如果已经存在一个事务,支持当前事务。如果没有一个活动的事务,则抛出异常 if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) { throw new IllegalTransactionStateException( "No existing transaction found for transaction marked with propagation 'mandatory'"); } //如果事务传播特性为required、required_new或nested else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED || def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW || def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) { // 挂起,doSuspend()由子类去实现 // 挂起操作,触发相关的挂起注册的事件,把当前线程事物的所有属性都封装好,放到一个SuspendedResourcesHolder // 然后清空清空一下当前线程事务 SuspendedResourcesHolder suspendedResources = suspend(null); if (debugEnabled) { logger.debug("Creating new transaction with name [" + def.getName() + "]: " + def); } //此处,开始创建事务 try { boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); //创建一个新的事务状态 就是new DefaultTransactionStatus() 把个属性都赋值上 DefaultTransactionStatus status = newTransactionStatus( def, transaction, true, newSynchronization, debugEnabled, suspendedResources); //开始事务,抽象方法,由子类去实现~ doBegin(transaction, def); //初始化和同步事务状态 prepareSynchronization(status, def); return status; } catch (RuntimeException | Error ex) { //重新开始 doResume由子类去实现 resume(null, suspendedResources); throw ex; } } // 走到这里表示传播属性就是不需要事务的,直接创建一个 else { // Create "empty" transaction: no actual transaction, but potentially synchronization. if (def.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) { logger.warn("Custom isolation level specified but no actual transaction initiated; " + "isolation level will effectively be ignored: " + def); } boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS); return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null); } }
(2)commit方法
@Override public final void commit(TransactionStatus status) throws TransactionException { //如果是一个已经完成的事物,不可重复提交 if (status.isCompleted()) { throw new IllegalTransactionStateException( "Transaction is already completed - do not call commit or rollback more than once per transaction"); } DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status; // 如果已经标记为了需要回滚,那就执行回滚吧 if (defStatus.isLocalRollbackOnly()) { if (defStatus.isDebug()) { logger.debug("Transactional code has requested rollback"); } processRollback(defStatus, false); return; } // shouldCommitOnGlobalRollbackOnly这个默认值是false,目前只有JTA事务复写成true了 // isGlobalRollbackOnly:是否标记为了全局的RollbackOnly if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) { if (defStatus.isDebug()) { logger.debug("Global transaction is marked as rollback-only but transactional code requested commit"); } processRollback(defStatus, true); return; } // 提交事务,会考虑到还原点、新事务、事务是否是rollback-only之类的 processCommit(defStatus); }
(3)rollback方法
@Override public final void rollback(TransactionStatus status) throws TransactionException { if (status.isCompleted()) { throw new IllegalTransactionStateException( "Transaction is already completed - do not call commit or rollback more than once per transaction"); } DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status; //交给子类去实现 processRollback(defStatus, false); }
4.2、DataSourceTransactionManager
以最为常用DataSourceTransactionManager
作为实现类看看内部具体如何实现
(1)doGetTransaction
@Override protected Object doGetTransaction() { DataSourceTransactionObject txObject = new DataSourceTransactionObject(); txObject.setSavepointAllowed(isNestedTransactionAllowed()); ConnectionHolder conHolder = (ConnectionHolder) TransactionSynchronizationManager.getResource(obtainDataSource()); txObject.setConnectionHolder(conHolder, false); return txObject; }
(2)doBegin
@Override protected void doBegin(Object transaction, TransactionDefinition definition) { DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction; Connection con = null; try { if (!txObject.hasConnectionHolder() || txObject.getConnectionHolder().isSynchronizedWithTransaction()) { //从DataSource里获取一个连接(这个DataSource一般是有连接池的) Connection newCon = obtainDataSource().getConnection(); if (logger.isDebugEnabled()) { logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction"); } // 把这个连接用ConnectionHolder包装一下 txObject.setConnectionHolder(new ConnectionHolder(newCon), true); } txObject.getConnectionHolder().setSynchronizedWithTransaction(true); con = txObject.getConnectionHolder().getConnection(); // 设置isReadOnly、设置隔离界别等 Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition); txObject.setPreviousIsolationLevel(previousIsolationLevel); //这里非常的关键,先看看Connection 是否是自动提交的 //如果是 就con.setAutoCommit(false) 要不然数据库默认没执行一条SQL都是一个事务,就没法进行事务的管理了 //因此从这后面,通过此Connection执行的所有SQL语句只要没有commit就都不会提交给数据库的 if (con.getAutoCommit()) { txObject.setMustRestoreAutoCommit(true); if (logger.isDebugEnabled()) { logger.debug("Switching JDBC Connection [" + con + "] to manual commit"); } con.setAutoCommit(false); } //这个方法特别特别有意思 它自己`Statement stmt = con.createStatement()`拿到一个Statement // 然后执行了一句SQL:`stmt.executeUpdate("SET TRANSACTION READ ONLY");` // 所以,所以:如果你仅仅只是查询。把事务的属性设置为readonly=true Spring对帮你对SQl进行优化的 // 需要注意的是:readonly=true 后,只能读,不能进行dml操作)(只能看到设置事物前数据的变化,看不到设置事物后数据的改变) prepareTransactionalConnection(con, definition); txObject.getConnectionHolder().setTransactionActive(true); int timeout = determineTimeout(definition); if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) { txObject.getConnectionHolder().setTimeoutInSeconds(timeout); } // 这一步:就是把当前的连接和当前的线程进行绑定 if (txObject.isNewConnectionHolder()) { TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder()); } } catch (Throwable ex) { //如果是新创建的连接,那就释放 if (txObject.isNewConnectionHolder()) { DataSourceUtils.releaseConnection(con, obtainDataSource()); txObject.setConnectionHolder(null, false); } throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex); } }
(3)doCommit
@Override protected void doCommit(DefaultTransactionStatus status) { DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction(); Connection con = txObject.getConnectionHolder().getConnection(); if (status.isDebug()) { logger.debug("Committing JDBC transaction on Connection [" + con + "]"); } try { con.commit(); } catch (SQLException ex) { throw new TransactionSystemException("Could not commit JDBC transaction", ex); } }
到此这篇关于Java中SpringBoot的@Transactional原理的文章就介绍到这了,更多相关SpringBoot的@Transactional原理内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!