java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > Java .util.concurrent并发工具

Java JUC并发之.util.concurrent并发工具包使用指南

作者:阿贾克斯的黎明

JUC就是java.util .concurrent工具包的简称,这是一个处理线程的工具包,JDK 1.5开始出现的,这篇文章主要介绍了Java JUC并发之.util.concurrent并发工具包使用的相关资料,需要的朋友可以参考下

前言

JUC(Java Util Concurrent)即 Java 并发工具包,是java.util.concurrent包及其子包的简称,自 Java 5 引入,为并发编程提供了高效、安全、可靠的工具类,极大简化了多线程编程的复杂度。

JUC 主要包含以下几类组件:

线程池框架

线程池通过重用线程来减少线程创建和销毁的开销,提高系统性能。

核心接口与类

线程池示例

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class ThreadPoolExample {
    public static void main(String[] args) {
        // 创建固定大小的线程池
        ExecutorService executor = Executors.newFixedThreadPool(3);
        
        // 提交任务
        for (int i = 0; i < 10; i++) {
            final int taskId = i;
            executor.submit(() -> {
                try {
                    System.out.println("任务 " + taskId + " 由线程 " + 
                                       Thread.currentThread().getName() + " 执行");
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            });
        }
        
        // 关闭线程池
        executor.shutdown();
        try {
            // 等待所有任务完成
            if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
                // 超时后强制关闭
                executor.shutdownNow();
            }
        } catch (InterruptedException e) {
            executor.shutdownNow();
        }
    }
}

ThreadPoolExecutor 核心参数

手动创建线程池时,ThreadPoolExecutor的构造函数提供了最灵活的配置:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler)
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class CustomThreadPool {
    public static void main(String[] args) {
        // 自定义线程池配置
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
            2, // 核心线程数
            5, // 最大线程数
            30, // 空闲时间
            TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(10), // 有界队列
            new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略
        );
        
        // 提交任务
        for (int i = 0; i < 20; i++) {
            final int taskId = i;
            executor.execute(() -> {
                try {
                    System.out.println("任务 " + taskId + " 由线程 " +
                                       Thread.currentThread().getName() + " 执行");
                    TimeUnit.MILLISECONDS.sleep(500);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            });
        }
        
        executor.shutdown();
    }
}

并发集合

JUC 提供了一系列线程安全的集合类,相比传统的同步集合,通常具有更好的性能。

常用并发集合

ConcurrentHashMap 示例

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class ConcurrentHashMapExample {
    public static void main(String[] args) throws InterruptedException {
        Map<String, Integer> concurrentMap = new ConcurrentHashMap<>();
        ExecutorService executor = Executors.newFixedThreadPool(4);
        
        // 并发写入
        for (int i = 0; i < 1000; i++) {
            final int num = i;
            executor.submit(() -> {
                String key = "key" + (num % 10);
                // 原子操作:计算并替换
                concurrentMap.compute(key, (k, v) -> v == null ? 1 : v + 1);
            });
        }
        
        executor.shutdown();
        executor.awaitTermination(1, TimeUnit.MINUTES);
        
        // 输出结果
        concurrentMap.forEach((k, v) -> System.out.println(k + ": " + v));
    }
}

同步工具类

JUC 提供了多种同步工具,用于协调多个线程之间的协作。

CountDownLatch

允许一个或多个线程等待其他线程完成操作。

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CountDownLatchExample {
    public static void main(String[] args) throws InterruptedException {
        // 计数器为3
        CountDownLatch latch = new CountDownLatch(3);
        ExecutorService executor = Executors.newFixedThreadPool(3);
        
        for (int i = 0; i < 3; i++) {
            final int taskId = i;
            executor.submit(() -> {
                try {
                    System.out.println("任务 " + taskId + " 开始执行");
                    Thread.sleep(1000 + taskId * 500);
                    System.out.println("任务 " + taskId + " 执行完成");
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                } finally {
                    // 计数器减1
                    latch.countDown();
                }
            });
        }
        
        System.out.println("等待所有任务完成...");
        // 等待计数器变为0
        latch.await();
        System.out.println("所有任务已完成,继续执行主线程");
        
        executor.shutdown();
    }
}

CyclicBarrier

让一组线程到达一个屏障时被阻塞,直到最后一个线程到达屏障,所有被阻塞的线程才会继续执行。

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CyclicBarrierExample {
    public static void main(String[] args) {
        // 3个线程到达屏障后,执行Runnable任务
        CyclicBarrier barrier = new CyclicBarrier(3, () -> 
            System.out.println("所有线程已到达屏障,开始下一步操作"));
        
        ExecutorService executor = Executors.newFixedThreadPool(3);
        
        for (int i = 0; i < 3; i++) {
            final int threadId = i;
            executor.submit(() -> {
                try {
                    System.out.println("线程 " + threadId + " 正在执行任务");
                    Thread.sleep(1000 + threadId * 500);
                    System.out.println("线程 " + threadId + " 到达屏障");
                    // 等待其他线程到达
                    barrier.await();
                    System.out.println("线程 " + threadId + " 继续执行");
                } catch (InterruptedException | BrokenBarrierException e) {
                    Thread.currentThread().interrupt();
                }
            });
        }
        
        executor.shutdown();
    }
}

Semaphore

信号量,用于控制同时访问特定资源的线程数量。

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

public class SemaphoreExample {
    public static void main(String[] args) {
        // 允许3个线程同时访问
        Semaphore semaphore = new Semaphore(3);
        ExecutorService executor = Executors.newFixedThreadPool(5);
        
        for (int i = 0; i < 10; i++) {
            final int taskId = i;
            executor.submit(() -> {
                try {
                    // 获取许可
                    semaphore.acquire();
                    System.out.println("任务 " + taskId + " 获得许可,开始执行");
                    TimeUnit.SECONDS.sleep(2);
                    System.out.println("任务 " + taskId + " 执行完成,释放许可");
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                } finally {
                    // 释放许可
                    semaphore.release();
                }
            });
        }
        
        executor.shutdown();
    }
}

原子操作类

JUC 提供了一系列原子操作类,用于在不使用锁的情况下实现线程安全的原子操作。

主要原子类包括:

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class AtomicExample {
    private static AtomicInteger counter = new AtomicInteger(0);
    
    public static void main(String[] args) throws InterruptedException {
        ExecutorService executor = Executors.newFixedThreadPool(10);
        
        // 10个线程,每个线程自增1000次
        for (int i = 0; i < 10; i++) {
            executor.submit(() -> {
                for (int j = 0; j < 1000; j++) {
                    // 原子自增操作
                    counter.incrementAndGet();
                }
            });
        }
        
        executor.shutdown();
        executor.awaitTermination(1, TimeUnit.MINUTES);
        
        // 结果应该是10000
        System.out.println("最终计数: " + counter.get());
    }
}

锁机制

JUC 的java.util.concurrent.locks包提供了比synchronized更灵活的锁机制。

Lock 接口

Lock接口是所有锁的父接口,主要实现类有:

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class ReentrantLockExample {
    private static int count = 0;
    // 创建可重入锁
    private static Lock lock = new ReentrantLock();
    
    public static void main(String[] args) throws InterruptedException {
        ExecutorService executor = Executors.newFixedThreadPool(5);
        
        for (int i = 0; i < 1000; i++) {
            executor.submit(() -> {
                // 获取锁
                lock.lock();
                try {
                    count++;
                } finally {
                    // 确保锁被释放
                    lock.unlock();
                }
            });
        }
        
        executor.shutdown();
        executor.awaitTermination(1, TimeUnit.MINUTES);
        
        System.out.println("最终计数: " + count);
    }
}

读写锁

ReentrantReadWriteLock提供了读锁和写锁分离,适合读多写少的场景:

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class ReadWriteLockExample {
    private Map<String, String> data = new HashMap<>();
    private ReadWriteLock lock = new ReentrantReadWriteLock();
    
    // 读操作使用读锁
    public String get(String key) {
        lock.readLock().lock();
        try {
            System.out.println("读取 key: " + key + ",线程: " + Thread.currentThread().getName());
            return data.get(key);
        } finally {
            lock.readLock().unlock();
        }
    }
    
    // 写操作使用写锁
    public void put(String key, String value) {
        lock.writeLock().lock();
        try {
            System.out.println("写入 key: " + key + ",线程: " + Thread.currentThread().getName());
            data.put(key, value);
        } finally {
            lock.writeLock().unlock();
        }
    }
    
    public static void main(String[] args) throws InterruptedException {
        ReadWriteLockExample example = new ReadWriteLockExample();
        ExecutorService executor = Executors.newFixedThreadPool(5);
        
        // 添加写操作
        executor.submit(() -> example.put("name", "Java"));
        
        // 添加多个读操作
        for (int i = 0; i < 4; i++) {
            executor.submit(() -> {
                for (int j = 0; j < 3; j++) {
                    example.get("name");
                    try {
                        TimeUnit.MILLISECONDS.sleep(100);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                }
            });
        }
        
        executor.shutdown();
        executor.awaitTermination(1, TimeUnit.MINUTES);
    }
}

实战案例:生产者消费者模型

使用 JUC 的阻塞队列实现经典的生产者消费者模型:

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class ProducerConsumerExample {
    // 容量为10的阻塞队列
    private static BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10);
    private static final int MAX_ITEMS = 20;
    
    // 生产者
    static class Producer implements Runnable {
        private int id;
        
        public Producer(int id) {
            this.id = id;
        }
        
        @Override
        public void run() {
            try {
                for (int i = 0; i < MAX_ITEMS; i++) {
                    int item = id * 100 + i;
                    queue.put(item); // 放入队列,如果满了会阻塞
                    System.out.println("生产者 " + id + " 生产了: " + item + 
                                       ",队列大小: " + queue.size());
                    TimeUnit.MILLISECONDS.sleep(100); // 模拟生产耗时
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }
    
    // 消费者
    static class Consumer implements Runnable {
        private int id;
        
        public Consumer(int id) {
            this.id = id;
        }
        
        @Override
        public void run() {
            try {
                for (int i = 0; i < MAX_ITEMS; i++) {
                    int item = queue.take(); // 从队列取,如果空了会阻塞
                    System.out.println("消费者 " + id + " 消费了: " + item + 
                                       ",队列大小: " + queue.size());
                    TimeUnit.MILLISECONDS.sleep(150); // 模拟消费耗时
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }
    
    public static void main(String[] args) throws InterruptedException {
        ExecutorService executor = Executors.newFixedThreadPool(4);
        
        // 创建2个生产者
        executor.submit(new Producer(1));
        executor.submit(new Producer(2));
        
        // 创建2个消费者
        executor.submit(new Consumer(1));
        executor.submit(new Consumer(2));
        
        executor.shutdown();
        executor.awaitTermination(1, TimeUnit.MINUTES);
    }
}

总结

JUC 为 Java 并发编程提供了强大的工具支持,大大简化了多线程程序的开发难度。掌握 JUC 的使用,能够帮助开发者编写高效、安全的并发程序,应对多线程环境下的各种挑战。在实际开发中,应根据具体场景选择合适的并发工具,同时注意线程安全和性能之间的平衡。通过不断实践和深入理解这些工具的原理,您将能够构建出更健壮、更高效的并发应用程序。

到此这篇关于Java JUC并发之.util.concurrent并发工具包使用指南的文章就介绍到这了,更多相关Java .util.concurrent并发工具内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文