java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > Spring事务未提交读取不到数据

Spring TransactionalEventListener事务未提交读取不到数据的解决

作者:程序员牧码

这篇文章主要介绍了Spring TransactionalEventListener事务未提交读取不到数据的解决方案,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教

一、背景

业务处理过程,发现了以下问题,代码一是原代码能正常执行,代码二是经过迭代一次非正常执行代码

ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 200, TimeUnit.MILLISECONDS,
                new ArrayBlockingQueue<Runnable>(5));
 
@Transactional
public Long test() {
  // ......
  // 插入记录
  Long studentId = studentService.insert(student);
  // 异步线程
  writeStatisticsData(studentId);
  return studentId;
}
 
private void writeStatisticsData(Long studentId) {
  executor.execute(() -> {
    Student student = studentService.findById(studentId);
    //........
  });
}
@Transactional
public Long test() {
  // ......
  // 插入记录
  Long studentId = studentService.insert(student);
   // 异步线程
  writeStatisticsData(studentId);
  // 插入学生地址记录
  Long addressId = addressService.insert(address);
  return studentId;
}
 
private void writeStatisticsData(Long studentId) {
  executor.execute(() -> {
    Student student = studentService.findById(studentId);
    //........
  });
}

二、问题分析

这里使用了spring事务,显然需要考虑事务的隔离级别

2.1、mysql隔离级别

查看mysql隔离级别

SELECT @@tx_isolation;
READ-COMMITTED

读提交,即在事务A插入数据过程中,事务B在A提交之前读取A插入的数据读取不到,而B在A提交之后再去读就会读取到A插入的数据,也即Read Committed不能保证在一个事务中每次读都能读到相同的数据,因为在每次读数据之后其他并发事务可能会对刚才读到的数据进行修改。

2.2、问题原因分析

由于mysql事务的隔离级别是读提交,test方法在开启异步线程后,异步线程也开启了事务,同时以读者身份去读 test 方法中插入的 student 记录,但此时 test 方法已经提交了事务,所以可以读取到 student 记录(即在异步方法中可以读取到 student 记录),但此代码有风险,若事务提交的时间晚一点,异步线程也有可能读取不到 student 记录。

经过上面分析,很明显异步方法中不能读取到 student 记录,由于代码二在异步线程下面又执行了其他操作,延时了test方法中事务的提交,所以代码二不能正常运行。

三、解决问题方案

解决思路是在事务提交后再做其他的处理(如异步发消息处理等),这里还是从Spring执行事务的过程中入手,Spring事务的处理过程不再分析,这里直接看Spring事务增强器TransactionInterceptor的核心处理流程,源码如下:

protected Object invokeWithinTransaction(Method method, Class<?> targetClass, final InvocationCallback invocation) throws Throwable {
  // 获取事务属性
  final TransactionAttribute txAttr = getTransactionAttributeSource().getTransactionAttribute(method, targetClass);
  //加载配置中配置的TransactionManager
  final PlatformTransactionManager tm = determineTransactionManager(txAttr);
  final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
  
  // 声明式事务的处理
  if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {
    TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
    Object retVal = null;
    //......
    retVal = invocation.proceedWithInvocation();
    //......
    commitTransactionAfterReturning(txInfo);
    return retVal;
  } else {
    // 编程式事务的处理......
  }
  //......
}

这里主要看声明式事务的处理,因为编程式事务的处理及提交都是用户在编码中进行控制。在声明式事务处理中,当方法执行完后,会执行 commitTransactionAfterReturning 方法来进行提交事务,该方法在 TransactionAspectSupport 类中,源码如下:

protected void commitTransactionAfterReturning(TransactionInfo txInfo) {
  if (txInfo != null && txInfo.hasTransaction()) {
    txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
  }
}

再看 commit 方法,该方法在 AbstractPlatformTransactionManager 类中,源码如下:

public final void commit(TransactionStatus status) throws TransactionException {
    // 这里省略很多代码,如事务回滚......
		processCommit(defStatus);
}
 
private void processCommit(DefaultTransactionStatus status) throws TransactionException {
		try {
			boolean beforeCompletionInvoked = false;
			try {
        prepareForCommit(status);
				triggerBeforeCommit(status);
				triggerBeforeCompletion(status);
				beforeCompletionInvoked = true;
				boolean globalRollbackOnly = false;
				if (status.isNewTransaction() || isFailEarlyOnGlobalRollbackOnly()) {
					globalRollbackOnly = status.isGlobalRollbackOnly();
				}
				if (status.hasSavepoint()) {
					status.releaseHeldSavepoint();
				} else if (status.isNewTransaction()) {
          // 提交事务
					doCommit(status);
				}
				//......
			} catch (......) {
				// 事务异常处理......
			}
 
			
			try {
        // 事务提交成功后的处理-----这里是重点
				triggerAfterCommit(status);
			} finally {
				triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
			}
		}
		finally {
			cleanupAfterCompletion(status);
    }
}
 
private void triggerAfterCommit(DefaultTransactionStatus status) {
  if (status.isNewSynchronization()) {
    TransactionSynchronizationUtils.triggerAfterCommit();
  }
}

最终会走到 TransactionSynchronizationUtils.triggerAfterCommit() 方法中

public static void triggerAfterCommit() {
  invokeAfterCommit(TransactionSynchronizationManager.getSynchronizations());
}
 
public static void invokeAfterCommit(List<TransactionSynchronization> synchronizations) {
  if (synchronizations != null) {
      for (TransactionSynchronization synchronization : synchronizations) {
    synchronization.afterCommit();
      }
  }
}

上面会把缓存在 TransactionSynchronizationManager 中的 TransactionSynchronization 按顺序来执行 afterCommit 方法,其中 TransactionSynchronization 以集合形式缓存在 TransactionSynchronizationManager 的 ThreadLocal 中。

3.1、方式一

经过上面分析,只需要代码中重新生成个 TransactionSynchronization 并加入到 TransactionSynchronizationManager 的 TransactionSynchronization 集合中即可,所以有了解决方案,如下:

private void writeStatisticsData(Long studentId) {
  if(TransactionSynchronizationManager.isActualTransactionActive()) {
            // 当前存在事务
            TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
              @Override
              public void afterCommit() {
                executor.execute(() -> {Student student = studentService.findById(studentId);
                //........
                });
              }});
        } else {
            // 当前不存在事务
            executor.execute(() -> {Student student = studentService.findById(studentId);
                //........
                });
        }
}

3.2、方式二

使用 @TransactionalEventListener 结合 Spring事件监听机制,该注解自从Spring4.2版本开始有的,如下:

// 事件
public class StudentEvent extends ApplicationEvent {
    public StudentEvent(Long studentId) {
        super(studentId);
    }
}
 
// 监听器
public class StudentEventListener{
  @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
  public void writeStatisticsData(StudentEvent studentEvent) {
    executor.execute(() -> {
      Student student = studentService.findById(studentEvent.getSource());
      //........
    });
  }
}
 
@Service
public class StudentService {
  // Spring4.2之后,ApplicationEventPublisher自动被注入到容器中,采用Autowired即可获取
  @Autowired
  private ApplicationEventPublisher applicationEventPublisher;
  
  @Transactional
  public Long test() {
    // ......
    // 插入记录
    Long studentId = studentService.insert(student);
    // 发布事件
    applicationEventPublisher.publishEvent(new StudentEvent(studentId));
    // 插入学生地址记录
    Long addressId = addressService.insert(address);
    return studentId;
  }
}

原理分析

Spring Bean在加载配置文件时,会使用 AnnotationDrivenBeanDefinitionParser 来解析 annotation-driven 标签,如下:

public class TxNamespaceHandler extends NamespaceHandlerSupport {
  //......
	@Override
	public void init() {
		registerBeanDefinitionParser("advice", new TxAdviceBeanDefinitionParser());
		registerBeanDefinitionParser("annotation-driven", new AnnotationDrivenBeanDefinitionParser());
		registerBeanDefinitionParser("jta-transaction-manager", new JtaTransactionManagerBeanDefinitionParser());
	}
}
class AnnotationDrivenBeanDefinitionParser implements BeanDefinitionParser {
  
  @Override
	public BeanDefinition parse(Element element, ParserContext parserContext) {
    // 重点——将TransactionalEventListenerFactory加入到容器中
		registerTransactionalEventListenerFactory(parserContext);
		String mode = element.getAttribute("mode");
		if ("aspectj".equals(mode)) {
			// mode="aspectj"
			registerTransactionAspect(element, parserContext);
		}
		else {
			// mode="proxy"
			AopAutoProxyConfigurer.configureAutoProxyCreator(element, parserContext);
		}
		return null;
	}
  
  private void registerTransactionalEventListenerFactory(ParserContext parserContext) {
		RootBeanDefinition def = new RootBeanDefinition();
		def.setBeanClass(TransactionalEventListenerFactory.class);
		parserContext.registerBeanComponent(new BeanComponentDefinition(def,
				TransactionManagementConfigUtils.TRANSACTIONAL_EVENT_LISTENER_FACTORY_BEAN_NAME));
	}
}
public class TransactionalEventListenerFactory implements EventListenerFactory, Ordered {
  
  //省略部分代码......
  
	@Override
	public ApplicationListener<?> createApplicationListener(String beanName, Class<?> type, Method method) {
		return new ApplicationListenerMethodTransactionalAdapter(beanName, type, method);
	}
}
class ApplicationListenerMethodTransactionalAdapter extends ApplicationListenerMethodAdapter {
  
   //省略部分代码......
  
	@Override
	public void onApplicationEvent(ApplicationEvent event) {
		if (TransactionSynchronizationManager.isSynchronizationActive()) {
      // 事务存在时,生成TransactionSynchronization并加入到 TransactionSynchronizationManager的缓存集合中
			TransactionSynchronization transactionSynchronization = createTransactionSynchronization(event);
			TransactionSynchronizationManager.registerSynchronization(transactionSynchronization);
		} else if (this.annotation.fallbackExecution()) {
			//.......
			}
			processEvent(event);
		} else {
			// 当前不存在事务什么也不做
		}
	}

上述 @TransactionalEventListener 本质上是一个 @EventListener,TransactionalEventListenerFactory类会将每一个扫描到的方法有TransactionalEventListener注解包装成ApplicationListenerMethodTransactionalAdapter对象,通过ApplicationListenerMethodTransactionalAdapter的onApplicationEvent方法可以看到若当前存在事务,就会生成TransactionSynchronization并加入到 TransactionSynchronizationManager的缓存ThreadLocal集合中,剩余流程同上述分析。(使用 @TransactionalEventListener 结合 Spring事件监听机制,并使用到异步方式感觉有点别扭,这里是为了说明问题)。

四、使用案例

ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(5,
        new ThreadFactoryBuilder().setDaemon(false).setNamePrefix("execApiCache").build());
 
@Override
@Transactional(rollbackFor = Exception.class)
public ResultVO addApi(Api api, List<Header> headerList, List<Request> requestList, Response response, List<Script> scriptList, List<RespCodeMapping> respCodeMappingList) {
 
    // 数据库代码...
 
    // 异步代码
    TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
        @Override
        public void afterCommit() {
            log.warn("afterCommit...");
            executorService.execute(() -> {
                // 异步业务
                execApiCache(api);
            });
    }});
 
    return ResultUtil.buildSucc();
}

Ps:setDaemon(false) 注意这里守护线程标记必须设置为 false,否则主线程执行完,异步线程没执行完的话,异步线程会马上被中断、关闭,所以这里不能设置成守护(用户)线程。

以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。

您可能感兴趣的文章:
阅读全文