java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > Java创建线程池方式

Java创建线程池的几种方式代码示例

作者:动物园首领

这篇文章主要介绍了Java中创建线程池的四种方式,包括使用Executors类、ThreadPoolExecutor类、Future和Callable接口以及Spring的ThreadPoolTaskExecutor,文中通过代码介绍的非常详细,需要的朋友可以参考下

一、创建线程池四种方式

二、线程池重要参数

package com.demo.threadPool;

import java.util.concurrent.*;

public class MainDemo1 {
    public static void main(String[] args) {
        int corePoolSize = 5; // 核心线程数
        int maximumPoolSize = 10; // 最大线程数
        long keepAliveTime = 1; // 非核心线程空闲存活时间
        /**
         * 存活时间单位
         * TimeUnit.DAYS:天
         * TimeUnit.HOURS:小时
         * TimeUnit.MINUTES:分
         * TimeUnit.SECONDS:秒
         * TimeUnit.MILLISECONDS:毫秒
         * TimeUnit.MICROSECONDS:微妙
         * TimeUnit.NANOSECONDS:纳秒
         */
        TimeUnit unit = TimeUnit.MINUTES;
        BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<Runnable>(); // 工作队列
        ThreadFactory threadFactory = Executors.defaultThreadFactory(); // 线程工厂
        RejectedExecutionHandler handler = new ThreadPoolExecutor.AbortPolicy(); // 拒绝策略
        ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize,maximumPoolSize,keepAliveTime,unit,workQueue,threadFactory,handler);
    }
}

三、线程池5种状态

四、Executors 类创建线程池

package com.demo.threadPool;

import java.util.List;
import java.util.concurrent.*;

public class MainThreadPool {

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        //初始化固定大小线程池
        ExecutorService executor1 = Executors.newFixedThreadPool(5);

        //使用 execute(Runnable command) 方法提交一个不需要返回结果的任务,
        // 或者使用submit(Callable<T> task) 方法提交一个需要返回结果的任务。
        for (int i = 0; i < 10; i++) {
            executor1.execute(new TaskR(i));
        }

        //使用 submit(Callable<T> task) 任务并获取 Future
        //使用 Future.get() 方法等待任务完成并获取结果。这个方法会阻塞调用线程直到任务完成。
        for (int i = 0; i < 10; i++) {
            Future<String> future =  executor1.submit(new TaskC(i));
            System.out.println("线程返回结果  "+future.get());
        }
        // 当所有任务都执行完毕,或者需要关闭线程池时,调用 shutdown() 方法。
        // 这将等待正在执行的任务完成,但不接收新任务。
        executor1.shutdown();

        //使用 shutdownNow() 方法尝试立即停止所有正在执行的任务,并返回等待执行的任务列表
        List<Runnable> notExecutedTasks = executor1.shutdownNow();
        for(Runnable ls : notExecutedTasks){
            System.out.println(ls);
        }

        //使用 awaitTermination() 方法等待线程池关闭,直到所有任务完成或超时。
        boolean res = executor1.awaitTermination(60, TimeUnit.SECONDS);
        System.out.println("执行结果:"+res);
    }
}

/**
 * 实现 Runnable 接口
 */
class TaskR implements Runnable {
    private int id;
    public TaskR(int id) {
        this.id = id;
    }
    public void run() {
        System.out.println("TaskR " + id + " is running...");
    }
}

/**
 * 实现 Callable 接口
 * 有返回值
 */
class TaskC implements Callable {
    private int id;
    public TaskC(int id) {
        this.id = id;
    }
    @Override
    public Object call(){
        System.out.println("TaskC " + id + " is running...");
        return id+"--TaskC";
    }
}
package com.demo.threadPool;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

/**
 * 单线程池 (newSingleThreadExecutor):
 * 创建一个只有一个线程的线程池。即使有多个任务提交,它们也会被排队,逐个由单个线程执行。
 */
public class MainOne {

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        /**
         * 单线程:创建的执行服务内部有一个线程。所有提交给它的任务将会序列化执行,也就是说,它会在单个线程上依次执行任务,不会有并发执行的情况发生
         * 任务队列:如果有多个任务提交给这个执行器,除了当前正在执行的任务外,其他任务将会在一个无界队列中等待,直到线程可用
         * 处理任务失败:如果执行中的线程由于任务抛出异常而终止,执行服务会安排一个新的线程来替换它,以继续执行后续的任务
         * 使用场景: newSingleThreadExecutor 非常适合需要顺序执行的任务,并且要求任务之间不受并发问题影响的场景
         */
        ExecutorService executor = Executors.newSingleThreadExecutor();

        for (int i = 0; i < 10; i++) {
            executor.execute(new TaskR(i));
        }

        //使用 submit(Callable<T> task) 任务并获取 Future
        //使用 Future.get() 方法等待任务完成并获取结果。这个方法会阻塞调用线程直到任务完成。
        for (int i = 0; i < 10; i++) {
            Future<String> future =  executor.submit(new TaskC(i));
            System.out.println("线程返回结果  "+future.get());
        }
        // 当所有任务都执行完毕,或者需要关闭线程池时,调用 shutdown() 方法。
        // 这将等待正在执行的任务完成,但不接收新任务。
        executor.shutdown();
    }
}
package com.demo.threadPool;

import java.util.Date;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * 缓存线程池 (newCachedThreadPool):
 * 创建一个可根据需要创建新线程的线程池。如果线程空闲超过60秒,它们将被终止并从池中移除
 */
public class MainCacheThreadPool {
    public static void main(String[] args) throws InterruptedException {

        System.out.println(Thread.currentThread().getName() + "线程: Start at: " + new Date());
        //初始化缓存线程池
        ExecutorService exec = Executors.newCachedThreadPool();
        for (int i = 1; i < 10; i++) {
            System.out.println("添加了第" + i + "个任务类");
            Thread.sleep(2000);
            exec.execute(new TaskR(i));
        }
        //所有任务结束后关闭线程池
        exec.shutdown();
        System.out.println(Thread.currentThread().getName() + " 线程: Finished all threads at:" + new Date());

    }
}
package com.demo.threadPool;

import java.util.Date;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

/**
 * 固定频率执行
 * 调度线程池 (newScheduledThreadPool):
 * 创建一个支持定时任务和周期性任务的线程池
 */
public class MainScheduledThreadPool {
    public static void main(String[] args) {
        /**
         * 场景描述
         * 假设你需要一个应用程序,该程序能够每10秒执行一次任务,并在启动后1分钟开始执行。此外,
         * 你还需要能够安排一次性任务在未来的某个时间点执行
         */
        ScheduledExecutorService threadPool = Executors.newScheduledThreadPool(10);

        // 安排定期任务
        // 初始延迟1分钟,之后每10秒执行一次
        threadPool.scheduleAtFixedRate(new TaskR(2), 60, 10, TimeUnit.SECONDS);

        // 安排一次性任务
        // 使用 schedule 方法安排一个任务,在指定的延迟后执行一次
        // 延迟5分钟后执行
        threadPool.schedule(new TaskR(3), 5, TimeUnit.MINUTES);

        // 关闭线程池
        // 当不再需要线程池时,调用 shutdown 方法来关闭线程池。这将等待正在执行的任务完成,但不接收新任务
        threadPool.shutdown();

        // 等待线程池关闭
        // 使用 awaitTermination 方法等待线程池关闭,直到所有任务完成或超时。
        try {
            threadPool.awaitTermination(1, TimeUnit.HOURS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }
}
package com.demo.threadPool;


import java.util.concurrent.*;

/**
 * 使用给定的线程工厂创建线程池
 */
public class MainFactory {
    public static void main(String[] args) {
        //自定义线程工厂创建
        ThreadFactory threadFactory = new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r);
            }
        };
        //使用给定的线程工厂创建线程池
        ExecutorService executor = Executors.newFixedThreadPool(5, threadFactory);
        executor.execute(new TaskR(2));
    }
}
package com.demo.threadPool;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
/**
 *  自定义线程工厂:设置线程名,守护线程,优先级以及UncaughtExceptionHandler
 */
public class MainFactory implements ThreadFactory {

    private final ThreadGroup group;
    private final AtomicInteger threadNumber = new AtomicInteger(1);
    private final String namePrefix;
    public MainFactory(String namePrefix) {
        SecurityManager s = System.getSecurityManager();
        group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
        this.namePrefix = namePrefix + "-thread-";
    }

    public MainFactory(ThreadGroup group, String namePrefix) {
        this.group = group;
        this.namePrefix = namePrefix;
    }

    @Override
    public Thread newThread(Runnable r) {
        Thread t = new Thread(group, r,namePrefix + threadNumber.getAndIncrement(),0);
        //守护线程
        if (t.isDaemon())
            t.setDaemon(true);
        //线程优先级
        if (t.getPriority() != Thread.NORM_PRIORITY)
            t.setPriority(Thread.NORM_PRIORITY);
        /**
         * 处理未捕捉的异常
         */
        t.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
            @Override
            public void uncaughtException(Thread t, Throwable e) {
                System.out.println("处理未捕获的异常");
            }
        });
        return t;
    }

    //测试方法
    public static void main(String[] args) {
        ExecutorService pool = Executors.newFixedThreadPool(5, new MainFactory("测试线程"));
        for (int i = 0; i < 10; i++) {
            pool.execute(new Runnable() {
                @Override
                public void run() {
                    System.out.println("线程处理");
                    //未捕获的异常,走自定义的UncaughtExceptionHandler逻辑
                    int i = 1 / 0;
                }
            });
        }
        pool.shutdown();
    }
}

五、ThreadPoolExecutor 类创建线程池

ThreadPoolExecutor 是 java.util.concurrent 包中用来创建线程池的一个类。它提供了一种灵活的方式来管理线程池,允许你控制线程的创建和销毁。

ThreadPoolExecutor 类中的几个重要方法

package com.demo.threadPool;
import java.util.Random;
import java.util.concurrent.*;
/**
 * ThreadPoolExecutor 是 java.util.concurrent 包中用来创建线程池的一个类
 * 它提供了一种灵活的方式来管理线程池,允许你控制线程的创建和销毁。
 * 以下是几种常见的创建 ThreadPoolExecutor 线程池的方式
 * 实际上 Executors 类也是调用 ThreadPoolExecutor 类创建的线程
 */
public class MainThreadPoolExecutor {
    //测试方法
    public static void main(String[] args) {
        /**
         * 核心线程数,核心线程就是一直存在的线程
         */
        int corePoolSize = 5;
        /**
         * 最大线程数,表示线程池中最多能创建多少个线程
         * 非核心线程数 = 最大线程数 - 核心线程数
         */
        int maximumPoolSize = 10;
        /**
         * 默认情况下,只有当线程池中的线程数大于corePoolSize时,
         * keepAliveTime才会起作用,则会终止,直到线程池中的线程数不超过corePoolSize
         * 则会终止,直到线程池中的线程数不超过corePoolSize
         * 但是如果调用了 allowCoreThreadTimeOut(boolean) 方法
         * 在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起作用,直到线程池中的线程数为 0
         * 针对非核心线程而言,表示线程没有任务执行时最多保持多久时间会终止
         */
        long keepAliveTime = 60;
        /**
         * 时间单位
         * 与 keepAliveTime 配合使用,针对非核心线程
         */
        TimeUnit unit = TimeUnit.SECONDS;
        /**
         * 存放任务的阻塞队列
         */
        BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(5);
        /**
         * 创建线程的工厂,可以为线程创建时起个好名字
         */
        ThreadFactory threadFactory = new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r);
            }
        };
        /**
         * 拒绝策略
         * 任务太多的时候会进行拒绝操作
         * 核心线程,非核心线程,任务队列都放不下时
         */
        // 自定义拒绝策略
        RejectedExecutionHandler defaultHandler1 = new MyRejectedExecutionHandler();
        // 默认策略,在需要拒绝任务时抛出RejectedExecutionException
        RejectedExecutionHandler defaultHandler3 = new ThreadPoolExecutor.AbortPolicy();
        // 直接在 execute 方法的调用线程中运行被拒绝的任务,如果线程池已经关闭,任务将被丢弃;
        RejectedExecutionHandler defaultHandler2 = new ThreadPoolExecutor.CallerRunsPolicy();
        // 直接丢弃任务
        RejectedExecutionHandler defaultHandler4 = new ThreadPoolExecutor.DiscardPolicy();
        // 丢弃队列中等待时间最长的任务,并执行当前提交的任务,如果线程池已经关闭,任务将被丢弃
        RejectedExecutionHandler defaultHandler5 = new ThreadPoolExecutor.DiscardOldestPolicy();
        /**
         * 创建线程池
         */
        ExecutorService service1 =  new ThreadPoolExecutor( corePoolSize, maximumPoolSize,keepAliveTime, 
                unit,workQueue,threadFactory,defaultHandler1);
        for (int i = 0; i < 10; i++) {
            System.out.println("添加第"+i+"个任务");
            service1.execute(new MyThread("线程"+i));
        }
        service1.shutdown();
    }
}

/**
 * 自定义拒绝策略
 */
class MyRejectedExecutionHandler implements RejectedExecutionHandler {
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        new Thread(r,"新线程"+new Random().nextInt(10)).start();
    }
}

/**
 * 线程类
 */
class MyThread implements Runnable {
    String name;
    public MyThread(String name) {
        this.name = name;
    }
    @Override
    public void run() {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("线程:"+Thread.currentThread().getName() +" 执行:"+name +"  run");
    }
}

六、Future 和 Callable 类使用创建线程池

package com.demo.threadPool;

import java.util.concurrent.*;

/**
 * Future 使用
 */
public class MainFuture {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(1);
        System.out.println("开始时间戳为:" + System.currentTimeMillis());
        Future<String> future = executorService.submit(new Test01());
        String result = future.get(); //获取计算结果。如果计算尚未完成,此方法会阻塞,直到计算完成或抛出异常
        boolean isdone = future.isDone();  //检查计算是否完成
        boolean cancel = future.cancel(true);  //尝试取消任务
        boolean iscancelled = future.isCancelled(); //检查任务是否被取消
        System.out.println("result:"+result);
        System.out.println("isdone:"+isdone);
        System.out.println("cancel:"+cancel);
        System.out.println("iscancelled:"+iscancelled);
        System.out.println("结束时间戳为:" + System.currentTimeMillis());
     executorService.shutdown();
    }
}
class Test01 implements Callable {
    @Override
    public Object call() throws Exception {
        return "你好";
    }
}

七、Spring 的 ThreadPoolTaskExecutor 类创建线程池

ThreadPoolTaskExecutor 是 Spring 框架提供的一个线程池实现,它扩展了 Java 的 ThreadPoolExecutor 并提供了一些额外的配置和功能
package com.cnpc.epai.assetcatalog.dmp.controller;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.ThreadPoolExecutor;
/**
 * 线程池配置类
 */
@Configuration
public class ConfigPoolConfiguration {
    @Bean("TaskExecutorDemo")
    public ThreadPoolTaskExecutor taskExecutorDemo(){
        ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
        threadPoolTaskExecutor.setCorePoolSize(10); // 核心线程数
        threadPoolTaskExecutor.setMaxPoolSize(20);// 最大线程数
        threadPoolTaskExecutor.setQueueCapacity(100); //工作队列
        threadPoolTaskExecutor.setKeepAliveSeconds(60); // 非核心线程的空闲存活时间
        threadPoolTaskExecutor.setAllowCoreThreadTimeOut(true);//指定是否允许核心线程超时。这允许动态增长和收缩,即使与非零队列结合使用也是如此(因为最大池大小只有在队列已满时才会增长)
        threadPoolTaskExecutor.setThreadNamePrefix("monitor-thread-pool-");// 设置线程名前缀
        threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());// 拒绝策略
        threadPoolTaskExecutor.setWaitForTasksToCompleteOnShutdown(true);// 设置线程池关闭时需要等待子任务执行完毕,才销毁对应的bean
        threadPoolTaskExecutor.initialize();//初始化线程池
        return threadPoolTaskExecutor;
    }
}

测试类

package com.cnpc.epai.assetcatalog.dmp.controller;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;

@Service
public class TestService {
    @Autowired
    private ThreadPoolTaskExecutor taskExecutor;
    @Async("taskExecutor")
    public void executeTask() {
        taskExecutor.execute(() -> {
            System.out.println("Executing task in thread: " + Thread.currentThread().getName());
        });
    }
}

总结 

到此这篇关于Java创建线程池的几种方式的文章就介绍到这了,更多相关Java创建线程池方式内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文