springboot动态数据源+分布式事务的实现
作者:wenzheng_du
1.引入jta-atomikos
这个springboot 是自带的。我的springboot版本为2.5.9,数据库为mysql。
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-jta-atomikos</artifactId> </dependency>
2.配置数据源(application.yml)
spring: # 数据源配置 jta: enabled: true datasource: master: # 主库数据源 url: username: password: driver-class-name: com.mysql.cj.jdbc.Driver slave: # 从库数据源 url: username: password: driver-class-name: com.mysql.cj.jdbc.Driver
新建文件 DataSourceConfig ,配置数据源及线程池。你有几个数据源就建立几个XXXDataSource方法,最后放到dynamicDataSource方法中的Map中去。事务管理一定要用JtaTransactionManager。SqlSessionFactory 是可以不用写的,如果要写的话,如果你用了mybatis-plus,一定要用MybatisSqlSessionFactoryBean,不然在启动或者调用Mybatis-plus的方法时,会报找不到bean的错。
import com.atomikos.icatch.jta.UserTransactionImp; import com.atomikos.icatch.jta.UserTransactionManager; import org.aspectj.lang.annotation.Aspect; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.jta.atomikos.AtomikosDataSourceBean; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Primary; import org.springframework.core.env.Environment; import org.springframework.transaction.PlatformTransactionManager; import org.springframework.transaction.annotation.EnableTransactionManagement; import org.springframework.transaction.jta.JtaTransactionManager; import javax.sql.DataSource; import javax.transaction.SystemException; import javax.transaction.UserTransaction; import java.util.HashMap; import java.util.Map; import java.util.Properties; @Aspect @Configuration @EnableTransactionManagement public class DataSourceConfig { @Autowired private Environment env; @Bean(name = "masterDataSource") public DataSource masterDataSource() { AtomikosDataSourceBean ds = new AtomikosDataSourceBean(); ds.setUniqueResourceName("master"); ds.setXaDataSourceClassName("com.mysql.cj.jdbc.MysqlXADataSource"); ds.setXaProperties(getXaProperties("master")); ds.setMaxPoolSize(10); ds.setMinPoolSize(5); ds.setBorrowConnectionTimeout(60); return ds; } @Bean(name = "slaveDataSource") public DataSource slaveDataSource() { AtomikosDataSourceBean ds = new AtomikosDataSourceBean(); ds.setUniqueResourceName("slave"); ds.setXaDataSourceClassName("com.mysql.cj.jdbc.MysqlXADataSource"); ds.setXaProperties(getXaProperties("slave")); ds.setMaxPoolSize(10); ds.setMinPoolSize(5); ds.setBorrowConnectionTimeout(60); return ds; } @Bean @Primary public DataSource dynamicDataSource(@Qualifier("masterDataSource") DataSource masterDataSource, @Qualifier("slaveDataSource") DataSource slaveDataSource) { DynamicDataSource dataSource = new DynamicDataSource(); Map<Object, Object> targetDataSources = new HashMap<>(); targetDataSources.put("master", masterDataSource); targetDataSources.put("slave", slaveDataSource); dataSource.setTargetDataSources(targetDataSources); dataSource.setDefaultTargetDataSource(masterDataSource); return dataSource; } @Bean public UserTransaction userTransaction() throws SystemException { UserTransactionImp userTransactionImp = new UserTransactionImp(); userTransactionImp.setTransactionTimeout(10000); return userTransactionImp; } @Bean(initMethod = "init", destroyMethod = "close") public UserTransactionManager userTransactionManager() { UserTransactionManager userTransactionManager = new UserTransactionManager(); userTransactionManager.setForceShutdown(false); return userTransactionManager; } @Bean public PlatformTransactionManager transactionManager(UserTransaction userTransaction, UserTransactionManager userTransactionManager) { JtaTransactionManager jtaTransactionManager = new JtaTransactionManager(); jtaTransactionManager.setUserTransaction(userTransaction); jtaTransactionManager.setTransactionManager(userTransactionManager); return jtaTransactionManager; } private Properties getXaProperties(String dataSourceType) { Properties xaProps = new Properties(); String username = getPropertyValue("spring.datasource." + dataSourceType + ".username"); String password = getPropertyValue("spring.datasource." + dataSourceType + ".password"); String url = getPropertyValue("spring.datasource." + dataSourceType + ".url"); xaProps.put("user", username); xaProps.put("password", password); xaProps.put("url", url); return xaProps; } private String getPropertyValue(String str) { return env.getProperty(str); } }
新建文件DynamicDataSource,这个类继承AbstractRoutingDataSource。
import org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource; public class DynamicDataSource extends AbstractRoutingDataSource { @Override protected Object determineCurrentLookupKey() { String dataSourceKey = DataSourceContextHolder.getDataSource(); if (dataSourceKey == null) { return "master"; // 默认数据源 } return dataSourceKey; } }
建立一个上下文DataSourceContextHolder文件,用于读取当前数据源。
public class DataSourceContextHolder { public static ThreadLocal<String> threadLocal = new ThreadLocal<>(); public static void setDataSource(String dataSourceName) { threadLocal.set(dataSourceName); } public static String getDataSource() { return threadLocal.get(); } public static void clearDataSource() { threadLocal.remove(); } }
3.配置注解
这个没啥好说的,文件名自己定义都可以。注意后面的默认数据源参数,配置了的话一定要是你的。
import java.lang.annotation.ElementType; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; @Target({ElementType.TYPE, ElementType.METHOD}) @Retention(RetentionPolicy.RUNTIME) public @interface MDS { String value() default "master"; }
4.切面
在类上使用上面的自定义注解。然后用切面去切换数据源。这里我用了3个判断,首先会先判断方法上有没有自定义注解,其次类上,都没有就用默认的数据源。
import cn.hutool.core.annotation.AnnotationUtil; import com.rs.common.annotation.MDS; import com.rs.framework.config.dynamic.DataSourceContextHolder; import org.aspectj.lang.JoinPoint; import org.aspectj.lang.annotation.AfterReturning; import org.aspectj.lang.annotation.AfterThrowing; import org.aspectj.lang.annotation.Aspect; import org.aspectj.lang.annotation.Before; import org.aspectj.lang.reflect.MethodSignature; import org.springframework.core.annotation.Order; import org.springframework.stereotype.Component; import java.lang.reflect.Method; @Aspect @Order(0) @Component public class DataSourceAspect { @Before("@within(mds) || @annotation(mds)") public void before(JoinPoint joinPoint, MDS mds) { MethodSignature signature = (MethodSignature) joinPoint.getSignature(); Method method = signature.getMethod(); Class<?> targetClass = method.getDeclaringClass(); Object value = AnnotationUtil.getAnnotationValue(targetClass, MDS.class); if(mds != null && mds.value() != null){ DataSourceContextHolder.setDataSource(mds.value()); } else if (value != null) { DataSourceContextHolder.setDataSource((String) value); } else { DataSourceContextHolder.setDataSource("master"); } } @AfterReturning(pointcut = "@within(mds) ||@annotation(mds)", returning = "returnVal") public void afterReturning(JoinPoint joinPoint, MDS mds, Object returnVal) { DataSourceContextHolder.clearDataSource(); } @AfterThrowing(pointcut = "@within(mds) ||@annotation(mds)", throwing = "ex") public void afterThrowing(JoinPoint joinPoint, MDS mds, Throwable ex) { DataSourceContextHolder.clearDataSource(); } }
5.如何使用
一个方法中执行不同数据源操作。getUserPageList方法使用的是默认数据源master,即最后的返回是返回master的查询结果。updateBusinessStatus方法使用的是从库数据源,在执行完sql后我写了一个异常。
这里需要注意的是,一定要写@Transactional(rollbackFor = Exception.class),且是写在方法上!!!别为了省事写在类上,写在类上发生异常不会回滚(亲测)。
@Override @Transactional(rollbackFor = Exception.class) public IPage<SysUserDto> getUserPageList(SysUser user, Page<SysUser> page) { user = new SysUser(); user.setUserId(29L); user.setDelFlag("0"); sysUserMapper.updateById(user); flwProcessService.updateBusinessStatus("1851531692768452608",3); return sysUserMapper.getUserPageList(user,page.page()); } @Override @MDS("slave") @Transactional(rollbackFor = Exception.class) public void updateBusinessStatus(String procInstId, Integer businessStatus) { flwProcessMapper.update(null,new UpdateWrapper<FlwProcess>().lambda() .eq(FlwProcess::getProcInstId,procInstId) .set(FlwProcess::getBusinessStatus,businessStatus) ); int i = 1/0; }
注意事项
一个方法不同数据源的操作@Transactional(rollbackFor = Exception.class)写在类上!!!
一个方法不同数据源的操作@Transactional(rollbackFor = Exception.class)写在类上!!!
一个方法不同数据源的操作@Transactional(rollbackFor = Exception.class)写在类上!!!
到此这篇关于springboot动态数据源+分布式事务的实现的文章就介绍到这了,更多相关springboot动态数据源+分布式事务内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!