java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > Java大数据量异步处理

Java大数据量异步处理方案

作者:霸道流氓气质

本文详细介绍了异步编程在处理大量数据时的应用,重点比较了线程池、消息队列和Spring的@Async注解三种异步方案的优缺点,并提供了线程池配置及使用示例,强调了线程池方案的注意事项,需要的朋友可以参考下

一、为什么需要异步

当一次操作需要处理大量数据(如插入12万条记录到数据库),如果同步执行:

解决思路:先快速响应用户"任务已提交",再在后台异步完成耗时操作

二、方案对比

2.1 线程池(ThreadPoolExecutor)

原理:在 JVM 内部维护一组工作线程,将任务提交到队列中由这些线程异步执行。

优点

缺点

适用场景

2.2 消息队列(RabbitMQ / RocketMQ / Kafka)

原理:将任务以消息形式发送到 Broker,消费者从 Broker 拉取消息执行。

优点

缺点

适用场景

2.3 Spring @Async

原理:通过注解标记方法为异步,Spring 使用内部线程池执行。

优点

缺点

适用场景

2.4 对比表

维度线程池消息队列@Async
可靠性低(JVM 重启丢失)高(消息持久化)
分布式
外部依赖需要 Broker
延迟极低(微秒级)低(毫秒级)极低
削峰能力有限(队列大小)强(Broker 容量)有限
代码复杂度
可观测性
重试机制需自行实现内置需自行实现

三、线程池核心知识

3.1 ThreadPoolExecutor 七大参数

new ThreadPoolExecutor(
    corePoolSize,      // 核心线程数:始终存活的线程
    maximumPoolSize,   // 最大线程数:队列满了之后扩展到的上限
    keepAliveTime,     // 空闲线程存活时间
    timeUnit,          // 时间单位
    workQueue,         // 任务队列
    threadFactory,     // 线程工厂(自定义线程名称)
    rejectedHandler    // 拒绝策略
);

3.2 任务提交执行流程

提交任务
  ├── 当前线程数 < corePoolSize → 创建新核心线程执行
  ├── 当前线程数 >= corePoolSize → 放入 workQueue
  ├── workQueue 已满 且 当前线程数 < maximumPoolSize → 创建非核心线程执行
  └── workQueue 已满 且 当前线程数 >= maximumPoolSize → 执行拒绝策略

3.3 四种拒绝策略

策略行为适用场景
AbortPolicy抛出 RejectedExecutionException不允许丢任务,调用方需感知
CallerRunsPolicy由提交任务的线程自己执行不丢任务,自动降级为同步
DiscardPolicy静默丢弃允许丢失
DiscardOldestPolicy丢弃队列中最老的任务只关心最新任务

3.4 常见队列选择

队列类型特点
ArrayBlockingQueue有界,背压明确
LinkedBlockingQueue可有界可无界,无界时可能 OOM
SynchronousQueue零容量,直接交接(用于 CachedThreadPool)

3.5 参数设计经验

CPU 密集型任务(计算、排序):

IO 密集型任务(数据库写入、网络调用):

批量导入场景(大量数据库写入):

四、完整示例:基于线程池的异步批量数据导入

以下是一个示例,展示"同步校验 + 异步批量插入"模式。

4.1 线程池配置

package com.example.config;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * 异步导入线程池配置.
 */
@Configuration
public class AsyncImportThreadPoolConfig {

  @Bean(name = "importThreadPool", destroyMethod = "shutdown")
  public ThreadPoolExecutor importThreadPool() {
    return new ThreadPoolExecutor(
        4,                              // 核心线程数
        8,                              // 最大线程数
        60, TimeUnit.SECONDS,           // 空闲线程存活60秒
        new ArrayBlockingQueue<>(16),   // 有界队列,最多堆积16个任务
        new ImportThreadFactory(),      // 自定义线程工厂
        new ThreadPoolExecutor.CallerRunsPolicy()  // 队列满时由调用线程执行
    );
  }

  static class ImportThreadFactory implements ThreadFactory {
    private final AtomicInteger counter = new AtomicInteger(1);

    @Override
    public Thread newThread(Runnable r) {
      Thread t = new Thread(r, "import-worker-" + counter.getAndIncrement());
      t.setDaemon(false); // 非守护线程,确保任务执行完
      return t;
    }
  }
}

4.2 Service 接口

package com.example.service;

import java.util.List;

/**
 * 批量导入服务接口.
 */
public interface BatchImportService {

  /**
   * 导入数据:同步校验 + 异步入库.
   *
   * @param rawDataList 原始数据列表(已从文件中解析出来)
   * @param operatorId  操作人ID
   * @return 导入结果提示
   */
  String importData(List<RawData> rawDataList, String operatorId);
}

4.3 Service 实现

package com.example.service.impl;

import com.example.entity.ImportRecord;
import com.example.mapper.ImportRecordMapper;
import com.example.service.BatchImportService;
import jakarta.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ThreadPoolExecutor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;

@Slf4j
@Service
public class BatchImportServiceImpl implements BatchImportService {

  private static final int BATCH_SIZE = 2000;

  @Resource
  @Qualifier("importThreadPool")
  private ThreadPoolExecutor threadPool;

  @Resource
  private ImportRecordMapper importRecordMapper;

  @Override
  public String importData(List<RawData> rawDataList, String operatorId) {

    // ========== 第一步:同步校验(在请求线程中执行) ==========
    for (int i = 0; i < rawDataList.size(); i++) {
      RawData raw = rawDataList.get(i);
      String error = validate(raw);
      if (error != null) {
        // 遇到第一条错误立即中断,同步返回给前端
        throw new RuntimeException("第" + (i + 1) + "行:" + error);
      }
    }

    // ========== 第二步:数据转换 ==========
    List<ImportRecord> recordList = new ArrayList<>(rawDataList.size());
    for (RawData raw : rawDataList) {
      ImportRecord record = convertToEntity(raw, operatorId);
      recordList.add(record);
    }

    // ========== 第三步:异步批量插入(提交到线程池) ==========
    // 注意:这里的 recordList 对象引用传递给了异步线程
    // 确保主线程之后不再修改这个列表
    threadPool.execute(() -> {
      try {
        long start = System.currentTimeMillis();
        int total = recordList.size();

        for (int i = 0; i < total; i += BATCH_SIZE) {
          int end = Math.min(i + BATCH_SIZE, total);
          List<ImportRecord> batch = recordList.subList(i, end);
          importRecordMapper.batchInsert(batch);
        }

        long cost = System.currentTimeMillis() - start;
        log.info("异步导入完成,共{}条,耗时{}ms", total, cost);
      } catch (Exception e) {
        log.error("异步导入失败", e);
        // 可选:更新主表状态为"导入失败"
      }
    });

    // ========== 第四步:同步返回成功提示 ==========
    return "导入任务已提交,共" + recordList.size() + "条数据正在后台处理";
  }

  /**
   * 校验单条数据.
   * 返回 null 表示通过,返回错误信息表示失败.
   */
  private String validate(RawData raw) {
    if (raw.getAmount() == null) {
      return "数量不能为空";
    }
    if (raw.getAmount() <= 0 || raw.getAmount() > 999999) {
      return "数量必须为大于0的正整数,最多六位";
    }
    return null;
  }

  /**
   * 原始数据转换为实体.
   */
  private ImportRecord convertToEntity(RawData raw, String operatorId) {
    ImportRecord record = new ImportRecord();
    record.setCode(raw.getCode());
    record.setName(raw.getName());
    record.setAmount(raw.getAmount());
    record.setOperatorId(operatorId);
    return record;
  }
}

4.4 执行时序

请求线程                          线程池工作线程
   │                                  │
   │── 解析文件 ──→                    │
   │── 逐行校验 ──→                    │
   │   (校验不过直接返回错误)             │
   │── 转换数据 ──→                    │
   │── threadPool.execute(task) ──→   │
   │                                  │── 批量INSERT第1批(2000条)
   │← 返回"导入任务已提交" ──           │── 批量INSERT第2批(2000条)
   │                                  │── ...
   │   (HTTP响应已返回给前端)            │── 批量INSERT第N批
   │                                  │── 记录日志"导入完成"

五、线程池方案的注意事项

5.1 线程安全

提交给线程池的数据(如 recordList)在主线程返回后不能再修改。示例中使用的是 ArrayList,提交后主线程不再操作它,所以安全。如果有并发修改风险,应使用 Collections.unmodifiableList() 或复制一份。

5.2 事务边界

异步线程中的数据库操作有独立的事务上下文。如果需要在主表保存后、从表插入中途失败时回滚主表,需要额外的补偿逻辑(如更新主表状态为"导入失败")。

5.3 优雅停机

Spring Boot 配置 server.shutdown=graceful 后,停机时会等待请求处理完成。但线程池中的任务默认不被等待。配置 destroyMethod = "shutdown" 可以让 Spring 容器销毁 Bean 时调用 shutdown(),等待正在执行的任务完成(但队列中等待的任务不会执行)。

如果要确保队列中的任务也执行完:

@PreDestroy
public void destroy() {
    threadPool.shutdown();
    try {
        if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) {
            threadPool.shutdownNow();
        }
    } catch (InterruptedException e) {
        threadPool.shutdownNow();
    }
}

5.4 监控

线程池没有内置的管理界面。建议通过定时任务或 Actuator 暴露:

log.info("线程池状态 - 活跃:{}, 队列积压:{}, 已完成:{}",
    threadPool.getActiveCount(),
    threadPool.getQueue().size(),
    threadPool.getCompletedTaskCount());

六、什么时候该用消息队列替代线程池

信号建议
服务多实例部署,需要负载均衡消费用消息队列
任务绝对不能丢失(如金融交易)用消息队列
需要延时执行或定时重试用消息队列
任务量突增需要削峰用消息队列
单实例、任务可重试(如重新导入)线程池足够
对延迟敏感(需要立即开始执行)线程池更合适
不想引入外部依赖线程池

以上就是Java大数据量异步处理方案的详细内容,更多关于Java大数据量异步处理的资料请关注脚本之家其它相关文章!

您可能感兴趣的文章:
阅读全文