使用ShardingJDBC进行数据分片以及读写分离
作者:一只爱撸猫的程序猿
简述
- ShardingJDBC: 它是一个轻量级的Java框架,提供了数据分片、读写分离、分布式主键生成等数据访问功能。ShardingJDBC 直接嵌入在应用程序中,不需要通过中间件代理的方式实现数据库访问。
- 多数据源: 在 ShardingJDBC 中,多数据源指的是将数据储存到多个数据库中。数据根据某种分片策略(如按照ID范围、哈希值等)分布在不同的数据库中。
- 读写分离: 读写分离是通过配置主库(写操作)和从库(读操作)来实现的。应用程序写入操作主要针对主库,读取操作可以分散到多个从库中,从而提高数据库的读取性能和系统的可扩展性。
原理
ShardingJDBC 的核心组件和功能,包括一些相关代码片段以更好地理解其工作原理。
1. SQL 解析
SQL 解析是 ShardingJDBC 处理流程的起点。ShardingJDBC 使用 ANTLR(另一个语言识别工具)作为 SQL 解析工具。
- SQL解析类:
SQLParseEngine
是解析的入口点。它接收原始 SQL 语句并基于数据库方言(MySQL、PostgreSQL 等)进行解析。 - 解析过程:解析过程包括词法分析和语法分析,用于构建抽象语法树(AST)。AST 提供了 SQL 结构的详细视图,包括表名、列名、条件等。
SQLParseEngine 源码分析
public final class SQLParseEngine { private final DatabaseType databaseType; private final String sql; private final LexerEngine lexerEngine; private final SQLStatementParser statementParser; public SQLParseEngine(DatabaseType databaseType, String sql, Properties properties, List<SQLToken> sqlTokens) { this.databaseType = databaseType; this.sql = sql; Lexer lexer = LexerFactory.newInstance(databaseType, sql); this.lexerEngine = new LexerEngine(lexer); this.statementParser = SQLStatementParserFactory.newInstance(databaseType, lexerEngine); } public SQLStatement parse() { lexerEngine.nextToken(); return statementParser.parse(); } }
在上面的代码中,SQLParseEngine
使用了数据库类型(DatabaseType
)和 SQL 语句来初始化。它首先创建一个 Lexer(词法解析器),然后使用这个 Lexer 创建一个 LexerEngine
,并且基于数据库类型创建相应的 SQL 语句解析器。parse
方法最终返回一个 SQL 语句对象(SQLStatement
),这个对象代表了解析后的 SQL 语句。
2. 路由计算
路由是根据分片策略和解析出的 SQL 信息,确定 SQL 应该执行在哪些具体的数据库和表上。
- 路由类:
ShardingRouter
负责执行路由逻辑。 - 路由策略:通过实现
ShardingStrategy
接口的类(如StandardShardingStrategy
、ComplexShardingStrategy
),根据分片键和分片算法确定目标数据源。
ShardingRouter 源码分析
public final class ShardingRouter { private final ShardingRule shardingRule; public ShardingRouter(ShardingRule shardingRule) { this.shardingRule = shardingRule; } public RoutingResult route(final SQLStatement sqlStatement) { // 省略具体路由逻辑 } }
在这里,ShardingRouter
通过 ShardingRule
(包含分片策略和规则)来进行初始化。route
方法接受一个 SQL 语句对象,并根据分片规则返回路由结果。
3. SQL 改写
根据路由结果,ShardingJDBC 会改写原始 SQL,使其适用于目标的物理数据库和表。
- 改写类:
SQLRewriteEngine
负责 SQL 改写。 - 改写过程:根据路由结果和 AST,
SQLRewriteEngine
会修改表名、增加额外的条件等,生成最终要执行的 SQL。
SQLRewriteEngine 源码分析
public final class SQLRewriteEngine { private final SQLStatement sqlStatement; private final List<SQLToken> sqlTokens; public SQLRewriteEngine(SQLStatement sqlStatement) { this.sqlStatement = sqlStatement; this.sqlTokens = new LinkedList<>(); } public String rewrite() { // 省略具体改写逻辑 } }
这里的 SQLRewriteEngine
接收一个 SQL 语句对象,并根据路由结果和 SQL 语句中的令牌(SQLToken
)列表来改写 SQL。
4. 执行计划生成与执行
生成 SQL 的执行计划并在相应的数据库实例上执行。
- 执行类:
ShardingExecuteEngine
负责管理 SQL 的执行。 - 执行过程:它可能涉及到并行查询、合并结果集等操作。对于写操作,通常直接路由到主库;对于读操作,则可能涉及到多个从库。
public class ShardingExecuteEngine implements AutoCloseable { private final ExecutorService executorService; public ShardingExecuteEngine(int executorSize) { this.executorService = Executors.newFixedThreadPool(executorSize); } // 省略执行方法 }
ShardingExecuteEngine
使用一个线程池来执行 SQL。这个类负责管理 SQL 的执行过程,包括可能的并行查询和结果集合并。
5. 结果集合并
对于查询操作,ShardingJDBC 需要合并来自不同物理表或数据库的结果集。
- 合并类:
MergeEngine
负责结果集的合并。 - 合并过程:根据不同的查询类型(聚合查询、排序查询等),
MergeEngine
使用不同的合并策略来确保返回给用户的是一个统一的结果集。
public final class MergeEngine { public MergedResult merge(List<QueryResult> queryResults, SQLStatement sqlStatement) { // 省略合并逻辑 } }
MergeEngine
负责将来自不同物理表或数据库的查询结果合并成一个统一的结果集。它根据 SQL 语句的类型(如聚合查询、排序查询)来应用不同的合并策略。
6. 分布式事务处理
处理分布式环境下的事务一致性。
- 事务管理器:
ShardingTransactionManager
接口定义了事务管理的行为。 - 具体实现:如
XAShardingTransactionManager
用于处理 XA 类型的分布式事务。
ShardingTransactionManager 接口和 XAShardingTransactionManager 实现
public interface ShardingTransactionManager { void begin(); void commit(); void rollback(); // 省略其他方法 } public class XAShardingTransactionManager implements ShardingTransactionManager { // 实现分布式事务管理逻辑 }
ShardingTransactionManager
接口定义了事务管理的基本行为,如开始(begin)、提交(commit)和回滚(rollback)操作。XAShardingTransactionManager
是这个接口的一个实现,用于处理 XA 类型的分布式事务。
我们将假设有两个业务表:order
和 user
,并且这两个表需要根据不同的策略进行分片。同时,我们将设置四个数据源(两个主库和两个从库)来实现读写分离。
实际案例
场景设定
- 数据源:
ds0
、ds0_slave
、ds1
、ds1_slave
。其中ds0
和ds1
是主库,ds0_slave
和ds1_slave
是从库。 - 业务表:
order
和user
。 - 分片策略:
order
表按照订单ID分片。user
表按照用户ID分片。
- 读写分离:所有写操作都发生在主库,读操作可以分配到从库。
配置和代码实现
添加依赖:
<dependency> <groupId>org.apache.shardingsphere</groupId> <artifactId>sharding-jdbc-spring-boot-starter</artifactId> <version>YOUR_VERSION</version> </dependency>
配置文件 application.yml
:
spring: shardingsphere: datasource: names: ds0,ds0_slave,ds1,ds1_slave ds0: type: com.zaxxer.hikari.HikariDataSource driver-class-name: YOUR_DRIVER_CLASS jdbc-url: JDBC_URL_FOR_DS0 username: YOUR_USERNAME password: YOUR_PASSWORD ds0_slave: type: com.zaxxer.hikari.HikariDataSource driver-class-name: YOUR_DRIVER_CLASS jdbc-url: JDBC_URL_FOR_DS0_SLAVE username: YOUR_USERNAME password: YOUR_PASSWORD ds1: type: com.zaxxer.hikari.HikariDataSource driver-class-name: YOUR_DRIVER_CLASS jdbc-url: JDBC_URL_FOR_DS1 username: YOUR_USERNAME password: YOUR_PASSWORD ds1_slave: type: com.zaxxer.hikari.HikariDataSource driver-class-name: YOUR_DRIVER_CLASS jdbc-url: JDBC_URL_FOR_DS1_SLAVE username: YOUR_USERNAME password: YOUR_PASSWORD sharding: tables: order: actual-data-nodes: ds${0..1}.order_${0..1} table-strategy: inline: sharding-column: id algorithm-expression: order_${id % 2} database-strategy: inline: sharding-column: user_id algorithm-expression: ds${user_id % 2} key-generator: type: SNOWFLAKE column: id user: actual-data-nodes: ds${0..1}.user_${0..1} table-strategy: inline: sharding-column: id algorithm-expression: user_${id > 5000 ? 1 : 0} database-strategy: inline: sharding-column: id algorithm-expression: ds${id % 2} key-generator: type: SNOWFLAKE column: id master-slave-rules: ds0: master-data-source-name: ds0 slave-data-source-names: ds0_slave ds1: master-data-source-name: ds1 slave-data-source-names: ds1_slave
这段配置是用于设置 Apache ShardingSphere(ShardingJDBC 的一个部分)的 YAML 格式的配置文件,专门用于 Spring Boot 项目。它定义了数据源(包括主从数据源),表的分片策略,以及主从复制规则。让我们逐个部分进行详细解释:
数据源配置
datasource: names: ds0,ds0_slave,ds1,ds1_slave ds0: type: com.zaxxer.hikari.HikariDataSource driver-class-name: YOUR_DRIVER_CLASS jdbc-url: JDBC_URL_FOR_DS0 username: YOUR_USERNAME password: YOUR_PASSWORD ...
names
: 定义了所有数据源的名称,这里有四个数据源:ds0
,ds0_slave
,ds1
,ds1_slave
。ds0
,ds0_slave
,ds1
,ds1_slave
: 分别定义了四个数据源的详细配置。type
: 数据源类型,这里使用的是 HikariCP 连接池。driver-class-name
: 数据库驱动类。jdbc-url
: 数据库的 JDBC URL。username
和password
: 数据库的登录用户名和密码。
分片配置
sharding: tables: order: actual-data-nodes: ds${0..1}.order_${0..1} table-strategy: inline: sharding-column: id algorithm-expression: order_${id % 2} database-strategy: inline: sharding-column: user_id algorithm-expression: ds${user_id % 2} key-generator: type: SNOWFLAKE column: id user: ...
sharding
: 定义了分片的总体配置。tables
: 在这里定义具体的表和它们的分片策略。order
: 这是一个表的名称。actual-data-nodes
: 定义实际的数据节点,ds${0..1}.order_${0..1}
表示order
表在ds0
和ds1
数据源上都有两个分片,即order_0
和order_1
。table-strategy
: 定义表的分片策略。sharding-column
: 分片键,这里使用id
。algorithm-expression
: 分片算法表达式,这里是简单的模 2 运算。
database-strategy
: 定义数据库的分片策略,类似于表的分片策略。key-generator
: 定义主键生成策略,这里使用的是雪花算法(SNOWFLAKE
)。
主从配置
master-slave-rules: ds0: master-data-source-name: ds0 slave-data-source-names: ds0_slave ds1: master-data-source-name: ds1 slave-data-source-names: ds1_slave
实体类和数据访问层:
定义 Order
和 User
实体类,以及对应的 JPA 仓库或 MyBatis 映射。
Order 实体类
@Entity @Table(name = "order") public class Order { @Id private Long id; @Column(name = "user_id") private Long userId; @Column(name = "order_amount") private BigDecimal orderAmount; // 标准的构造函数、getter 和 setter public Order() { } // ... 省略其他构造函数、getter 和 setter 方法 // ... 可以添加业务逻辑方法 }
User 实体类
@Entity @Table(name = "user") public class User { @Id private Long id; @Column(name = "username") private String username; @Column(name = "email") private String email; // 标准的构造函数、getter 和 setter public User() { } // ... 省略其他构造函数、getter 和 setter 方法 // ... 可以添加业务逻辑方法 }
OrderRepository 接口
@Repository public interface OrderRepository extends JpaRepository<Order, Long> { List<Order> findByUserId(Long userId); // ... 可以根据需要添加其他查询方法 }
UserRepository 接口
@Repository public interface UserRepository extends JpaRepository<User, Long> { User findByUsername(String username); // ... 可以根据需要添加其他查询方法 }
补充一个mybatis的写法:
@Mapper public interface UserMapper { @Select("SELECT * FROM user WHERE id = #{id}") User findById(Long id); @Insert("INSERT INTO user (username, email) VALUES (#{username}, #{email})") void insert(User user); // 更多的 MyBatis SQL 映射可以根据需要添加 }
总结
ShardingJDBC 的源码实现体现了其作为一个数据库中间件框架的复杂性和灵活性。它将 SQL 解析、路由、改写、执行和结果集合并等多个步骤封装成一系列高度解耦的组件和接口。这种设计使得 ShardingJDBC 能够灵活地适应各种数据库和 SQL 方言,同时提供丰富的分片策略和读写分离功能。
以上就是使用ShardingJDBC进行数据分片以及读写分离的详细内容,更多关于ShardingJDBC数据分片的资料请关注脚本之家其它相关文章!