java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > Java线程间通讯

Java线程间通讯的几种方法小结

作者:顽石九变

线程通信可以用于控制并发线程的数量,本文主要介绍了Java线程间通讯的几种方法小结,文中通过示例代码介绍的非常详细,需要的朋友们下面随着小编来一起学习学习吧

一、使用同一个共享变量控制

1、Synchronized、wait、notify

public class Demo1 {

    private final List<Integer> list =new ArrayList<>();

    public static void main(String[] args) {
        Demo1 demo =new Demo1();
        new Thread(()->{
            for (int i=0;i<10;i++){
                synchronized (demo.list){
                    if(demo.list.size()%2==1){
                        try {
                            demo.list.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    demo.list.add(i);
                    System.out.print(Thread.currentThread().getName());
                    System.out.println(demo.list);
                    demo.list.notify();
                }
            }

        }).start();

        new Thread(()->{
            for (int i=0;i<10;i++){
                synchronized (demo.list){
                    if(demo.list.size()%2==0){
                        try {
                            demo.list.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    demo.list.add(i);
                    System.out.print(Thread.currentThread().getName());
                    System.out.println(demo.list);
                    demo.list.notify();
                }
            }
        }).start();
    }
}

这段代码演示了如何使用synchronizedwaitnotify来实现两个线程间的通信,以确保它们交替地向一个ArrayList中添加数字。以下是代码的详细解释:

2、Lock、Condition

import java.util.ArrayList;  
import java.util.List;  
import java.util.concurrent.locks.Condition;  
import java.util.concurrent.locks.Lock;  
import java.util.concurrent.locks.ReentrantLock;  
  
public class Task {  
    // 创建一个可重入锁,用于同步访问共享资源(即列表)  
    private final Lock lock = new ReentrantLock();  
    // 创建两个条件变量,一个用于表示列表未满,一个用于表示列表非空  
    private final Condition notFull = lock.newCondition();  
    private final Condition notEmpty = lock.newCondition();  
  
    // 定义列表的最大容量  
    private static final int MAX_SIZE = 10;  
    // 创建一个ArrayList作为共享资源,用于在两个线程之间传递数据  
    private final List<String> list = new ArrayList<>(MAX_SIZE);  
  
    // add方法用于向列表中添加元素  
    public void add() {  
        for (int i = 0; i < 10; i++) {  
            lock.lock(); // 获取锁,开始同步代码块  
            try {  
                // 如果列表已满,则当前线程等待,直到其他线程从列表中移除元素  
                while (list.size() == MAX_SIZE) {  
                    notFull.await(); // 等待列表不满的条件成立  
                }  
                // 模拟耗时操作(比如网络请求或数据处理)  
                Thread.sleep(100);  
                // 向列表中添加一个新元素,并打印相关信息  
                list.add("add " + (i + 1));  
                System.out.println("The list size is " + list.size());  
                System.out.println("The add thread is " + Thread.currentThread().getName());  
                System.out.println("-------------");  
                // 通知可能在等待的移除线程,现在列表不为空,可以执行移除操作了  
                notEmpty.signal();  
            } catch (InterruptedException e) {  
                // 打印异常信息,实际开发中可能需要更复杂的错误处理逻辑  
                e.printStackTrace();  
            } finally {  
                lock.unlock(); // 释放锁,允许其他线程访问同步代码块  
            }  
        }  
    }  
  
    // sub方法用于从列表中移除元素  
    public void sub() {  
        for (int i = 0; i < 10; i++) {  
            lock.lock(); // 获取锁,开始同步代码块  
            try {  
                // 如果列表为空,则当前线程等待,直到其他线程向列表中添加元素  
                while (list.isEmpty()) {  
                    notEmpty.await(); // 等待列表非空的条件成立  
                }  
                // 模拟耗时操作(比如网络请求或数据处理)  
                Thread.sleep(100);  
                // 从列表中移除第一个元素,并打印相关信息  
                list.remove(0);  
                System.out.println("The list size is " + list.size());  
                System.out.println("The sub thread is " + Thread.currentThread().getName());  
                System.out.println("-------------");  
                // 通知可能在等待的添加线程,现在列表不满,可以执行添加操作了  
                notFull.signal();  
            } catch (InterruptedException e) {  
                // 打印异常信息,实际开发中可能需要更复杂的错误处理逻辑  
                e.printStackTrace();  
            } finally {  
                lock.unlock(); // 释放锁,允许其他线程访问同步代码块  
            }  
        }  
    }  
  
    // main方法作为程序的入口点,创建Task对象并启动两个线程来执行add和sub方法  
    public static void main(String[] args) {  
        Task task = new Task(); // 创建Task对象,它包含共享资源和同步机制  
        // 使用Lambda表达式和方法引用启动两个线程,分别执行add和sub方法,并为它们设置名称以便区分输出中的信息来源  
        new Thread(task::add, "AddThread").start(); // 启动添加线程  
        new Thread(task::sub, "SubThread").start(); // 启动移除线程  
    }  
}

这段代码定义了一个名为Task的类,它主要实现了线程安全的列表添加和移除操作。类内部使用了java.util.concurrent.locks包下的ReentrantLock可重入锁以及相关的Condition条件变量来同步访问共享资源(即一个ArrayList)。

Task类中,有两个主要的方法:addsubadd方法用于向列表中添加元素,而sub方法用于从列表中移除元素。这两个方法在被调用时都需要获取锁,以确保同一时间只有一个线程可以访问共享资源。

当添加线程调用add方法时,它首先检查列表是否已满。如果已满,则通过调用notFull.await()使当前线程等待,直到其他线程从列表中移除元素并发出通知。一旦列表不满,添加线程就会向列表中添加一个新元素,并通过调用notEmpty.signal()通知可能在等待的移除线程。

类似地,当移除线程调用sub方法时,它首先检查列表是否为空。如果为空,则通过调用notEmpty.await()使当前线程等待,直到其他线程向列表中添加元素并发出通知。一旦列表非空,移除线程就会从列表中移除一个元素,并通过调用notFull.signal()通知可能在等待的添加线程。

这种使用锁和条件变量的方式实现了线程间的同步和通信,确保了共享资源(即列表)在任何时候都不会被多个线程同时修改,从而避免了数据竞争和不一致的问题。同时,通过条件变量的等待和通知机制,有效地协调了添加线程和移除线程的执行顺序,使得它们能够按照预期的方式交替进行添加和移除操作。

3、利用volatile

volatile修饰的变量值直接存在主内存里面,子线程对该变量的读写直接写住内存,而不是像其它变量一样在local thread里面产生一份copy。volatile能保证所修饰的变量对于多个线程可见性,即只要被修改,其它线程读到的一定是最新的值。

public class Demo2 {
    private volatile List<Integer> list =new ArrayList<>();
    public static void main(String[] args) {
        Demo2 demo =new Demo2();
        new Thread(()->{
            for (int i=0;i<10;i++){
                    demo.list.add(i);
                    System.out.print(Thread.currentThread().getName());
                    System.out.println(demo.list);
            }

        }).start();

        new Thread(()->{
            for (int i=0;i<10;i++){
                    demo.list.add(i);
                    System.out.print(Thread.currentThread().getName());
                    System.out.println(demo.list);
                }
        }).start();
    }
}

4、利用AtomicInteger

和volatile类似

二、PipedInputStream、PipedOutputStream

这里用流在两个线程间通信,但是Java中的Stream是单向的,所以在两个线程中分别建了一个input和output

public class PipedDemo {

    private final PipedInputStream inputStream1;
    private final PipedOutputStream outputStream1;
    private final PipedInputStream inputStream2;
    private final PipedOutputStream outputStream2;

    public PipedDemo(){
        inputStream1 = new PipedInputStream();
        outputStream1 = new PipedOutputStream();
        inputStream2 = new PipedInputStream();
        outputStream2 = new PipedOutputStream();
        try {
            inputStream1.connect(outputStream2);
            inputStream2.connect(outputStream1);
        } catch (IOException e) {
            e.printStackTrace();
        }

    }
    /**程序退出时,需要关闭stream*/
    public void shutdown() throws IOException {
        inputStream1.close();
        inputStream2.close();
        outputStream1.close();
        outputStream2.close();
    }


    public static void main(String[] args) throws IOException {
        PipedDemo demo =new PipedDemo();
        new Thread(()->{
            PipedInputStream in = demo.inputStream2;
            PipedOutputStream out = demo.outputStream2;

            for (int i = 0; i < 10; i++) {
                try {
                    byte[] inArr = new byte[2];
                    in.read(inArr);
                    System.out.print(Thread.currentThread().getName()+": "+i+" ");
                    System.out.println(new String(inArr));
                    while(true){
                        if("go".equals(new String(inArr)))
                            break;
                    }
                    out.write("ok".getBytes());
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }

        }).start();

        new Thread(()->{
            PipedInputStream in = demo.inputStream1;
            PipedOutputStream out = demo.outputStream1;

            for (int i = 0; i < 10; i++) {
                try {
                    out.write("go".getBytes());
                    byte[] inArr = new byte[2];
                    in.read(inArr);
                    System.out.print(Thread.currentThread().getName()+": "+i+" ");
                    System.out.println(new String(inArr));
                    while(true){
                        if("ok".equals(new String(inArr)))
                            break;
                    }

                } catch (IOException e) {
                    e.printStackTrace();
                }
            }

        }).start();
//        demo.shutdown();
    }
}

输出

Thread-0: 0 go
Thread-1: 0 ok
Thread-0: 1 go
Thread-1: 1 ok
Thread-0: 2 go
Thread-1: 2 ok
Thread-0: 3 go
Thread-1: 3 ok
Thread-0: 4 go
Thread-1: 4 ok
Thread-0: 5 go
Thread-1: 5 ok
Thread-0: 6 go
Thread-1: 6 ok
Thread-0: 7 go
Thread-1: 7 ok
Thread-0: 8 go
Thread-1: 8 ok
Thread-0: 9 go
Thread-1: 9 ok

三、利用BlockingQueue

BlockingQueue定义的常用方法如下:

BlockingQueue有四个具体的实现类:

所有BlockingQueue的使用方式类似,以下例子一个线程写入,一个线程读取,操作的是同一个Queue:

public class BlockingQueueDemo {

    public static void main(String[] args) {
        LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>();
        //读线程
        new Thread(() -> {
            int i =0;
            while (true) {
                try {
                    String item = queue.take();
                    System.out.print(Thread.currentThread().getName() + ": " + i + " ");
                    System.out.println(item);
                    i++;
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();
        //写线程
        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                try {
                    String item = "go"+i;
                    System.out.print(Thread.currentThread().getName() + ": " + i + " ");
                    System.out.println(item);
                    queue.put(item);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }
}

到此这篇关于Java线程间通讯的几种方法小结的文章就介绍到这了,更多相关Java线程间通讯内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家! 

您可能感兴趣的文章:
阅读全文