Java JUC并发之.util.concurrent并发工具包使用指南
作者:阿贾克斯的黎明
前言
JUC(Java Util Concurrent)即 Java 并发工具包,是java.util.concurrent包及其子包的简称,自 Java 5 引入,为并发编程提供了高效、安全、可靠的工具类,极大简化了多线程编程的复杂度。
JUC 主要包含以下几类组件:
- 线程池框架(Executor Framework)
- 并发集合(Concurrent Collections)
- 同步工具(Synchronizers)
- 原子操作类(Atomic Classes)
- 锁机制(Locks)
- 并发工具类(如 CountDownLatch、CyclicBarrier 等)
线程池框架
线程池通过重用线程来减少线程创建和销毁的开销,提高系统性能。
核心接口与类
Executor:最基本的线程池接口,定义了执行任务的方法ExecutorService:扩展了 Executor,提供了更丰富的线程池操作ThreadPoolExecutor:线程池的核心实现类Executors:线程池的工具类,提供了常用线程池的创建方法
线程池示例
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class ThreadPoolExample {
public static void main(String[] args) {
// 创建固定大小的线程池
ExecutorService executor = Executors.newFixedThreadPool(3);
// 提交任务
for (int i = 0; i < 10; i++) {
final int taskId = i;
executor.submit(() -> {
try {
System.out.println("任务 " + taskId + " 由线程 " +
Thread.currentThread().getName() + " 执行");
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
// 关闭线程池
executor.shutdown();
try {
// 等待所有任务完成
if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
// 超时后强制关闭
executor.shutdownNow();
}
} catch (InterruptedException e) {
executor.shutdownNow();
}
}
}
ThreadPoolExecutor 核心参数
手动创建线程池时,ThreadPoolExecutor的构造函数提供了最灵活的配置:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
corePoolSize:核心线程数maximumPoolSize:最大线程数keepAliveTime:非核心线程的空闲超时时间workQueue:任务等待队列threadFactory:线程工厂handler:拒绝策略
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class CustomThreadPool {
public static void main(String[] args) {
// 自定义线程池配置
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2, // 核心线程数
5, // 最大线程数
30, // 空闲时间
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(10), // 有界队列
new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略
);
// 提交任务
for (int i = 0; i < 20; i++) {
final int taskId = i;
executor.execute(() -> {
try {
System.out.println("任务 " + taskId + " 由线程 " +
Thread.currentThread().getName() + " 执行");
TimeUnit.MILLISECONDS.sleep(500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
executor.shutdown();
}
}
并发集合
JUC 提供了一系列线程安全的集合类,相比传统的同步集合,通常具有更好的性能。
常用并发集合
ConcurrentHashMap:线程安全的 HashMap 替代者CopyOnWriteArrayList:读多写少场景下的线程安全 ListCopyOnWriteArraySet:基于 CopyOnWriteArrayList 实现的 SetConcurrentLinkedQueue:高效的并发队列LinkedBlockingQueue:可阻塞的链表队列ArrayBlockingQueue:有界的数组队列PriorityBlockingQueue:支持优先级的阻塞队列
ConcurrentHashMap 示例
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class ConcurrentHashMapExample {
public static void main(String[] args) throws InterruptedException {
Map<String, Integer> concurrentMap = new ConcurrentHashMap<>();
ExecutorService executor = Executors.newFixedThreadPool(4);
// 并发写入
for (int i = 0; i < 1000; i++) {
final int num = i;
executor.submit(() -> {
String key = "key" + (num % 10);
// 原子操作:计算并替换
concurrentMap.compute(key, (k, v) -> v == null ? 1 : v + 1);
});
}
executor.shutdown();
executor.awaitTermination(1, TimeUnit.MINUTES);
// 输出结果
concurrentMap.forEach((k, v) -> System.out.println(k + ": " + v));
}
}
同步工具类
JUC 提供了多种同步工具,用于协调多个线程之间的协作。
CountDownLatch
允许一个或多个线程等待其他线程完成操作。
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CountDownLatchExample {
public static void main(String[] args) throws InterruptedException {
// 计数器为3
CountDownLatch latch = new CountDownLatch(3);
ExecutorService executor = Executors.newFixedThreadPool(3);
for (int i = 0; i < 3; i++) {
final int taskId = i;
executor.submit(() -> {
try {
System.out.println("任务 " + taskId + " 开始执行");
Thread.sleep(1000 + taskId * 500);
System.out.println("任务 " + taskId + " 执行完成");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
// 计数器减1
latch.countDown();
}
});
}
System.out.println("等待所有任务完成...");
// 等待计数器变为0
latch.await();
System.out.println("所有任务已完成,继续执行主线程");
executor.shutdown();
}
}
CyclicBarrier
让一组线程到达一个屏障时被阻塞,直到最后一个线程到达屏障,所有被阻塞的线程才会继续执行。
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CyclicBarrierExample {
public static void main(String[] args) {
// 3个线程到达屏障后,执行Runnable任务
CyclicBarrier barrier = new CyclicBarrier(3, () ->
System.out.println("所有线程已到达屏障,开始下一步操作"));
ExecutorService executor = Executors.newFixedThreadPool(3);
for (int i = 0; i < 3; i++) {
final int threadId = i;
executor.submit(() -> {
try {
System.out.println("线程 " + threadId + " 正在执行任务");
Thread.sleep(1000 + threadId * 500);
System.out.println("线程 " + threadId + " 到达屏障");
// 等待其他线程到达
barrier.await();
System.out.println("线程 " + threadId + " 继续执行");
} catch (InterruptedException | BrokenBarrierException e) {
Thread.currentThread().interrupt();
}
});
}
executor.shutdown();
}
}
Semaphore
信号量,用于控制同时访问特定资源的线程数量。
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
public class SemaphoreExample {
public static void main(String[] args) {
// 允许3个线程同时访问
Semaphore semaphore = new Semaphore(3);
ExecutorService executor = Executors.newFixedThreadPool(5);
for (int i = 0; i < 10; i++) {
final int taskId = i;
executor.submit(() -> {
try {
// 获取许可
semaphore.acquire();
System.out.println("任务 " + taskId + " 获得许可,开始执行");
TimeUnit.SECONDS.sleep(2);
System.out.println("任务 " + taskId + " 执行完成,释放许可");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
// 释放许可
semaphore.release();
}
});
}
executor.shutdown();
}
}
原子操作类
JUC 提供了一系列原子操作类,用于在不使用锁的情况下实现线程安全的原子操作。
主要原子类包括:
- 基本类型:
AtomicInteger、AtomicLong、AtomicBoolean - 数组类型:
AtomicIntegerArray、AtomicLongArray等 - 引用类型:
AtomicReference、AtomicStampedReference等 - 字段更新器:
AtomicIntegerFieldUpdater等
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class AtomicExample {
private static AtomicInteger counter = new AtomicInteger(0);
public static void main(String[] args) throws InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(10);
// 10个线程,每个线程自增1000次
for (int i = 0; i < 10; i++) {
executor.submit(() -> {
for (int j = 0; j < 1000; j++) {
// 原子自增操作
counter.incrementAndGet();
}
});
}
executor.shutdown();
executor.awaitTermination(1, TimeUnit.MINUTES);
// 结果应该是10000
System.out.println("最终计数: " + counter.get());
}
}
锁机制
JUC 的java.util.concurrent.locks包提供了比synchronized更灵活的锁机制。
Lock 接口
Lock接口是所有锁的父接口,主要实现类有:
ReentrantLock:可重入锁ReentrantReadWriteLock:可重入读写锁
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class ReentrantLockExample {
private static int count = 0;
// 创建可重入锁
private static Lock lock = new ReentrantLock();
public static void main(String[] args) throws InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(5);
for (int i = 0; i < 1000; i++) {
executor.submit(() -> {
// 获取锁
lock.lock();
try {
count++;
} finally {
// 确保锁被释放
lock.unlock();
}
});
}
executor.shutdown();
executor.awaitTermination(1, TimeUnit.MINUTES);
System.out.println("最终计数: " + count);
}
}
读写锁
ReentrantReadWriteLock提供了读锁和写锁分离,适合读多写少的场景:
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class ReadWriteLockExample {
private Map<String, String> data = new HashMap<>();
private ReadWriteLock lock = new ReentrantReadWriteLock();
// 读操作使用读锁
public String get(String key) {
lock.readLock().lock();
try {
System.out.println("读取 key: " + key + ",线程: " + Thread.currentThread().getName());
return data.get(key);
} finally {
lock.readLock().unlock();
}
}
// 写操作使用写锁
public void put(String key, String value) {
lock.writeLock().lock();
try {
System.out.println("写入 key: " + key + ",线程: " + Thread.currentThread().getName());
data.put(key, value);
} finally {
lock.writeLock().unlock();
}
}
public static void main(String[] args) throws InterruptedException {
ReadWriteLockExample example = new ReadWriteLockExample();
ExecutorService executor = Executors.newFixedThreadPool(5);
// 添加写操作
executor.submit(() -> example.put("name", "Java"));
// 添加多个读操作
for (int i = 0; i < 4; i++) {
executor.submit(() -> {
for (int j = 0; j < 3; j++) {
example.get("name");
try {
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
});
}
executor.shutdown();
executor.awaitTermination(1, TimeUnit.MINUTES);
}
}
实战案例:生产者消费者模型
使用 JUC 的阻塞队列实现经典的生产者消费者模型:
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class ProducerConsumerExample {
// 容量为10的阻塞队列
private static BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10);
private static final int MAX_ITEMS = 20;
// 生产者
static class Producer implements Runnable {
private int id;
public Producer(int id) {
this.id = id;
}
@Override
public void run() {
try {
for (int i = 0; i < MAX_ITEMS; i++) {
int item = id * 100 + i;
queue.put(item); // 放入队列,如果满了会阻塞
System.out.println("生产者 " + id + " 生产了: " + item +
",队列大小: " + queue.size());
TimeUnit.MILLISECONDS.sleep(100); // 模拟生产耗时
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
// 消费者
static class Consumer implements Runnable {
private int id;
public Consumer(int id) {
this.id = id;
}
@Override
public void run() {
try {
for (int i = 0; i < MAX_ITEMS; i++) {
int item = queue.take(); // 从队列取,如果空了会阻塞
System.out.println("消费者 " + id + " 消费了: " + item +
",队列大小: " + queue.size());
TimeUnit.MILLISECONDS.sleep(150); // 模拟消费耗时
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
public static void main(String[] args) throws InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(4);
// 创建2个生产者
executor.submit(new Producer(1));
executor.submit(new Producer(2));
// 创建2个消费者
executor.submit(new Consumer(1));
executor.submit(new Consumer(2));
executor.shutdown();
executor.awaitTermination(1, TimeUnit.MINUTES);
}
}
总结
JUC 为 Java 并发编程提供了强大的工具支持,大大简化了多线程程序的开发难度。掌握 JUC 的使用,能够帮助开发者编写高效、安全的并发程序,应对多线程环境下的各种挑战。在实际开发中,应根据具体场景选择合适的并发工具,同时注意线程安全和性能之间的平衡。通过不断实践和深入理解这些工具的原理,您将能够构建出更健壮、更高效的并发应用程序。
到此这篇关于Java JUC并发之.util.concurrent并发工具包使用指南的文章就介绍到这了,更多相关Java .util.concurrent并发工具内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!
您可能感兴趣的文章:
- java.util.concurrent.ExecutionException 问题解决方法
- 深入Synchronized和java.util.concurrent.locks.Lock的区别详解
- 出现java.util.ConcurrentModificationException 问题及解决办法
- Java 报错 java.util.ConcurrentModificationException: null 的原因及解决方案
- Java报错:java.util.concurrent.ExecutionException的解决办法
- java.util.ConcurrentModificationException 解决方法
- 浅谈java.util.concurrent包中的线程池和消息队列
- Java中JUC包(java.util.concurrent)下的常用子类
