Spring多数据源切换失败,发现与事务相关问题
作者:ChengQinHong
背景
一个方法里,A数据源需要进行查询和更新操作,B数据源进行查询操作。
详细业务是查询用户基础信息数据(A数据源)的时候,需要查询登录行为数据(B数据源),同时对用户信息进行修改,所以该方法最后还需要更新用户基础信息数据(A数据源)。
原来的A实现思路
controller调用serviceImpl方法,在serviceImpl方法内某一段代码去切换B数据源查询结果,根据查询结果更新A数据源。
实现A思路的结果
表面上一切调试都显示切换了B数据源,应该是还没有深入源码去debug、
应该某个地方拿的数据源连接还是默认的A数据源,导致报错,提示没有找到对应数据库表,A数据源肯定没有B数据源的表啊,郁闷。
后来的B实现思路
查询帖子后发现事务的干预,导致数据源切换失败,决定换个思路,把切换数据源的方法放在了controller,因为我是仅对一个数据源进行更新操作,另一个数据源只作查询操作,此时整个事务其实只在A数据源进行,所以我就单独把对A数据源的操作声明为A方法,对B数据源的操作声明为B方法,在controller先调用B方法获取查询结果,作为入参去调用A方法,这样就解决了数据源切换问题也解决了A方法的事务问题。
实现B思路的结果
达到了预期。
切换数据源成功,A数据源查询、更新、事务都没问题,B数据源查询没问题。
⚠️注意:
此思路是把service的方法一分为二,在controller分别调用,只适用于对其中单一数据源作修改数据操作,并不适用于对多数据源同时进行修改数据操作,因为单数据源进行数据操作是普通数据源事务,并不复杂,就和我们平时使用@Transactional一样。
但是如果你对多数据源进行修改数据操作的话!事情就变得复杂起来了,多数据源事务,可让你头疼的了,因为回滚非常麻烦,类似于分布式事务了,阿里的分布式事务有SEATA支撑,这个我了解,但是以后我再讲这方面的,因为这个单体系统的多数据源事务还需要深入研究一下。
原因
由于默认使用的是主数据源master,只有在mapper接口方法上标注从数据源slave才会切换数据源过去,但是要注意事务(因为之前看一个帖子,说一个事物里,缓存了默认的数据库连接,即使代码里切换了数据源,重新去建立连接时候发现有缓存一个数据库连接耶,直接拿这个,导致我们切换数据源失败,因为拿的还是默认的数据库连接。
配置文件
spring: datasource: type: com.alibaba.druid.pool.DruidDataSource driverClassName: com.mysql.jdbc.Driver master: #主库A type: com.alibaba.druid.pool.DruidDataSource driverClassName: com.mysql.jdbc.Driver url: jdbc:mysql://127.0.0.1:3306/A?useUnicode=true&characterEncoding=UTF-8&useOldAliasMetadataBehavior=true&useSSL=false&serverTimezone=GMT%2B8 username: root password: 123456 slave: #从库B type: com.alibaba.druid.pool.DruidDataSource driverClassName: com.mysql.jdbc.Driver url: jdbc:mysql://192.168.1.12:3306/B?useUnicode=true&characterEncoding=UTF-8&useOldAliasMetadataBehavior=true&useSSL=true&serverTimezone=GMT%2B8 username: root password: 123456
配置类(蛮多的,注意,请复制完整)
import com.alibaba.druid.pool.DruidDataSource; import com.alibaba.druid.spring.boot.autoconfigure.DruidDataSourceBuilder; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Primary; import org.springframework.jdbc.datasource.LazyConnectionDataSourceProxy; import javax.sql.DataSource; import java.util.HashMap; // 主从数据源配置 @Configuration public class DataSourceConfiguration { DataSourceProperties masterDataSource = new DataSourceProperties(); DataSourceProperties slaveDataSource = new DataSourceProperties(); @Bean @ConfigurationProperties(prefix = "spring.datasource.master") public DataSource masterDataSource() { DruidDataSource druidDataSource = masterDataSource.setDataSource(DruidDataSourceBuilder.create().build()); return druidDataSource; } @Bean @ConfigurationProperties(prefix = "spring.datasource.slave") public DataSource slaveDataSource() { DruidDataSource druidDataSource = slaveDataSource.setDataSource(DruidDataSourceBuilder.create().build()); return druidDataSource; } @Bean public DataSource routeDataSource() { RoutingDataSource routingDataSource = new RoutingDataSource() {{ setDefaultTargetDataSource(masterDataSource()); setTargetDataSources(new HashMap<Object, Object>() {{ put(DbType.MASTER, masterDataSource()); put(DbType.SLAVE, slaveDataSource()); }}); }}; return routingDataSource; } @Bean @Primary public LazyConnectionDataSourceProxy lazyConnectionDataSourceProxy() { return new LazyConnectionDataSourceProxy(routeDataSource()); } }
import com.alibaba.druid.pool.DruidDataSource; import lombok.Setter; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Configuration; /** * 数据源配置文件 */ @Setter @Configuration @ConfigurationProperties(prefix = "spring.datasource.druid") public class DataSourceProperties { private int initialSize = 10; private int minIdle = 10; private int maxActive = 50; private int maxWait; private int timeBetweenEvictionRunsMillis = 300000; private int minEvictableIdleTimeMillis = 60000; private int maxEvictableIdleTimeMillis = 7200000; private String validationQuery = "SELECT 1 FROM DUAL"; private boolean testWhileIdle = true; private boolean testOnBorrow = true; private boolean testOnReturn = true; public DruidDataSource setDataSource(DruidDataSource datasource) { /** 配置初始化大小、最小、最大 */ datasource.setInitialSize(initialSize); //优先级:application的spring.datasource.master.initialSize > application的spring.datasource.druid.initialSize > datasource.setInitialSize(20)和datasource.setInitialSize(initialSize) datasource.setMaxActive(maxActive); datasource.setMinIdle(minIdle); /** 配置获取连接等待超时的时间 */ // datasource.setMaxWait(maxWait); /** 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒 */ datasource.setTimeBetweenEvictionRunsMillis(timeBetweenEvictionRunsMillis); /** 配置一个连接在池中最小、最大生存的时间,单位是毫秒 */ datasource.setMinEvictableIdleTimeMillis(minEvictableIdleTimeMillis); datasource.setMaxEvictableIdleTimeMillis(maxEvictableIdleTimeMillis); /** * 用来检测连接是否有效的sql,要求是一个查询语句,常用select 'x'。如果validationQuery为null,testOnBorrow、testOnReturn、testWhileIdle都不会起作用。 */ datasource.setValidationQuery(validationQuery); /** 建议配置为true,不影响性能,并且保证安全性。申请连接的时候检测,如果空闲时间大于timeBetweenEvictionRunsMillis,执行validationQuery检测连接是否有效。 */ datasource.setTestWhileIdle(testWhileIdle); /** 申请连接时执行validationQuery检测连接是否有效,做了这个配置会降低性能。 */ datasource.setTestOnBorrow(testOnBorrow); /** 归还连接时执行validationQuery检测连接是否有效,做了这个配置会降低性能。 */ datasource.setTestOnReturn(testOnReturn); return datasource; } }
import lombok.extern.slf4j.Slf4j; import org.aspectj.lang.ProceedingJoinPoint; import org.aspectj.lang.annotation.Around; import org.aspectj.lang.annotation.Aspect; import org.springframework.stereotype.Component; // aop环绕增强切换数据源 @Slf4j @Aspect @Component public class DBConnectionAOP { @Around("@annotation(connectToDB)") public Object proceed(ProceedingJoinPoint pjp, ConnectToDB connectToDB) throws Throwable { try { if (connectToDB.value().equals("MASTER")) { log.info("Master DB 配置"); DBContextHolder.setDbType(DbType.MASTER); } else if (connectToDB.value().equals("SLAVE")) { log.info("Slave DB 配置"); DBContextHolder.setDbType(DbType.SLAVE); } else { log.info("默认 DB 配置"); } Object result = pjp.proceed(); DBContextHolder.clearDbType(); return result; } finally { DBContextHolder.clearDbType(); } } }
public class DBContextHolder { private static final ThreadLocal<DbType> contextHolder = new ThreadLocal<DbType>(); public static void setDbType(DbType dbType) { if (dbType == null) { throw new NullPointerException(); } contextHolder.set(dbType); } public static DbType getDbType() { return (DbType) contextHolder.get(); } public static void clearDbType() { contextHolder.remove(); } }
import org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource; // 动态数据源路由配置 public class RoutingDataSource extends AbstractRoutingDataSource { // 决定使用哪个数据源 @Override protected Object determineCurrentLookupKey() { return DBContextHolder.getDbType(); } }
// 数据源枚举 public enum DbType { MASTER, SLAVE }
import java.lang.annotation.ElementType; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; // 数据源选择注解 @Target({ElementType.METHOD, ElementType.TYPE}) @Retention(RetentionPolicy.RUNTIME) public @interface ConnectToDB { // 默认主数据库 String value() default "primary"; }
异步线程池配置
如果有异步需求的话,可以借鉴,所以也放上来吧。
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import java.util.concurrent.Executor; import java.util.concurrent.ThreadPoolExecutor; /** * 异步线程池的配置类 */ @Configuration @EnableAsync public class ExecutorConfig { private static final Logger logger = LoggerFactory.getLogger(ExecutorConfig.class); @Bean public Executor asyncServiceExecutor() { logger.info("注册asyncServiceExecutor"); // ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); ThreadPoolTaskExecutor executor = new VisibleThreadPoolTaskExecutor(); //配置核心线程数 executor.setCorePoolSize(5); //配置最大线程数 executor.setMaxPoolSize(50); //配置队列大小 // Set the capacity for the ThreadPoolExecutor's BlockingQueue. Default is Integer.MAX_VALUE. // Any positive value will lead to a LinkedBlockingQueue instance; any other value will lead to a SynchronousQueue instance. executor.setQueueCapacity(100); // 设置允许的空闲时间(秒) executor.setKeepAliveSeconds(60); //配置线程池中的线程的名称前缀 executor.setThreadNamePrefix("async-service-"); // rejection-policy:当pool已经达到max size的时候,如何处理新任务;CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy()); //执行初始化 executor.initialize(); return executor; } }
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.util.concurrent.ListenableFuture; import java.util.concurrent.Callable; import java.util.concurrent.Future; import java.util.concurrent.ThreadPoolExecutor; /** * ThreadPoolTaskExecutor的子类,在父类的基础上加入了日志信息,查看线程池的信息 */ public class VisibleThreadPoolTaskExecutor extends ThreadPoolTaskExecutor { private static final Logger logger = LoggerFactory.getLogger(VisibleThreadPoolTaskExecutor.class); @Override public void execute(Runnable task) { showThreadPoolInfo("1. do execute"); super.execute(task); } @Override public void execute(Runnable task, long startTimeout) { showThreadPoolInfo("2. do execute"); super.execute(task, startTimeout); } @Override public Future<?> submit(Runnable task) { showThreadPoolInfo("1. do submit"); return super.submit(task); } @Override public <T> Future<T> submit(Callable<T> task) { showThreadPoolInfo("2. do submit"); return super.submit(task); } @Override public ListenableFuture<?> submitListenable(Runnable task) { showThreadPoolInfo("1. do submitListenable"); return super.submitListenable(task); } @Override public <T> ListenableFuture<T> submitListenable(Callable<T> task) { showThreadPoolInfo("2. do submitListenable"); return super.submitListenable(task); } private void showThreadPoolInfo(String prefix) { ThreadPoolExecutor executor = getThreadPoolExecutor(); if (null == executor) { return; } logger.info("MaximumPoolSize:" + executor.getMaximumPoolSize() + ",CorePoolSize:" + executor.getCorePoolSize() + ",ThreadNamePrefix:" + this.getThreadNamePrefix() + ",prefix:" + prefix + ",TaskCount:" + executor.getTaskCount() + ",CompletedTaskCount:" + executor.getCompletedTaskCount() + ",ActiveCount:" + executor.getActiveCount() + ",PoolSize:" + executor.getPoolSize() + ",QueueSize:" + executor.getQueue().size() ); } }
A思路业务代码(反面教材,请勿模仿)
controller
@PostMapping("queryUserData.do") @VerifyLogin public RetInfo<Object> queryUserData(String temp, String channelId) throws JsonProcessingException { String mobile = UserUtil.getMobile(); User user = userService.queryUserData(mobile, temp, channelId); return new RetInfo<>(RetEnum.SUCCESS.getCode(), RetEnum.SUCCESS.getMsg(), user); }
@Slf4j @Service @Transactional(rollbackFor = Exception.class) public class UserServiceImpl implements UserService { @Autowired private UserLogService userLogService; @Override public User queryUserData(String mobile, String temp, String channelId) throws JsonProcessingException { // 切换slave从数据源查询 Date loginTime = userLogService.validateLoginByToday(mobile); // 查询当天登录时间 Date date = new Date(); Integer userStatus = 0; // 以下所有操作都切换回master主数据源查询和更新 User user = userMapper.selectUserByMobile(mobile); // 业务代码..... userMapper.insert(user); } }
B思路业务代码(正确!有相同业务场景的可以借鉴)
controller
@PostMapping("queryUserData.do") public RetInfo<Object> queryUserData(String temp, String channelId) throws JsonProcessingException { long start = System.currentTimeMillis(); String mobile = UserUtil.getMobile(); HashMap<String, Object> map = new HashMap<>(); // 切换slave从数据源查询 Date loginTime = userLogService.validateLoginByToday(mobile); // 查询用户当天登录时间 // 切换回master主数据源查询和更新 User user = userService.queryUserData(mobile, loginTime, temp, channelId); return new RetInfo<>(RetEnum.SUCCESS.getCode(), RetEnum.SUCCESS.getMsg(), user); }
service 只是贴出了从数据源查询代码,主数据源的代码就和平常写的一样就行了。
@Service public class UserLogServiceImpl implements UserLogService { private UserBehaviorService userBehaviorService; public UserLogServiceImpl(UserBehaviorService userBehaviorService) { this.userBehaviorService = userBehaviorService; } @Override public Date validateLoginByToday(String mobile) { return userBehaviorService.validateLoginByToday(mobile); } }
@Slf4j @Service public class UserBehaviorServiceImpl implements UserBehaviorService { private UserBehaviorMapper userBehaviorMapper; public UserBehaviorServiceImpl(UserBehaviorMapper userBehaviorMapper) { this.userBehaviorMapper = userBehaviorMapper; } @Transactional @ConnectToDB(value = "SLAVE") // 这就是切换数据源最重要注解 @Override public Date validateLoginByToday(String mobile) { return userBehaviorMapper.validateLoginByToday(mobile, DateUtil.dateToString(new Date(), "yyyyMMdd")); } }
mapper(和平常一样,没有什么特别的)
@Mapper public interface UserBehaviorMapper { /** * 查询用户当天是否活跃 * @param mobile * @param dateStr * @return Integer */ Integer validateActive(@Param("mobile") String mobile, @Param("dateStr") String dateStr); }
总结
以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。