java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > Java 多线程与并发编程

Java 多线程与并发编程全面实战指南

作者:vvilkin

本文系统讲解Java多线程与并发编程,涵盖线程创建、同步机制(synchronized/Lock)、通信工具、并发集合、线程池及CompletableFuture等现代技术,强调正确使用多线程的重要性,提供避免死锁、内存可见性等常见问题的实践指南,感兴趣的朋友跟随小编一起看看吧

引言:为什么需要多线程?

在当今这个多核处理器普及的时代,单线程程序已经难以充分利用现代计算机硬件的计算能力。Java 作为一门成熟的企业级编程语言,提供了丰富的多线程与并发编程支持,使开发者能够构建高性能、高并发的应用程序。

多线程编程可以带来诸多好处:

然而,多线程编程也带来了复杂性,包括线程安全、死锁、竞态条件等问题。本文将全面介绍 Java 多线程与并发编程的各个方面。

一、Java 线程基础

1.1 线程创建方式

Java 提供了三种基本的线程创建方式:

1.1.1 继承 Thread 类

class MyThread extends Thread {
    @Override
    public void run() {
        System.out.println("Thread running: " + Thread.currentThread().getName());
    }
}
// 使用
MyThread thread = new MyThread();
thread.start(); // 注意:调用 run() 是普通方法调用,start() 才是启动新线程

1.1.2 实现 Runnable 接口

class MyRunnable implements Runnable {
    @Override
    public void run() {
        System.out.println("Runnable running: " + Thread.currentThread().getName());
    }
}
// 使用
Thread thread = new Thread(new MyRunnable());
thread.start();

1.1.3 实现 Callable 接口

class MyCallable implements Callable<String> {
    @Override
    public String call() throws Exception {
        return "Result from " + Thread.currentThread().getName();
    }
}
// 使用
ExecutorService executor = Executors.newSingleThreadExecutor();
Future<String> future = executor.submit(new MyCallable());
System.out.println(future.get()); // 获取返回值
executor.shutdown();

最佳实践:推荐实现 Runnable 或 Callable 接口,因为 Java 不支持多重继承,且这种方式更符合面向对象的设计原则。

1.2 线程生命周期

Java 线程有以下几种状态:

Thread thread = new Thread(() -> {
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
});
System.out.println(thread.getState()); // NEW
thread.start();
System.out.println(thread.getState()); // RUNNABLE
Thread.sleep(500);
System.out.println(thread.getState()); // TIMED_WAITING
Thread.sleep(1000);
System.out.println(thread.getState()); // TERMINATED

二、线程同步与线程安全

2.1 同步机制

2.1.1 synchronized 关键字

// 同步方法
public synchronized void syncMethod() {
    // 临界区代码
}
// 同步代码块
public void someMethod() {
    synchronized(this) { // 可以使用任何对象作为锁
        // 临界区代码
    }
}

2.1.2 Lock 接口

Lock lock = new ReentrantLock();
public void someMethod() {
    lock.lock();
    try {
        // 临界区代码
    } finally {
        lock.unlock(); // 确保在finally中释放锁
    }
}

比较

2.2 volatile 关键字

volatile 保证变量的可见性,但不保证原子性:

private volatile boolean running = true;
public void stop() {
    running = false;
}
public void run() {
    while(running) {
        // 执行任务
    }
}

2.3 原子类

java.util.concurrent.atomic 包提供了一系列原子类:

AtomicInteger counter = new AtomicInteger(0);
// 线程安全的自增
counter.incrementAndGet();
// CAS 操作
boolean updated = counter.compareAndSet(expect, update);

三、线程通信

3.1 wait/notify 机制

class SharedResource {
    private boolean ready = false;
    public synchronized void waitForReady() throws InterruptedException {
        while(!ready) {
            wait(); // 释放锁并等待
        }
        // 条件满足,继续执行
    }
    public synchronized void setReady() {
        ready = true;
        notifyAll(); // 通知所有等待线程
    }
}

3.2 Condition 接口

class BoundedBuffer {
    final Lock lock = new ReentrantLock();
    final Condition notFull = lock.newCondition();
    final Condition notEmpty = lock.newCondition();
    final Object[] items = new Object[100];
    int putptr, takeptr, count;
    public void put(Object x) throws InterruptedException {
        lock.lock();
        try {
            while (count == items.length)
                notFull.await();
            items[putptr] = x;
            if (++putptr == items.length) putptr = 0;
            ++count;
            notEmpty.signal();
        } finally {
            lock.unlock();
        }
    }
    public Object take() throws InterruptedException {
        lock.lock();
        try {
            while (count == 0)
                notEmpty.await();
            Object x = items[takeptr];
            if (++takeptr == items.length) takeptr = 0;
            --count;
            notFull.signal();
            return x;
        } finally {
            lock.unlock();
        }
    }
}

四、并发工具类

4.1 CountDownLatch

CountDownLatch startSignal = new CountDownLatch(1);
CountDownLatch doneSignal = new CountDownLatch(N);
for (int i = 0; i < N; ++i) {
    new Thread(new Worker(startSignal, doneSignal)).start();
}
doSomethingElse(); // 主线程准备工作
startSignal.countDown(); // 让所有worker开始工作
doSomethingElse();
doneSignal.await(); // 等待所有worker完成

4.2 CyclicBarrier

class Solver {
    final int N;
    final float[][] data;
    final CyclicBarrier barrier;
    class Worker implements Runnable {
        int myRow;
        Worker(int row) { myRow = row; }
        public void run() {
            while (!done()) {
                processRow(myRow);
                try {
                    barrier.await();
                } catch (InterruptedException ex) {
                    return;
                } catch (BrokenBarrierException ex) {
                    return;
                }
            }
        }
    }
    public Solver(float[][] matrix) {
        data = matrix;
        N = matrix.length;
        barrier = new CyclicBarrier(N, () -> {
            mergeRows();
        });
        for (int i = 0; i < N; ++i)
            new Thread(new Worker(i)).start();
    }
}

4.3 Semaphore

class Pool {
    private static final int MAX_AVAILABLE = 100;
    private final Semaphore available = new Semaphore(MAX_AVAILABLE, true);
    public Object getItem() throws InterruptedException {
        available.acquire();
        return getNextAvailableItem();
    }
    public void putItem(Object x) {
        if (markAsUnused(x))
            available.release();
    }
}

五、线程池

5.1 线程池创建

ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5);
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);

5.2 ThreadPoolExecutor

ThreadPoolExecutor executor = new ThreadPoolExecutor(
    5, // 核心线程数
    10, // 最大线程数
    60, // 空闲线程存活时间
    TimeUnit.SECONDS, // 时间单位
    new ArrayBlockingQueue<>(100), // 工作队列
    new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略
);

5.3 拒绝策略

六、并发集合

6.1 ConcurrentHashMap

ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
map.put("key", 1);
map.compute("key", (k, v) -> v + 1); // 原子更新

6.2 CopyOnWriteArrayList

CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>();
list.add("item");
// 适合读多写少的场景

6.3 BlockingQueue

BlockingQueue<String> queue = new LinkedBlockingQueue<>(10);
// 生产者
queue.put("item");
// 消费者
String item = queue.take();

七、现代并发编程

7.1 CompletableFuture

CompletableFuture.supplyAsync(() -> fetchData())
    .thenApply(data -> processData(data))
    .thenAccept(result -> displayResult(result))
    .exceptionally(ex -> {
        System.err.println("Error: " + ex.getMessage());
        return null;
    });

7.2 并行流

List<String> results = dataList.parallelStream()
    .filter(item -> item.startsWith("A"))
    .map(String::toUpperCase)
    .collect(Collectors.toList());

八、最佳实践与常见问题

8.1 最佳实践

8.2 常见问题

结语

Java 多线程与并发编程是一个既强大又复杂的主题。掌握这些技术可以帮助开发者构建高性能、高并发的应用程序,但也需要谨慎处理线程安全和性能问题。随着 Java 版本的更新,并发编程的 API 也在不断改进和简化(如 CompletableFuture、并行流等),开发者应当持续学习这些新特性。

记住,多线程编程的第一原则是:如果可以不使用多线程,就不要使用多线程。只有在真正需要并行处理或异步操作时,才考虑引入多线程,并且要确保正确处理所有并发问题。

到此这篇关于Java 多线程与并发编程全面指南的文章就介绍到这了,更多相关Java 多线程与并发编程内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文