java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > Java虚拟线程(VirtualThread)

Java虚拟线程(VirtualThread)使用详解

作者:lifallen

这篇文章主要介绍了Java虚拟线程(VirtualThread)使用,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教

VirtualThread 是 Java 实现轻量级并发(也称为纤程或协程)的关键。与传统的平台线程(直接映射到操作系统线程)不同,虚拟线程由 JVM 管理和调度,可以在少量平台线程上运行大量的虚拟线程,从而提高应用程序的吞吐量。

流程概要

核心组件:

scheduler (Executor): 每个虚拟线程都有一个调度器,它是一个 Executor 实例。

这个调度器负责安排虚拟线程在载体线程(通常是平台线程)上执行。

如果没有显式指定,它会使用默认的 ForkJoinPool (DEFAULT_SCHEDULER) 或者继承父虚拟线程的调度器。

// scheduler and continuation
private final Executor scheduler;
private final Continuation cont;
private final Runnable runContinuation;

在构造函数中,如果 schedulernull,会进行选择:

// ... existing code ...
VirtualThread(Executor scheduler, String name, int characteristics, Runnable task) {
    super(name, characteristics, /*bound*/ false);
    Objects.requireNonNull(task);

    // choose scheduler if not specified
    if (scheduler == null) {
        Thread parent = Thread.currentThread();
        if (parent instanceof VirtualThread vparent) {
            scheduler = vparent.scheduler;
        } else {
            scheduler = DEFAULT_SCHEDULER;
        }
    }

    this.scheduler = scheduler;
    this.cont = new VThreadContinuation(this, task);
    this.runContinuation = this::runContinuation;
}
// ... existing code ...

cont (Continuation): 这是虚拟线程的核心。Continuation 是 JDK 内部的一个机制,允许代码执行被挂起(yield)和恢复。每个虚拟线程都包装了一个 Continuation 实例。

VThreadContinuation 是一个内部类,它继承自 Continuation,并包装了用户提供的 Runnable 任务。

// ... existing code ...
/**
 * The continuation that a virtual thread executes.
 */
private static class VThreadContinuation extends Continuation {
    VThreadContinuation(VirtualThread vthread, Runnable task) {
        super(VTHREAD_SCOPE, wrap(vthread, task));
    }
    @Override
    protected void onPinned(Continuation.Pinned reason) {
    }
    private static Runnable wrap(VirtualThread vthread, Runnable task) {
        return new Runnable() {
            @Hidden
            @JvmtiHideEvents
            public void run() {
                vthread.notifyJvmtiStart(); // notify JVMTI
                try {
                    vthread.run(task);
                } finally {
                    vthread.notifyJvmtiEnd(); // notify JVMTI
                }
            }
        };
    }
}
// ... existing code ...

runContinuation (Runnable): 这是一个 Runnable,其 run() 方法(即 this::runContinuation 指向的 VirtualThread#runContinuation() 方法)负责实际执行或继续执行虚拟线程的任务。它处理虚拟线程的挂载(mount)到载体线程、运行 Continuation、以及卸载(unmount)。

状态管理: VirtualThread 内部维护了一系列状态常量(如 NEW, STARTED, RUNNING, PARKED, TERMINATED 等)和一个 volatile int state 字段来跟踪其生命周期。状态之间的转换是精心设计的,以处理各种场景,如启动、运行、暂停、阻塞和终止。

// ... existing code ...
/*
 * Virtual thread state transitions:
 *
 *      NEW -> STARTED         // Thread.start, schedule to run
 *  STARTED -> TERMINATED      // failed to start
 *  STARTED -> RUNNING         // first run
 *  RUNNING -> TERMINATED      // done
 *
 *  RUNNING -> PARKING         // Thread parking with LockSupport.park
// ... many more states ...
 *  YIELDED -> RUNNING         // continue execution after Thread.yield
 */
private static final int NEW      = 0;
private static final int STARTED  = 1;
private static final int RUNNING  = 2;     // runnable-mounted

// untimed and timed parking
private static final int PARKING       = 3;
private static final int PARKED        = 4;     // unmounted
// ... other state constants ...
private static final int TERMINATED = 99;  // final state
// ... existing code ...

carrierThread (Thread): 表示当前承载该虚拟线程执行的平台线程。当虚拟线程被挂起(unmounted)时,它不占用平台线程。当它需要运行时,调度器会将其调度到某个可用的载体线程上执行。

启动流程 (start() 方法):

// ... existing code ...
@Override
void start(ThreadContainer container) {
    if (!compareAndSetState(NEW, STARTED)) {
        throw new IllegalThreadStateException("Already started");
    }

    // bind thread to container
    assert threadContainer() == null;
    setThreadContainer(container);

    // start thread
    boolean addedToContainer = false;
    boolean started = false;
    try {
        container.add(this);  // may throw
        addedToContainer = true;

        // scoped values may be inherited
        inheritScopedValueBindings(container);

        // submit task to run thread, using externalSubmit if possible
        externalSubmitRunContinuationOrThrow();
        started = true;
    } finally {
        if (!started) {
            afterDone(addedToContainer);
        }
    }
}

@Override
public void start() {
    start(ThreadContainers.root());
}
// ... existing code ...

执行与挂起 (runContinuation()yieldContinuation()):

// ... existing code ...
@ChangesCurrentThread // allow mount/unmount to be inlined
private void runContinuation() {
    // the carrier must be a platform thread
    if (Thread.currentThread().isVirtual()) {
        throw new WrongThreadException();
    }

    // set state to RUNNING
    int initialState = state();
    if (initialState == STARTED || initialState == UNPARKED
            || initialState == UNBLOCKED || initialState == YIELDED) {
        // newly started or continue after parking/blocking/Thread.yield
        if (!compareAndSetState(initialState, RUNNING)) {
            return;
        }
        // consume permit when continuing after parking or blocking. If continue
        // after a timed-park or timed-wait then the timeout task is cancelled.
        if (initialState == UNPARKED) {
            cancelTimeoutTask();
            setParkPermit(false);
        } else if (initialState == UNBLOCKED) {
            cancelTimeoutTask();
            blockPermit = false;
        }
    } else {
        // not runnable
        return;
    }

    mount();
    try {
        cont.run();
    } finally {
        unmount();
        if (cont.isDone()) {
            afterDone();
        } else {
            afterYield();
        }
    }
}
// ... existing code ...
// ... existing code ...
@Hidden
private boolean yieldContinuation() {
    notifyJvmtiUnmount(/*hide*/true);
    try {
        return Continuation.yield(VTHREAD_SCOPE);
    } finally {
        notifyJvmtiMount(/*hide*/false);
    }
}
// ... existing code ...

runContinuation():

Parking (park()parkNanos()):

// ... existing code ...
@Override
void park() {
    assert Thread.currentThread() == this;

    // complete immediately if parking permit available or interrupted
    if (getAndSetParkPermit(false) || interrupted)
        return;

    // park the thread
    boolean yielded = false;
    setState(PARKING);
    try {
        yielded = yieldContinuation();
    } catch (OutOfMemoryError e) {
        // park on carrier
    } finally {
        assert (Thread.currentThread() == this) && (yielded == (state() == RUNNING));
        if (!yielded) {
            assert state() == PARKING;
            setState(RUNNING);
        }
    }

    // park on the carrier thread when pinned
    if (!yielded) {
        parkOnCarrierThread(false, 0);
    }
}
// ... existing code ...

调度 (submitRunContinuation 系列方法):

// ... existing code ...
private void submitRunContinuation(Executor scheduler, boolean retryOnOOME) {
    boolean done = false;
    while (!done) {
        try {
            // Pin the continuation to prevent the virtual thread from unmounting
            // when submitting a task. For the default scheduler this ensures that
            // the carrier doesn't change when pushing a task. For other schedulers
            // it avoids deadlock that could arise due to carriers and virtual
            // threads contending for a lock.
            if (currentThread().isVirtual()) {
                Continuation.pin();
                try {
                    scheduler.execute(runContinuation);
                } finally {
                    Continuation.unpin();
                }
            } else {
                scheduler.execute(runContinuation);
            }
            done = true;
        } catch (RejectedExecutionException ree) {
            submitFailed(ree);
            throw ree;
        } catch (OutOfMemoryError e) {
            if (retryOnOOME) {
                U.park(false, 100_000_000); // 100ms
            } else {
                throw e;
            }
        }
    }
}
// ... existing code ...

总而言之,VirtualThread 的实现巧妙地结合了 Continuation 机制和 Executor 调度框架。Continuation 提供了挂起和恢复执行流的能力,使得虚拟线程在阻塞操作时可以释放底层的平台线程。调度器则负责在虚拟线程准备好运行时,将其分配给一个可用的平台线程执行。通过这种方式,少量的平台线程就可以支持大量并发的虚拟线程,大大降低了线程创建和上下文切换的开销。状态管理确保了虚拟线程在各种并发场景下的正确行为。

调度器继承

instanceof 模式匹配语法 (parent instanceof VirtualThread vparent)

这是一种 Java 的语法特性,称为 模式匹配 (Pattern Matching) for instanceof。它从 Java 14 开始作为预览特性引入,并在 Java 16 中正式发布。

在以前的 Java 版本中,如果想检查一个对象的类型,并且如果类型匹配则将其转换为该类型并使用,你通常会这样写:

if (parent instanceof VirtualThread) {
    VirtualThread vparent = (VirtualThread) parent;
    // 然后使用 vparent
    scheduler = vparent.scheduler;
}

模式匹配 instanceof 简化了这个过程。

if (parent instanceof VirtualThread vparent) 这行代码做了两件事:

  1. 类型检查: 它检查 parent 对象是否是 VirtualThread 的一个实例。
  2. 条件声明和赋值: 如果 parent 确实是 VirtualThread 的实例,那么它会声明一个新的局部变量 vparent (类型为 VirtualThread),并将 parent 自动转换 (cast) 为 VirtualThread 类型后赋值给 vparent。这个 vparent 变量只在 if 语句块为真(即类型匹配成功)的作用域内有效。

这是一种更简洁、更安全的写法,避免了显式的类型转换和引入额外的变量声明步骤。

这里的“父子关系”并不是指操作系统层面严格的父子进程或线程关系,而是指创建者与被创建者的关系

Thread parent = Thread.currentThread();:这行代码获取的是当前正在执行 VirtualThread 构造函数的线程。这个线程就是新虚拟线程的“创建者”或“父”线程。

逻辑:

当创建一个新的 VirtualThread 时,可以显式地给它传递一个 scheduler

如果调用者没有提供 scheduler (即 scheduler == null),那么虚拟线程的构造逻辑会尝试确定一个默认的调度器。

这时,它会检查创建这个新虚拟线程的线程 (parent):

这种设计体现了一种“上下文感知”的默认行为。如果你的代码已经在某个特定的虚拟线程(它使用着特定的调度器)中运行,当你从这个虚拟线程中再创建一个新的虚拟线程时,让新的虚拟线程默认使用与创建者相同的调度器通常是合理的。这有助于:

简单来说,如果一个虚拟线程A创建了另一个虚拟线程B,并且没有为B指定调度器,那么B就会默认使用A的调度器。如果是一个平台线程创建了虚拟线程B,并且没有为B指定调度器,那么B就会使用全局默认的调度器。

Continuation 类深度分析

Continuation 类是 Java 实现轻量级线程(虚拟线程)的基石。它代表了一种一次性(one-shot)的分界延续(delimited continuation)。简单来说,它封装了一段计算(一个 Runnable 任务),这段计算可以被挂起(yield),并在之后从挂起点恢复执行。

核心能力与特性:

封装计算单元:

执行与挂起 (run()yield()):

run(): 这是启动或恢复 Continuation 执行的入口点。

yield(ContinuationScope scope) (静态方法):

栈管理 (StackChunk):

状态管理:

父子关系 (parent, child):

pinning (固定):

与 JVM 的深度集成:

Continuation 提供的核心能力: 它提供了一种机制,使得一段Java代码的执行可以在不阻塞底层平台线程的情况下被暂停,其状态(主要是调用栈)被保存起来,之后可以在相同的或不同的平台线程上从暂停点恢复执行。这是实现用户态线程(如虚拟线程)的基础。

实现多个虚拟线程的 JVM 级别调度还需要什么?

仅仅有 Continuation 是不够的,还需要一个完整的框架来管理和调度它们,这正是 java.lang.VirtualThread 所做的事情。关键组件包括:

虚拟线程的表示 (VirtualThread 类):

调度器 (Executor):

阻塞操作的适配:

与平台线程的交互(挂载/卸载):

固定 (Pinning) 的处理:

线程局部变量和作用域值:

如果自己设计虚拟线程调度应该怎么做?

这是一个非常复杂的系统工程,深度依赖于 JVM 的底层支持。但从概念上讲,可以设想以下组件:

MyContinuation:

MyVirtualThread:

MyScheduler (调度器):

调度循环:

同步原语和 I/O 适配:

锁 (MyLock): 当 MyVirtualThread 尝试获取一个已被持有的锁时,它不应阻塞平台工作线程。而是:

  1. MyVirtualThread 放入该锁的等待队列。
  2. 调用其 MyContinuation.yield()
  3. 当锁被释放时,调度器将等待队列中的一个 MyVirtualThread 移回就绪队列。

I/O: 对于非阻塞 I/O (NIO):

  1. 发起非阻塞 I/O 操作。
  2. MyVirtualThread 和一个回调(当 I/O 完成时调用)注册到 Selector
  3. 调用 MyContinuation.yield()
  4. Selector 检测到 I/O 事件完成时,执行回调,回调将对应的 MyVirtualThread 重新放入调度器的就绪队列。

Pinning 处理:

挑战:

JDK 中的 ContinuationVirtualThread 在 JVM 层面解决了这些核心挑战。自己从头设计一个类似的系统将是一项艰巨的任务,但理解其基本原理有助于更好地使用这些高级并发特性。

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。

您可能感兴趣的文章:
阅读全文