SpringBoot+MyBatis+AOP实现读写分离的示例代码
作者:Java后端何哥
前言:高并发这个阶段,肯定是需要做MySQL读写分离的。实际上大部分的互联网网站或者App,其实都是读多写少。所以针对这个情况,就是写一个主库,但是主库挂多个从库,然后从多个从库来读,那不就可以支撑更高的读并发压力了吗?
一、 MySQL 读写分离
1.1、如何实现 MySQL 的读写分离?
其实很简单,就是基于主从复制架构。简单来说,就搞一个主库,挂多个从库,然后我们就单单只是写主库,然后主库会自动把数据给同步到从去,多个从库用于读。
读写分离就是对于一条SQL该选择哪一个数据库去执行,至于谁来做选择数据库这件事,有两个,要么使用中间件帮我们做,要么程序自己做。一般来说,读写分离有两种实现方式。第一种是依靠中间件MyCat或Sharding-JDBC,也就是说应用程序连接到中间件,中间件帮我们做SQL分离,去选择指定的数据源;第二种是应用程序自己去做分离。这里我用程序自己来做,主要是利用Spring提供的路由数据源,以及AOP。
1.2、MySQL 主从复制原理?
主库将变更写入 binlog 日志,然后从库连接到主库之后,从库有一个 IO 线程,将主库的 binlog 日志拷贝到自己本地,写入一个 relay 中继日志中。接着从库中有一个 SQL 线程会从中继日志读取 binlog,然后执行 binlog 日志中的内容,也就是在自己本地再次执行一遍 SQL,这样就可以保证自己跟主库的数据是一样的。
mysql-master-slave
这里有一个非常重要的一点,就是从库同步主库数据的过程是串行化的,也就是说主库上并行的操作,在从库上会串行执行。所以这就是一个非常重要的点了,由于从库从主库拷贝日志以及串行执行 SQL 的特点,在高并发场景下,从库的数据一定会比主库慢一些,是有延时的。所以经常出现,刚写入主库的数据可能是读不到的,要过几十毫秒,甚至几百毫秒才能读取到。
而且这里还有另外一个问题,就是如果主库突然宕机,然后恰好数据还没同步到从库,那么有些数据可能在从库上是没有的,有些数据可能就丢失了。
所以 MySQL 实际上在这一块有两个机制,一个是半同步复制,用来解决主库数据丢失问题;一个是并行复制,用来解决主从同步延时问题。
这个所谓半同步复制,也叫 semi-sync
复制,指的就是主库写入 binlog 日志之后,就会将强制此时立即将数据同步到从库,从库将日志写入自己本地的 relay log 之后,接着会返回一个 ack 给主库,主库接收到至少一个从库的 ack 之后才会认为写操作完成了。
所谓并行复制,指的是从库开启多个线程,并行读取 relay log 中不同库的日志,然后并行重放不同库的日志,这是库级别的并行。
1.3、MySQL 主从同步延时问题(精华)
线上会发现,每天总有那么一些数据,我们期望更新一些重要的数据状态,但在高峰期时候却没更新。用户跟客服反馈,而客服就会反馈给我们。
(1) 主从同步延迟的原因
一个服务器开放N个链接给客户端来连接的,这样有会有大并发的更新操作, 但是从服务器的里面读取binlog的线程仅有一个,当某个SQL在从服务器上执行的时间稍长或者由于某个SQL要进行锁表就会导致,主服务器的SQL大量积压,未被同步到从服务器里。这就导致了主从不一致, 也就是主从延迟。
(2) 主从同步延迟的解决办法
一般来说,如果主从延迟较为严重,有以下解决方案:
- 分库:将一个主库拆分为多个主库,每个主库的写并发就减少了几倍,此时主从延迟可以忽略不计。
- 需要走主库的强制走主库查询:如果确实是存在必须先插入,立马要求就查询到,然后立马就要反过来执行一些操作,对这个查询设置直连主库。
- 业务层面妥协,重写代码:写代码的同学要慎重,插入数据时立马查询可能查不到。是否操作完之后马上要马上进行读取?
二、SpringBoot+AOP+MyBatis实现MySQL读写分离
代码环境是 SpringBoot+MyBatis+AOP。想要读写分离就需要配置多个数据源,在进行写操作是选择写的数据源(主库),读操作时选择读的数据源(从库)。
2.1、AbstractRoutingDataSource
SpringBoot提供了AbstractRoutingDataSource类根据用户定义的规则选择当前的数据源,这样我们可以在执行查询之前,设置使用的数据源。实现可动态路由的数据源,在每次数据库查询操作前执行。它的抽象方法 determineCurrentLookupKey() 决定使用哪个数据源
想要读写分离就需要配置多个数据源,在进行写操作是选择写的数据源,读操作时选择读的数据源。其中有两个关键点:
- 如何切换数据源
- 如何根据不同的方法选择正确的数据源
2.2、如何切换数据源
通常用 springboot 时都是使用它的默认配置,只需要在配置文件中定义好连接属性就行了,但是现在我们需要自己来配置了,spring 是支持多数据源的,多个 datasource 放在一个 HashMapTargetDataSource
中,通过dertermineCurrentLookupKey
获取 key 来觉定要使用哪个数据源。因此我们的目标就很明确了,建立多个 datasource 放到 TargetDataSource 中,同时重写 dertermineCurrentLookupKey 方法来决定使用哪个 key。
2.3、如何选择数据源
事务一般是注解在 Service 层的,因此在开始这个 service 方法调用时要确定数据源,有什么通用方法能够在开始执行一个方法前做操作呢?相信你已经想到了那就是**切面 **。怎么切有两种办法:
- 注解式,定义一个只读注解,被该数据标注的方法使用读库
- 方法名,根据方法名写切点,比如 getXXX 用读库,setXXX 用写库
三 、代码实现
3.0、工程目录结构
3.1、引入Maven依赖
<dependencies> <!--SpringBoot集成Aop起步依赖--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-aop</artifactId> </dependency> <!--SpringBoot集成WEB起步依赖--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!--mybatis集成SpringBoot起步依赖--> <dependency> <groupId>org.mybatis.spring.boot</groupId> <artifactId>mybatis-spring-boot-starter</artifactId> <version>2.1.3</version> </dependency> <!--MySQL驱动--> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> </dependency> <!--SpringBoot单元测试依赖--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> <exclusions> <exclusion> <groupId>org.junit.vintage</groupId> <artifactId>junit-vintage-engine</artifactId> </exclusion> </exclusions> </dependency> </dependencies>
3.2、编写配置文件,配置主从数据源
spring: datasource: #主数据源 master: name: test jdbc-url: jdbc:mysql://xxxxxx:3306/test?allowMultiQueries=true&useSSL=false&useUnicode=true&characterEncoding=utf-8 username: root password: xxxxxx driver-class-name: com.mysql.cj.jdbc.Driver hikari: maximum-pool-size: 20 max-lifetime: 30000 idle-timeout: 30000 data-source-properties: prepStmtCacheSize: 250 prepStmtCacheSqlLimit: 2048 cachePrepStmts: true useServerPrepStmts: true #从数据源 slave: name: test jdbc-url: jdbc:mysql://xxxxxx:3306/test?allowMultiQueries=true&useSSL=false&useUnicode=true&characterEncoding=utf-8 username: root password: xxxxxx driver-class-name: com.mysql.cj.jdbc.Driver hikari: maximum-pool-size: 20 max-lifetime: 30000 idle-timeout: 30000 data-source-properties: prepStmtCacheSize: 250 prepStmtCacheSqlLimit: 2048 cachePrepStmts: true useServerPrepStmts: true #MyBatis: # mapper-locations: classpath:mapper/*.xml # type-aliases-package: com.hs.demo.entity
3.3、Enum类,定义主库从库
定义一个枚举类来代表这三个数据源
package com.hs.demo.config; /** * Enum类,定义主库从库两个数据源 */ public enum DBTypeEnum { MASTER, SLAVE; }
3.4、ThreadLocal定义数据源切换
通过ThreadLocal
将数据源绑定到每个线程上下文中,ThreadLocal 用来保存每个线程的是使用读库还是写库。操作结束后清除该数据,避免内存泄漏。
package com.hs.demo.config; /** *ThreadLocal定义数据源切换,通过ThreadLocal将数据源绑定到每个线程上下文中 */ public class DBContextHolder { /** * ThreadLocal 不是 Thread,是一个线程内部的数据存储类,通过它可以在指定的线程中存储数据,对数据存储后,只有在线程中才可以获取到存储的数据,对于其他线程来说是无法获取到数据。 * 大致意思就是ThreadLocal提供了线程内存储变量的能力,这些变量不同之处在于每一个线程读取的变量是对应的互相独立的,通过get和set方法就可以得到当前线程对应的值。 */ private static final ThreadLocal<DBTypeEnum> contextHolder = new ThreadLocal<>(); public static void set(DBTypeEnum dbTypeEnum){ contextHolder.set(dbTypeEnum); } public static DBTypeEnum get() { return contextHolder.get(); } public static void master() { set(DBTypeEnum.MASTER); System.out.println("--------以下操作为master(写操作)--------"); } public static void slave() { set(DBTypeEnum.SLAVE); System.out.println("--------以下操作为slave(读操作)--------"); } public static void clear() { contextHolder.remove(); } }
3.5、重写路由选择类
重写 determineCurrentLookupKey 方法,获取当前线程上绑定的路由key。Spring 在开始进行数据库操作时会通过这个方法来决定使用哪个数据库源,因此我们在这里调用上面 DbContextHolder 类的getDbType()
方法获取当前操作类别。
- AbstractRoutingDataSource的getConnection() 方法根据查找 lookup key 键对不同目标数据源的调用,通常是通过(但不一定)某些线程绑定的事物上下文来实现。
- AbstractRoutingDataSource的多数据源动态切换的核心逻辑是:在程序运行时,把数据源数据源通过 AbstractRoutingDataSource 动态织入到程序中,灵活的进行数据源切换。
- 基于AbstractRoutingDataSource的多数据源动态切换,可以实现读写分离,这么做缺点也很明显,无法动态的增加数据源。
package com.hs.demo.config; import org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource; import org.springframework.lang.Nullable; /** * 重写路由选择类:获取当前线程上绑定的路由key */ public class MyRoutingDataSource extends AbstractRoutingDataSource { /** * determineCurrentLookupKey()方法决定使用哪个数据源、 * 根据Key获取数据源的信息,上层抽象函数的钩子 */ @Nullable @Override protected Object determineCurrentLookupKey() { return DBContextHolder.get(); } }
3.6、配置多数据源
这里配置了3个数据源,1个master,1个slave,1个路由数据源。前2个数据源都是为了生成第3个数据源,而且后续我们只用这最后一个路由数据源。
package com.hs.demo.config; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.boot.jdbc.DataSourceBuilder; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import javax.sql.DataSource; import java.util.HashMap; import java.util.Map; /** * 增加了 DataSourceConfig 这个配置文件之后,需要添加Hikari连接池,单数据源自动装载时不会出这 * 样的问题 * * @Configuration 注解,表明这就是一个配置类,指示一个类声明一个或者多个@Bean 声明的方法并且由Spring容器统一管理,以便在运行时为这些bean生成bean的定义和服务请求的类。 */ @Configuration public class DataSourceConfig { /** * 注入主库数据源 */ @Bean @ConfigurationProperties(prefix = "spring.datasource.master") public DataSource masterDataSource() { return DataSourceBuilder.create().build(); //DataSourceProperties properties放在方法参数里 // return DataSourceBuilder.create(properties.getClassLoader()) // .type(HikariDataSource.class) // .driverClassName(properties.getDriverClassName()) // .url(properties.getUrl()) // .username(properties.getUsername()) // .password(properties.getPassword()) // .build(); } /** * 注入从库数据源 */ @Bean @ConfigurationProperties(prefix = "spring.datasource.slave") public DataSource slaveDataSource() { return DataSourceBuilder.create().build(); } /** * 配置选择数据源 * @param masterDataSource * @param slaveDataSource * @return DataSource */ @Bean public DataSource myRoutingDataSource(@Qualifier("masterDataSource") DataSource masterDataSource, @Qualifier("slaveDataSource") DataSource slaveDataSource) { Map<Object, Object> targetDataSource = new HashMap<>(); targetDataSource.put(DBTypeEnum.MASTER, masterDataSource); targetDataSource.put(DBTypeEnum.SLAVE, slaveDataSource); MyRoutingDataSource myRoutingDataSource = new MyRoutingDataSource(); //找不到用默认数据源 myRoutingDataSource.setDefaultTargetDataSource(masterDataSource); //可选择目标数据源 myRoutingDataSource.setTargetDataSources(targetDataSource); return myRoutingDataSource; } }
3.7、配置Mybatis指定数据源
修改SqlSessionFactory 和事务管理器
package com.hs.demo.config; import org.apache.ibatis.session.SqlSessionFactory; import org.mybatis.spring.SqlSessionFactoryBean; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.jdbc.datasource.DataSourceTransactionManager; import org.springframework.transaction.PlatformTransactionManager; import org.springframework.transaction.annotation.EnableTransactionManagement; import javax.annotation.Resource; import javax.sql.DataSource; /** * 配置Mybatis指定数据源:SqlSessionFactory和事务管理器 */ @Configuration @EnableTransactionManagement public class MyBatisConfig { /** * 注入自己重写的数据源 */ @Resource(name = "myRoutingDataSource") private DataSource myRoutingDataSource; /** * 配置SqlSessionFactory * @return SqlSessionFactory * @throws Exception */ @Bean public SqlSessionFactory sqlSessionFactory() throws Exception { SqlSessionFactoryBean sqlSessionFactoryBean = new SqlSessionFactoryBean(); sqlSessionFactoryBean.setDataSource(myRoutingDataSource); //ResourcePatternResolver(资源查找器)定义了getResources来查找资源 //PathMatchingResourcePatternResolver提供了以classpath开头的通配符方式查询,否则会调用ResourceLoader的getResource方法来查找 // ResourcePatternResolver resolver = new PathMatchingResourcePatternResolver(); // sqlSessionFactoryBean.setMapperLocations(resolver.getResources(mapperLocation)); return sqlSessionFactoryBean.getObject(); } /** * 事务管理器,不写则事务不生效:事务需要知道当前使用的是哪个数据源才能进行事务处理 */ @Bean public PlatformTransactionManager platformTransactionManager() { return new DataSourceTransactionManager(myRoutingDataSource); } // /** // * 当自定义数据源,用户必须覆盖SqlSessionTemplate,开启BATCH处理模式 // * // * @param sqlSessionFactory // * @return // */ // @Bean // public SqlSessionTemplate sqlSessionTemplate(@Qualifier("sqlSessionFactory") SqlSessionFactory sqlSessionFactory) { // return new SqlSessionTemplate(sqlSessionFactory, ExecutorType.BATCH); // } }
3.8、AOP切面实现数据源切换
通过Aop的前置通知来设置要使用的路由key(数据源)
package com.hs.demo.config; import org.aspectj.lang.JoinPoint; import org.aspectj.lang.annotation.Aspect; import org.aspectj.lang.annotation.Before; import org.aspectj.lang.annotation.Pointcut; import org.aspectj.lang.reflect.MethodSignature; import org.springframework.stereotype.Component; import java.lang.reflect.Method; /** * 默认情况下,所有的查询都走从库,插入/修改/删除走主库。我们通过方法名来区分操作类型(CRUD) * * 切面不能建立在DAO层,事务是在service开启的,到dao层再切换数据源,那事务就废了 * */ @Aspect @Component public class DataSourceAop { /** * 第一个”*“符号 表示返回值的类型任意; * com.sample.service.impl AOP所切的服务的包名,即,我们的业务部分 * 包名后面的”..“ 表示当前包及子包 * 第二个”*“ 表示类名,*即所有类。此处可以自定义,下文有举例 * .*(..) 表示任何方法名,括号表示参数,两个点表示任何参数类型 */ @Pointcut("!@annotation(com.hs.demo.config.Master) " + "&& (execution(* com.hs.demo.service.*.select*(..)) " + "|| execution(* com.hs.demo.service..*.find*(..)))") public void readPointcut() { } @Pointcut("@annotation(com.hs.demo.config.Master) " + "|| execution(* com.hs.demo.service..*.save*(..)) " + "|| execution(* com.hs.demo.service..*.add*(..)) " + "|| execution(* com.hs.demo.service..*.insert*(..)) " + "|| execution(* com.hs.demo.service..*.update*(..)) " + "|| execution(* com.hs.demo.service..*.edit*(..)) " + "|| execution(* com.hs.demo..*.delete*(..)) " + "|| execution(* com.hs.demo..*.remove*(..))") public void writePointcut() { } @Before("readPointcut()") public void read(JoinPoint jp) { //获取当前的方法信息 MethodSignature methodSignature = (MethodSignature) jp.getSignature(); Method method = methodSignature.getMethod(); //判断方法上是否存在注解@Master boolean present = method.isAnnotationPresent(Master.class); if (!present) { //如果不存在,默认走从库读 DBContextHolder.slave(); } else { //如果存在,走主库读 DBContextHolder.master(); } } @Before("writePointcut()") public void write() { DBContextHolder.master(); } /** * 另一种写法:if...else... 判断哪些需要读从数据库,其余的走主数据库 */ // @Before("execution(* com.cjs.example.service.impl.*.*(..))") // public void before(JoinPoint jp) { // String methodName = jp.getSignature().getName(); // // if (StringUtils.startsWithAny(methodName, "get", "select", "find")) { // DBContextHolder.slave(); // }else { // DBContextHolder.master(); // } // } }
3.9、如果有强制走主库的操作,可以定义注解
package com.hs.demo.config; import java.lang.annotation.ElementType; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; /** * 有时候主从延迟,需要强制读主库的注解 */ @Retention(RetentionPolicy.RUNTIME) @Target(ElementType.METHOD) public @interface Master { //设置数据源类型 //String value(); }
3.10、自行定义CRUD读写操作
(1)UserEntity
package com.hs.demo.entity; import lombok.Data; /** * @author heshi * @date 2021/10/20 15:14 */ @Data public class UserEntity { private Integer user_id; private String account; private String nickname; private String password; private String headimage_url; private String introduce; }
(2)UserMapper
package com.hs.demo.mapper; import com.hs.demo.entity.UserEntity; import org.apache.ibatis.annotations.*; import java.util.List; /** * Spring通过@Mapper注解实现动态代理,mybatis会自动创建Dao接口的实现类代理对象注入IOC容器进行管理,这样就不用编写Dao层的实现类 * */ @Mapper public interface UserMapper { @Select("SELECT * FROM user") //使用@Select、@Insert等注解方式来实现对应的持久化操作,使得我们可以不配置XML格式的Mapper文件 List<UserEntity> findAll(); @Insert("insert into user(account,nickname,password) values(#{account}, #{nickname}, #{password})") int insert(UserEntity user); @Update("UPDATE user SET account=#{account},nickname=#{nickname} WHERE id =#{id}") void update(UserEntity user); @Delete("DELETE FROM user WHERE id =#{id}") void delete(Long id); }
(3)UserService(重要)
package com.hs.demo.service; import com.hs.demo.entity.UserEntity; import com.hs.demo.mapper.UserMapper; import com.hs.demo.mysql.Master; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.util.List; /** * @author heshi * @date 2021/10/21 10:36 */ @Service public class UserService { @Autowired UserMapper userMapper; // @Master public List<UserEntity> findAll() { List<UserEntity> userEntities = userMapper.findAll(); return userEntities; } public int insertUser(UserEntity user) { int i = userMapper.insert(user); return i; } // void update(UserEntity user); // // void delete(Long id); }
(4) UserController
package com.hs.demo.controller; import com.hs.demo.entity.UserEntity; import com.hs.demo.service.UserService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.util.List; @RestController public class UserController { @Autowired UserService userService; @RequestMapping("/listUser") public List<UserEntity> listUser() { List<UserEntity> users = userService.findAll(); return users; } @RequestMapping("/insertUser") public void insertUser() { UserEntity userEntity = new UserEntity(); userEntity.setAccount("22222"); userEntity.setNickname("hshshs"); userEntity.setPassword("123"); userService.insertUser(userEntity); } }
运行结果如下图所示
总结:通过AOP来确定所使用数据源类型,然后通过路由来进行数据源选择。
参考链接:
springboot实现读写分离(基于Mybatis,mysql)
到此这篇关于SpringBoot+MyBatis+AOP实现读写分离的示例代码的文章就介绍到这了,更多相关SpringBoot+MyBatis+AOP读写分离内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!