java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > Java线程池并发编程

Java线程池高效并发编程实战技巧

作者:旧日之血_Hayter

线程池是一种线程管理机制,它预先创建一定数量的线程并放入池中,当需要执行任务时,从池中获取空闲线程来执行任务,任务完成后线程不销毁而是返回池中等待下一次任务,这篇文章给大家介绍Java线程池高效并发编程实战技巧,感兴趣的朋友跟随小编一起看看吧

概要

因为面试中暴露出来的不足,所以写一写线程池,也算是复习一下。

什么是线程池

线程池是一种线程管理机制,它预先创建一定数量的线程并放入池中,当需要执行任务时,从池中获取空闲线程来执行任务,任务完成后线程不销毁而是返回池中等待下一次任务。

主要作用

1、降低资源消耗

避免频繁创建和销毁线程的开销,重复利用已经创建好了的线程。

2、提高响应速度

任务到达时,无需额外创建线程即可运行

3、提高线程可管理性

统一管理线程资源,避免无限制创建线程导致系统崩溃,可以控制并发线程数量,避免过度竞争。

4、提供更强大的功能

基础线程池使用实例

import java.util.concurrent.*;
import java.util.Random;
public class ThreadPoolDemo {
    public static void main(String[] args) {
        // 1. 创建线程池
        // 核心参数:核心线程数5,最大线程数10,空闲时间60秒,任务队列容量100
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
            5,  // corePoolSize
            10, // maximumPoolSize
            60, // keepAliveTime
            TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(100), // 任务队列
            Executors.defaultThreadFactory(),
            new ThreadPoolExecutor.AbortPolicy() // 拒绝策略
        );
        // 2. 提交任务
        for (int i = 1; i <= 20; i++) {
            int taskId = i;
            executor.execute(() -> {
                System.out.println("处理任务" + taskId + 
                    ", 线程: " + Thread.currentThread().getName());
                try {
                    Thread.sleep(1000); // 模拟业务处理
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            });
        }
        // 3. 优雅关闭
        executor.shutdown();
        try {
            if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
                executor.shutdownNow();
            }
        } catch (InterruptedException e) {
            executor.shutdownNow();
        }
    }
}

如上所示,我们经历了:

1、创建线程池

其核心线程数为5,最大线程数为10,空闲时间60s,任务队列容量为100

2、提交任务

3、关闭

线程池的工作流程

当你调用 executor.execute() 时,内部会发生以下逻辑:

业务场景

订单异步处理

import java.util.concurrent.*;
import java.util.List;
import java.util.ArrayList;
public class OrderProcessor {
    // 使用单例模式创建线程池
    private static final ThreadPoolExecutor orderExecutor = new ThreadPoolExecutor(
        3, 8, 30, TimeUnit.SECONDS,
        new ArrayBlockingQueue<>(1000),
        new ThreadFactory() {
            private int count = 0;
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r, "order-process-" + (++count));
                thread.setDaemon(false);
                return thread;
            }
        },
        new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略:调用者线程执行
    );
    /**
     * 异步处理订单
     */
    public CompletableFuture<Void> processOrderAsync(Order order) {
        return CompletableFuture.runAsync(() -> {
            try {
                // 1. 验证订单
                validateOrder(order);
                // 2. 扣减库存
                reduceInventory(order);
                // 3. 生成发货单
                generateShipping(order);
                // 4. 发送通知
                sendNotification(order);
                System.out.println("订单处理完成: " + order.getId());
            } catch (Exception e) {
                // 记录异常,进行补偿
                handleOrderException(order, e);
            }
        }, orderExecutor);
    }
    /**
     * 批量处理订单
     */
    public CompletableFuture<Void> batchProcessOrders(List<Order> orders) {
        List<CompletableFuture<Void>> futures = new ArrayList<>();
        for (Order order : orders) {
            CompletableFuture<Void> future = processOrderAsync(order);
            futures.add(future);
        }
        // 等待所有任务完成
        return CompletableFuture.allOf(
            futures.toArray(new CompletableFuture[0])
        );
    }
    // 业务方法(模拟实现)
    private void validateOrder(Order order) {
        // 验证逻辑
    }
    private void reduceInventory(Order order) {
        // 扣减库存逻辑
    }
    private void generateShipping(Order order) {
        // 生成发货单逻辑
    }
    private void sendNotification(Order order) {
        // 发送通知
    }
    private void handleOrderException(Order order, Exception e) {
        // 异常处理
    }
    // 优雅关闭
    public void shutdown() {
        orderExecutor.shutdown();
    }
    // 订单类
    static class Order {
        private String id;
        // 其他字段
        public String getId() { return id; }
    }
}

说明

1、为什么返回类型是CompletableFuture<Void>?

答:

其实你要是不想返回的话,直接void就行。线程自己处理业务逻辑,啥都不用管。但是缺点就是,你什么都不知道,无法等待任务完成,而且不知道任务会不会被线程池拒绝。

2、这么写有什么好处呢?

答:

虽然你的逻辑内部不产生结果(即 runAsync 的特性),但返回 CompletableFuture 有以下三个核心好处:

(关于链式调用的问题,后面会新开一遍文章说一下,爱你。)

3、还有什么常见的返回类型吗?

返回类型场景建议
CompletableFuture<Void>推荐。 异步执行,不返回数据,但允许调用者监听状态。
void极致的“甩手掌柜”,调用方完全不关心后续,代码最简。
CompletableFuture<T>异步执行,且需要把处理后的结果传回给调用方。

异步数据导出

import java.util.concurrent.*;
import java.util.List;
import java.io.File;
public class DataExportService {
    // 专门用于导出任务的线程池
    private static final ThreadPoolExecutor exportExecutor = new ThreadPoolExecutor(
        2, 4, 5, TimeUnit.MINUTES,
        new LinkedBlockingQueue<>(50),
        new ThreadFactory() {
            private int count = 0;
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r, "export-thread-" + (++count));
                thread.setPriority(Thread.NORM_PRIORITY);
                return thread;
            }
        },
        new ThreadPoolExecutor.DiscardOldestPolicy() // 拒绝策略:丢弃最老任务
    );
    /**
     * 异步导出Excel
     */
    public CompletableFuture<File> exportExcelAsync(String exportId, 
                                                     List<?> dataList) {
        return CompletableFuture.supplyAsync(() -> {
            System.out.println("开始导出数据,任务ID: " + exportId);
            try {
                // 模拟大数据量处理
                File excelFile = generateExcelFile(dataList);
                // 模拟上传到云存储
                String url = uploadToCloudStorage(excelFile);
                // 记录导出日志
                saveExportLog(exportId, url, "SUCCESS");
                return excelFile;
            } catch (Exception e) {
                saveExportLog(exportId, null, "FAILED");
                throw new RuntimeException("导出失败", e);
            }
        }, exportExecutor);
    }
    /**
     * 带进度的数据导出
     */
    public CompletableFuture<File> exportWithProgress(String exportId, 
                                                      List<?> dataList,
                                                      ProgressCallback callback) {
        return CompletableFuture.supplyAsync(() -> {
            int total = dataList.size();
            int batchSize = 1000;
            int processed = 0;
            for (int i = 0; i < total; i += batchSize) {
                int end = Math.min(i + batchSize, total);
                List<?> batchData = dataList.subList(i, end);
                // 处理批次数据
                processBatchData(batchData);
                processed = end;
                float progress = (float) processed / total;
                // 回调更新进度
                if (callback != null) {
                    callback.onProgress(progress);
                }
                // 模拟处理时间
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
            return generateExcelFile(dataList);
        }, exportExecutor);
    }
    // 业务方法(模拟实现)
    private File generateExcelFile(List<?> dataList) {
        // 生成Excel文件
        return new File("export.xlsx");
    }
    private String uploadToCloudStorage(File file) {
        // 上传到云存储
        return "https://oss.example.com/" + file.getName();
    }
    private void saveExportLog(String exportId, String url, String status) {
        // 保存日志
    }
    private void processBatchData(List<?> batchData) {
        // 处理批次数据
    }
    // 进度回调接口
    public interface ProgressCallback {
        void onProgress(float progress);
    }
    // 获取线程池状态
    public void printThreadPoolStatus() {
        System.out.println("核心线程数: " + exportExecutor.getCorePoolSize());
        System.out.println("活动线程数: " + exportExecutor.getActiveCount());
        System.out.println("任务队列大小: " + exportExecutor.getQueue().size());
        System.out.println("已完成任务数: " + exportExecutor.getCompletedTaskCount());
    }
}

说明

这里有点看不懂,先说一下吧,exportExcelAsync是最标准的异步流导出,直接调用线程池进行业务逻辑的调用并且返回结果,流程如下:

这是最基础的异步流,采用了 CompletableFuture.supplyAsync

执行步骤:

而exportWithProgress则是这样子的

这是这段代码的高级之处。它解决了大数据量导出时“用户不知道还要等多久”的问题。

两者都用了try catch来保证健壮性

可以优化的点:

也就是这里

// 模拟处理时间
try {
    Thread.sleep(100);
} catch (InterruptedException e) {
    // 就是这一句!重新设置中断状态
    Thread.currentThread().interrupt();
}

为什么说这行代码“很专业”?

在 Java 并发编程中,这是一个非常容易被新手忽略的最佳实践

1. 中断标志位被“擦除”了

当一个线程正在 sleep 时,如果外部调用了 thread.interrupt()sleep 方法会立刻抛出 InterruptedException重点来了: 一旦抛出这个异常,JVM 会自动把该线程的“中断标志位”清除(改为 false)。

2. 如果不加这一句会发生什么?

如果你只是打印了日志,或者干脆 catch 块里什么都不写:

3. Thread.currentThread().interrupt() 的作用

这一行的意思是:“既然异常把中断标志位擦除了,那我就手动把它再设回 true。”

这样做有几个好处:

定时任务线程池

import java.util.concurrent.*;
import java.time.LocalDateTime;
public class ScheduledTaskService {
    private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(3);
    /**
     * 初始化定时任务
     */
    public void initScheduledTasks() {
        // 1. 每天凌晨执行数据清理
        scheduleDailyCleanup();
        // 2. 每5分钟执行一次数据同步
        schedulePeriodicSync();
        // 3. 延迟执行一次性任务
        scheduleOneTimeTask();
    }
    /**
     * 每天凌晨2点执行数据清理
     */
    private void scheduleDailyCleanup() {
        long initialDelay = calculateInitialDelay(2, 0); // 凌晨2点
        long period = 24 * 60 * 60; // 24小时
        scheduler.scheduleAtFixedRate(() -> {
            try {
                System.out.println("开始数据清理: " + LocalDateTime.now());
                cleanUpOldData();
                System.out.println("数据清理完成: " + LocalDateTime.now());
            } catch (Exception e) {
                System.err.println("数据清理失败: " + e.getMessage());
            }
        }, initialDelay, period, TimeUnit.SECONDS);
    }
    /**
     * 每5分钟执行数据同步
     */
    private void schedulePeriodicSync() {
        scheduler.scheduleWithFixedDelay(() -> {
            try {
                syncDataWithExternalSystem();
            } catch (Exception e) {
                // 记录异常,下次继续执行
                System.err.println("数据同步失败: " + e.getMessage());
            }
        }, 0, 5, TimeUnit.MINUTES);
    }
    /**
     * 延迟10秒执行一次性任务
     */
    private void scheduleOneTimeTask() {
        scheduler.schedule(() -> {
            System.out.println("执行一次性任务: " + LocalDateTime.now());
        }, 10, TimeUnit.SECONDS);
    }
    /**
     * 提交可取消的定时任务
     */
    public ScheduledFuture<?> submitCancellableTask(Runnable task, 
                                                    long initialDelay, 
                                                    long period, 
                                                    TimeUnit unit) {
        return scheduler.scheduleAtFixedRate(task, initialDelay, period, unit);
    }
    // 工具方法:计算到指定时间的延迟
    private long calculateInitialDelay(int targetHour, int targetMinute) {
        LocalDateTime now = LocalDateTime.now();
        LocalDateTime targetTime = now.withHour(targetHour)
                                     .withMinute(targetMinute)
                                     .withSecond(0);
        if (now.isAfter(targetTime)) {
            targetTime = targetTime.plusDays(1);
        }
        return java.time.Duration.between(now, targetTime).getSeconds();
    }
    // 业务方法
    private void cleanUpOldData() {
        // 清理过期数据
    }
    private void syncDataWithExternalSystem() {
        // 同步数据
    }
    public void shutdown() {
        scheduler.shutdown();
    }
}

说明

这个之前做过,这里总结一下真正业务中会怎么做

1、Spring的用法

如果项目是 Spring Boot,通常不会手动去 new ScheduledExecutorService。我们会利用 Spring 封装好的注解,配合配置文件。

@Component
@Slf4j
public class DataCleanupTask {
    // 从配置文件读取 Cron 表达式,例如:0 0 2 * * ? (每天凌晨2点)
    @Scheduled(cron = "${task.cleanup.cron}")
    public void dailyCleanup() {
        log.info("开始数据清理...");
        try {
            // 业务逻辑
        } catch (Exception e) {
            log.error("清理失败", e);
        }
    }
}

2、分布式锁

代码在单机运行没问题,但现代业务通常是 多实例部署

(当时的统计数据业务就是这么处理的)

@Scheduled(cron = "0 0 2 * * ?")
@SchedulerLock(name = "dataCleanupTask", lockAtMostFor = "10m", lockAtLeastFor = "1m")
public void scheduledTask() {
    // 只有抢到锁的机器才会执行
}

3、分布式任务调度平台(XXL-JOB / Quartz)

在大型互联网公司,定时任务通常是独立于业务代码进行管理的。最常用的方案是 XXL-JOB(国内主流)或 Elastic-Job

为什么业务开发喜欢用平台?

总结

场景推荐方案
本地小工具/单机脚本维持你现在的 ScheduledExecutorService (最轻量)
普通 Spring Boot 业务@Scheduled + 配置文件
多台服务器集群部署@Scheduled + ShedLock (最简单有效)
中大型分布式系统XXL-JOBCloud Native CronJob (最专业)

小结

对于线程池的用法做了一点小小的总结,这是个开始。

到此这篇关于Java线程池高效并发编程实战技巧的文章就介绍到这了,更多相关Java线程池并发编程内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文