Spring Data JPA开启批量更新时乐观锁失效问题的解决方法
作者:TechGenius
乐观锁机制
什么是乐观锁?
乐观锁的基本思想是,认为在大多数情况下,数据访问不会导致冲突。因此,乐观锁允许多个事务同时读取和修改相同的数据,而不进行显式的锁定。在提交事务
之前,会检查是否有其他事务对该数据进行了修改。如果没有冲突,则提交成功;如果发现冲突,就需要回滚并重新尝试。
乐观锁通常使用版本号或时间戳来实现。每个数据项都会包含一个表示当前版本的标识符。在读取数据时,会将版本标识符保存下来。在提交更新时,会检查数据的当前版本是否与保存的版本匹配。如果匹配,则更新成功;否则,表示数据已被其他事务修改,需要处理冲突。
乐观锁适用于读操作频率较高、写操作冲突较少的场景。它减少了锁的使用,提高了并发性能,但需要处理冲突和重试的情况。
乐观锁是一种广义的思想,不是某一框架或语言特有的。
乐观锁的优缺点
优点
- 增强吞吐量:由于在事务持续时间的大部分时间内没有持有锁,因此等待时间最少,吞吐量也是最⼤的。
- 最小化死锁:死锁是一种事务无限期地等待其他人锁定的资源的情况,这种情况的可能性要小得多,因为数据不会长时间锁定。
- 更好的可扩展性:随着分布式系统和微服务架构的兴起,乐观锁在确保系统能够有效扩展而无需管理复杂锁机制的开销方面发挥着关键作用。
缺点
- 冲突管理开销:在冲突频繁的场景中,管理和解决冲突可能会占用大量资源。
- 复杂性:实现乐观锁需要经过深思熟虑的设计,特别是在处理失败的事务时。
- 过时数据的可能性:由于数据在读取时未锁定,因此事务可能会使用过时或过时的数据,如果管理不正确,可能会导致逻辑错误或不一致。
JPA-乐观锁
概述
JPA(Java Persistence API)
协议对乐观锁的操作做了规定:通过指定@Version
字段对数据增加版本号控制,进⽽在更新的时候判断版本号是否有变化。如果版本没有变化则更新成功;如果版本有变化,就会更新失败并抛出“OptimisticLockException”
异常。我们⽤ SQL 表示⼀下乐观锁的做法,代码如下:
SELECT uid, name, version FROM user WHERE id = 1; UPDATE user SET name = 'jack', version = version + 1 WHERE id = 1 AND version = 1;
假设本次查询的version=1
,在更新操作时,只要version
与上一个版本相同,就会更新成功,并且不会出现互相覆盖的问题,保证了数据的原⼦性。
实现方法
JPA 协议规定,想要实现乐观锁,可以通过@Version
注解标注在某个字段上⾯,而此字段需要是可以持久化到DB的字段,并且只⽀持如下四种类型:
int
或Integer
short
或Short
long
或Long
java.sql.Timestamp
我比较推荐使用Integer类型的字段,语义比较清晰、简单。
@Version的作用
该@Version
注解用于启用实体上的乐观锁,确保数据库中的数据更新不会出现并发修改问题。当实体中的某个字段标记为@Version
时,JPA 将使用该字段来跟踪更改并确保一次只有一个事务可以更新特定行。
注意:Spring Data JPA ⾥⾯有两个@Version注解,请使⽤@javax.persistence.Version,⽽不是@org.springframework.data.annotation.Version。
它是如何工作的?
每个用注解标记的实体都@Version
将由 JPA 跟踪其版本。这是基本机制:
- 初始化:当实体第一次被持久化(保存到数据库)时,版本字段(通常是整数或时间戳)被设置为其初始值,通常为零。
- 读取:稍后获取实体时,JPA 会从数据库中检索当前版本。
- 更新:在尝试更新或删除实体时,JPA 会根据实体的版本检查数据库中的当前版本。如果版本匹配,则操作继续,并且数据库中的版本增加(用于更新)。
- 冲突:如果版本不匹配,则表明另一个事务同时更新了实体,导致 JPA 抛出
OptimisticLockException
。
项目示例
引入依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-jpa</artifactId> </dependency> <!-- 驱动 --> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> </dependency> <!-- 数据库连接池 --> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-dbcp2</artifactId> </dependency>
项目配置
spring: datasource: url: jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&allowMultiQueries=true&useSSL=false username: root password: root jpa: database: mysql database-platform: org.hibernate.dialect.MySQL5InnoDBDialect show-sql: true hibernate: ddl-auto: update # 一般使用update # create: 每次运行程序时,都会重新创建表,故而数据会丢失 # create-drop: 每次运行程序时会先创建表结构,然后待程序结束时清空表 # upadte: 每次运行程序,没有表时会创建表,如果对象发生改变会更新表结构,原有数据不会清空,只会更新(推荐使用) # validate: 运行程序会校验数据与数据库的字段类型是否相同,字段不同会报错 # none: 禁用DDL处理 open-in-view: false properties: hibernate: jdbc: # 开启批量更新/写入 batch_size: 50 batch_versioned_data: true order_inserts: true order_updates: true
实体添加@Version
User
实体增加字段version
,并添加注解@Version
。当然,数据库也要加上version
字段。
@Entity @Table(name = "TEST_USER") public class User { // ...... @Version private Integer version; // ...... }
创建UserInfoRepository
创建UserInfoRepository,⽅便进⾏DB操作
public interface UserInfoRepository extends JpaRepository<User, Long> {}
创建 UserInfoService
创建 UserInfoService,⽤来模拟Service的复杂业务逻辑。
public interface UserService { /** * 根据 UserId 产⽣的⼀些业务计算逻辑 */ User calculate(Long userId); } @Service public class UserServiceImpl implements UserService { @Autowired private UserRepository userRepository; @Override @Transactional public User calculate(Long userId) { User user = repository.getById(userId); // 模拟复杂的业务计算逻辑耗时操作; try { TimeUnit.SECONDS.sleep(2L); } catch (InterruptedException ignored) { } user.setAge(user.getAge() + 1); return userRepository.saveAndFlush(user); } }
其中,我们通过 @Transactional 开启事务,并且在查询⽅法后⾯模拟复杂业务逻辑,⽤来呈现多线程的并发问题。
测试方法
@ExtendWith(SpringExtension.class) @DataJpaTest @ComponentScan(basePackageClasses = UserServiceImpl.class) class UserServiceTest { @Autowired private UserService userService; @Autowired private UserRepository userRepository; @Test void testVersion() { // 加⼀条数据 User user1 = userRepository.save(User.builder().age(20).name("zzn").build()); // 验证⼀下数据库⾥⾯的值 Assertions.assertEquals(0, user1.getVersion()); Assertions.assertEquals(20, user1.getAge()); userService.calculate(user1.getId()); // 验证⼀下更新成功的值 User user2 = userRepository.getById(user1.getId()); Assertions.assertEquals(1, user2.getVersion()); Assertions.assertEquals(21, user2.getAge()); } @SneakyThrows @Test @Rollback(false) @Transactional(propagation = Propagation.NEVER) void testVersionException() { // 加⼀条数据 userRepository.save(User.builder().age(20).name("zzn").build()); // 模拟多线程执⾏两次 new Thread(() -> userService.calculate(1L)).start(); TimeUnit.SECONDS.sleep(1L); // 如果两个线程同时执⾏会发⽣乐观锁异常; Exception exception = Assertions.assertThrows(ObjectOptimisticLockingFailureException.class, () -> userService.calculate(1L)); log.info("error info:", exception); } }
从上⾯的测试得到的结果中,我们执⾏testVersion()
,会发现在 save
的时候, Version
会⾃动 +1,第⼀次初始化为 0;update 的时候也会附带 Version
条件,我们通过下图的 SQL,也可以看到 Version 的变化。
⽽当⾯我们调⽤testVersionException()
测试⽅法的时候,利⽤多线程模拟两个并发情况,会发现两个线程同时取到了历史数据,并在稍后都对历史数据进⾏了更新。
由此你会发现,第⼆次测试的结果是乐观锁异常,更新不成功。
通过⽇志⼜会发现,两个SQL同时更新的时候,Version
是⼀样的,是它导致了乐观锁异常。
注意:乐观锁异常不仅仅是同⼀个⽅法多线程才会出现的问题,我们只是为了⽅便测试⽽采⽤同⼀个⽅法;不同的⽅法、不同的项⽬,都有可能导致乐观锁异常。乐观锁的本质是 SQL 层⾯发⽣的,和使⽤的框架、技术没有关系。
问题描述
一句废话:正常情况下,一切正常!
运行环境
Java:1.8.0 SpringBoot:2.3.12.RELEASE Spring Data JPA:2.3.9.RELEASE Hibernate:5.4.32.Final Database Driver:ojdbc6 11.2.0.3 Database Platform:Oracle 10g
问题现象
上述代码示例运行在MySQL数据库上,一切正常,但是切换到Oracle数据库时,不开启批量更新模式时,也符合预期,但是开启批量更新模式时,不符合预期:并发更新同一实体时,未抛出
ObjectOptimisticLockingFailureException
异常。
数据库类型 | 开启批量 | 不开启批量 |
---|---|---|
Oracle | 不生效 | 生效 |
MySQL | 生效 | 生效 |
批量模式下,乐观锁异常栈:
Caused by: org.hibernate.StaleStateException: Batch update returned unexpected row count from update [0]; actual row count: 0; expected: 1; statement executed: update test_user set update_time=?, version=?, remark=? where user_id=? and version=? at org.hibernate.jdbc.Expectations$BasicExpectation.checkBatched(Expectations.java:67) ~[hibernate-core-5.4.32.Final.jar:5.4.32.Final] at org.hibernate.jdbc.Expectations$BasicExpectation.verifyOutcome(Expectations.java:54) ~[hibernate-core-5.4.32.Final.jar:5.4.32.Final] at org.hibernate.engine.jdbc.batch.internal.BatchingBatch.checkRowCounts(BatchingBatch.java:151) ~[hibernate-core-5.4.32.Final.jar:5.4.32.Final] at org.hibernate.engine.jdbc.batch.internal.BatchingBatch.performExecution(BatchingBatch.java:126) ~[hibernate-core-5.4.32.Final.jar:5.4.32.Final] at org.hibernate.engine.jdbc.batch.internal.BatchingBatch.doExecuteBatch(BatchingBatch.java:106) ~[hibernate-core-5.4.32.Final.jar:5.4.32.Final] at org.hibernate.engine.jdbc.batch.internal.AbstractBatchImpl.execute(AbstractBatchImpl.java:148) ~[hibernate-core-5.4.32.Final.jar:5.4.32.Final] at org.hibernate.engine.jdbc.internal.JdbcCoordinatorImpl.executeBatch(JdbcCoordinatorImpl.java:198) ~[hibernate-core-5.4.32.Final.jar:5.4.32.Final] at org.hibernate.engine.spi.ActionQueue.executeActions(ActionQueue.java:633) ~[hibernate-core-5.4.32.Final.jar:5.4.32.Final] at org.hibernate.engine.spi.ActionQueue.lambda$executeActions$1(ActionQueue.java:478) ~[hibernate-core-5.4.32.Final.jar:5.4.32.Final] at java.util.LinkedHashMap.forEach(LinkedHashMap.java:676) ~[?:1.8.0_73] at org.hibernate.engine.spi.ActionQueue.executeActions(ActionQueue.java:475) ~[hibernate-core-5.4.32.Final.jar:5.4.32.Final] at org.hibernate.event.internal.AbstractFlushingEventListener.performExecutions(AbstractFlushingEventListener.java:344) ~[hibernate-core-5.4.32.Final.jar:5.4.32.Final] at org.hibernate.event.internal.DefaultFlushEventListener.onFlush(DefaultFlushEventListener.java:40) ~[hibernate-core-5.4.32.Final.jar:5.4.32.Final] at org.hibernate.event.service.internal.EventListenerGroupImpl.fireEventOnEachListener(EventListenerGroupImpl.java:99) ~[hibernate-core-5.4.32.Final.jar:5.4.32.Final] at org.hibernate.internal.SessionImpl.doFlush(SessionImpl.java:1362) ~[hibernate-core-5.4.32.Final.jar:5.4.32.Final] at org.hibernate.internal.SessionImpl.flush(SessionImpl.java:1349) ~[hibernate-core-5.4.32.Final.jar:5.4.32.Final] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_73] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_73] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_73] at java.lang.reflect.Method.invoke(Method.java:497) ~[?:1.8.0_73] at org.springframework.orm.jpa.SharedEntityManagerCreator$SharedEntityManagerInvocationHandler.invoke(SharedEntityManagerCreator.java:314) ~[spring-orm-5.2.22.RELEASE.jar:5.2.22.RELEASE] at com.sun.proxy.$Proxy156.flush(Unknown Source) ~[?:?] at org.springframework.data.jpa.repository.support.SimpleJpaRepository.flush(SimpleJpaRepository.java:601) ~[spring-data-jpa-2.3.9.RELEASE.jar:2.3.9.RELEASE] at org.springframework.data.jpa.repository.support.SimpleJpaRepository.saveAndFlush(SimpleJpaRepository.java:570) ~[spring-data-jpa-2.3.9.RELEASE.jar:2.3.9.RELEASE] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_73] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_73] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_73] at java.lang.reflect.Method.invoke(Method.java:497) ~[?:1.8.0_73] at org.springframework.data.repository.core.support.ImplementationInvocationMetadata.invoke(ImplementationInvocationMetadata.java:72) ~[spring-data-commons-2.3.9.RELEASE.jar:2.3.9.RELEASE] at org.springframework.data.repository.core.support.RepositoryComposition$RepositoryFragments.invoke(RepositoryComposition.java:382) ~[spring-data-commons-2.3.9.RELEASE.jar:2.3.9.RELEASE] at org.springframework.data.repository.core.support.RepositoryComposition.invoke(RepositoryComposition.java:205) ~[spring-data-commons-2.3.9.RELEASE.jar:2.3.9.RELEASE] at org.springframework.data.repository.core.support.RepositoryFactorySupport$ImplementationMethodExecutionInterceptor.invoke(RepositoryFactorySupport.java:550) ~[spring-data-commons-2.3.9.RELEASE.jar:2.3.9.RELEASE] at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) ~[spring-aop-5.2.22.RELEASE.jar:5.2.22.RELEASE] at org.springframework.data.repository.core.support.QueryExecutorMethodInterceptor.doInvoke(QueryExecutorMethodInterceptor.java:155) ~[spring-data-commons-2.3.9.RELEASE.jar:2.3.9.RELEASE] at org.springframework.data.repository.core.support.QueryExecutorMethodInterceptor.invoke(QueryExecutorMethodInterceptor.java:130) ~[spring-data-commons-2.3.9.RELEASE.jar:2.3.9.RELEASE] at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) ~[spring-aop-5.2.22.RELEASE.jar:5.2.22.RELEASE] at org.springframework.data.projection.DefaultMethodInvokingMethodInterceptor.invoke(DefaultMethodInvokingMethodInterceptor.java:80) ~[spring-data-commons-2.3.9.RELEASE.jar:2.3.9.RELEASE] at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) ~[spring-aop-5.2.22.RELEASE.jar:5.2.22.RELEASE] at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:367) ~[spring-tx-5.2.22.RELEASE.jar:5.2.22.RELEASE] at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:118) ~[spring-tx-5.2.22.RELEASE.jar:5.2.22.RELEASE] at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) ~[spring-aop-5.2.22.RELEASE.jar:5.2.22.RELEASE] at org.springframework.dao.support.PersistenceExceptionTranslationInterceptor.invoke(PersistenceExceptionTranslationInterceptor.java:139) ~[spring-tx-5.2.22.RELEASE.jar:5.2.22.RELEASE] ... 109 more
非批量模式下,乐观锁异常栈:
Caused by: org.hibernate.StaleObjectStateException: Row was updated or deleted by another transaction (or unsaved-value mapping was incorrect) : [com.esunny.option.domain.user.User#990] at org.hibernate.persister.entity.AbstractEntityPersister.check(AbstractEntityPersister.java:2649) ~[hibernate-core-5.4.32.Final.jar:5.4.32.Final] at org.hibernate.persister.entity.AbstractEntityPersister.update(AbstractEntityPersister.java:3492) ~[hibernate-core-5.4.32.Final.jar:5.4.32.Final] at org.hibernate.persister.entity.AbstractEntityPersister.updateOrInsert(AbstractEntityPersister.java:3355) ~[hibernate-core-5.4.32.Final.jar:5.4.32.Final] at org.hibernate.persister.entity.AbstractEntityPersister.update(AbstractEntityPersister.java:3769) ~[hibernate-core-5.4.32.Final.jar:5.4.32.Final] at org.hibernate.action.internal.EntityUpdateAction.execute(EntityUpdateAction.java:201) ~[hibernate-core-5.4.32.Final.jar:5.4.32.Final] at org.hibernate.engine.spi.ActionQueue.executeActions(ActionQueue.java:604) ~[hibernate-core-5.4.32.Final.jar:5.4.32.Final] at org.hibernate.engine.spi.ActionQueue.lambda$executeActions$1(ActionQueue.java:478) ~[hibernate-core-5.4.32.Final.jar:5.4.32.Final] at java.util.LinkedHashMap.forEach(LinkedHashMap.java:676) ~[?:1.8.0_73] at org.hibernate.engine.spi.ActionQueue.executeActions(ActionQueue.java:475) ~[hibernate-core-5.4.32.Final.jar:5.4.32.Final] at org.hibernate.event.internal.AbstractFlushingEventListener.performExecutions(AbstractFlushingEventListener.java:344) ~[hibernate-core-5.4.32.Final.jar:5.4.32.Final] at org.hibernate.event.internal.DefaultFlushEventListener.onFlush(DefaultFlushEventListener.java:40) ~[hibernate-core-5.4.32.Final.jar:5.4.32.Final] at org.hibernate.event.service.internal.EventListenerGroupImpl.fireEventOnEachListener(EventListenerGroupImpl.java:99) ~[hibernate-core-5.4.32.Final.jar:5.4.32.Final] at org.hibernate.internal.SessionImpl.doFlush(SessionImpl.java:1362) ~[hibernate-core-5.4.32.Final.jar:5.4.32.Final] at org.hibernate.internal.SessionImpl.flush(SessionImpl.java:1349) ~[hibernate-core-5.4.32.Final.jar:5.4.32.Final] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_73] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_73] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_73] at java.lang.reflect.Method.invoke(Method.java:497) ~[?:1.8.0_73] at org.springframework.orm.jpa.SharedEntityManagerCreator$SharedEntityManagerInvocationHandler.invoke(SharedEntityManagerCreator.java:314) ~[spring-orm-5.2.22.RELEASE.jar:5.2.22.RELEASE] at com.sun.proxy.$Proxy156.flush(Unknown Source) ~[?:?] at org.springframework.data.jpa.repository.support.SimpleJpaRepository.flush(SimpleJpaRepository.java:601) ~[spring-data-jpa-2.3.9.RELEASE.jar:2.3.9.RELEASE] at org.springframework.data.jpa.repository.support.SimpleJpaRepository.saveAndFlush(SimpleJpaRepository.java:570) ~[spring-data-jpa-2.3.9.RELEASE.jar:2.3.9.RELEASE] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_73] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_73] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_73] at java.lang.reflect.Method.invoke(Method.java:497) ~[?:1.8.0_73] at org.springframework.data.repository.core.support.ImplementationInvocationMetadata.invoke(ImplementationInvocationMetadata.java:72) ~[spring-data-commons-2.3.9.RELEASE.jar:2.3.9.RELEASE] at org.springframework.data.repository.core.support.RepositoryComposition$RepositoryFragments.invoke(RepositoryComposition.java:382) ~[spring-data-commons-2.3.9.RELEASE.jar:2.3.9.RELEASE] at org.springframework.data.repository.core.support.RepositoryComposition.invoke(RepositoryComposition.java:205) ~[spring-data-commons-2.3.9.RELEASE.jar:2.3.9.RELEASE] at org.springframework.data.repository.core.support.RepositoryFactorySupport$ImplementationMethodExecutionInterceptor.invoke(RepositoryFactorySupport.java:550) ~[spring-data-commons-2.3.9.RELEASE.jar:2.3.9.RELEASE] at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) ~[spring-aop-5.2.22.RELEASE.jar:5.2.22.RELEASE] at org.springframework.data.repository.core.support.QueryExecutorMethodInterceptor.doInvoke(QueryExecutorMethodInterceptor.java:155) ~[spring-data-commons-2.3.9.RELEASE.jar:2.3.9.RELEASE] at org.springframework.data.repository.core.support.QueryExecutorMethodInterceptor.invoke(QueryExecutorMethodInterceptor.java:130) ~[spring-data-commons-2.3.9.RELEASE.jar:2.3.9.RELEASE] at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) ~[spring-aop-5.2.22.RELEASE.jar:5.2.22.RELEASE] at org.springframework.data.projection.DefaultMethodInvokingMethodInterceptor.invoke(DefaultMethodInvokingMethodInterceptor.java:80) ~[spring-data-commons-2.3.9.RELEASE.jar:2.3.9.RELEASE] at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) ~[spring-aop-5.2.22.RELEASE.jar:5.2.22.RELEASE] at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:367) ~[spring-tx-5.2.22.RELEASE.jar:5.2.22.RELEASE] at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:118) ~[spring-tx-5.2.22.RELEASE.jar:5.2.22.RELEASE] at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) ~[spring-aop-5.2.22.RELEASE.jar:5.2.22.RELEASE] at org.springframework.dao.support.PersistenceExceptionTranslationInterceptor.invoke(PersistenceExceptionTranslationInterceptor.java:139) ~[spring-tx-5.2.22.RELEASE.jar:5.2.22.RELEASE] ... 109 more
代码分析
从以上两种模式下的异常栈分析代码路径:
org.springframework.data.jpa.repository.support.SimpleJpaRepository.saveAndFlush org.springframework.data.jpa.repository.support.SimpleJpaRepository.flush org.hibernate.internal.SessionImpl.flush org.hibernate.event.service.internal.EventListenerGroupImpl.fireEventOnEachListener org.hibernate.event.internal.DefaultFlushEventListener.onFlush org.hibernate.event.internal.AbstractFlushingEventListener.performExecutions org.hibernate.engine.spi.ActionQueue.executeActions
ActionQueue.executeActions逻辑如下:
hibernate-core-5.4.32.Final-sources.jar!/org/hibernate/engine/spi/ActionQueue.java
/** * Perform all currently queued actions. * * @throws HibernateException error executing queued actions. */ public void executeActions() throws HibernateException { if ( hasUnresolvedEntityInsertActions() ) { throw new IllegalStateException( "About to execute actions, but there are unresolved entity insert actions." ); } for ( ListProvider listProvider : EXECUTABLE_LISTS_MAP.values() ) { ExecutableList<?> l = listProvider.get( this ); if ( l != null && !l.isEmpty() ) { executeActions( l ); } } } /** * Perform {@link org.hibernate.action.spi.Executable#execute()} on each element of the list * * @param list The list of Executable elements to be performed * * @throws HibernateException */ private <E extends Executable & Comparable<?> & Serializable> void executeActions(ExecutableList<E> list) throws HibernateException { // todo : consider ways to improve the double iteration of Executables here: // 1) we explicitly iterate list here to perform Executable#execute() // 2) ExecutableList#getQuerySpaces also iterates the Executables to collect query spaces. try { for ( E e : list ) { try { e.execute(); } finally { if( e.getBeforeTransactionCompletionProcess() != null ) { if( beforeTransactionProcesses == null ) { beforeTransactionProcesses = new BeforeTransactionCompletionProcessQueue( session ); } beforeTransactionProcesses.register(e.getBeforeTransactionCompletionProcess()); } if( e.getAfterTransactionCompletionProcess() != null ) { if( afterTransactionProcesses == null ) { afterTransactionProcesses = new AfterTransactionCompletionProcessQueue( session ); } afterTransactionProcesses.register(e.getAfterTransactionCompletionProcess()); } } } } finally { if ( session.getFactory().getSessionFactoryOptions().isQueryCacheEnabled() ) { // Strictly speaking, only a subset of the list may have been processed if a RuntimeException occurs. // We still invalidate all spaces. I don't see this as a big deal - after all, RuntimeExceptions are // unexpected. Set<Serializable> propertySpaces = list.getQuerySpaces(); invalidateSpaces( propertySpaces.toArray( new Serializable[propertySpaces.size()] ) ); } } list.clear(); session.getJdbcCoordinator().executeBatch(); }
这里在for循环里头调用了e.execute()
,同时在循环之后,finally之后调用了session.getJdbcCoordinator().executeBatch()
其中,EXECUTABLE_LISTS_MAP
中的Executable
包括:EntityInsertAction
、EntityUpdateAction
、EntityDeleteAction
等。
Executable.execute逻辑如下:
hibernate-core-5.4.32.Final-sources.jar!/org/hibernate/action/internal/EntityUpdateAction.java
@Override public void execute() throws HibernateException { final Serializable id = getId(); final EntityPersister persister = getPersister(); final SharedSessionContractImplementor session = getSession(); final Object instance = getInstance(); final boolean veto = preUpdate(); final SessionFactoryImplementor factory = session.getFactory(); Object previousVersion = this.previousVersion; if ( persister.isVersionPropertyGenerated() ) { // we need to grab the version value from the entity, otherwise // we have issues with generated-version entities that may have // multiple actions queued during the same flush previousVersion = persister.getVersion( instance ); } final Object ck; if ( persister.canWriteToCache() ) { final EntityDataAccess cache = persister.getCacheAccessStrategy(); ck = cache.generateCacheKey( id, persister, factory, session.getTenantIdentifier() ); lock = cache.lockItem( session, ck, previousVersion ); } else { ck = null; } if ( !veto ) { persister.update( id, state, dirtyFields, hasDirtyCollection, previousState, previousVersion, instance, rowId, session ); } final EntityEntry entry = session.getPersistenceContextInternal().getEntry( instance ); if ( entry == null ) { throw new AssertionFailure( "possible nonthreadsafe access to session" ); } if ( entry.getStatus()==Status.MANAGED || persister.isVersionPropertyGenerated() ) { // get the updated snapshot of the entity state by cloning current state; // it is safe to copy in place, since by this time no-one else (should have) // has a reference to the array TypeHelper.deepCopy( state, persister.getPropertyTypes(), persister.getPropertyCheckability(), state, session ); if ( persister.hasUpdateGeneratedProperties() ) { // this entity defines property generation, so process those generated // values... persister.processUpdateGeneratedProperties( id, instance, state, session ); if ( persister.isVersionPropertyGenerated() ) { nextVersion = Versioning.getVersion( state, persister ); } } // have the entity entry doAfterTransactionCompletion post-update processing, passing it the // update state and the new version (if one). entry.postUpdate( instance, state, nextVersion ); } final StatisticsImplementor statistics = factory.getStatistics(); if ( persister.canWriteToCache() ) { if ( persister.isCacheInvalidationRequired() || entry.getStatus()!= Status.MANAGED ) { persister.getCacheAccessStrategy().remove( session, ck); } else if ( session.getCacheMode().isPutEnabled() ) { //TODO: inefficient if that cache is just going to ignore the updated state! final CacheEntry ce = persister.buildCacheEntry( instance,state, nextVersion, getSession() ); cacheEntry = persister.getCacheEntryStructure().structure( ce ); final boolean put = cacheUpdate( persister, previousVersion, ck ); if ( put && statistics.isStatisticsEnabled() ) { statistics.entityCachePut( StatsHelper.INSTANCE.getRootEntityRole( persister ), getPersister().getCacheAccessStrategy().getRegion().getName() ); } } } session.getPersistenceContextInternal().getNaturalIdHelper().manageSharedNaturalIdCrossReference( persister, id, state, previousNaturalIdValues, CachedNaturalIdValueSource.UPDATE ); postUpdate(); if ( statistics.isStatisticsEnabled() && !veto ) { statistics.updateEntity( getPersister().getEntityName() ); } }
调用了persister的update方法。
AbstractEntityPersister.update
hibernate-core-5.4.32.Final-sources.jar!/org/hibernate/persister/entity/AbstractEntityPersister.java
public boolean update( final Serializable id, final Object[] fields, final Object[] oldFields, final Object rowId, final boolean[] includeProperty, final int j, final Object oldVersion, final Object object, final String sql, final SharedSessionContractImplementor session) throws HibernateException { final Expectation expectation = Expectations.appropriateExpectation( updateResultCheckStyles[j] ); final int jdbcBatchSizeToUse = session.getConfiguredJdbcBatchSize(); // IMPLEMENTATION NOTE: If Session#saveOrUpdate or #update is used to update an entity, then // Hibernate does not have a database snapshot of the existing entity. // As a result, oldFields will be null. // Don't use a batch if oldFields == null and the jth table is optional (isNullableTable( j ), // because there is no way to know that there is actually a row to update. If the update // was batched in this case, the batch update would fail and there is no way to fallback to // an insert. final boolean useBatch = expectation.canBeBatched() && isBatchable() && jdbcBatchSizeToUse > 1 && ( oldFields != null || !isNullableTable( j ) ); if ( useBatch && updateBatchKey == null ) { updateBatchKey = new BasicBatchKey( getEntityName() + "#UPDATE", expectation ); } final boolean callable = isUpdateCallable( j ); final boolean useVersion = j == 0 && isVersioned(); if ( LOG.isTraceEnabled() ) { LOG.tracev( "Updating entity: {0}", MessageHelper.infoString( this, id, getFactory() ) ); if ( useVersion ) { LOG.tracev( "Existing version: {0} -> New version:{1}", oldVersion, fields[getVersionProperty()] ); } } try { int index = 1; // starting index final PreparedStatement update; if ( useBatch ) { update = session .getJdbcCoordinator() .getBatch( updateBatchKey ) .getBatchStatement( sql, callable ); } else { update = session .getJdbcCoordinator() .getStatementPreparer() .prepareStatement( sql, callable ); } try { index += expectation.prepare( update ); //Now write the values of fields onto the prepared statement index = dehydrate( id, fields, rowId, includeProperty, propertyColumnUpdateable, j, update, session, index, true ); // Write any appropriate versioning conditional parameters if ( useVersion && entityMetamodel.getOptimisticLockStyle().isVersion()) { if ( checkVersion( includeProperty ) ) { getVersionType().nullSafeSet( update, oldVersion, index, session ); } } else if ( isAllOrDirtyOptLocking() && oldFields != null ) { boolean[] versionability = getPropertyVersionability(); //TODO: is this really necessary???? boolean[] includeOldField = entityMetamodel.getOptimisticLockStyle().isAll() ? getPropertyUpdateability() : includeProperty; Type[] types = getPropertyTypes(); for ( int i = 0; i < entityMetamodel.getPropertySpan(); i++ ) { boolean include = includeOldField[i] && isPropertyOfTable( i, j ) && versionability[i]; //TODO: is this really necessary???? if ( include ) { boolean[] settable = types[i].toColumnNullness( oldFields[i], getFactory() ); types[i].nullSafeSet( update, oldFields[i], index, settable, session ); index += ArrayHelper.countTrue( settable ); } } } if ( useBatch ) { session.getJdbcCoordinator().getBatch( updateBatchKey ).addToBatch(); return true; } else { return check( session.getJdbcCoordinator().getResultSetReturn().executeUpdate( update ), id, j, expectation, update, sql ); } } catch (SQLException e) { if ( useBatch ) { session.getJdbcCoordinator().abortBatch(); } throw e; } finally { if ( !useBatch ) { session.getJdbcCoordinator().getResourceRegistry().release( update ); session.getJdbcCoordinator().afterStatementExecution(); } } } catch (SQLException e) { throw getFactory().getSQLExceptionHelper().convert( e, "could not update: " + MessageHelper.infoString( this, id, getFactory() ), sql ); } }
关键之处:
useBatch的赋值逻辑
public boolean isBatchable() { return optimisticLockStyle().isNone() || !isVersioned() && optimisticLockStyle().isVersion() || getFactory().getSessionFactoryOptions().isJdbcBatchVersionedData(); } 1. 配置了`spring.jpa.properties.hibernate.jdbc.batch_versioned_data`为true; 2. jdbcBatchSizeToUse > 1, 即`spring.jpa.properties.hibernate.jdbc.batch_size`大于0
- 如果useBatch为true
调用session.getJdbcCoordinator().getBatch(updateBatchKey).addToBatch();
这里的updateBatchKey
为com.example.domain.User#UPDATE
;此处仅是将PreparedStatement
放入待执行队列。
之后便执行session.getJdbcCoordinator().executeBatch()
逻辑;请看BatchingBatch.performExecution
- 如果useBatch为false
调用session.getJdbcCoordinator().getResultSetReturn().executeUpdate( update )
,并调用check
方法执行检查。此处检查失败,则会抛出乐观锁异常!
BatchingBatch.performExecution
hibernate-core-5.4.32.Final-sources.jar!/org/hibernate/engine/jdbc/batch/internal/BatchingBatch.java
private void performExecution() { LOG.debugf( "Executing batch size: %s", batchPosition ); try { for ( Map.Entry<String,PreparedStatement> entry : getStatements().entrySet() ) { try { final PreparedStatement statement = entry.getValue(); final int[] rowCounts; try { getJdbcCoordinator().getJdbcSessionOwner().getJdbcSessionContext().getObserver().jdbcExecuteBatchStart(); rowCounts = statement.executeBatch(); } finally { getJdbcCoordinator().getJdbcSessionOwner().getJdbcSessionContext().getObserver().jdbcExecuteBatchEnd(); } checkRowCounts( rowCounts, statement ); } catch ( SQLException e ) { abortBatch(); throw sqlExceptionHelper().convert( e, "could not execute batch", entry.getKey() ); } } } catch ( RuntimeException re ) { LOG.unableToExecuteBatch( re.getMessage() ); throw re; } finally { batchPosition = 0; } }
可以看到这里调用了statement.executeBatch(),并返回了int[] rowCounts;然后调用checkRowCounts( rowCounts, statement ); >Expectations#BasicExpectation.checkBatched 此处检查失败,则会抛出乐观锁异常!
问题原因
非批量模式下,检查执行结果是调用的checkNonBatched
方法,该方法仅检查更新条目数是否一致:
private void checkNonBatched(int rowCount, String statementSQL) { if ( expectedRowCount > rowCount ) { throw new StaleStateException( "Unexpected row count: " + rowCount + "; expected: " + expectedRowCount + "; statement executed: " + statementSQL ); } if ( expectedRowCount < rowCount ) { String msg = "Unexpected row count: " + rowCount + "; expected: " + expectedRowCount; throw new TooManyRowsAffectedException( msg, expectedRowCount, rowCount ); } }
批量模式下,检查执行结果是调用的checkBatched
方法,检查逻辑如下:
private void checkBatched(int rowCount, int batchPosition, String statementSQL) { if ( rowCount == -2 ) { LOG.debugf( "Success of batch update unknown: %s", batchPosition ); } else if ( rowCount == -3 ) { throw new BatchFailedException( "Batch update failed: " + batchPosition ); } else { if ( expectedRowCount > rowCount ) { throw new StaleStateException( "Batch update returned unexpected row count from update [" + batchPosition + "]; actual row count: " + rowCount + "; expected: " + expectedRowCount + "; statement executed: " + statementSQL ); } if ( expectedRowCount < rowCount ) { String msg = "Batch update returned unexpected row count from update [" + batchPosition + "]; actual row count: " + rowCount + "; expected: " + expectedRowCount; throw new BatchedTooManyRowsAffectedException( msg, expectedRowCount, rowCount, batchPosition ); } } }
问题便在于此!
int[] executeBatch() throws SQLException
返回值说明:
① 大于或等于零的数字,表示命令已成功处理,并且是更新计数,给出了数据库中受命令影响的行数执行;
② SUCCESS_NO_INFO ( -2)的值,表示命令处理成功,但受影响的行数未知;
③ 如果批量更新中的命令之一无法正确执行,此方法引发BatchUpdateException,JDBC Driver可能会也可能不会继续处理剩余的命令。但是Driver的行为是与特定的DBMS绑定的,要么总是继续处理命令,要么从不继续处理命令。如果驱动程序继续处理,方法将返回EXECUTE_FAILED(-3)。
在实际的测试过程中发现:
DB类型 | 是否可以返回实际影响行数 | 备注 |
---|---|---|
MySQL | 是 | |
Oracle | 否 | 每个数组位置值均为-2 |
在Oracle的驱动中没有实现该功能,即提交成功后不能返回影响行数,所以返回-2。
Oracle驱动源码如下:oracle.jdbc.driver.OraclePreparedStatement#executeBatch
public int[] executeBatch() throws SQLException { synchronized (this.connection) { int[] arrayOfInt = new int[this.currentRank]; /* 此处省略N行代码 */ if ((this.sqlKind != 1) && (this.sqlKind != 4)) { for (i = 0; i < arrayOfInt.length; i++) { arrayOfInt[i] = -2; // 关键看这行 } } this.connection.registerHeartbeat(); return arrayOfInt; } }
根据StackOverflow
上的说法,Oracle 11g
之前的版本,executeBatch
方法返回的均是-2,eg.
解决方案
Hibernate对于这个问题有自己的处理办法,就是设置一个jdbc和数据库的连接属性hibernate.jdbc.use_scrollable_resultset =true
。
如果你想让你的JDBC驱动从executeBatch()
返回正确的行计数 , 那么将此属性设为true(开启这个选项通常是安全的). 同时,Hibernate将为自动版本化的数据使用批量DML. 默认值为false. eg. true | false
以上就是Spring Data JPA开启批量更新时乐观锁失效问题的解决方法的详细内容,更多关于Spring Data JPA乐观锁失效的资料请关注脚本之家其它相关文章!