java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > Java 虚拟线程

深入浅析Java 虚拟线程

作者:曾彪彪

Java21引入了虚拟线程,这是一种轻量级线程,适用于IO密集型的应用,可以极大提高应用的性能和吞吐量,虚拟线程是由传统线程执行的,由JVM控制上下文切换,创建和销毁的开销小,适用于高并发场景,本文介绍Java 虚拟线程的相关知识,感兴趣的朋友跟随小编一起看看吧

在Java 21中,引入了虚拟线程,这是一个非常非常重要的特性,之前一直苦苦寻找的Java协程,终于问世了。在高并发以及IO密集型的应用中,虚拟线程能极大的提高应用的性能和吞吐量。

什么是虚拟线程

先来看一下虚拟线程的概念。

虚拟线程概念

DK 21 引入了虚拟线程的支持,这是为了改善 Java 应用程序在高并发场景下的性能。虚拟线程是一种轻量级线程,具有较小的内存占用,能够更高效地进行上下文切换,适用于 I/O 密集型的应用程序

虚拟线程的工作原理

当应用程序启动一个虚拟线程时,JVM会将这个虚拟线程交给JVM底层的线程池去执行,这个底层的线程池是一个传统线程池,并且真正执行虚拟线程中任务的线程,也是传统线程(操作系统线程)。当虚拟线程遇到阻塞时,JVM会立刻将虚拟线程挂起,让其它虚拟线程执行。也就是说,开启一个虚拟线程,并不需要启用一个传统线程,一般一个传统线程,可以执行多个虚拟线程的任务。在执行过程中,可以把虚拟线程理解成任务task。

这里举一个列子,假设用户创建了1000个虚拟线程,JVM的执行虚拟线程的线程池线程数是10,那么当第一个虚拟线程V1需要执行时,JVM会将V1调度到传统线程T1上,以此类推,虚拟线程V2会被调度到传统线程T2上,那么V3->T3,V4->T4,… V10->T10。当执行到V11时,这里有三种情况:

对于被阻塞的线程,如V3,当IO结束后,操作系统会通过事件,如epoll通知JVM,V3的IO操作已结束,此时JVM重新唤醒V3,选择可用的传统线程,来执行V3的任务。

这里需要注意两点:

虚拟线程的调度

一般来说,程序员不需要对虚拟线程的调度进行管理,在JDK 21中,JVM默认启用了虚拟线程,并且会使用默认的ForkJoinPool线程池来执行虚拟线程,并且线程池的大小,也会根据虚拟线程的数量,进行动态调整。如果需要手动管理执行虚拟线程的线程池大小,那么需要自定义线程池,并将虚拟线程交给自定义的线程池来执行,这样虽然可行,通常没有必要。

虚拟线程与传统线程区别

虚拟线程与传统线程的区别主要在于:

虚拟线程类似于task,传统系统与操作系统线程对应,一个传统线程可以执行多个虚拟线程。虚拟线程与task的区别是,当传统线程执行虚拟线程时,遇到阻塞会挂起虚拟线程,当传统线程执行task时,遇到阻塞就真的阻塞了。当然传统中的task继承自runnable,虚拟线程继承自Thread,他们属于不同的类,可调用的方法也不一样。

JDK也提供了虚拟线程池,可以通过下面方式得到一个虚拟线程池。

import java.util.concurrent.*;
public class VirtualThreadPoolExample {
    public static void main(String[] args) {
        // 创建一个虚拟线程池
        ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();
        // 提交多个任务到线程池
        for (int i = 0; i < 10; i++) {
            final int taskId = i;
            executor.submit(() -> {
                System.out.println("Task " + taskId + " running in " + Thread.currentThread());
            });
        }
        // 关闭线程池
        executor.shutdown();
    }
}

上面代码中,提交给线程池的任务,JVM都会为其创建一个虚拟线程,然后以虚拟线程的方式执行。

与传统的线程池相比,虚拟线程池无法设置核心线程数、最大线程数、线程池大小、任务队列等参数,也不需要设置这些参数。

虚拟线程与传统线程的相同之处:

虚拟线程与协程的区别

协程是python中的异步编程技术,对于IO密集型应用,协程可以发挥很大的优势。协程的异步工作原理与虚拟线程相似,也是遇到IO就阻塞,让主线程继续执行其它任务,当IO完成时,操作系统通过事件机制,如epoll,通知python进程,产生一个事件,放到event loop队列中,最后由主线程执行。

虚拟线程与协程的主要区别在于:

区别虚拟线程协程
并发/并行虚拟线程是并行的,多个虚拟线程可以同时在多个CPU上运行,同一时刻,可以运行多个虚拟线程。从这个角度将,虚拟线程能支持更高的并发。协程不是并行的,因为只有一个主线程执行任务事件,同一时刻,只有一个任务被处理。
资源争夺虚拟线程中,存在资源争夺问题,以及状态同步问题,在编写代码时,需要考虑并发控制。甚至需要做合理的并发设计。因为只有一个主线程在执行任务事件,没有并发问题,编程时也不需要考虑并发问题。
框架支持虚拟线程是JDK 21的新特性,不需要任何框架支持。需要框架支持,写异步代码和同步代码,使用的是两个完全不同的框架,另外学习异步编程,增加了学习成本。并且异步编程有些难度,debug也变得复杂些。

怎样使用虚拟线程

在JDK 21中,使用虚拟线程有两种方式:

public class VirtualThreadExample {
    public static void main(String[] args) {
        Thread virtualThread = Thread.ofVirtual().start(() -> {
            System.out.println("Hello virtual thread ");
        });
        try {
            virtualThread.join();  // 等待虚拟线程完成
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
import java.util.concurrent.*;
public class VirtualThreadPoolExample {
    public static void main(String[] args) {
        // 创建一个虚拟线程池
        ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();
        // 提交多个任务到线程池
        for (int i = 0; i < 10; i++) {
            final int taskId = i;
            executor.submit(() -> {
                System.out.println("Task " + taskId + " running in " + Thread.currentThread());
            });
        }
        // 关闭线程池
        executor.shutdown();
    }
}

通过线程池执行任务时,无法对并发实现控制,容易造成OOM,或耗尽服务方资源,可以自定义以下虚拟线程池,实现资源控制:

package com.zengbiaobiao.demo.vitrualthreaddemo;
import org.springframework.lang.NonNull;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
/*****
 * 虚拟线程池,支持配置任务队列数和最大并发任务数
 */
public class VirtualThreadExecutorService extends AbstractExecutorService {
    private volatile boolean shouldStop = false;
    private final ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();
    private final Semaphore semaphore;
    private final BlockingQueue<Runnable> taskQueue;
    /******
     * 构造函数
     * @param taskQueueSize,任务队列大小,任务队列是一个阻塞队列,如果任务队列满了,那么调用execute方法会阻塞
     * @param concurrencySize,并发任务大小,同时执行的IO任务个数,防止并发过重,或者资源不够
     */
    public VirtualThreadExecutorService(int taskQueueSize, int concurrencySize) {
        this.semaphore = new Semaphore(concurrencySize);
        taskQueue = new LinkedBlockingQueue<>(taskQueueSize);
        this.loopEvent();
    }
    private void loopEvent() {
        Thread.ofVirtual().name("VirtualThreadExecutor").start(() -> {
            while (!shouldStop) {
                try {
                    Runnable task = taskQueue.take();
                    semaphore.acquire();
                    executor.execute(() -> {
                        try {
                            try {
                                task.run();
                            } finally {
                                semaphore.release();
                            }
                        } catch (Exception e) {
                            Thread.currentThread().interrupt();
                            throw new RuntimeException(e);
                        }
                    });
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    if (shouldStop) break;
                }
            }
        });
    }
    @Override
    public void shutdown() {
        shouldStop = true;
        executor.shutdown();
    }
    /**
     * @return The task not executed
     */
    @Override
    public List<Runnable> shutdownNow() {
        shouldStop = true;
        List<Runnable> remainingTasks = new ArrayList<>(taskQueue);
        taskQueue.clear();
        executor.shutdownNow();
        return remainingTasks;
    }
    @Override
    public boolean isShutdown() {
        return shouldStop;
    }
    @Override
    public boolean isTerminated() {
        return shouldStop && executor.isTerminated();
    }
    @Override
    public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
        return executor.awaitTermination(timeout, unit);
    }
    @Override
    public void execute(Runnable command) {
        try {
            taskQueue.put(command); // 阻塞直到队列有空间
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RejectedExecutionException("Task submission interrupted.", e);
        }
    }
}

测试代码如下:

package com.zengbiaobiao.demo.vitrualthreaddemo;
import org.apache.tomcat.util.threads.VirtualThreadExecutor;
public class VirtualThreadExecutorServiceDemo {
    public static void main(String[] args) throws InterruptedException {
        VirtualThreadExecutorService executorService = new VirtualThreadExecutorService(10, 2);
        for (int i = 0; i < 100000; i++) {
            final String threadName = "thread-" + i;
            System.out.println(Thread.currentThread() + ": try to create task " + threadName);
            executorService.submit(() -> {
                System.out.println(Thread.currentThread() + ": " + threadName + " created!");
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                System.out.println(Thread.currentThread() + ": " + threadName + " finished!");
            });
        }
        Thread.sleep(5000000);
    }
}

哪些场景下可以应用虚拟线程

虚拟线程在IO密集型的高并发应用中能发挥出巨大的威力,在所有IO密集型应用中,具体来说,下列场景中,使用虚拟线程是比较合适的:

那么哪些场景下不合适使用虚拟线程呢?

虚拟线程实际应用场景举例

在一个spring boot项目中,有时候因为异步事件处理不过来,造成吞吐量下降,在JDK 21中,可以将事件改成虚拟线程来执行,代码如下:

package com.zengbiaobiao.demo.vitrualthreaddemo;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
@Configuration
@EnableAsync
public class AsyncConfig {
    @Bean(name = "taskExecutor")
    public Executor taskExecutor() {
        // 最大并行任务数
        Semaphore semaphore = new Semaphore(100);
        ExecutorService virtualThreadPool = Executors.newVirtualThreadPerTaskExecutor();
        return runnable -> {
            try {
                // 控制并行任务数
                semaphore.acquire();
                virtualThreadPool.submit(() -> {
                    try {
                        runnable.run();
                    } finally {
                        semaphore.release();
                    }
                });
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException("Task submission interrupted", e);
            }
        };
    }
}

事件发送和处理代码如下:

package com.zengbiaobiao.demo.vitrualthreaddemo;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/home")
public class HomeController {
    private final ApplicationEventPublisher eventPublisher;
    public HomeController(ApplicationEventPublisher eventPublisher) {
        this.eventPublisher = eventPublisher;
    }
    @GetMapping("/index")
    public String index() {
        for (int i = 0; i < 1000; i++) {
            eventPublisher.publishEvent("event " + i);
        }
        return "success";
    }
    @EventListener
    @Async
    public void handleEvent(String event) {
        System.out.println(Thread.currentThread() + ": " + event);
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
}

输出结果如下:

VirtualThread[#2031]/runnable@ForkJoinPool-1-worker-4: event 976
VirtualThread[#2039]/runnable@ForkJoinPool-1-worker-1: event 980
VirtualThread[#1064]/runnable@ForkJoinPool-1-worker-1: event 983
VirtualThread[#2047]/runnable@ForkJoinPool-1-worker-2: event 984
VirtualThread[#2049]/runnable@ForkJoinPool-1-worker-9: event 985
VirtualThread[#2057]/runnable@ForkJoinPool-1-worker-2: event 989
VirtualThread[#2059]/runnable@ForkJoinPool-1-worker-3: event 990
VirtualThread[#2061]/runnable@ForkJoinPool-1-worker-6: event 991
VirtualThread[#2063]/runnable@ForkJoinPool-1-worker-10: event 992
VirtualThread[#2065]/runnable@ForkJoinPool-1-worker-10: event 993
VirtualThread[#2071]/runnable@ForkJoinPool-1-worker-3: event 996
VirtualThread[#2069]/runnable@ForkJoinPool-1-worker-2: event 995
VirtualThread[#2075]/runnable@ForkJoinPool-1-worker-7: event 998
VirtualThread[#2077]/runnable@ForkJoinPool-1-worker-10: event 999

上面输出结果中,每次并发执行100个任务,当虚拟线程池任务达到100之后,执行eventPublisher.publishEvent("event " + i)代码时,代码阻塞,过100ms之后,100个任务执行完成,下一批任务被执行。

虚拟线程使用注意事项

经过验证,虚拟线程在遇到IO时,确实会让步,并且不消耗太多资源,核心特点是,让异步编程变得简单,并且不需要框架支持。但是容易因大的并发,造成OOM,或者对目标系统造成冲击,追求高并发可用,但一定要做测试和验证。对于需要做状态同步,如需要加锁,或需要使用synchronize关键字的代码,需要优化设计,如果无法规避,那么,使用虚拟线程,和使用线程池,效果差不多。

虚拟线程存在的问题:

Java Virtual Threads — some early gotchas to look out for

Two Pitfalls by moving to Java Virtual Threads

Java 21 Virtual Threads - Dude, Where’s My Lock?

Pitfalls to avoid when switching to Virtual threads

Do Java 21 virtual threads address the main reason to switch to reactive single-thread frameworks?

Pinning: A pitfall to avoid when using virtual threads in Java

Taming the Virtual Threads: Embracing Concurrency With Pitfall Avoidance

Pitfalls you encounter with virtual threads

示例代码在Gitee上同步

到此这篇关于Java 虚拟线程 探索的文章就介绍到这了,更多相关Java 虚拟线程内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文