JAVA中关于多线程的使用详解
作者:我今晚不熬夜
多线程基础
1 概述
现代操作系统(Windows
,macOS
,Linux
)都可以执行多任务。多任务就是同时运行多个任务。例如:播放音乐的同时,浏览器可以进行文件下载,同时可以进行QQ消息的收发。
CPU
执行代码都是一条一条顺序执行的,但是,即使是单核CPU
,也可以同时运行多个任务。因为操作系统执行多任务实际上就是让CPU
对多个任务轮流交替执行。
操作系统轮流让多个任务交替执行,例如,让浏览器执行0.001
秒,让QQ执行0.001
秒,再让音乐播放器执行0.001
秒。在用户使用的体验看来,CPU
就是在同时执行多个任务。
1 进程与线程
程序:程序是含有指令和数据的文件,被存储在磁盘或其他的数据存储设备中,可以理解为程序是包含静态代码的文件。例如:浏览器软件、音乐播放器软件等软件的安装目录和文件。
进程:进程是程序的一次执行过程,是系统运行程序的基本单位。
线程:某些进程内部还需要同时执行多个子任务。例如,我们在使用WPS
时,WPS
可以让我们一边打字,一边进行拼写检查,同时还可以在后台进行自动保存和上传云文档,我们把子任务称为线程。线程是进程划分成的更小的运行单位。
进程和线程的关系就是:一个进程可以包含一个或多个线程,但至少会有一个主线程。
2 线程基本概念
单线程:单线程就是进程中只有一个线程。单线程在程序执行时,所走的程序路径按照连续顺序排下来,前面的必须处理好,后面的才会执行
多线程:由一个以上的线程组成的程序称为多线程程序。Java中,一定是从主线程开始执行(main方法),然后在主线程的某个位置创建并启动新的线程。
多线程的应用场景
- 软件中的耗时操作,拷贝的迁移文件,加载大量资源的时候
- 所有的后台服务器
- 所有的聊天软件
3 线程的创建方式
3. 1 方式一:继承java.lang.Thread类(线程子类)
public class Demo01 { public static void main(String[] args) { //1.创建一个Thread的子类,重写run方法 //2.创建子类对象 //3.调用start方法启动 MyThread myThread = new MyThread(); myThread.start(); for (int i = 0; i < 10; i++) { System.out.println("主线程执行任务:"+i); } } } class MyThread extends Thread { @Override public void run() { for (int i = 0; i < 10; i++) { System.out.println(getName()+"正在执行任务:"+i); } } }
3. 2 方式二:实现java.lang.Runnable接口(线程执行类)
public class Demo02 { public static void main(String[] args) { //方式2:java.lang.Runnable 接口实现多线程 //1.创建Runnable的实现类,重写run方法 //2.创建Runnable的实现类对象r1 //3.创建Thread类的对象,将r1作为构造方法的参数进行创建 //4.调用start方法启动线程 MyRun myRun = new MyRun(); Thread t1 = new Thread(myRun); t1.start(); for (int i = 0; i < 10; i++) { System.out.println("主线程执行任务:"+i); } } } class MyRun implements Runnable { public void run() { for (int i = 0; i < 10; i++) { System.out.println(Thread.currentThread().getName()+"线程执行任务:"+i); } } }
3. 3 方式三:实现java.util.concurrent.Callable接口,允许子线程返回结果、抛出异常
public class Demo03 { public static void main(String[] args) throws ExecutionException, InterruptedException { //方式3:Callable接口方式实现多线程 //步骤: //1.创建Callable的实现类,重写call方法 //2.创建Callable的实现类对象 //3.创建FutureTask对象,用来进行结果的管理操作 //4.创建Thread类的对象,将步骤3的对象作为参数传递 //5.启动线程 MyCallable callable1 = new MyCallable(1,10); FutureTask<Integer> futureTask1 = new FutureTask<Integer>(callable1); Thread thread1=new Thread(futureTask1); thread1.setName("线程1"); thread1.start(); MyCallable callable2 = new MyCallable(2,11); FutureTask<Integer> futureTask2 = new FutureTask<Integer>(callable2); Thread thread2=new Thread(futureTask2); thread2.setName("线程2"); thread2.start(); System.out.println("子线程C1和为:"+futureTask1.get()); System.out.println("子线程C2和为:"+futureTask2.get()); for (int i=1;i<=10;i++) { System.out.println("主线程:"+i); } } } class MyCallable implements Callable<Integer> { int begin,end; public MyCallable(int begin,int end) { this.begin = begin; this.end = end; } @Override public Integer call() throws Exception { int sum = 0; for (int i = begin; i <= end; i++) { sum += i; System.out.println(Thread.currentThread().getName()+"正在进行加"+i+"操作"); } return sum; } }
3. 4 方式四:线程池
public class Demo04 { public static void main(String[] args) { //使用线程池创建线程对象 ExecutorService ex= Executors.newCachedThreadPool(); ex.execute(new MyRun()); ex.execute(new MyRun()); } }
总结:
- Thread 编程比较简单,可以直接使用Thread类中的方法 可扩展性差
- Runnable 拓展性强,实现该接口后还可以继承其他的类再实现其他的接口
- Callable 拓展性强,实现该接口还可以继承其他类,可以有返回值
4 线程所拥有的方法
4.1 getName()获取线程名
public class Demo01 { public static void main(String[] args) { //获取当前线程 Thread t1=Thread.currentThread(); System.out.println("线程对象: "+t1); //获取线程名 getName(),如果没有为线程命名,系统会默认指定线程名,命名规则是Thread-N的形式 System.out.println("线程名:"+t1.getName()); } }
4.2 setName()设置线程名
public class Demo02 { public static void main(String[] args) { //给线程设置名字,setName() //让当前线程休眠:Thread.sleep(long ,min) 参数为毫秒值 MyThread t1 = new MyThread(); t1.setName("土豆"); t1.start(); //使用构造方法设置线程名 MyThread t2 = new MyThread("洋芋"); t2.start(); //Runnable作为参数 Runnable runnable = new Runnable() { @Override public void run() { System.out.println(Thread.currentThread().getName()+"线程正在启动"); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName()+"线程运行结束"); } }; Thread thread = new Thread(runnable); thread.setName("马铃薯"); thread.start(); } }
4.3 sleep()线程休眠
public class MyThread extends Thread { public MyThread(){} public MyThread(String name) { super(name); } @Override public void run() { System.out.println("当前线程为:"+getName()); for(int i=1; i<=5; i++){ try { //休眠1秒 Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); }finally { System.out.println(i); } } } }
4.4 setPriority(long newPriority) 设置线程优先级
public class Demo03 { public static void main(String[] args) { Runnable runnable = new Runnable() { public void run() { for (char i = 'a'; i <= 'g'; i++) { System.out.println(Thread.currentThread().getName() + ": " + i); } } }; Runnable runnable2 = new Runnable() { public void run() { for (char i = 'A'; i <= 'J'; i++) { System.out.println(Thread.currentThread().getName() + ": " + i); } } }; Thread t1=new Thread(runnable,"线程1"); Thread t2=new Thread(runnable2,"线程2"); //设置线程优先级 1~10, //注意:优先级高的线程抢占到资源的概率较大,但不一定优先抢占到资源 t1.setPriority(1); t2.setPriority(10); t1.start(); t2.start(); //获取线程优先级 System.out.println(t1.getName()+"的优先级为"+t1.getPriority()); System.out.println(t2.getName()+"的优先级为"+t2.getPriority()); System.out.println(Thread.currentThread().getName()+"的优先级为"+Thread.currentThread().getPriority()); } }
4.5 setDaemon设置守护线程
public class Demo04 { public static void main(String[] args) { Thread t1 = new Thread("线程1") { public void run() { for (char i = 'a'; i < 'z'; i++) { System.out.println(getName()+":"+i); } } }; Thread t2 = new Thread("线程2") { public void run() { for (char i = 'A'; i < 'Z'; i++) { System.out.println(getName()+":"+i); } } }; //t2线程设置为守护线程 //细节: 当其他的非守护线程执行完毕后,守护线程会陆陆续续的执行结束 t2.setDaemon(true); t1.start(); t2.start(); } }
4.6 yield()线程礼让
public class Demo05 { public static void main(String[] args) { Thread t1 = new Thread("线程1") { public void run() { for (char i = 'a'; i < 'z'; i++) { System.out.println(getName()+":"+i); Thread.yield(); } } }; Thread t2 = new Thread("线程2") { public void run() { Thread.yield(); for (char i = 'A'; i < 'Z'; i++) { System.out.println(getName()+":"+i); } } }; t1.start(); t2.start(); } }
4.7 join()线程插队
public class Demo06 { public static void main(String[] args) throws InterruptedException { Thread t1=new Thread(){ public void run(){ System.out.println("进到"+getName()+"之中"); for(int i=1;i<=100;i++){ System.out.println(i); } System.out.println("结束"); } }; t1.start(); t1.join(); //线程的插队,插入当前的线程的前面 //主线程执行的任务 for(char i='A';i<='z';i++){ System.out.println("main:"+i); } } }
4.8 interrupt()线程的中断
public class Demo07 { public static void main(String[] args) { Thread t1 = new Thread("线程1"){ public void run(){ //获取当前系统时间 long startTime = System.currentTimeMillis(); System.out.println("进入到"+getName()+"线程中"); try { Thread.sleep(3000); } catch (InterruptedException e) { System.out.println("中断"+getName()+"线程"); e.printStackTrace(); } System.out.println("结束"+getName()+"线程"); long endTime = System.currentTimeMillis(); System.out.println(getName()+"运行时间:"+(endTime-startTime)); } }; t1.start(); //让主线程休眠 System.out.println("main线程进入"); try { Thread.sleep(1000); } catch (InterruptedException e) { throw new RuntimeException(e); } //main主线程修改t1线程的中断状态=true //t1线程检测中断状态=true,则抛出InterruptedException,子线程执行结束 t1.interrupt(); } }
5 多线程同步
Synchronized
同步锁,简单来说,使用Synchronized
关键字将一段代码逻辑,用一把锁给锁起来,只有获得了这把锁的线程才访问。并且同一时刻, 只有一个线程能持有这把锁, 这样就保证了同一时刻只有一个线程能执行被锁住的代码,从而确保代码的线程安全。
看不懂?没有关系,我也看不懂,简单来说就是,如果有多个线程访问同一个资源,如果在不加锁的情况下,资源会混乱,比如有三个线程同时访问一个数number,并且修改他的值,那么这个值最终大概会达不到想要的结果,产生数据混乱,比如两个线程分别对一个数进行操作,其中一个加100次1,另外一个减100次1,预想的结果应该还是这个数本身,但如果不加锁,那么他极有可能变为别的值。
synchronized 关键字的用法
* 1.加锁: * 锁对象可以是任意类型的对象,必须要保证多条线程共用一个锁对象 * synchronized(锁对象){ * 操作的共享代码 * } * * 某条线程获取到了锁资源,锁关闭,当里面的任务执行完成,锁释放 * 默认情况下,锁是打开状态 * * 2.锁方法: * synchronized加到方法上,变成了锁方法,注意锁不能自己指定 * 修饰符 synchronized 返回值类型 方法名(){ * 操作的共享代码 * } * 锁:同步锁不能自己指定,普通方法锁是当前对象
1 修饰实例方法:
当使用synchronized
修饰实例方法时, 以下两种写法作用和意义相同:
方法1:
public class ShouMai extends Thread { public static int ticket = 1000; public static Object lock = new Object(); public ShouMai(String name) { super(name); } //请加锁,保证线程安全 @Override public void run() { while (true) { synchronized (ShouMai.class) { if (ticket > 0) { System.out.println(getName() + "正在售卖第" + (1000 - --ticket) + "张票"); } else { System.out.println(getName() + ":票已售罄"); break; } } } } }
方式2:
public class ShouMai1 extends Thread { public static int ticket = 1000; public ShouMai1(String name) { super(name); } //加锁,保证线程安全 @Override public void run() { while (true) { if(!shoumai()) break; } } public synchronized static boolean shoumai() { if (ticket > 0) { System.out.println(Thread.currentThread().getName() + "正在售卖第" + (1000 - --ticket) + "张票"); return true; } else { System.out.println(Thread.currentThread().getName() + ":票已售罄"); return false; } } }
2 修饰静态方法
public class ShouMai2Imp implements Runnable { public static int ticket=1000; @Override public void run() { while (true){ if(!shoumai()) break; } } public synchronized static boolean shoumai(){ if (ticket<=0) return false; System.out.println(Thread.currentThread().getName() + "正在售卖第" + (1000 - --ticket) + "张票"); return true; } }
3 修饰代码块
synchronized(自定义对象) { //临界区 }
lock锁的使用
在java中,可以是使用Object.lock()方法进行加锁,Object.unlock()方法进行解锁
public class ShouMai3 extends Thread { public static int ticket = 200; private static Lock lock = new ReentrantLock(); public ShouMai3(String name) { super(name); } @Override public void run() { while (true) { lock.lock(); try { if (ticket > 0) { System.out.println(getName() + "正在售卖第" + (200 - --ticket) + "张票"); } else { System.out.println(getName() + ":票已售罄"); break; } Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } } }
6 线程的状态
在整个线程的生命周期中,线程的状态有以下6
种:
New
:新建状态,新创建的线程,此时尚未调用start()
方法;Runnable
:运行状态,运行中的线程,已经调用start()
方法,线程正在或即将执行run()
方法;Blocked
:阻塞状态,运行中的线程,在等待竞争锁时,被阻塞,暂不执行;Waiting
:等待状态,运行中的线程,因为join()
等方法调用,进入等待;Timed Waiting
:计时等待状态,运行中的线程,因为执行sleep(等待毫秒值)
join(等待毫秒值)
等方法,进入计时等待;Terminated
:终止状态,线程已终止,因为run()
方法执行完毕。
当线程启动后,它可以在Runnable
、Blocked
、Waiting
和Timed Waiting
这几个状态之间切换,直到最后变成Terminated
状态,线程终止
1 NEW
新建状态,新创建的线程,此时尚未调用start()方法
public class Demo01 { public static void main(String[] args) { Thread thread=new Thread(); System.out.println(thread.getState()); } }
2 RUNNABLE
运行状态,运行中的线程,已经调用start()方法,线程正在或即将执行run方法
public class Demo02 { public static void main(String[] args) { Thread t1=new Thread(){ @Override public void run() { for (int i = 0; i < 10; i++) { System.out.println("正在执行任务"); } } }; //启动线程 t1.start(); System.out.println(t1.getState()); } }
3 Terminated
终止状态,线程已终止,因为run()方法执行完毕
public class Demo03 { public static void main(String[] args) throws InterruptedException { Thread thread = new Thread(new Runnable() { @Override public void run() { System.out.println("正在执行任务"); } }); //启动线程 thread.start(); //等待子线程完成任务 Thread.sleep(500); //获取thread线程状态 System.out.println(thread.getState()); } }
4 BLOCKED与TIMED_WAITING
- BLOCKED 阻塞状态,运行中的线程,在等待竞争锁时,被阻塞,暂不执行
- TIMED_WAITING 计时等待状态,运行中的线程,因为执行sleep(等待毫秒值)
sleep() 不释放锁资源
wait() 释放锁资源 -- Object中的方法,wait()调用时对象必须要和锁对象一样
public class Demo04 { public static void main(String[] args) throws InterruptedException { Object lock = new Object(); Runnable runnable = new Runnable() { @Override public void run() { synchronized (lock) { //如果某条线程获取到锁资源,任务一直执行,不释放锁资源 while (true) { //死循环 try { Thread.sleep(1000); // lock.wait(1000); } catch (InterruptedException e) { e.printStackTrace(); } break; } } } }; Thread thread1 = new Thread(runnable,"线程1"); Thread thread2 = new Thread(runnable,"线程2"); thread1.start(); thread2.start(); //让主线程休眠500毫秒,让t1和t2同时竞争一个锁资源 Thread.sleep(500); System.out.println(thread1.getName()+":"+thread1.getState()); System.out.println(thread2.getName()+":"+thread2.getState()); } }
5 WAITING
等待状态,运行中的线程,因为join()
等方法调用,进入等待;
public class Demo05 { public static void main(String[] args) throws InterruptedException { Object lock = new Object(); Runnable runnable = new Runnable() { @Override public void run() { synchronized (lock) { try { lock.wait(); } catch (InterruptedException e) { throw new RuntimeException(e); } } } }; Thread thread = new Thread(runnable); thread.start(); Thread.sleep(500); System.out.println(thread.getState()); } }
public class Demo06 { public static void main(String[] args) throws InterruptedException { Thread outThread = new Thread(new Runnable() { public void run() { System.out.println("外部线程启动"); //在内部又创建了一个线程 Thread innerThread = new Thread(new Runnable() { @Override public void run() { System.out.println("内部线程启动"); //innerThread任务一直执行 while (true){ try { Thread.sleep(2000); break; } catch (InterruptedException e) { throw new RuntimeException(e); } } } }); innerThread.start(); try { innerThread.join(); //innerThread插队,插队插到outThread前面,outThread此时处于等待innerThread完成任务状态,所以outThread等待 } catch (InterruptedException e) { throw new RuntimeException(e); } } }); outThread.start(); //线程启动 //让主线程休眠,目的时为了让outThread运行起来 Thread.sleep(100); //获取outThread的状态 System.out.println(outThread.getState()); } }
7 线程池
概念
线程池内部维护了若干个线程,没有任务的时候,这些线程都处于等待空闲状态。如果有新的线程任务,就分配一个空闲线程执行。如果所有线程都处于忙碌状态,线程池会创建一个新线程进行处理或者放入队列(工作队列)中等待。
┌─────┐ execute ┌──────────────────┐ │Task1│─────────>│ThreadPool │ ├─────┤ │┌───────┐┌───────┐│ │Task2│ ││Thread1││Thread2││ ├─────┤ │└───────┘└───────┘│ │Task3│ │┌───────┐┌───────┐│ ├─────┤ ││Thread3││Thread4││ │Task4│ │└───────┘└───────┘│ ├─────┤ └──────────────────┘ │Task5│ ├─────┤ │Task6│ └─────┘ ...
线程池常用方法
- 执行无返回值的线程任务:
void execute(Runnable command);
- 提交有返回值的线程任务:
Future<T> submit(Callable<T> task);
- 关闭线程池:
void shutdown();
或shutdownNow();
- 等待线程池关闭:
boolean awaitTermination(long timeout, TimeUnit unit);
执行线程任务
execute()
只能提交Runnable
类型的任务,没有返回值,而submit()
既能提交Runnable
类型任务也能提交Callable
类型任务,可以返回Future
类型结果,用于获取线程任务执行结果。
execute()
方法提交的任务异常是直接抛出的,而submit()
方法是捕获异常,当调用Future
的get()
方法获取返回值时,才会抛出异常。
线程池分类
Java
标准库提供的几种常用线程池,创建这些线程池的方法都被封装到Executors
工具类中。
- FixedThreadPool:线程数固定的线程池,使用
Executors.newFixedThreadPool()
创建; - CachedThreadPool:线程数根据任务动态调整的线程池,使用
Executors.newCachedThreadPool()
创建; - SingleThreadExecutor:仅提供一个单线程的线程池,使用
Executors.newSingleThreadExecutor()
创建; - ScheduledThreadPool:能实现定时、周期性任务的线程池,使用
Executors.newScheduledThreadPool()
创建;
FixedThreadPool
public class Demo01 { public static void main(String[] args) throws InterruptedException { //1.获取线程池对象 ExecutorService executorService= Executors.newFixedThreadPool(3); //2.提交任务给线程池对象 executorService.execute(new MyRunnable("任务1")); executorService.execute(new MyRunnable("任务2")); executorService.execute(new MyRunnable("任务3")); Thread.sleep(100); //主线程休眠 executorService.execute(new MyRunnable("任务4")); //3.线程池的关闭方法 线程池在程序结束的时候要关闭。使用shutdown()方法关闭线程池的时候,它会等待正在执行的任务先完成,然后再关闭。shutdownNow()会立刻停止正在执行的任务; executorService.shutdown(); // executorService.shutdownNow(); /* 当使用awaitTermination()方法时,主线程会处于一种等待的状态,按照指定的timeout检查线程池。 第一个参数指定的是时间,第二个参数指定的是时间单位(当前是秒)。返回值类型为boolean型。 如果等待的时间超过指定的时间,但是线程池中的线程运行完毕,awaitTermination()返回true。 如果等待的时间超过指定的时间,但是线程池中的线程未运行完毕,awaitTermination()返回false。 如果等待时间没有超过指定时间,则继续等待。 */ while (!executorService.awaitTermination(1, TimeUnit.SECONDS)){ System.out.println("还没关闭线程池"); }; System.out.println("已经关闭线程池"); } } class MyRunnable implements Runnable{ public String str; public MyRunnable(String str) { this.str = str; } @Override public void run() { System.out.println(Thread.currentThread().getName()+" 开始执行"+str); try { Thread.sleep(1000); } catch (InterruptedException e) { throw new RuntimeException(e); } // for (int i = 0; i < 10; i++) { // System.out.println(Thread.currentThread().getName()+":"+i); // } System.out.println(Thread.currentThread().getName()+" 结束"+str); } }
CachedThreadPool
public class Demo02 { public static void main(String[] args) throws InterruptedException { //创建一个线程数量无上限的线程池对象 ExecutorService executorService= Executors.newCachedThreadPool(); for (int i = 1; i <= 100; i++) { executorService.execute(new MyRunnable("任务"+i)); } Thread.sleep(2000); executorService.execute(new MyRunnable("最后一个任务")); executorService.shutdown(); } }
SingleThreadExecutor
public class Demo03 { public static void main(String[] args) { ExecutorService executorService= Executors.newSingleThreadExecutor(); //循环的提交任务 for (int i = 1; i < 10; i++) { executorService.execute(new MyRunnable("任务"+i)); } executorService.shutdown(); } }
ScheduledThreadPool
public class Demo04 { public static void main(String[] args) { ScheduledExecutorService executorService= Executors.newScheduledThreadPool(2); Runnable runnable = new Runnable() { @Override public void run() { System.out.println(Thread.currentThread().getName()+"开始执行"); try { Thread.sleep(3000); } catch (InterruptedException e) { throw new RuntimeException(e); } System.out.println(Thread.currentThread().getName()+"执行结束"); } }; //2.提交任务 --延时多久进行任务的执行 //schedule(Runnable command,long delay, TimeUnit unit) 任务对象 延迟时间 时间单位 // executorService.schedule(runnable,2,TimeUnit.SECONDS); //scheduleAtFixedRate 延时1秒首次执行此任务,每隔2秒进行此任务的执行 executorService.scheduleAtFixedRate(runnable, 1, 2, TimeUnit.SECONDS); //上一次任务执行完毕后,等待固定的时间间隔,再进行下一次任务 // executorService.scheduleWithFixedDelay(runnable, 2, 3, TimeUnit.SECONDS); } }
自定义线程池
在《阿里巴巴java开发手册》中指出了线程资源必须通过线程池提供,不允许在应用中自行显示的创建线程,这样一方面是线程的创建更加规范,可以合理控制开辟线程的数量;另一方面线程的细节管理交给线程池处理,优化了资源的开销。而线程池不允许使用Executors
去创建,而要通过ThreadPoolExecutor
方式。jdk
中Executor
框架虽然提供了如newFixedThreadPool()
、newSingleThreadExecutor()
、newCachedThreadPool()
等创建线程池的方法,但都有其局限性,不够灵活;另外由于前面几种方法内部也是通过new ThreadPoolExecutor
方式实现,使用ThreadPoolExecutor
有助于大家明确线程池的运行规则,创建符合自己的业务场景需要的线程池,避免资源耗尽的风险。
必须为线程池中的线程,按照业务规则,进行命名。可以在创建线程池时,使用自定义线程工厂规范线程命名方式,避免线程使用默认名称。
如何创建一个线程池?
public ThreadPoolExecutor(int corePoolSize, 核心线程数 >=0 int maximumPoolSize, 最大线程数(核心+临时) >corePoolSize long keepAliveTime, 临时线程存活时间 TimeUnit unit, 存活的时间单位 BlockingQueue<Runnable> workQueue, 等待队列 ThreadFactory threadFactory, 线程工厂 RejectedExecutionHandler handler) 拒绝策略 拒绝策略 ThreadPoolExecutor.AbortPolicy() 默认拒绝策略--抛异常 ThreadPoolExecutor.DiscardOldestPolicy() 丢弃等待时间最久的任务 ThreadPoolExecutor.DiscardPolicy() 丢弃当前的任务 ThreadPoolExecutor.CallerRunsPolicy() 让当前的线程进行任务的执行
ExecutorService executorService = new ThreadPoolExecutor(3, 5, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(3), Executors.defaultThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy()); //提交任务 executorService.submit(new MyRunnable("任务1")); //提交一个任务给executorService对象 executorService.submit(new MyRunnable("任务2")); executorService.submit(new MyRunnable("任务3")); executorService.submit(new MyRunnable("任务4")); //核心线程已满,进入等待队列 executorService.submit(new MyRunnable("任务5")); executorService.submit(new MyRunnable("任务6")); executorService.submit(new MyRunnable("任务7")); //核心线程已满,等待队列已满,进入临时线程 executorService.submit(new MyRunnable("任务8")); executorService.submit(new MyRunnable("任务9")); //核心线程已满,等待队列已满,临时线程已满,抛出异常 executorService.shutdown();
自定义线程工厂与拒绝策略
自定义线程工厂
class MyThreadFactory implements ThreadFactory { //线程池名称前缀 String namePrefix; public MyThreadFactory(String namePrefix) { this.namePrefix = namePrefix; } //具备原子性的Integer类型 private AtomicInteger threadNumber = new AtomicInteger(1); @Override public Thread newThread(Runnable r) { return new Thread(r,namePrefix+"线程"+threadNumber.getAndIncrement()); } }
主要实现ThreadFactory接口中的newThread方法,我写的这个方法也就是对线程池与线程池中的线程进行了重命名
自定义拒绝策略
class MyRejectedExecutionHandler implements RejectedExecutionHandler { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { System.out.println("哎呦,达到能执行任务的上限了,(宝ᴗ宝)做不到啊"); } }
在这里呢我自定义了一个拒绝策略,当有一个线程进入到线程池后,核心线程,线程队列,临时线程等都没有空闲位置时,会直接输出一段话,线程不会进入线程池之中。
线程池的状态
线程池的状态分为:RUNNING , SHUTDOWN , STOP , TIDYING , TERMINATED
RUNNING:运行状态,线程池被一旦被创建,就处于RUNNING
状态,并且线程池中的任务数为0
。该状态的线程池会接收新任务,并处理工作队列中的任务。
- 调用线程池的
shutdown()
方法,可以切换到SHUTDOWN关闭状态; - 调用线程池的
shutdownNow()
方法,可以切换到STOP停止状态;
SHUTDOWN :关闭状态,该状态的线程池不会接收新任务,但会处理工作队列中的任务;当工作队列为空时,并且线程池中执行的任务也为空时,线程池进入TIDYING状态;
STOP:停止状态,该状态的线程不会接收新任务,也不会处理阻塞队列中的任务,而且会中断正在运行 的任务;线程池中执行的任务为空,进入TIDYING状态;
TIDYING :整理状态,该状态表明所有的任务已经运行终止,记录的任务数量为0
;terminated()
执行完毕,进入TERMINATED
状态;
TERMINATED : 终止状态,该状态表示线程池彻底关闭。
线程池的优点
- 提高响应的速度
- 提高线程的可管理性
- 降低资源消耗(线程执行完任务,不销毁,可以执行其他的任务)
线程池核心代码
线程池相关的接口和实现类
线程池技术,是由2
个核心接口和一组实现类组成。
Executor
接口作为线程池技术中的顶层接口,它的作用是用来定义线程池中,用于提交并执行线程任务的核心方法:exuecte()
方法。未来线程池中所有的线程任务,都将由exuecte()
方法来执行。
ExecutorService
接口继承了Executor
接口,扩展了awaitTermination()
、submit()
、shutdown()
等专门用于管理线程任务的方法。
ExecutorService
接口的抽象实现类AbstractExecutorService
,为不同的线程池实现类,提供submit()
、invokeAll()
等部分方法的公共实现。但是由于在不同线程池中的核心方法exuecte()
执行策略不同,所以在AbstractExecutorService
并未提供该方法的具体实现。
AbstractExecutorService
有两个常见的子类ThreadPoolExecutor
和ForkJoinPool
,用于实现不同的线程池。
ThreadPoolExecutor
线程池通过Woker
工作线程、BlockingQueue
阻塞工作队列 以及 拒绝策略实现了一个标准的线程池;
ForkJoinPool
是一个基于分治思想的线程池实现类,通过分叉(fork
)合并(join
)的方式,将一个大任务拆分成多个小任务,并且为每个工作线程提供一个工作队列,减少竞争,实现并行的线程任务执行方式,所以ForkJoinPool适合计算密集型场景,是ThreadPoolExecutor线程池的一种补充。
ScheduledThreadPoolExecutor
类是ThreadPoolExecutor
类的子类,按照时间周期执行线程任务的线程池实现类,通常用于作业调度相关的业务场景。由于该线程池的工作队列使用DelayedWorkQueue
,这是一个按照任务执行时间进行排序的优先级工作队列,所以这也是ScheduledThreadPoolExecutor
线程池能按照时间周期来执行线程任务的主要原因。
工作线程Worker类
每个Woker
类的对象,都代表线程池中的一个工作线程。
当ThreadPoolExecutor
线程池,通过exeute()
方法执行1
个线程任务时,会调用addWorker()
方法创建一个Woker
工作线程对象。并且,创建好的Worker
工作线程对象,会被添加到一个HashSet<Worker> workders
工作线程集合,统一由线程池进行管理。
通过阅读源代码,可以看出Worker
类是ThreadPoolExecutor
类中定义的一个私有内部类,保存了每个Worker
工作线程要执行的Runnable
线程任务和Thread
线程对象。
当创建Worker
工作线程时,会通过构造方法保存Runnable
线程任务,同时使用ThreadFactory
线程工厂,为该工作线程创建一个Thread
线程对象。通过这样的操作,每个Worker
工作线程对象,都将绑定一个真正的Thread
线程。
另外,当Thread
线程被JVM
调度执行时,线程将会自动执行Worker
工作线程对象的run()
方法,通过调用runWorker()
方法,最终实现Woker
工作线程中所保存的Runnable
线程任务的执行。
值得重视的是:当Worker
工作线程,在第一次执行完成线程任务后,这个Worker
工作线程并不会销毁,而是会以循环的方式,通过线程池的getTask()
方法,获取阻塞工作队列中新的Runnable
线程任务,并通过当前Worker
工作线程中所绑定Thread
线程,完成新线程任务的执行,从而实现了线程池的中Thread
线程的重复使用。
核心方法:execute()方法
ThreadPoolExecutor
线程池中,会通过execute(Runnable command)
方法执行Runnable
类型的线程任务。
完整实现了Executor
接口定义execute()
方法,这个方法作用是执行一个Runnable
类型的线程任务。整体的执行流程是:
- 首先,通过
AtomicInteger
类型的ctl
对象,获取线程池的状态和工作线程数; - 然后,判断当前线程池中的工作线程数;
- 如果,工作线程的数量小于核心线程数,则通过
addWorker()
方法,创建新的Worker
工作线程,并添加至workers
工作线程集合; - 如果,工作线程的数量大于核心线程数,并且线程池处于
RUNNING
状态,那么,线程池会将Runnable
类型的线程任务,缓存至workQueue
阻塞工作队列,等待某个空闲工作线程获取并执行该任务; - 如果,
workQueue
工作队列缓存线程任务失败,代表工作队列已满。那么,线程池会重新通过addWorker()
方法,尝试创建新的工作线程; - 这次创建时,会判断工作线程数是否超出最大线程数。如果没有超出,会创建新的工作线程;如果已经超出,则返回
false
,代表创建失败; - 如果创建失败,线程池执行拒绝策略;
public class ThreadPoolExecutor { // 线程池执行Runnable线程任务 public void execute(Runnable command) { if (command == null) throw new NullPointerException(); // 获取线程池的状态和工作线程数 int c = ctl.get(); // 工作线程的数量小于核心线程数 if (workerCountOf(c) < corePoolSize) { // 创建新的Worker工作线程 if (addWorker(command, true)) return; // 创建失败,重新获取线程池的状态和工作线程数 c = ctl.get(); } // 如果线程池处于RUNNING状态,缓存线程任务至工作队列 if (isRunning(c) && workQueue.offer(command)) { // 任务缓存成功 // 重新获取线程池的状态和工作线程数 int recheck = ctl.get(); // 如果线程池不是处于RUNNING状态,则删除任务 if (! isRunning(recheck) && remove(command)) // 执行拒绝策略 reject(command); // 如果工作线程数等于零 // 通过addWorker()方法检查线程池状态和工作队列 else if (workerCountOf(recheck) == 0) addWorker(null, false); } // 如果缓存线程任务至工作队列 // 尝试创建新的工作线程 // 创建时,判断工作线程数是否超出最大线程数 // 如果没有超出,创建成功 // 如果已经超出,创建失败 else if (!addWorker(command, false)) // 执行拒绝策略 reject(command); } }
核心方法:addWorker()方法
在execute()
方法的执行过程中,会通过addWorker()
方法创建一个工作线程,用于执行当前线程任务。
阅读源代码,会发现,这个方法的整个执行过程可以分为两个部分:检查线程池的状态和工作线程数量、创建并执行工作线程。
1 检查线程池的状态和工作线程数量
private boolean addWorker(Runnable firstTask, boolean core) { // 第1部分:检查线程池的状态和工作线程数量 // 循环检查线程池的状态,直到符合创建工作线程的条件,通过retry标签break退出 retry: for (;;) { // 通过ctl对象,获取当前线程池的运行状态 int c = ctl.get(); int rs = runStateOf(c); // 如果线程池处于开始关闭的状态(获取线程任务为空,同时工作队列不等于空) // 则工作线程创建失败 if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; // 检查工作线程数量 for (;;) { // 通过ctl对象,获取当前线程池中工作线程数量 int wc = workerCountOf(c); // 工作线程数量如果超出最大容量或者核心线程数(最大线程数) // 则工作线程创建失败 if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; // 通过ctl对象,将当前工作线程数量+1,并通过retry标签break退出外层循环 if (compareAndIncrementWorkerCount(c)) break retry; // 再次获取线程池状态,检查是否发生变化 c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } // 第2部分:创建并执行工作线程.... }
2 创建并执行工作线程
private boolean addWorker(Runnable firstTask, boolean core) { // 第1部分:检查线程池的状态和工作线程数量.... // 第2部分:创建并执行工作线程.... boolean workerStarted = false; // 工作线程是否已经启动 boolean workerAdded = false; // 工作线程是否已经保存 Worker w = null; try { // 创建新工作线程,并通过线程工厂创建Thread线程 w = new Worker(firstTask); // 获取新工作线程的Thread线程对象,用于启动真正的线程 final Thread t = w.thread; if (t != null) { // 获取线程池的ReentrantLock主锁对象 // 确保在添加和启动线程时的同步与安全 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 检查线程池状态 int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { // 检查Thread线程对象的状态是否已经处于启动状态 if (t.isAlive()) throw new IllegalThreadStateException(); // 保存工作线程 workers.add(w); // 记录线程池曾经达到过的最大工作线程数量 int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } // 添加工作线程后,正式启动线程 if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } // 返回线程启动状态 return workerStarted; }
到此这篇关于JAVA中关于多线程的学习和使用的文章就介绍到这了,更多相关java多线程使用内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!