java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > Java21虚拟线程

Java21之虚拟线程用法实践指南及常见问题

作者:hay_lee

虚拟线程是Java19中作为预览功能提出,21中正式完成的轻量级线程,旨在减少编写易于观察的高吞吐量的并发程序的工作量,这篇文章主要介绍了Java21之虚拟线程用法的相关资料,需要的朋友可以参考下

前言

虚拟线程是 Java 21(LTS)中正式引入的重大特性,属于 Project Loom 的核心成果。它彻底改变了 Java 并发编程的范式。

1. 虚拟线程概述

1.1 什么是虚拟线程?

虚拟线程(Virtual Threads)是 JDK 实现的轻量级线程,由 JVM 而非操作系统管理。它们与传统的平台线程(Platform Threads,即操作系统线程)形成对比。

1.2 核心特点

1.3 与平台线程的对比

特性平台线程虚拟线程

创建成本

高(MB 级内存)

极低(KB 级内存)

最大数量

几千个

数百万个

调度者

操作系统

JVM

阻塞行为

阻塞整个 OS 线程

自动挂起,释放载体线程

适用场景

CPU 密集型任务

I/O 密集型任务

2. 虚拟线程的创建方法

2.1 使用 Thread.ofVirtual()

// 方法1:直接创建并启动
Thread virtualThread1 = Thread.ofVirtual().start(() -> {
    System.out.println("Hello from virtual thread!");
});

// 方法2:构建 Thread 对象后启动
Thread virtualThread2 = Thread.ofVirtual().name("my-virtual-thread").unstarted(() -> {
    System.out.println("Named virtual thread");
});
virtualThread2.start();

// 方法3:设置守护线程
Thread virtualThread3 = Thread.ofVirtual().daemon(true).unstarted(() -> {
    System.out.println("Daemon virtual thread");
});

2.2 使用 Thread.Builder

// 获取虚拟线程构建器
Thread.Builder builder = Thread.ofVirtual();

// 链式调用设置属性
Thread virtualThread = builder
    .name("custom-virtual-thread")
    .daemon(false)
    .unstarted(() -> {
        System.out.println("Custom virtual thread running");
    });

virtualThread.start();

2.3 使用 Executors.newVirtualThreadPerTaskExecutor()

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

// 创建虚拟线程执行器
try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor()) {
    // 提交任务,每个任务在一个新的虚拟线程中执行
    for (int i = 0; i < 10; i++) {
        final int taskId = i;
        executor.submit(() -> {
            System.out.println("Task " + taskId + " running on " + Thread.currentThread());
            return "Result " + taskId;
        });
    }
} // 自动关闭执行器

2.4 从现有线程工厂创建

// 创建虚拟线程工厂
ThreadFactory virtualThreadFactory = Thread.ofVirtual().factory();

// 使用工厂创建线程
Thread thread = virtualThreadFactory.newThread(() -> {
    System.out.println("Created via factory");
});
thread.start();

3. 虚拟线程的核心使用场景

3.1 高并发 I/O 操作

import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.net.URI;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class HighConcurrencyIO {
    private static final HttpClient httpClient = HttpClient.newBuilder()
            .connectTimeout(Duration.ofSeconds(10))
            .build();
    
    public static void main(String[] args) throws InterruptedException {
        // 模拟 10,000 个并发 HTTP 请求
        try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor()) {
            CompletableFuture<?>[] futures = new CompletableFuture[10000];
            
            for (int i = 0; i < 10000; i++) {
                final int requestId = i;
                futures[i] = CompletableFuture.runAsync(() -> {
                    try {
                        HttpRequest request = HttpRequest.newBuilder()
                                .uri(URI.create("https://httpbin.org/delay/1"))
                                .timeout(Duration.ofSeconds(5))
                                .build();
                        
                        HttpResponse<String> response = httpClient.send(request, 
                                HttpResponse.BodyHandlers.ofString());
                        
                        System.out.println("Request " + requestId + " completed with status: " 
                                + response.statusCode());
                    } catch (Exception e) {
                        System.err.println("Request " + requestId + " failed: " + e.getMessage());
                    }
                }, executor);
            }
            
            // 等待所有请求完成
            CompletableFuture.allOf(futures).join();
        }
        
        System.out.println("All requests completed!");
    }
}

3.2 数据库连接池优化

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class DatabaseVirtualThreads {
    private static final String DB_URL = "jdbc:postgresql://localhost:5432/test";
    private static final String DB_USER = "user";
    private static final String DB_PASSWORD = "password";
    
    public static void main(String[] args) throws Exception {
        // 传统方式需要大量数据库连接
        // 虚拟线程方式:少量连接 + 大量虚拟线程
        
        try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor()) {
            // 模拟 1000 个并发数据库查询
            CompletableFuture<?>[] futures = new CompletableFuture[1000];
            
            for (int i = 0; i < 1000; i++) {
                final int queryId = i;
                futures[i] = CompletableFuture.runAsync(() -> {
                    Connection conn = null;
                    try {
                        // 从连接池获取连接(这里简化为直接创建)
                        conn = DriverManager.getConnection(DB_URL, DB_USER, DB_PASSWORD);
                        PreparedStatement stmt = conn.prepareStatement(
                            "SELECT * FROM users WHERE id = ?");
                        stmt.setInt(1, queryId);
                        
                        ResultSet rs = stmt.executeQuery();
                        while (rs.next()) {
                            System.out.println("Query " + queryId + ": " + rs.getString("name"));
                        }
                        
                    } catch (Exception e) {
                        System.err.println("Query " + queryId + " failed: " + e.getMessage());
                    } finally {
                        if (conn != null) {
                            try {
                                conn.close(); // 返回到连接池
                            } catch (Exception e) {
                                // ignore
                            }
                        }
                    }
                }, executor);
            }
            
            CompletableFuture.allOf(futures).join();
        }
    }
}

3.3 Web 服务器处理

import com.sun.net.httpserver.HttpServer;
import com.sun.net.httpserver.HttpHandler;
import com.sun.net.httpserver.HttpExchange;
import java.io.IOException;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.util.concurrent.Executor;

public class VirtualThreadWebServer {
    public static void main(String[] args) throws IOException {
        // 创建 HTTP 服务器
        HttpServer server = HttpServer.create(new InetSocketAddress(8080), 0);
        
        // 设置虚拟线程执行器处理每个请求
        Executor virtualThreadExecutor = (runnable) -> {
            Thread.ofVirtual().start(runnable);
        };
        server.setExecutor(virtualThreadExecutor);
        
        // 添加处理器
        server.createContext("/hello", new HttpHandler() {
            @Override
            public void handle(HttpExchange exchange) throws IOException {
                // 模拟 I/O 操作(如数据库查询、外部 API 调用)
                try {
                    Thread.sleep(100); // 模拟延迟
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                
                String response = "Hello from virtual thread! Thread: " + 
                    Thread.currentThread();
                exchange.sendResponseHeaders(200, response.length());
                OutputStream os = exchange.getResponseBody();
                os.write(response.getBytes());
                os.close();
            }
        });
        
        server.start();
        System.out.println("Server started on http://localhost:8080/hello");
    }
}

4. 虚拟线程的高级特性

4.1 线程局部变量(ThreadLocal)

虚拟线程完全支持 ThreadLocal:

public class VirtualThreadLocalExample {
    private static final ThreadLocal<String> context = new ThreadLocal<>();
    
    public static void main(String[] args) throws InterruptedException {
        Runnable task = () -> {
            String threadName = Thread.currentThread().getName();
            context.set("Context for " + threadName);
            
            System.out.println("Thread: " + threadName + 
                ", Context: " + context.get());
            
            // 清理 ThreadLocal(最佳实践)
            context.remove();
        };
        
        // 在虚拟线程中使用 ThreadLocal
        Thread vt1 = Thread.ofVirtual().name("VT-1").unstarted(task);
        Thread vt2 = Thread.ofVirtual().name("VT-2").unstarted(task);
        
        vt1.start();
        vt2.start();
        
        vt1.join();
        vt2.join();
    }
}

4.2 线程中断

虚拟线程支持标准的中断机制:

public class VirtualThreadInterruptExample {
    public static void main(String[] args) throws InterruptedException {
        Thread virtualThread = Thread.ofVirtual().unstarted(() -> {
            try {
                System.out.println("Virtual thread started");
                Thread.sleep(5000); // 可中断的阻塞操作
                System.out.println("Virtual thread finished normally");
            } catch (InterruptedException e) {
                System.out.println("Virtual thread was interrupted");
                Thread.currentThread().interrupt(); // 保持中断状态
            }
        });
        
        virtualThread.start();
        
        // 1秒后中断虚拟线程
        Thread.sleep(1000);
        virtualThread.interrupt();
        
        virtualThread.join();
    }
}

4.3 线程栈跟踪

虚拟线程提供完整的栈跟踪信息:

public class VirtualThreadStackTrace {
    public static void main(String[] args) {
        Thread virtualThread = Thread.ofVirtual().unstarted(() -> {
            methodA();
        });
        
        virtualThread.start();
        
        try {
            virtualThread.join();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
    
    private static void methodA() {
        methodB();
    }
    
    private static void methodB() {
        // 打印当前线程的栈跟踪
        Thread.dumpStack();
        
        // 或者获取栈跟踪
        StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
        System.out.println("Stack trace size: " + stackTrace.length);
        for (StackTraceElement element : stackTrace) {
            System.out.println(element);
        }
    }
}

5. 虚拟线程的最佳实践

5.1 何时使用虚拟线程

适合使用虚拟线程的场景

不适合使用虚拟线程的场景

5.2 性能调优建议

// 1. 正确使用 try-with-resources 管理执行器
try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor()) {
    // 提交任务
    executor.submit(task);
} // 自动关闭,等待所有任务完成

// 2. 避免在虚拟线程中进行 CPU 密集型计算
Runnable cpuIntensiveTask = () -> {
    // 错误:在虚拟线程中进行大量计算
    // long result = heavyComputation();
    
    // 正确:将 CPU 密集型任务提交到平台线程池
    ExecutorService cpuPool = Executors.newFixedThreadPool(
        Runtime.getRuntime().availableProcessors());
    CompletableFuture.supplyAsync(() -> heavyComputation(), cpuPool)
        .thenAccept(result -> {
            // 处理结果(回到虚拟线程)
            System.out.println("Result: " + result);
        });
};

// 3. 合理管理资源
public class ResourceManagementExample {
    private static final ExecutorService VIRTUAL_EXECUTOR = 
        Executors.newVirtualThreadPerTaskExecutor();
    private static final ExecutorService CPU_EXECUTOR = 
        Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
    
    public static void shutdown() {
        VIRTUAL_EXECUTOR.close();
        CPU_EXECUTOR.shutdown();
    }
}

5.3 错误处理和监控

public class VirtualThreadErrorHandling {
    public static void main(String[] args) {
        Thread virtualThread = Thread.ofVirtual().unstarted(() -> {
            try {
                // 可能抛出异常的操作
                riskyOperation();
            } catch (Exception e) {
                // 处理异常
                System.err.println("Exception in virtual thread: " + e.getMessage());
            }
        });
        
        // 设置未捕获异常处理器
        virtualThread.setUncaughtExceptionHandler((thread, exception) -> {
            System.err.println("Uncaught exception in " + thread.getName() + 
                ": " + exception.getMessage());
        });
        
        virtualThread.start();
    }
    
    private static void riskyOperation() {
        throw new RuntimeException("Something went wrong!");
    }
}

6. 虚拟线程的内部机制

6.1 载体线程(Carrier Threads)

public class CarrierThreadExample {
    public static void main(String[] args) throws InterruptedException {
        Thread virtualThread = Thread.ofVirtual().unstarted(() -> {
            System.out.println("Virtual thread: " + Thread.currentThread());
            System.out.println("Carrier thread: " + Thread.currentThread().carrierThread());
            
            // 模拟 I/O 阻塞
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            
            System.out.println("After sleep - Virtual: " + Thread.currentThread());
            System.out.println("After sleep - Carrier: " + Thread.currentThread().carrierThread());
        });
        
        virtualThread.start();
        virtualThread.join();
    }
}

6.2 挂起和恢复机制

当虚拟线程遇到阻塞操作时:

  1. JVM 捕获阻塞操作
  2. 虚拟线程被挂起
  3. 载体线程被释放用于执行其他虚拟线程
  4. 阻塞操作完成后,虚拟线程被调度到某个载体线程上恢复执行

7. 与现有并发工具的集成

7.1 CompletableFuture

public class CompletableFutureWithVirtualThreads {
    public static void main(String[] args) throws Exception {
        try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor()) {
            CompletableFuture<String> future1 = CompletableFuture
                .supplyAsync(() -> fetchDataFromAPI1(), executor);
            
            CompletableFuture<String> future2 = CompletableFuture
                .supplyAsync(() -> fetchDataFromAPI2(), executor);
            
            CompletableFuture<Void> combined = CompletableFuture.allOf(future1, future2);
            combined.join();
            
            System.out.println("Data 1: " + future1.get());
            System.out.println("Data 2: " + future2.get());
        }
    }
    
    private static String fetchDataFromAPI1() {
        // 模拟 API 调用
        try { Thread.sleep(1000); } catch (InterruptedException e) {}
        return "API1 Result";
    }
    
    private static String fetchDataFromAPI2() {
        // 模拟 API 调用
        try { Thread.sleep(1500); } catch (InterruptedException e) {}
        return "API2 Result";
    }
}

7.2 Stream API

public class StreamWithVirtualThreads {
    public static void main(String[] args) {
        try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor()) {
            // 并行流通常使用 ForkJoinPool,但可以自定义
            var results = IntStream.range(0, 1000)
                .boxed()
                .parallel()
                .map(i -> processItem(i, executor))
                .collect(Collectors.toList());
            
            System.out.println("Processed " + results.size() + " items");
        }
    }
    
    private static String processItem(int item, ExecutorService executor) {
        // 在虚拟线程中处理每个项目
        CompletableFuture<String> future = CompletableFuture
            .supplyAsync(() -> {
                // 模拟 I/O 操作
                try { Thread.sleep(10); } catch (InterruptedException e) {}
                return "Processed " + item;
            }, executor);
        
        return future.join();
    }
}

8. 调试和监控虚拟线程

8.1 JVM 参数

# 启用虚拟线程相关的调试信息
java -Djdk.virtualThreadScheduler.parallelism=8 YourApp

# 监控虚拟线程统计信息
java -XX:+UnlockDiagnosticVMOptions -XX:+LogVMOutput YourApp

8.2 JMX 监控

import java.lang.management.ManagementFactory;
import com.sun.management.ThreadMXBean;

public class VirtualThreadMonitoring {
    public static void main(String[] args) {
        ThreadMXBean threadBean = (ThreadMXBean) ManagementFactory.getThreadMXBean();
        
        // 获取线程信息
        long[] threadIds = threadBean.getAllThreadIds();
        System.out.println("Total threads: " + threadIds.length);
        
        // 虚拟线程的线程 ID 通常是负数
        for (long id : threadIds) {
            if (id < 0) {
                System.out.println("Virtual thread ID: " + id);
            }
        }
    }
}

8.3 日志记录

public class VirtualThreadLogging {
    private static final Logger logger = LoggerFactory.getLogger(VirtualThreadLogging.class);
    
    public static void main(String[] args) {
        try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor()) {
            for (int i = 0; i < 10; i++) {
                final int taskId = i;
                executor.submit(() -> {
                    // 日志中包含线程信息
                    logger.info("Processing task {} on thread {}", 
                        taskId, Thread.currentThread().getName());
                    
                    try {
                        Thread.sleep(100);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                    
                    logger.info("Completed task {}", taskId);
                });
            }
        }
    }
}

9. 常见问题和解决方案

9.1 内存泄漏问题

// 问题:ThreadLocal 未清理
private static final ThreadLocal<ExpensiveResource> resource = new ThreadLocal<>();

public void badExample() {
    Thread.ofVirtual().start(() -> {
        resource.set(new ExpensiveResource()); // 未清理!
        // ... do work
    });
}

// 解决方案:确保清理 ThreadLocal
public void goodExample() {
    Thread.ofVirtual().start(() -> {
        try {
            resource.set(new ExpensiveResource());
            // ... do work
        } finally {
            resource.remove(); // 确保清理
        }
    });
}

9.2 死锁检测

虚拟线程的死锁检测与平台线程相同:

public class DeadlockExample {
    private static final Object lock1 = new Object();
    private static final Object lock2 = new Object();
    
    public static void main(String[] args) {
        Thread t1 = Thread.ofVirtual().unstarted(() -> {
            synchronized (lock1) {
                System.out.println("T1 got lock1");
                try { Thread.sleep(100); } catch (InterruptedException e) {}
                synchronized (lock2) {
                    System.out.println("T1 got lock2");
                }
            }
        });
        
        Thread t2 = Thread.ofVirtual().unstarted(() -> {
            synchronized (lock2) {
                System.out.println("T2 got lock2");
                try { Thread.sleep(100); } catch (InterruptedException e) {}
                synchronized (lock1) {
                    System.out.println("T2 got lock1");
                }
            }
        });
        
        t1.start();
        t2.start();
        
        // JVM 仍然可以检测到死锁
    }
}

10. 总结

10.1 虚拟线程的优势

10.2 使用建议

  1. I/O 密集型任务:优先使用虚拟线程
  2. CPU 密集型任务:继续使用平台线程池
  3. 资源管理:注意 ThreadLocal 清理和资源释放
  4. 监控调试:利用现有的 JVM 工具进行监控

虚拟线程是 Java 并发编程的重大突破,它让开发者能够以同步的方式编写高并发代码,同时获得异步编程的性能优势。掌握虚拟线程的使用方法,将极大地提升 Java 应用的并发处理能力。

到此这篇关于Java21之虚拟线程用法实践指南及常见问题的文章就介绍到这了,更多相关Java21虚拟线程内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文