java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > Spring多数据源切换失败,发现与事务相关

Spring多数据源切换失败,发现与事务相关问题

作者:ChengQinHong

这篇文章主要介绍了Spring多数据源切换失败,发现与事务相关问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教

背景

一个方法里,A数据源需要进行查询和更新操作,B数据源进行查询操作。

详细业务是查询用户基础信息数据(A数据源)的时候,需要查询登录行为数据(B数据源),同时对用户信息进行修改,所以该方法最后还需要更新用户基础信息数据(A数据源)。

原来的A实现思路

controller调用serviceImpl方法,在serviceImpl方法内某一段代码去切换B数据源查询结果,根据查询结果更新A数据源。

实现A思路的结果

表面上一切调试都显示切换了B数据源,应该是还没有深入源码去debug、

应该某个地方拿的数据源连接还是默认的A数据源,导致报错,提示没有找到对应数据库表,A数据源肯定没有B数据源的表啊,郁闷。

后来的B实现思路

查询帖子后发现事务的干预,导致数据源切换失败,决定换个思路,把切换数据源的方法放在了controller,因为我是仅对一个数据源进行更新操作,另一个数据源只作查询操作,此时整个事务其实只在A数据源进行,所以我就单独把对A数据源的操作声明为A方法,对B数据源的操作声明为B方法,在controller先调用B方法获取查询结果,作为入参去调用A方法,这样就解决了数据源切换问题也解决了A方法的事务问题。

实现B思路的结果

达到了预期。

切换数据源成功,A数据源查询、更新、事务都没问题,B数据源查询没问题。

⚠️注意:

此思路是把service的方法一分为二,在controller分别调用,只适用于对其中单一数据源作修改数据操作,并不适用于对多数据源同时进行修改数据操作,因为单数据源进行数据操作是普通数据源事务,并不复杂,就和我们平时使用@Transactional一样。

但是如果你对多数据源进行修改数据操作的话!事情就变得复杂起来了,多数据源事务,可让你头疼的了,因为回滚非常麻烦,类似于分布式事务了,阿里的分布式事务有SEATA支撑,这个我了解,但是以后我再讲这方面的,因为这个单体系统的多数据源事务还需要深入研究一下。

原因

由于默认使用的是主数据源master,只有在mapper接口方法上标注从数据源slave才会切换数据源过去,但是要注意事务(因为之前看一个帖子,说一个事物里,缓存了默认的数据库连接,即使代码里切换了数据源,重新去建立连接时候发现有缓存一个数据库连接耶,直接拿这个,导致我们切换数据源失败,因为拿的还是默认的数据库连接。

配置文件

spring:
  datasource:
    type: com.alibaba.druid.pool.DruidDataSource
    driverClassName: com.mysql.jdbc.Driver
    master: #主库A
      type: com.alibaba.druid.pool.DruidDataSource
      driverClassName: com.mysql.jdbc.Driver
      url: jdbc:mysql://127.0.0.1:3306/A?useUnicode=true&characterEncoding=UTF-8&useOldAliasMetadataBehavior=true&useSSL=false&serverTimezone=GMT%2B8
      username: root
      password: 123456
    slave: #从库B
      type: com.alibaba.druid.pool.DruidDataSource
      driverClassName: com.mysql.jdbc.Driver
      url: jdbc:mysql://192.168.1.12:3306/B?useUnicode=true&characterEncoding=UTF-8&useOldAliasMetadataBehavior=true&useSSL=true&serverTimezone=GMT%2B8
      username: root
      password: 123456

配置类(蛮多的,注意,请复制完整)

import com.alibaba.druid.pool.DruidDataSource;
import com.alibaba.druid.spring.boot.autoconfigure.DruidDataSourceBuilder;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.jdbc.datasource.LazyConnectionDataSourceProxy;
 
import javax.sql.DataSource;
import java.util.HashMap;
 
// 主从数据源配置
@Configuration
public class DataSourceConfiguration {
 
    DataSourceProperties masterDataSource = new DataSourceProperties();
    DataSourceProperties slaveDataSource = new DataSourceProperties();
 
    @Bean
    @ConfigurationProperties(prefix = "spring.datasource.master")
    public DataSource masterDataSource() {
        DruidDataSource druidDataSource = masterDataSource.setDataSource(DruidDataSourceBuilder.create().build());
        return druidDataSource;
    }
 
    @Bean
    @ConfigurationProperties(prefix = "spring.datasource.slave")
    public DataSource slaveDataSource() {
        DruidDataSource druidDataSource = slaveDataSource.setDataSource(DruidDataSourceBuilder.create().build());
        return druidDataSource;
    }
 
    @Bean
    public DataSource routeDataSource() {
        RoutingDataSource routingDataSource = new RoutingDataSource() {{
            setDefaultTargetDataSource(masterDataSource());
            setTargetDataSources(new HashMap<Object, Object>() {{
                put(DbType.MASTER, masterDataSource());
                put(DbType.SLAVE, slaveDataSource());
            }});
        }};
 
        return routingDataSource;
    }
 
    @Bean
    @Primary
    public LazyConnectionDataSourceProxy lazyConnectionDataSourceProxy() {
        return new LazyConnectionDataSourceProxy(routeDataSource());
    }
 
}
import com.alibaba.druid.pool.DruidDataSource;
import lombok.Setter;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
 
/**
 * 数据源配置文件
 */
@Setter
@Configuration
@ConfigurationProperties(prefix = "spring.datasource.druid")
public class DataSourceProperties {
 
    private int initialSize = 10;
 
    private int minIdle = 10;
 
    private int maxActive = 50;
 
    private int maxWait;
 
    private int timeBetweenEvictionRunsMillis = 300000;
 
    private int minEvictableIdleTimeMillis = 60000;
 
    private int maxEvictableIdleTimeMillis = 7200000;
 
    private String validationQuery = "SELECT 1 FROM DUAL";
 
    private boolean testWhileIdle = true;
 
    private boolean testOnBorrow = true;
 
    private boolean testOnReturn = true;
 
    public DruidDataSource setDataSource(DruidDataSource datasource) {
        /** 配置初始化大小、最小、最大 */
        datasource.setInitialSize(initialSize);  //优先级:application的spring.datasource.master.initialSize > application的spring.datasource.druid.initialSize > datasource.setInitialSize(20)和datasource.setInitialSize(initialSize)
        datasource.setMaxActive(maxActive);
        datasource.setMinIdle(minIdle);
        /** 配置获取连接等待超时的时间 */
        // datasource.setMaxWait(maxWait);
        /** 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒 */
        datasource.setTimeBetweenEvictionRunsMillis(timeBetweenEvictionRunsMillis);
        /** 配置一个连接在池中最小、最大生存的时间,单位是毫秒 */
        datasource.setMinEvictableIdleTimeMillis(minEvictableIdleTimeMillis);
        datasource.setMaxEvictableIdleTimeMillis(maxEvictableIdleTimeMillis);
        /**
         * 用来检测连接是否有效的sql,要求是一个查询语句,常用select 'x'。如果validationQuery为null,testOnBorrow、testOnReturn、testWhileIdle都不会起作用。
         */
        datasource.setValidationQuery(validationQuery);
        /** 建议配置为true,不影响性能,并且保证安全性。申请连接的时候检测,如果空闲时间大于timeBetweenEvictionRunsMillis,执行validationQuery检测连接是否有效。 */
        datasource.setTestWhileIdle(testWhileIdle);
        /** 申请连接时执行validationQuery检测连接是否有效,做了这个配置会降低性能。 */
        datasource.setTestOnBorrow(testOnBorrow);
        /** 归还连接时执行validationQuery检测连接是否有效,做了这个配置会降低性能。 */
        datasource.setTestOnReturn(testOnReturn);
        
        return datasource;
    }
 
}
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.springframework.stereotype.Component;
 
// aop环绕增强切换数据源
@Slf4j
@Aspect
@Component
public class DBConnectionAOP {
 
    @Around("@annotation(connectToDB)")
    public Object proceed(ProceedingJoinPoint pjp, ConnectToDB connectToDB) throws Throwable {
        try {
            if (connectToDB.value().equals("MASTER")) {
                log.info("Master DB 配置");
                DBContextHolder.setDbType(DbType.MASTER);
            } else if (connectToDB.value().equals("SLAVE")) {
                log.info("Slave DB 配置");
                DBContextHolder.setDbType(DbType.SLAVE);
            } else {
                log.info("默认 DB 配置");
            }
 
            Object result = pjp.proceed();
            DBContextHolder.clearDbType();
 
            return result;
        } finally {
            DBContextHolder.clearDbType();
        }
    }
}
public class DBContextHolder {
 
    private static final ThreadLocal<DbType> contextHolder = new ThreadLocal<DbType>();
 
    public static void setDbType(DbType dbType) {
        if (dbType == null) {
            throw new NullPointerException();
        }
        contextHolder.set(dbType);
    }
 
    public static DbType getDbType() {
        return (DbType) contextHolder.get();
    }
 
    public static void clearDbType() {
        contextHolder.remove();
    }
 
}
import org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource;
 
// 动态数据源路由配置
public class RoutingDataSource extends AbstractRoutingDataSource {
 
    // 决定使用哪个数据源
    @Override
    protected Object determineCurrentLookupKey() {
        return DBContextHolder.getDbType();
    }
 
}
// 数据源枚举
public enum DbType {
    MASTER,
    SLAVE
}
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
 
// 数据源选择注解
@Target({ElementType.METHOD, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
public @interface ConnectToDB {
    // 默认主数据库
    String value() default "primary";
 
}

异步线程池配置

如果有异步需求的话,可以借鉴,所以也放上来吧。

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
 
/**
 * 异步线程池的配置类
 */
@Configuration
@EnableAsync
public class ExecutorConfig {
    private static final Logger logger = LoggerFactory.getLogger(ExecutorConfig.class);
 
    @Bean
    public Executor asyncServiceExecutor() {
        logger.info("注册asyncServiceExecutor");
 
//        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        ThreadPoolTaskExecutor executor = new VisibleThreadPoolTaskExecutor();
        //配置核心线程数
        executor.setCorePoolSize(5);
        //配置最大线程数
        executor.setMaxPoolSize(50);
        //配置队列大小
        // Set the capacity for the ThreadPoolExecutor's BlockingQueue. Default is Integer.MAX_VALUE.
        // Any positive value will lead to a LinkedBlockingQueue instance; any other value will lead to a SynchronousQueue instance.
        executor.setQueueCapacity(100);
        // 设置允许的空闲时间(秒)
        executor.setKeepAliveSeconds(60);
        //配置线程池中的线程的名称前缀
        executor.setThreadNamePrefix("async-service-");
        // rejection-policy:当pool已经达到max size的时候,如何处理新任务;CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());
        //执行初始化
        executor.initialize();
 
        return executor;
    }
 
 
}
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.concurrent.ListenableFuture;
 
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
 
/**
 * ThreadPoolTaskExecutor的子类,在父类的基础上加入了日志信息,查看线程池的信息
 */
public class VisibleThreadPoolTaskExecutor extends ThreadPoolTaskExecutor {
    private static final Logger logger = LoggerFactory.getLogger(VisibleThreadPoolTaskExecutor.class);
 
    @Override
    public void execute(Runnable task) {
        showThreadPoolInfo("1. do execute");
        super.execute(task);
    }
 
    @Override
    public void execute(Runnable task, long startTimeout) {
        showThreadPoolInfo("2. do execute");
        super.execute(task, startTimeout);
    }
 
    @Override
    public Future<?> submit(Runnable task) {
        showThreadPoolInfo("1. do submit");
        return super.submit(task);
    }
 
    @Override
    public <T> Future<T> submit(Callable<T> task) {
        showThreadPoolInfo("2. do submit");
        return super.submit(task);
    }
 
    @Override
    public ListenableFuture<?> submitListenable(Runnable task) {
        showThreadPoolInfo("1. do submitListenable");
        return super.submitListenable(task);
    }
 
    @Override
    public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
        showThreadPoolInfo("2. do submitListenable");
        return super.submitListenable(task);
    }
 
    private void showThreadPoolInfo(String prefix) {
        ThreadPoolExecutor executor = getThreadPoolExecutor();
 
        if (null == executor) {
            return;
        }
 
        logger.info("MaximumPoolSize:" + executor.getMaximumPoolSize() +
                ",CorePoolSize:" + executor.getCorePoolSize() +
                ",ThreadNamePrefix:" + this.getThreadNamePrefix() +
                ",prefix:" + prefix +
                ",TaskCount:" + executor.getTaskCount() +
                ",CompletedTaskCount:" + executor.getCompletedTaskCount() +
                ",ActiveCount:" + executor.getActiveCount() +
                ",PoolSize:" + executor.getPoolSize() +
                ",QueueSize:" + executor.getQueue().size()
        );
    }
 
}

A思路业务代码(反面教材,请勿模仿)

controller

@PostMapping("queryUserData.do")
    @VerifyLogin
    public RetInfo<Object> queryUserData(String temp, String channelId) throws JsonProcessingException {
        String mobile = UserUtil.getMobile();
        User user = userService.queryUserData(mobile, temp, channelId);
        return new RetInfo<>(RetEnum.SUCCESS.getCode(), RetEnum.SUCCESS.getMsg(), user);
    }
@Slf4j
@Service
@Transactional(rollbackFor = Exception.class)
public class UserServiceImpl implements UserService {
    @Autowired
    private UserLogService userLogService;
 
    @Override
    public User queryUserData(String mobile, String temp, String channelId) throws JsonProcessingException {
        // 切换slave从数据源查询
        Date loginTime = userLogService.validateLoginByToday(mobile); // 查询当天登录时间
 
        Date date = new Date();
        Integer userStatus = 0;
        
        // 以下所有操作都切换回master主数据源查询和更新
        User user = userMapper.selectUserByMobile(mobile);
        // 业务代码.....
        userMapper.insert(user);
}
}

B思路业务代码(正确!有相同业务场景的可以借鉴)

controller

    @PostMapping("queryUserData.do")
    public RetInfo<Object> queryUserData(String temp, String channelId) throws JsonProcessingException {
        long start = System.currentTimeMillis();
        String mobile = UserUtil.getMobile();
        HashMap<String, Object> map = new HashMap<>();
        // 切换slave从数据源查询
        Date loginTime = userLogService.validateLoginByToday(mobile); // 查询用户当天登录时间
        // 切换回master主数据源查询和更新
        User user = userService.queryUserData(mobile, loginTime, temp, channelId);
 
        return new RetInfo<>(RetEnum.SUCCESS.getCode(), RetEnum.SUCCESS.getMsg(), user);
    }

service 只是贴出了从数据源查询代码,主数据源的代码就和平常写的一样就行了。

@Service
public class UserLogServiceImpl implements UserLogService {
 
    private UserBehaviorService userBehaviorService;
 
    public UserLogServiceImpl(UserBehaviorService userBehaviorService) {
        this.userBehaviorService = userBehaviorService;
    }
 
    @Override
    public Date validateLoginByToday(String mobile) {
        return userBehaviorService.validateLoginByToday(mobile);
    }
}
@Slf4j
@Service
public class UserBehaviorServiceImpl implements UserBehaviorService {
 
    private UserBehaviorMapper userBehaviorMapper;
 
    public UserBehaviorServiceImpl(UserBehaviorMapper userBehaviorMapper) {
        this.userBehaviorMapper = userBehaviorMapper;
    }
 
    @Transactional
    @ConnectToDB(value = "SLAVE") // 这就是切换数据源最重要注解
    @Override
    public Date validateLoginByToday(String mobile) {
            return userBehaviorMapper.validateLoginByToday(mobile, DateUtil.dateToString(new Date(), "yyyyMMdd"));
    }
}

mapper(和平常一样,没有什么特别的)

@Mapper
public interface UserBehaviorMapper {
    /**
     * 查询用户当天是否活跃
     * @param mobile
     * @param dateStr
     * @return Integer
     */
    Integer validateActive(@Param("mobile") String mobile, @Param("dateStr") String dateStr);
}

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。

您可能感兴趣的文章:
阅读全文