Java线程间通讯的几种方法小结
作者:顽石九变
一、使用同一个共享变量控制
1、Synchronized、wait、notify
public class Demo1 { private final List<Integer> list =new ArrayList<>(); public static void main(String[] args) { Demo1 demo =new Demo1(); new Thread(()->{ for (int i=0;i<10;i++){ synchronized (demo.list){ if(demo.list.size()%2==1){ try { demo.list.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } demo.list.add(i); System.out.print(Thread.currentThread().getName()); System.out.println(demo.list); demo.list.notify(); } } }).start(); new Thread(()->{ for (int i=0;i<10;i++){ synchronized (demo.list){ if(demo.list.size()%2==0){ try { demo.list.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } demo.list.add(i); System.out.print(Thread.currentThread().getName()); System.out.println(demo.list); demo.list.notify(); } } }).start(); } }
这段代码演示了如何使用synchronized
、wait
和notify
来实现两个线程间的通信,以确保它们交替地向一个ArrayList
中添加数字。以下是代码的详细解释:
类定义:
Demo1
类中有一个私有的、不可变的(由final
修饰)成员变量list
,它是ArrayList<Integer>
类型的。
主函数:
- 在
main
方法中,首先创建了Demo1
的一个实例demo
。 - 然后启动了两个线程,每个线程都执行一个特定的任务。
- 在
第一个线程的任务:
- 使用一个for循环,循环10次。
- 在每次循环中,首先获得
demo.list
的锁,这是通过synchronized (demo.list)
实现的。 - 检查当前列表的大小是否为奇数(通过
demo.list.size()%2==1
)。如果是,则调用demo.list.wait()
使当前线程进入等待状态,并释放demo.list
的锁,这样其他线程可以获取该锁并执行其任务。 - 当线程从等待状态被唤醒时(通过另一个线程的
notify
调用),它会继续执行,并将当前的数字添加到列表中。 - 打印当前线程的名称和更新后的列表。
- 通过调用
demo.list.notify()
唤醒可能正在等待的另一个线程。
第二个线程的任务:
- 它的工作方式与第一个线程非常相似,但有一个关键的区别:它检查列表的大小是否为偶数,并在这种情况下使线程进入等待状态。
交替执行:
- 由于两个线程的工作方式,它们将交替地向列表中添加数字。当一个线程发现列表的大小是其期望的(奇数或偶数)时,它会暂停并等待另一个线程添加一个数字。然后,它会被另一个线程的
notify
调用唤醒,继续其执行,并再次使另一个线程等待。
- 由于两个线程的工作方式,它们将交替地向列表中添加数字。当一个线程发现列表的大小是其期望的(奇数或偶数)时,它会暂停并等待另一个线程添加一个数字。然后,它会被另一个线程的
注意事项:
- 使用
wait
和notify
时,必须在同步块或方法中这样做,否则会抛出IllegalMonitorStateException
。 - 当多个线程可能访问共享资源(在这里是
demo.list
)时,使用同步是必要的,以确保数据的完整性和一致性。 - 虽然在这个特定的例子中只有两个线程,但这种方法可以扩展到更多的线程,只要它们遵循相同的通信和同步协议。
- 使用
潜在问题:
- 这个代码可能存在一个潜在的问题,即“假唤醒”。理论上,一个线程可能会无故地(或由于系统中的其他原因)从
wait
方法中唤醒,即使没有其他线程明确地调用了notify
或notifyAll
。为了避免这种情况导致的问题,通常在wait
调用周围使用一个循环来检查预期的条件是否仍然成立。如果条件不满足,则继续等待。这通常被称为“条件变量”的使用模式。
- 这个代码可能存在一个潜在的问题,即“假唤醒”。理论上,一个线程可能会无故地(或由于系统中的其他原因)从
2、Lock、Condition
import java.util.ArrayList; import java.util.List; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class Task { // 创建一个可重入锁,用于同步访问共享资源(即列表) private final Lock lock = new ReentrantLock(); // 创建两个条件变量,一个用于表示列表未满,一个用于表示列表非空 private final Condition notFull = lock.newCondition(); private final Condition notEmpty = lock.newCondition(); // 定义列表的最大容量 private static final int MAX_SIZE = 10; // 创建一个ArrayList作为共享资源,用于在两个线程之间传递数据 private final List<String> list = new ArrayList<>(MAX_SIZE); // add方法用于向列表中添加元素 public void add() { for (int i = 0; i < 10; i++) { lock.lock(); // 获取锁,开始同步代码块 try { // 如果列表已满,则当前线程等待,直到其他线程从列表中移除元素 while (list.size() == MAX_SIZE) { notFull.await(); // 等待列表不满的条件成立 } // 模拟耗时操作(比如网络请求或数据处理) Thread.sleep(100); // 向列表中添加一个新元素,并打印相关信息 list.add("add " + (i + 1)); System.out.println("The list size is " + list.size()); System.out.println("The add thread is " + Thread.currentThread().getName()); System.out.println("-------------"); // 通知可能在等待的移除线程,现在列表不为空,可以执行移除操作了 notEmpty.signal(); } catch (InterruptedException e) { // 打印异常信息,实际开发中可能需要更复杂的错误处理逻辑 e.printStackTrace(); } finally { lock.unlock(); // 释放锁,允许其他线程访问同步代码块 } } } // sub方法用于从列表中移除元素 public void sub() { for (int i = 0; i < 10; i++) { lock.lock(); // 获取锁,开始同步代码块 try { // 如果列表为空,则当前线程等待,直到其他线程向列表中添加元素 while (list.isEmpty()) { notEmpty.await(); // 等待列表非空的条件成立 } // 模拟耗时操作(比如网络请求或数据处理) Thread.sleep(100); // 从列表中移除第一个元素,并打印相关信息 list.remove(0); System.out.println("The list size is " + list.size()); System.out.println("The sub thread is " + Thread.currentThread().getName()); System.out.println("-------------"); // 通知可能在等待的添加线程,现在列表不满,可以执行添加操作了 notFull.signal(); } catch (InterruptedException e) { // 打印异常信息,实际开发中可能需要更复杂的错误处理逻辑 e.printStackTrace(); } finally { lock.unlock(); // 释放锁,允许其他线程访问同步代码块 } } } // main方法作为程序的入口点,创建Task对象并启动两个线程来执行add和sub方法 public static void main(String[] args) { Task task = new Task(); // 创建Task对象,它包含共享资源和同步机制 // 使用Lambda表达式和方法引用启动两个线程,分别执行add和sub方法,并为它们设置名称以便区分输出中的信息来源 new Thread(task::add, "AddThread").start(); // 启动添加线程 new Thread(task::sub, "SubThread").start(); // 启动移除线程 } }
这段代码定义了一个名为Task
的类,它主要实现了线程安全的列表添加和移除操作。类内部使用了java.util.concurrent.locks
包下的ReentrantLock
可重入锁以及相关的Condition
条件变量来同步访问共享资源(即一个ArrayList
)。
在Task
类中,有两个主要的方法:add
和sub
。add
方法用于向列表中添加元素,而sub
方法用于从列表中移除元素。这两个方法在被调用时都需要获取锁,以确保同一时间只有一个线程可以访问共享资源。
当添加线程调用add
方法时,它首先检查列表是否已满。如果已满,则通过调用notFull.await()
使当前线程等待,直到其他线程从列表中移除元素并发出通知。一旦列表不满,添加线程就会向列表中添加一个新元素,并通过调用notEmpty.signal()
通知可能在等待的移除线程。
类似地,当移除线程调用sub
方法时,它首先检查列表是否为空。如果为空,则通过调用notEmpty.await()
使当前线程等待,直到其他线程向列表中添加元素并发出通知。一旦列表非空,移除线程就会从列表中移除一个元素,并通过调用notFull.signal()
通知可能在等待的添加线程。
这种使用锁和条件变量的方式实现了线程间的同步和通信,确保了共享资源(即列表)在任何时候都不会被多个线程同时修改,从而避免了数据竞争和不一致的问题。同时,通过条件变量的等待和通知机制,有效地协调了添加线程和移除线程的执行顺序,使得它们能够按照预期的方式交替进行添加和移除操作。
3、利用volatile
volatile修饰的变量值直接存在主内存里面,子线程对该变量的读写直接写住内存,而不是像其它变量一样在local thread里面产生一份copy。volatile能保证所修饰的变量对于多个线程可见性,即只要被修改,其它线程读到的一定是最新的值。
public class Demo2 { private volatile List<Integer> list =new ArrayList<>(); public static void main(String[] args) { Demo2 demo =new Demo2(); new Thread(()->{ for (int i=0;i<10;i++){ demo.list.add(i); System.out.print(Thread.currentThread().getName()); System.out.println(demo.list); } }).start(); new Thread(()->{ for (int i=0;i<10;i++){ demo.list.add(i); System.out.print(Thread.currentThread().getName()); System.out.println(demo.list); } }).start(); } }
4、利用AtomicInteger
和volatile类似
二、PipedInputStream、PipedOutputStream
这里用流在两个线程间通信,但是Java中的Stream是单向的,所以在两个线程中分别建了一个input和output
public class PipedDemo { private final PipedInputStream inputStream1; private final PipedOutputStream outputStream1; private final PipedInputStream inputStream2; private final PipedOutputStream outputStream2; public PipedDemo(){ inputStream1 = new PipedInputStream(); outputStream1 = new PipedOutputStream(); inputStream2 = new PipedInputStream(); outputStream2 = new PipedOutputStream(); try { inputStream1.connect(outputStream2); inputStream2.connect(outputStream1); } catch (IOException e) { e.printStackTrace(); } } /**程序退出时,需要关闭stream*/ public void shutdown() throws IOException { inputStream1.close(); inputStream2.close(); outputStream1.close(); outputStream2.close(); } public static void main(String[] args) throws IOException { PipedDemo demo =new PipedDemo(); new Thread(()->{ PipedInputStream in = demo.inputStream2; PipedOutputStream out = demo.outputStream2; for (int i = 0; i < 10; i++) { try { byte[] inArr = new byte[2]; in.read(inArr); System.out.print(Thread.currentThread().getName()+": "+i+" "); System.out.println(new String(inArr)); while(true){ if("go".equals(new String(inArr))) break; } out.write("ok".getBytes()); } catch (IOException e) { e.printStackTrace(); } } }).start(); new Thread(()->{ PipedInputStream in = demo.inputStream1; PipedOutputStream out = demo.outputStream1; for (int i = 0; i < 10; i++) { try { out.write("go".getBytes()); byte[] inArr = new byte[2]; in.read(inArr); System.out.print(Thread.currentThread().getName()+": "+i+" "); System.out.println(new String(inArr)); while(true){ if("ok".equals(new String(inArr))) break; } } catch (IOException e) { e.printStackTrace(); } } }).start(); // demo.shutdown(); } }
输出
Thread-0: 0 go
Thread-1: 0 ok
Thread-0: 1 go
Thread-1: 1 ok
Thread-0: 2 go
Thread-1: 2 ok
Thread-0: 3 go
Thread-1: 3 ok
Thread-0: 4 go
Thread-1: 4 ok
Thread-0: 5 go
Thread-1: 5 ok
Thread-0: 6 go
Thread-1: 6 ok
Thread-0: 7 go
Thread-1: 7 ok
Thread-0: 8 go
Thread-1: 8 ok
Thread-0: 9 go
Thread-1: 9 ok
三、利用BlockingQueue
BlockingQueue定义的常用方法如下:
- add(Object):把Object加到BlockingQueue里,如果BlockingQueue可以容纳,则返回true,否则抛出异常。
- offer(Object):表示如果可能的话,将Object加到BlockingQueue里,即如果BlockingQueue可以容纳,则返回true,否则返回false。
- put(Object):把Object加到BlockingQueue里,如果BlockingQueue没有空间,则调用此方法的线程被阻断直到BlockingQueue里有空间再继续。
- poll(time):获取并删除BlockingQueue里排在首位的对象,若不能立即取出,则可以等time参数规定的时间,取不到时返回null。当不传入time值时,立刻返回。
- peek():立刻获取BlockingQueue里排在首位的对象,但不从队列里删除,如果队列为空,则返回null。
- take():获取并删除BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到BlockingQueue有新的对象被加入为止。
BlockingQueue有四个具体的实现类:
- ArrayBlockingQueue:数组阻塞队列,规定大小,其构造函数必须带一个int参数来指明其大小。其所含的对象是以FIFO(先入先出)顺序排序的。
- LinkedBlockingQueue:链阻塞队列,大小不定,若其构造函数带一个规定大小的参数,生成的BlockingQueue有大小限制,若不带大小参数,所生成的BlockingQueue的大小由Integer.MAX_VALUE来决定。其所含的对象是以FIFO顺序排序的。
- PriorityBlockingQueue:类似于LinkedBlockingQueue,但其所含对象的排序不是FIFO,而是依据对象的自然排序顺序或者是构造函数所带的Comparator决定的顺序。
- SynchronousQueue:特殊的BlockingQueue,它的内部同时只能够容纳单个元素,对其的操作必须是放和取交替完成的。
- DelayQueue:延迟队列,注入其中的元素必须实现 java.util.concurrent.Delayed 接口
所有BlockingQueue的使用方式类似,以下例子一个线程写入,一个线程读取,操作的是同一个Queue:
public class BlockingQueueDemo { public static void main(String[] args) { LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>(); //读线程 new Thread(() -> { int i =0; while (true) { try { String item = queue.take(); System.out.print(Thread.currentThread().getName() + ": " + i + " "); System.out.println(item); i++; } catch (Exception e) { e.printStackTrace(); } } }).start(); //写线程 new Thread(() -> { for (int i = 0; i < 10; i++) { try { String item = "go"+i; System.out.print(Thread.currentThread().getName() + ": " + i + " "); System.out.println(item); queue.put(item); } catch (Exception e) { e.printStackTrace(); } } }).start(); } }
到此这篇关于Java线程间通讯的几种方法小结的文章就介绍到这了,更多相关Java线程间通讯内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!