java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > Java关闭ExecutorService

Java如何优雅关闭异步中的ExecutorService

作者:Huooya

在并发编程领域,Java的ExecutorService是线程池管理的关键接口,这篇文章主要为大家介绍了如何优雅关闭异步中的ExecutorService,需要的可以了解下

1.ExecutorService的核心价值

在并发编程领域,Java的ExecutorService(位于java.util.concurrent包)是线程池管理的关键接口。作为Executor框架的核心组件,它通过解耦任务提交与执行机制,为开发者提供了:

2.关闭机制的必要性

不正确的线程池关闭会导致:

3.shutdown()方法详解

3.1 方法特性

void shutdown()

状态转换

将线程池状态设置为SHUTDOWN,触发以下行为:

典型应用场景

ExecutorService executor = Executors.newFixedThreadPool(4);
// 提交多个任务...
executor.shutdown();

try {
    if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
        System.err.println("仍有任务未在时限内完成");
    }
} catch (InterruptedException e) {
    Thread.currentThread().interrupt();
}

3.2 内部运作机制

4.shutdownNow()方法剖析

4.1 方法定义

List<Runnable> shutdownNow()

状态转换

将线程池状态设置为STOP,触发:

4.2 中断处理要点

executor.shutdownNow();
// 典型返回值处理
List<Runnable> unprocessed = executor.shutdownNow();
if (!unprocessed.isEmpty()) {
    logger.warn("丢弃{}个未执行任务", unprocessed.size());
}

任务中断条件

只有当任务代码正确处理中断时才能被终止:

class InterruptibleTask implements Runnable {
    public void run() {
        while (!Thread.currentThread().isInterrupted()) {
            // 执行可中断的操作
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt(); // 重置中断状态
                break;
            }
        }
    }
}

5.对比分析

特性shutdown()shutdownNow()
新任务接受立即拒绝立即拒绝
运行中任务处理等待完成尝试中断
队列任务处理全部执行清除并返回
返回值void未执行任务列表
适用场景优雅关闭紧急终止
线程中断策略仅中断空闲线程强制中断所有线程

6.最佳实践代码示例

6.1 标准关闭模板

public class GracefulShutdownExample {
    // 定义超时时间和时间单位(30秒)
    private static final int TIMEOUT = 30;
    private static final TimeUnit UNIT = TimeUnit.SECONDS;

    // 执行任务的方法,接收一个任务列表并将其提交给线程池执行
    public void executeTasks(List<Runnable> tasks) {
        // 创建一个固定大小的线程池,大小为系统可用处理器核心数
        ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
        
        try {
            // 将任务列表中的每个任务提交到线程池
            tasks.forEach(executor::submit);
        } finally {
            // 在所有任务提交完后,禁用线程池接收新任务,开始优雅关闭线程池
            executor.shutdown(); // 禁止再提交新任务
            try {
                // 等待线程池中的任务在指定超时内完成,如果超时未完成,则强制关闭线程池
                if (!executor.awaitTermination(TIMEOUT, UNIT)) {
                    // 如果未能在超时内完成,则调用 shutdownNow() 强制终止所有活动任务
                    List<Runnable> unfinished = executor.shutdownNow();
                    // 处理未完成的任务,例如记录日志或重新提交
                    handleUnfinishedTasks(unfinished);
                }
            } catch (InterruptedException e) {
                // 如果在等待终止时被中断,恢复中断状态并强制关闭线程池
                Thread.currentThread().interrupt();
                executor.shutdownNow();
            }
        }
    }
    
    // 处理未完成任务的方法,这里我们打印未完成任务的数量
    private void handleUnfinishedTasks(List<Runnable> tasks) {
        // 如果有未完成的任务,打印任务数量并执行额外的处理
        if (!tasks.isEmpty()) {
            System.out.println("未完成任务数: " + tasks.size());
            // 可在此处记录日志、重新排队未完成的任务等
        }
    }

}

构造线程池: Executors.newFixedThreadPool() 创建一个固定大小的线程池,大小为系统可用的处理器核心数,这样可以更高效地利用 CPU 资源。

提交任务: 使用 tasks.forEach(executor::submit) 提交每个任务到线程池中执行。

优雅关闭线程池:

处理中断: 如果在等待终止过程中发生 InterruptedException,线程会恢复中断状态,并且强制关闭线程池。

处理未完成任务: handleUnfinishedTasks() 方法会处理未完成的任务,比如记录日志或者重新排队未完成的任务。

6.2 带回调的增强实现

public class EnhancedExecutorManager {
    // 定义线程池对象
    private final ExecutorService executor;
    // 定义超时时间及单位
    private final long timeout;
    private final TimeUnit unit;

    // 构造函数,初始化线程池并设置超时时间和单位
    public EnhancedExecutorManager(int corePoolSize, long timeout, TimeUnit unit) {
        // 创建一个核心池大小为 corePoolSize,最大池大小为 corePoolSize * 2,最大空闲时间 60秒的线程池
        this.executor = new ThreadPoolExecutor(
            corePoolSize,                             // 核心线程池大小
            corePoolSize * 2,                         // 最大线程池大小
            60L, TimeUnit.SECONDS,                    // 空闲线程的存活时间
            new LinkedBlockingQueue<>(1000),          // 使用容量为 1000 的队列来缓存任务
            new CustomThreadFactory(),                // 自定义线程工厂
            new ThreadPoolExecutor.CallerRunsPolicy() // 当任务无法提交时,调用者线程执行该任务
        );
        this.timeout = timeout;                     // 设置超时时间
        this.unit = unit;                           // 设置超时时间单位
    }
    
    // 优雅关闭线程池的方法
    public void shutdown() {
        executor.shutdown(); // 首先尝试正常关闭线程池,不再接收新的任务
        
        try {
            // 如果线程池未能在指定的超时时间内终止,则强制关闭
            if (!executor.awaitTermination(timeout, unit)) {
                System.out.println("强制终止线程池...");
                // 强制停止所有正在执行的任务并返回丢弃的任务列表
                List<Runnable> droppedTasks = executor.shutdownNow();
                System.out.println("丢弃任务数: " + droppedTasks.size());
            }
        } catch (InterruptedException e) {
            // 如果在等待过程中线程池关闭操作被中断,立即强制关闭并恢复中断状态
            executor.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }
    
    // 自定义线程工厂类,用于创建线程
    private static class CustomThreadFactory implements ThreadFactory {
        private static final AtomicInteger poolNumber = new AtomicInteger(1); // 线程池编号,用于生成线程名
        private final ThreadGroup group; // 线程组
        private final AtomicInteger threadNumber = new AtomicInteger(1); // 线程编号
        private final String namePrefix; // 线程名称前缀
    
        CustomThreadFactory() {
            // 获取当前系统的安全管理器,如果没有,则使用当前线程的线程组
            SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() :
                                  Thread.currentThread().getThreadGroup();
            // 设置线程池的名称前缀
            namePrefix = "pool-" +
                          poolNumber.getAndIncrement() + // 线程池编号递增
                         "-thread-";
        }
    
        // 创建新线程的方法
        public Thread newThread(Runnable r) {
            // 创建新的线程,线程组、名称及优先级均已设置
            Thread t = new Thread(group, r,
                                  namePrefix + threadNumber.getAndIncrement(),
                                  0); // 默认优先级和daemon设置
            // 如果线程是守护线程,则将其设置为非守护线程
            if (t.isDaemon())
                t.setDaemon(false);
            // 设置线程优先级为默认
            if (t.getPriority() != Thread.NORM_PRIORITY)
                t.setPriority(Thread.NORM_PRIORITY);
            return t; // 返回新创建的线程
        }
    }
}

线程池初始化:

优雅关闭线程池:

自定义线程工厂:

7.关键注意事项

8.高级应用场景

分级关闭策略

public class TieredShutdownManager { 
    // 定义三个优先级的线程池列表:高优先级、中优先级、低优先级
    private final List<ExecutorService> highPriority; 
    private final List<ExecutorService> normalPriority; 
    private final List<ExecutorService> lowPriority; 
  
    // 公共方法用于优雅关闭所有线程池
    public void gracefulShutdown() { 
        // 依次关闭高、中、低优先级的线程池
        shutdownTier(highPriority, 10, TimeUnit.SECONDS); 
        shutdownTier(normalPriority, 30, TimeUnit.SECONDS); 
        shutdownTier(lowPriority, 60, TimeUnit.SECONDS); 
    } 
  
    // 私有方法,用于优雅关闭指定优先级的线程池
    private void shutdownTier(List<ExecutorService> tier, long timeout, TimeUnit unit) { 
        // 对指定的线程池列表执行关闭操作
        tier.forEach(ExecutorService::shutdown); 
  
        // 对每个线程池执行等待终止的操作,指定超时时间
        tier.forEach(executor -> { 
            try { 
                // 如果线程池未在超时时间内终止,则调用 shutdownNow 强制关闭
                if (!executor.awaitTermination(timeout, unit)) { 
                    executor.shutdownNow(); 
                } 
            } catch (InterruptedException e) { 
                // 如果在等待终止过程中线程被中断,恢复中断状态并强制关闭线程池
                Thread.currentThread().interrupt(); 
                executor.shutdownNow(); 
            } 
        }); 
    } 
}

gracefulShutdown 方法按照优先级顺序依次关闭高、中、低优先级的线程池。

shutdownTier 方法首先尝试正常关闭每个线程池(调用 shutdown),然后通过 awaitTermination 方法等待线程池在指定的时间内结束,如果未成功结束,则调用 shutdownNow 强制关闭。

在关闭过程中,如果发生中断,则会捕获 InterruptedException 异常,并且中断当前线程,同时强制关闭线程池。

9.性能优化建议

根据任务类型选择队列策略:

监控关键指标:

ThreadPoolExecutor executor = (ThreadPoolExecutor) service;
System.out.println("活跃线程数: " + executor.getActiveCount());
System.out.println("完成任务数: " + executor.getCompletedTaskCount());
System.out.println("队列大小: " + executor.getQueue().size());

动态调整参数:

executor.setCorePoolSize(newSize);
executor.setMaximumPoolSize(newMaxSize);

10.总结建议

根据Oracle官方文档建议,在大多数生产场景中推荐以下关闭流程:

正确选择关闭策略需要综合考量:

到此这篇关于Java如何优雅关闭异步中的ExecutorService的文章就介绍到这了,更多相关Java关闭ExecutorService内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文