java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > JDK21对虚拟线程

JDK21对虚拟线程的几种用法实践指南

作者:乄bluefox

虚拟线程是Java中的一种轻量级线程,由JVM管理,特别适合于I/O密集型任务,这篇文章主要介绍了JDK21对虚拟线程的几种用法,文中通过代码介绍的非常详细,需要的朋友可以参考下

一、参考官方文档

Virtual Threads

Virtual threads are lightweight threads that reduce the effort of writing, maintaining, and debugging high-throughput concurrent applications.

二、什么是虚拟线程

目标:用少量平台线程(Platform Threads)支撑海量并发任务(如 100 万请求),提升吞吐量。

类型说明
平台线程(PlatformThread)JVM 映射到操作系统线程(OS Thread),创建成本高(默认几百 KB 栈空间),数量受限(通常几千)
虚拟线程(VirtualThread)JVM 内部轻量线程,由 Thread.ofVirtual() 创建,不直接绑定 OS 线程,可创建百万级
平台线程 = 真实员工(数量少、成本高)
虚拟线程 = 临时工(数量多、任务来了再分配真实员工干活)

适用场景:

场景说明
🟢 高并发 I/O 任务如 HTTP 请求、数据库查询、Redis 调用(阻塞多、CPU 少)
🟢 Web 服务器处理请求Tomcat、Netty、Spring WebFlux 等每请求一线程模型
🟢 批量处理任务如 10 万条数据同步、日志处理
🟢 调用多个外部服务并行调用订单、用户、库存服务并聚合结果

不适用场景:

场景说明
🔴 CPU 密集型任务如图像处理、加密解密、大数据计算 → 用平台线程池(ForkJoinPool
🔴 长时间运行的无限循环虚拟线程调度器可能“饿死”其他任务
🔴 JNI / Native 代码虚拟线程会被挂起,直到 native 方法返回(阻塞平台线程)
🔴 持有 synchronized 块太久会阻塞平台线程,降低并发能力

三、几种用法

你的需求推荐用法
学习虚拟线程Thread.ofVirtual().start()
高并发 Web 请求、批量 I/OnewVirtualThreadPerTaskExecutor()
CPU 密集型任务(如计算)❌ 不要用虚拟线程,用 ForkJoinPool

1、Thread.ofVirtual().start()

—— 最基础的创建方式

Thread vt = Thread.ofVirtual()
    .name("worker")
    .start(() -> {
        System.out.println("Hello from " + Thread.currentThread());
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {}
        System.out.println("Done");
    });

vt.join(); // 等待完成

适用场景:

不适用场景:

建议:

不推荐用于生产环境。它没有资源池管理,无法限制并发,容易造成内存溢出。

2、Executors.newVirtualThreadPerTaskExecutor()

—— 最推荐的生产级用法

try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
    for (int i = 0; i < 10_000; i++) {
        executor.submit(() -> {
            Thread.sleep(Duration.ofSeconds(1));
            System.out.println("Task " + Thread.currentThread());
            return "result";
        });
    }
} // 自动等待所有任务完成

参考官方:

try (ExecutorService myExecutor = Executors.newVirtualThreadPerTaskExecutor()) {
    Future<?> future = myExecutor.submit(() -> System.out.println("Running thread"));
    future.get();
    System.out.println("Task completed");
    // ...

适用场景 :

场景说明
🟢 Web 请求处理每个 HTTP 请求启动一个虚拟线程(Spring Boot 6+ 默认)
🟢 批量 I/O 操作如读取 10 万个文件、调用外部 API
🟢 消息队列消费每条消息一个虚拟线程处理
🟢 Redis / DB 批量查询配合 multiGetpipeline 使用

优势 :

用官方文档的话来说总结几点:

建议:

在生产环境中最应该使用的虚拟线程方式!尤其适合:高并发 + I/O 密集 + 短任务

3、信号量进行限制并发

其实有同学会想到,虚拟线程能不能像平台线程那样子进行池化,关于这个的官方解释,在官方文档中提到了很多次,多次提及两者并不应该是一个概念,你可以将平台线程池视为从队列中提取任务并处理它们的“工人”,将虚拟线程视为任务本身。

如果想要限制并发,和平台线程类似的线程池那样,可以使用信号量进行限制:

Semaphore sem = new Semaphore(10);
...
Result foo() {
    sem.acquire();
    try {
        return callLimitedService();
    } finally {
        sem.release();
    }
}

可能同学会认为用信号量去限制的话和线程池还是很大差距,下面官方文档也给了提示:

仅仅用信号量阻塞一些虚拟线程,可能看起来与向固定线程池提交任务有着本质的不同,但实际上并非如此。向线程池提交任务会将其排队等待稍后执行,而信号量(或任何其他用于此目的的阻塞同步构造)在内部会创建一个线程队列,这些线程因信号量而被阻塞,该队列反映了等待池线程执行的任务队列。由于虚拟线程就是任务,因此最终结构是等价的。

4、官网更多其他用法

Thread.Builder builder = Thread.ofVirtual().name("MyThread");
Runnable task = () -> {
    System.out.println("Running thread");
};
Thread t = builder.start(task);
System.out.println("Thread t name: " + t.getName());
t.join();

The following example creates and starts two virtual threads with:

Thread.Builder builder = Thread.ofVirtual().name("worker-", 0);
Runnable task = () -> {
    System.out.println("Thread ID: " + Thread.currentThread().threadId());
};

// name "worker-0"
Thread t1 = builder.start(task);   
t1.join();
System.out.println(t1.getName() + " terminated");

// name "worker-1"
Thread t2 = builder.start(task);   
t2.join();  
System.out.println(t2.getName() + " terminated");

输出:

Thread ID: 21
worker-0 terminated
Thread ID: 24
worker-1 terminated

希望同时向不同的服务发起多个出站调用:

void handle(Request request, Response response) {
    var url1 = ...
    var url2 = ...
 
    try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
        var future1 = executor.submit(() -> fetchURL(url1));
        var future2 = executor.submit(() -> fetchURL(url2));
        response.send(future1.get() + future2.get());
    } catch (ExecutionException | InterruptedException e) {
        response.fail(e);
    }
}
 
String fetchURL(URL url) throws IOException {
    try (var in = url.openStream()) {
        return new String(in.readAllBytes(), StandardCharsets.UTF_8);
    }
}

四、监控对内存的使用情况

模拟10w虚拟线程

import java.lang.management.ManagementFactory;
import java.lang.management.MemoryMXBean;
import java.lang.management.MemoryUsage;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class VirtualThreadTest {

    // 方法一:查看当前 JVM 内存使用情况
    public static void printMemoryUsage(String phase) {
        MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean();
        MemoryUsage heapMemory = memoryMXBean.getHeapMemoryUsage();
        System.out.printf("[%s] Heap Memory Used: %d MB / %d MB%n",
                phase,
                heapMemory.getUsed() / (1024 * 1024),
                heapMemory.getMax() / (1024 * 1024));
    }

    // 方法二:创建虚拟线程执行任务
    public static void runVirtualThreads(int taskCount) throws InterruptedException {
        ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();
        for (int i = 0; i < taskCount; i++) {
            int finalI = i;
            executor.submit(() -> {
                System.out.printf("虚拟线程[%s] 执行任务 %d%n",
                        Thread.currentThread(),
                        finalI);
                try {
                    Thread.sleep(1000); // 模拟 IO 阻塞
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            });
        }
        executor.shutdown();
        while (!executor.isTerminated()) {
            Thread.sleep(100);
        }
    }

    // 方法三:循环查看内存(模拟 GC 回收效果)
    public static void monitorMemory(int seconds) throws InterruptedException {
        for (int i = 0; i < seconds; i++) {
            printMemoryUsage("监控中");
            System.gc(); // 建议 GC,可能不会马上执行
            Thread.sleep(1000);
        }
    }

    // 主方法测试
    public static void main(String[] args) throws Exception {
        // 1. 初始内存
        printMemoryUsage("初始");

        // 2. 创建 1 万个虚拟线程任务
        runVirtualThreads(100_000);

        // 3. 任务执行后内存
        printMemoryUsage("任务执行后");

        // 4. 持续观察 GC 是否回收虚拟线程
        monitorMemory(10);
    }
}

可以看到快速创建10万的虚拟线程也能很快的将内存进行回收,可能大家会想,不是说虚拟线程是创建一个线程就回收内存了吗,为啥你这里统一进行释放呢,可以看到我这里进行阻塞,模拟创建10万线程大概会占用多少内存

五、兼容ThreadLocal

在我们现阶段大多数用户信息都是通过ThreadLocal进行传递过来,每个线程绑定一个用户

1、传统平台线程 + ThreadLocal 的关系

下面我们回顾一下传统的ThreadLocal和线程的关系:

关系图如下:

平台线程 T1  → 有自己的 ThreadLocalMap → 存储 BaseContext.value
平台线程 T2  → 有自己的 ThreadLocalMap → 存储 BaseContext.value
平台线程 T3  → 有自己的 ThreadLocalMap → 存储 BaseContext.value

2、虚拟线程(Virtual Thread) + ThreadLocal 的关系

关系图如下:

载体线程 C1(平台线程) 
   ├─ 虚拟线程 VT1 → 有自己的 ThreadLocalMap → 存储 BaseContext.value
   ├─ 虚拟线程 VT2 → 有自己的 ThreadLocalMap → 存储 BaseContext.value
   └─ 虚拟线程 VT3 → 有自己的 ThreadLocalMap → 存储 BaseContext.value

载体线程 C2
   ├─ 虚拟线程 VT10001 → 有自己的 ThreadLocalMap → 存储 BaseContext.value
   └─ 虚拟线程 VT10002 → 有自己的 ThreadLocalMap → 存储 BaseContext.value

3、两者的变化

传统的线程的话会进行线程池排队,不会频繁创建线程,从而不会导致线程爆炸。要是某个虚拟线程执行什么任务。

4、怎么兼容线程模式下ThreadLocal

// 获取当前用户信息(来自平台线程的 ThreadLocal)
String currentUser = BaseContext.getCurrentId(); // 假设这是从 ThreadLocal 拿的

// 在虚拟线程中使用时,显式传入
Thread.startVirtualThread(() -> {
    // 显式使用传入的上下文
    processUserOrder(currentUser, orderId);
});

六、一些虚拟线程相关的检查代码

1、检查自己的spingboot版本

import org.springframework.core.SpringVersion;

public class SpringVersionCheck {
    public static void main(String[] args) {
        System.out.println("Spring Framework 版本: " + SpringVersion.getVersion());
    }
}

2、检查自己是否默认开启全局的虚拟线程

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.scheduling.annotation.EnableAsync;

@SpringBootApplication
@EnableAsync
public class App {
    public static void main(String[] args) {
        ConfigurableApplicationContext ctx = SpringApplication.run(App.class, args);

        // 测试异步方法
        AsyncService service = ctx.getBean(AsyncService.class);
        service.asyncTask();
    }
}
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;

@Service

class AsyncService {

    @Async

    public void asyncTask() {

        Thread thread = Thread.currentThread();

        System.out.println("线程类型: " + (thread.isVirtual() ? "虚拟线程" : "平台线程"));

        System.out.println("线程名称: " + thread.getName());

    }

}

3、监控虚拟线程存活数量

import java.lang.management.ManagementFactory;
import java.lang.management.ThreadMXBean;

public class VirtualThreadMonitor {
    public static void main(String[] args) {
        ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();

        // 获取当前存活的线程总数(包括平台线程和虚拟线程)
        long totalThreadCount = threadBean.getThreadCount();

        // 初始化虚拟线程计数器
        long virtualThreadCount = 0;

        // 遍历当前所有线程,检查哪些是虚拟线程
        for (Thread thread : Thread.getAllStackTraces().keySet()) {
            if (thread.isVirtual()) {  // 如果线程是虚拟线程
                virtualThreadCount++;
            }
        }

        // 输出结果
        System.out.println("当前存活 虚拟线程数: " + virtualThreadCount);
        System.out.println("当前存活 总线程数: " + totalThreadCount);
    }
}

七、实战

新增conf配置类,让spring帮忙管理他的生命周期

import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync;

import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

@Slf4j
@Configuration
@EnableAsync  // 启用 @Async 支持
public class VirtualThreadConfig implements AsyncConfigurer {

    private ExecutorService createVirtualThreadExecutor(String prefix) {
        return Executors.newThreadPerTaskExecutor(
                Thread.ofVirtual()
                        .name(prefix, 0)
                        .uncaughtExceptionHandler((thread, ex) ->
                                log.error("虚拟线程 [{}] 执行异常", thread.getName(), ex))
                        .factory()
        );
    }

    /**
     * 用于单个用户收藏数据的异步回写(实时场景)
     */
    @Bean("favoriteRedisWriterVirtualThreadExecutor")
    public Executor favoriteUserWriteVirtualThreadExecutor() {
        log.info("初始化虚拟线程池:favoriteUserWriteVirtualThreadExecutor");
        return createVirtualThreadExecutor("vt-fav-user-write-");
    }

    /**
     * 用于批量同步收藏数据到 Redis(定时任务场景)
     */
    @Bean("favoriteBatchWriteVirtualThreadExecutor")
    public Executor favoriteBatchWriteVirtualThreadExecutor() {
        log.info("初始化虚拟线程池:favoriteBatchWriteVirtualThreadExecutor");
        return createVirtualThreadExecutor("vt-fav-batch-write-");
    }

    @Override
    public Executor getAsyncExecutor() {
        return createVirtualThreadExecutor("vt-default-async-");
    }
}

在定时任务中,我们需要对收藏数据进行全量重写到 Redis,以保证缓存一致性。由于数据量较大(3200+ 条记录),采用同步方式逐条写入会导致任务执行时间过长,影响系统响应性。为了提升任务执行效率,我们引入 虚拟线程(Virtual Threads),将每条 Redis 写入操作提交到独立的虚拟线程中并发执行。这样可以在不增加平台线程负担的前提下,显著提高 I/O 密集型操作的吞吐量,缩短全量刷新时间。

至此就简练的实现异步虚拟线程的改造方案

总结

到此这篇关于JDK21对虚拟线程的几种用法实践指南的文章就介绍到这了,更多相关JDK21对虚拟线程内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文