sharding-jdbc实现分页查询的示例代码
作者:shark_chili
详解sharding-jdbc分页查询
前置步骤
之前的文章已经介绍过sharding-jdbc
底层会通过重写数据源对应的prepareStament
完成分表查询逻辑,而分页插件则是拦截SQL
语句实现分页查询,所以使用sharding-jdbc
进行分页查询只需引入用户所需的分页插件即可,以笔者为例,这里就直接使用pagehelper
:
<!-- pagehelper 插件--> <dependency> <groupId>com.github.pagehelper</groupId> <artifactId>pagehelper-spring-boot-starter</artifactId> </dependency>
分页查询代码示例
本文中笔者配置的分页算法是通过id取模的方式,假设我们的对应的user
数据id为1,按照我们的算法,它将被存至1%3=1
即user_1
表:
##使用哪一列用作计算分表策略,我们就使用id spring.shardingsphere.sharding.tables.user.table-strategy.inline.sharding-column=id ##具体的分表路由策略,我们有3个user表,使用主键id取余3,余数0/1/2分表对应表user_0,user_2,user_2 spring.shardingsphere.sharding.tables.user.table-strategy.inline.algorithm-expression=user_$->{id % 3}
笔者在实验表中插入大约100w的数据,进行一次分页查询,其中分页算法为id%3
@Test void selectByPage() { //查询第2页的数据10条 PageHelper.startPage(2, 10, false); //查询结果按照id升序排列 UserExample userExample = new UserExample(); userExample.setOrderByClause("id asc"); //输出查询结果 List<User> userList = userMapper.selectByExample(userExample); userList.forEach(System.out::println); }
最终结果如下,可以看到查询结果和单表情况下是一样的,即从11~20
:
User(id=11, name=user11, phone=) User(id=12, name=user12, phone=) User(id=13, name=user13, phone=) User(id=14, name=user14, phone=) User(id=15, name=user15, phone=) User(id=16, name=user16, phone=) User(id=17, name=user17, phone=) User(id=18, name=user18, phone=) User(id=19, name=user19, phone=) User(id=20, name=user20, phone=)
详解sharding-jdbc对于分页查询的底层实现
按照正常的单表查询逻辑,假设我们要查询第2页的数据10
条,我们对应的SQL就是:
select * from user limit (page-1)*10,size =>select * from user limit 10,10
而sharding-jdbc
分表分页查询则比较粗暴,它会将对应分页及之前的数据全部查询来,然后进行排序,跳过对应页码的数据后,再取出对应量级的数据返回。
以我们的分页查询为例,它会将每个分表的按照id进行升序排列之后取出各自的前20条数据,每张分表前20条数据之后,sharding-jdbc
会根据我们的排序算法比对各张分表的第一条数据,很明显user_1对应的结果最小,所以按照此规则轮询分表的user_1
、user_2
、user_0
以此将这3组结果存放至优先队列中。
基于这个队列,sharding-jdbc会按照分页查询的逻辑跳过10个,所以它会不断取出优先队列中的第一个元素,然后将这组分表结果再次存回队列,以我们的查询为例就是:
- 从
user_1
取出id为1的值,作为skip的第一个元素。 - 将
user_1
查询结果入队,因为头元素为4,和其他两组比最大,所以存放至队尾。 - 再次从优先队列中拿到
user_2
的队首元素2,作为skip
的第2个元素,然后再次存入队尾。 - 依次步骤完成跳过10个。
- 然后再按照这个规律筛选出10个,最终得到11~20。
源码印证
基于上述的图解,我们通过源码解析方式来印证,首先mybatis
会基于我们的SQL
调用execute
方法获取查询结果,然后再通过handleResultSets
生成列表并返回。 我们都知道sharding-jdbc
通过自实现数据源的同时也给出对应的PreparedStatement
即ShardingPreparedStatement
,所以execute
方法本质的执行者就是ShardingPreparedStatement
,它会得到第2页之前的所有数据,然后通过handleResultSets
进行skip
和limit
得到最终结果:
@Override public <E> List<E> query(Statement statement, ResultHandler resultHandler) throws SQLException { PreparedStatement ps = (PreparedStatement) statement; //调用sharding-jdbc的ShardingPreparedStatement的execute获取各个分表前2页的所有数据 ps.execute(); //通过skip结合limit得到所有结果 return resultSetHandler.handleResultSets(ps); }
步入execute
方法可以看到其内部本质是调用preparedStatementExecutor
进行查询处理的:
@Override public boolean execute() throws SQLException { try { clearPrevious(); //获取查询SQL shard(); initPreparedStatementExecutor(); //执行SQL结果并返回 return preparedStatementExecutor.execute(); } finally { clearBatch(); } }
而该执行方法最终会走到ShardingExecuteEngine
的parallelExecute
方法,通过异步查询3张分表的结果,再通过外部传入的回调执行器处理这3个异步任务的查询结果:
private <I, O> List<O> parallelExecute(final Collection<ShardingExecuteGroup<I>> inputGroups, final ShardingGroupExecuteCallback<I, O> firstCallback, final ShardingGroupExecuteCallback<I, O> callback) throws SQLException { Iterator<ShardingExecuteGroup<I>> inputGroupsIterator = inputGroups.iterator(); ShardingExecuteGroup<I> firstInputs = inputGroupsIterator.next(); //提交3个异步任务 Collection<ListenableFuture<Collection<O>>> restResultFutures = asyncGroupExecute(Lists.newArrayList(inputGroupsIterator), callback); //通过回调执行器callback阻塞获取3个异步结果 return getGroupResults(syncGroupExecute(firstInputs, null == firstCallback ? callback : firstCallback), restResultFutures); }
得到3张分表的数据之后,其内部逻辑最终会走到ShardingPreparedStatement
的getResultSet
方法,其内部会创建一个合并引擎DQLMergeEngine
进行并调用getCurrentResultSet
进行数据截取:
@Override public ResultSet getResultSet() throws SQLException { //...... if (routeResult.getSqlStatement() instanceof SelectStatement || routeResult.getSqlStatement() instanceof DALStatement) { //反射创建分表合并引擎 MergeEngine mergeEngine = MergeEngineFactory.newInstance(connection.getShardingContext().getDatabaseType(), connection.getShardingContext().getShardingRule(), routeResult, connection.getShardingContext().getMetaData().getTable(), queryResults); //截取最终结果 currentResultSet = getCurrentResultSet(resultSets, mergeEngine); } return currentResultSet; }
而该引擎就是DQLMergeEngine
,进行合并操作时,会调用LimitDecoratorMergedResult
跳过前10个元素:
private MergedResult decorate(final MergedResult mergedResult) throws SQLException { Limit limit = routeResult.getLimit(); //...... //通过LimitDecoratorMergedResult跳过3张分表组合结果的前10个元素 if (DatabaseType.MySQL == databaseType || DatabaseType.PostgreSQL == databaseType || DatabaseType.H2 == databaseType) { return new LimitDecoratorMergedResult(mergedResult, routeResult.getLimit()); } //...... return mergedResult; }
跳过的逻辑就比较简单了,LimitDecoratorMergedResult
会调用合并引擎调用OrderByStreamMergedResult
的next
方法跳过前10个元素:
//LimitDecoratorMergedResult的skipOffset跳过10个元素 private boolean skipOffset() throws SQLException { for (int i = 0; i < limit.getOffsetValue(); i++) { //调用OrderByStreamMergedResult跳过组合结果的前10个元素 if (!getMergedResult().next()) { return true; } } rowNumber = 0; return false; }
可以看到OrderByStreamMergedResult
的逻辑就是我们上文所说的取出队列中的第一组查询结果的第一个元素,然后再将其存入队(因为取出第一个元素后,队首元素最大,这组结果会存至队尾),不断循环跳够10个:
@Override public boolean next() throws SQLException { //...... //取出队列中第一组分表查询结果的第一个元素 OrderByValue firstOrderByValue = orderByValuesQueue.poll(); //如果这组分表结果还有元素则将这组分表结果入队,因为队首元素最大,所以会存放至队尾 if (firstOrderByValue.next()) { orderByValuesQueue.offer(firstOrderByValue); } //...... return true; }
经过上述步骤跳过10个元素后,就要截取第二页的10个数据了,代码再次回到PreparedStatementHandler
的handleResultSets
方法,该方法会调用到DefaultResultSetHandler
的handleRowValuesForSimpleResultMap
方法,该方法会循环10个,通过resultSet.next()
移到下一条数据的游标,然后生成对象存储到resultHandler
中,最终通过这个resultHandler就可以看到我们分页查询的List:
private void handleRowValuesForSimpleResultMap(ResultSetWrapper rsw, ResultMap resultMap, ResultHandler<?> resultHandler, RowBounds rowBounds, ResultMapping parentMapping) throws SQLException { DefaultResultContext<Object> resultContext = new DefaultResultContext<>(); ResultSet resultSet = rsw.getResultSet(); skipRows(resultSet, rowBounds); //通过resultSet.next()方法调用 while (shouldProcessMoreRows(resultContext, rowBounds) && !resultSet.isClosed() && resultSet.next()) { ResultMap discriminatedResultMap = resolveDiscriminatedResultMap(resultSet, resultMap, null); Object rowValue = getRowValue(rsw, discriminatedResultMap, null); storeObject(resultHandler, resultContext, rowValue, parentMapping, resultSet); } }
而next方法本质还是调用LimitDecoratorMergedResult的next方法,以rowNumber 来计数,调用mergedResult的next方法将游标移动到要返回的数据,
@Override public boolean next() throws SQLException { //...... //同样基于优先队列取够10个 return ++rowNumber <= limit.getRowCountValue() && getMergedResult().next(); }
而OrderByStreamMergedResult
的next
逻辑和之前差不多,就是通过轮询优先队列中的每一组分表对象的队首元素,将其存到currentQueryResult
中,后续进行对象创建时就会从currentQueryResult
中拿到这个结果生成User
对象存入List
中返回:
@Override public boolean next() throws SQLException { //...... //从优先队列orderByValuesQueue拿到队首的一组分表查询结果 OrderByValue firstOrderByValue = orderByValuesQueue.poll(); //移动当前队列游标 if (firstOrderByValue.next()) { orderByValuesQueue.offer(firstOrderByValue); } if (orderByValuesQueue.isEmpty()) { return false; } //将当前优先队列中的队首元素的queryResult作为本次的查询结果,作为后续创建User对象的数据 setCurrentQueryResult(orderByValuesQueue.peek().getQueryResult()); return true; }
存在的问题
自此我们了解了sharding-jdbc
分页查询的内部工作机制,这里我们顺便说一下这种算法的缺点,查阅官网说法是sharding-jdbc分页查询不会占用内存,说明查询结果仅仅记录的是游标:
首先,采用流式处理 + 归并排序的方式来避免内存的过量占用。由于SQL改写不可避免的占用了额外的带宽,但并不会导致内存暴涨。 与直觉不同,大多数人认为ShardingSphere会将1,000,010 * 2记录全部加载至内存,进而占用大量内存而导致内存溢出。 但由于每个结果集的记录是有序的,因此ShardingSphere每次比较仅获取各个分片的当前结果集记录,驻留在内存中的记录仅为当前路由到的分片的结果集的当前游标指向而已。 对于本身即有序的待排序对象,归并排序的时间复杂度仅为O(n),性能损耗很小。
但是笔者在使用过程中,打印内存快照时发现,进行500w
数据的深分页查询发现,它的做法和我们上文源码所说的一致,就是将当前页以及之前的结果全部加载到内存中,所以笔者认为使用sharding-jdbc
时还是需要注意一下对内存的监控:
小结
以上就是sharding-jdbc实现分页查询的示例代码的详细内容,更多关于sharding-jdbc分页查询的资料请关注脚本之家其它相关文章!