Java多线程处理千万级数据更新操作
作者:玛奇玛丶
故事背景
之前因为用户信息安全把好几个敏感信息用AES加密保存了,应业务需求,需要通过后台userId等信息查询订单,所以需要明文保存数据。但是目前订单表已经有7000万+订单数据,操作起来比较麻烦,肯定需要分批操作的,否则会有内存溢出等问题。初步想到的解决方案是新建一个数据库表,保存处理订单表的id是处理到哪一条数据,然后用limit查询一万条数据,处理完id+10000。
伪代码
//查询id
while(id<maxid){
Integer id = mapper.selectList().get(0);
List list = mapper.selectList(Wrappers.<order>lambdaQuery()
.gt(order::getId,id)
.last("limit 10000");
for(){
处理数据
update数据
}
id = list.get(10000).getId();
updateId();
}
这样update数据处理可能会比较慢,可以用多线程处理。
方案一
用多线程的话需要考虑一下怎么设计,首先是想着拿到一万条数据之后,分成10份,给10条线程去处理数据,但是这样可能会因为线程处理太慢,线程任务队列太多导致OOM异常,所以可以等待线程全都执行完毕再处理下一批数据。那么怎么等待线程执行完再处理呢?百度完发现可以用CountDownLatch实现~

方案二
如果不想等线程执行完再处理,可以转换下思路。把list当做消息队列,每个线程获取1千条数据,如果队列数据为空则再查询后面1万条数据做处理。这里有一个多线程处理共享队列的问题,所以需要在取数据的时候和增删队列数据的时候加锁。

伪代码
private static List list;
//线程处理
public void run(){
List dataList;
while(true){
synchronized(xx.class){
if(list.size() == 0){
list = getList();
}
//再判断一次,如果为空就是处理完了
if(orderList == null || orderList.size() == 0) {
return;
}
//取最后1千条
dataList = new ArrayList<>(list.subList(list.size()-1000,list.size()));
//减去最后一千条
list = list.subList(0,list.size()-1000);
}
//处理dataList
for(Order order:dataList){
...
}
}
}
多线程的坑
多线程一定要注意数据的并发安全问题,比如说用subList的话并不会复制一个新的list出来,只是原有list的基础上的一个视图,如果原来的list改变,subList得到的list也会跟着改变,如果想要用subList一定要new一个List接收。
DEMO
public static void main(String[] args) {
List<Integer> list = new ArrayList<>();
for(int i=0;i<100;i++){
list.add(i);
}
while(list.size() >=10){
//取最后10条
List<Integer> dataList = list.subList(list.size()-10,list.size());
//List<Integer> dataList = list.stream().skip(Math.max(0, list.size() - 10)).collect(Collectors.toList()) ;
ListThread listThread = new ListThread(dataList);
listThread.start();
//去掉最后10条数据
list = list.subList(0,list.size()-10);
}
list.add(33333);
}
class ListThread extends Thread{
private List<Integer> list;
public ListThread(List list){
this.list = list;
}
public void run(){
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(Thread.currentThread().getName()+"开始打印数据======");
for(int i=0;i<list.size();i++){
System.out.println(list.get(i));
}
}
}
如果sleep5秒模拟队列的增删操作,原来的list已经变了,会影响线程里的list变动,会报错
java.util.ConcurrentModificationException
要解决这个问题需要用创建一个新的list,这就不会受原来的list影响
List<Integer> dataList = new ArrayList<>(list.subList(list.size()-10,list.size())); //或者使用stream流 List<Integer> dataList = list.stream().skip(Math.max(0, list.size() - 10)).collect(Collectors.toList()) ;
后面还遇到一个问题,new出来的多线程类无法使用@Autowired注入,解决办法:
1、将需要的Bean作为线程的的构造函数的参数传入
2、使用ApplicationContext.getBean方法来静态的获取Bean(推荐)
总结
1、使用多线程一定要注意线程安全问题,操作共享变量需要加锁处理,或者分割独立的数据给各自的线程处理。如果是简单且不追求性能的业务还是用单线程比较安全。
2、使用subList要注意返回的是视图,如果原来的list有变化,需要创建一个新的list接收,或者用stream流做数据分割。
3、查询+处理数据速度大概是1分钟35万条,大概是这个时间,批量update可能还能优化,我用的是mybatis-plus自带的updateBatchById。
到此这篇关于Java多线程处理千万级数据更新操作的文章就介绍到这了,更多相关Java多线程处理数据更新内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!
