C#教程

关注公众号 jb51net

关闭
首页 > 软件编程 > C#教程 > C# async await迭代器

C#语言async await之迭代器工作原理示例解析

作者:微软技术栈

这篇文章主要为大家介绍了C#语言async await之迭代器工作原理示例解析,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

C# 迭代器

接《async/await 在 C# 语言中是如何工作的?(上)》,今天我们继续介绍 C# 迭代器和 async/await under the covers。

这个解决方案的伏笔实际上是在 Task 出现的几年前,即 C# 2.0,当时它增加了对迭代器的支持。

迭代器允许你编写一个方法,然后由编译器用来实现 IEnumerable<T> 和/或 IEnumerator<T>。例如,如果我想创建一个产生斐波那契数列的枚举数,我可以这样写:

public static IEnumerable<int> Fib(){
    int prev = 0, next = 1;
    yield return prev;
    yield return next;
    while (true)
    {
        int sum = prev + next;
        yield return sum;
        prev = next;
        next = sum;
    }}

然后我可以用 foreach 枚举它:

foreach (int i in Fib()){
    if (i > 100) break;
    Console.Write($"{i} ");}

我可以通过像 System.Linq.Enumerable 上的组合器将它与其他 IEnumerable<T> 进行组合:

foreach (int i in Fib().Take(12)){
    Console.Write($"{i} ");}

或者我可以直接通过 IEnumerator<T> 来手动枚举它:

using IEnumerator<int> e = Fib().GetEnumerator();while (e.MoveNext()){
    int i = e.Current;
    if (i > 100) break;
    Console.Write($"{i} ");}

以上所有的结果是这样的输出:

0 1 1 2 3 5 8 13 21 34 55 89

真正有趣的是,为了实现上述目标,我们需要能够多次进入和退出 Fib 方法。我们调用 MoveNext,它进入方法,然后该方法执行,直到它遇到 yield return,此时对 MoveNext 的调用需要返回 true,随后对 Current 的访问需要返回 yield value。然后我们再次调用 MoveNext,我们需要能够在 Fib 中从我们上次停止的地方开始,并且保持上次调用的所有状态不变。迭代器实际上是由 C# 语言/编译器提供的协程,编译器将 Fib 迭代器扩展为一个成熟的状态机。

所有关于 Fib 的逻辑现在都在 MoveNext 方法中,但是作为跳转表的一部分,它允许实现分支到它上次离开的位置,这在枚举器类型上生成的状态字段中被跟踪。而我写的局部变量,如 prev、next 和 sum,已经被 "提升 "为枚举器上的字段,这样它们就可以在调用 MoveNext 时持续存在。

在我之前的例子中,我展示的最后一种枚举形式涉及手动使用 IEnumerator<T>。在那个层面上,我们手动调用 MoveNext(),决定何时是重新进入循环程序的适当时机。但是,如果不这样调用它,而是让 MoveNext 的下一次调用实际成为异步操作完成时执行的延续工作的一部分呢?如果我可以 yield 返回一些代表异步操作的东西,并让消耗代码将 continuation 连接到该 yield 对象,然后在该 continuation 执行 MoveNext 时会怎么样?使用这种方法,我可以编写一个辅助方法:

static Task IterateAsync(IEnumerable<Task> tasks){
    var tcs = new TaskCompletionSource();
    IEnumerator<Task> e = tasks.GetEnumerator();
    void Process()
    {
        try
        {
            if (e.MoveNext())
            {
                e.Current.ContinueWith(t => Process());
                return;
            }
        }
        catch (Exception e)
        {
            tcs.SetException(e);
            return;
        }
        tcs.SetResult();
    };
    Process();
    return tcs.Task;}

现在变得有趣了。我们得到了一个可迭代的任务列表。每次我们 MoveNext 到下一个 Task 并获得一个时,我们将该任务的 continuation 连接起来;当这个 Task 完成时,它只会回过头来调用执行 MoveNext、获取下一个 Task 的相同逻辑,以此类推。这是建立在将 Task 作为任何异步操作的单一表示的思想之上的,所以我们输入的枚举表可以是一个任何异步操作的序列。这样的序列是从哪里来的呢?当然是通过迭代器。

还记得我们之前的 CopyStreamToStream 例子吗?考虑一下这个:

static Task CopyStreamToStreamAsync(Stream source, Stream destination){
    return IterateAsync(Impl(source, destination));
    static IEnumerable<Task> Impl(Stream source, Stream destination)
    {
        var buffer = new byte[0x1000];
        while (true)
        {
            Task<int> read = source.ReadAsync(buffer, 0, buffer.Length);
            yield return read;
            int numRead = read.Result;
            if (numRead <= 0)
            {
                break;
            }
            Task write = destination.WriteAsync(buffer, 0, numRead);
            yield return write;
            write.Wait();
        }
    }}

我们正在调用那个 IterateAsync 助手,而我们提供给它的枚举表是由一个处理所有控制流的迭代器产生的。它调用 Stream.ReadAsync 然后 yield 返回 Task;yield task 在调用 MoveNext 之后会被传递给 IterateAsync,而 IterateAsync 会将一个 continuation 挂接到那个 task 上,当它完成时,它会回调 MoveNext 并在 yield 之后回到这个迭代器。此时,Impl 逻辑获得方法的结果,调用 WriteAsync,并再次生成它生成的 Task。以此类推。

这就是 C# 和 .NET 中 async/await 的开始。

在 C# 编译器中支持迭代器和 async/await 的逻辑中,大约有95%左右的逻辑是共享的。不同的语法,不同的类型,但本质上是相同的转换。

事实上,在 async/await 出现之前,一些开发人员就以这种方式使用迭代器进行异步编程。在实验性的 Axum 编程语言中也有类似的转换原型,这是 C# 支持异步的关键灵感来源。Axum 提供了一个可以放在方法上的 async 关键字,就像 C# 中的 async 一样。

Task 还不普遍,所以在异步方法中,Axum 编译器启发式地将同步方法调用与 APM 对应的方法相匹配,例如,如果它看到你调用 stream.Read,它会找到并利用相应的 stream.BeginRead 和 stream.EndRead 方法,合成适当的委托传递给 Begin 方法,同时还为定义为可组合的 async 方法生成完整的 APM 实现。它甚至还集成了 SynchronizationContext!虽然 Axum 最终被搁置,但它为 C# 中的 async/await 提供了一个很棒的原型。

async/await under the covers

现在我们知道了我们是如何做到这一点的,让我们深入研究它实际上是如何工作的。作为参考,下面是我们的同步方法示例:

public void CopyStreamToStream(Stream source, Stream destination){
    var buffer = new byte[0x1000];
    int numRead;
    while ((numRead = source.Read(buffer, 0, buffer.Length)) != 0)
    {
        destination.Write(buffer, 0, numRead);
    }}

下面是 async/await 对应的方法:

public async Task CopyStreamToStreamAsync(Stream source, Stream destination){
    var buffer = new byte[0x1000];
    int numRead;
    while ((numRead = await source.ReadAsync(buffer, 0, buffer.Length)) != 0)
    {
        await destination.WriteAsync(buffer, 0, numRead);
    }}

签名从 void 变成了 async Task,我们分别调用了 ReadAsync 和 WriteAsync,而不是 Read 和 Write,这两个操作都带 await 前缀。编译器和核心库接管了其余部分,从根本上改变了代码实际执行的方式。让我们深入了解一下是如何做到的。

编译器转换

我们已经看到,和迭代器一样,编译器基于状态机重写了 async 方法。我们仍然有一个与开发人员写的签名相同的方法(public Task CopyStreamToStreamAsync(Stream source, Stream destination)),但该方法的主体完全不同:

[AsyncStateMachine(typeof(<CopyStreamToStreamAsync>d__0))]public Task CopyStreamToStreamAsync(Stream source, Stream destination){
    <CopyStreamToStreamAsync>d__0 stateMachine = default;
    stateMachine.<>t__builder = AsyncTaskMethodBuilder.Create();
    stateMachine.source = source;
    stateMachine.destination = destination;
    stateMachine.<>1__state = -1;
    stateMachine.<>t__builder.Start(ref stateMachine);
    return stateMachine.<>t__builder.Task;}
private struct <CopyStreamToStreamAsync>d__0 : IAsyncStateMachine{
    public int <>1__state;
    public AsyncTaskMethodBuilder <>t__builder;
    public Stream source;
    public Stream destination;
    private byte[] <buffer>5__2;
    private TaskAwaiter <>u__1;
    private TaskAwaiter<int> <>u__2;
    ...}

注意,与开发人员所写的签名的唯一区别是缺少 async 关键字本身。Async 实际上不是方法签名的一部分;就像 unsafe 一样,当你把它放在方法签名中,你是在表达方法的实现细节,而不是作为契约的一部分实际公开出来的东西。使用 async/await 实现 task -return 方法是实现细节。

编译器已经生成了一个名为 <CopyStreamToStreamAsync>d__0 的结构体,并且它在堆栈上对该结构体的实例进行了零初始化。重要的是,如果异步方法同步完成,该状态机将永远不会离开堆栈。这意味着没有与状态机相关的分配,除非该方法需要异步完成,也就是说它需要等待一些尚未完成的任务。稍后会有更多关于这方面的内容。

该结构体是方法的状态机,不仅包含开发人员编写的所有转换逻辑,还包含用于跟踪该方法中当前位置的字段,以及编译器从方法中提取的所有“本地”状态,这些状态需要在 MoveNext 调用之间生存。它在逻辑上等价于迭代器中的 IEnumerable<T>/IEnumerator<T> 实现。(请注意,我展示的代码来自发布版本;在调试构建中,C# 编译器将实际生成这些状态机类型作为类,因为这样做可以帮助某些调试工作)。

在初始化状态机之后,我们看到对 AsyncTaskMethodBuilder.Create() 的调用。虽然我们目前关注的是 Tasks,但 C# 语言和编译器允许从异步方法返回任意类型(“task-like”类型),例如,我可以编写一个方法 public async MyTask CopyStreamToStreamAsync,只要我们以适当的方式扩展我们前面定义的 MyTask,它就能顺利编译。这种适当性包括声明一个相关的“builder”类型,并通过 AsyncMethodBuilder 属性将其与该类型关联起来:

[AsyncMethodBuilder(typeof(MyTaskMethodBuilder))]public class MyTask{
    ...}
public struct MyTaskMethodBuilder{
    public static MyTaskMethodBuilder Create() { ... }
    public void Start<TStateMachine>(ref TStateMachine stateMachine) where TStateMachine : IAsyncStateMachine { ... }
    public void SetStateMachine(IAsyncStateMachine stateMachine) { ... }
    public void SetResult() { ... }
    public void SetException(Exception exception) { ... }
    public void AwaitOnCompleted<TAwaiter, TStateMachine>(
        ref TAwaiter awaiter, ref TStateMachine stateMachine)
        where TAwaiter : INotifyCompletion
        where TStateMachine : IAsyncStateMachine { ... }
    public void AwaitUnsafeOnCompleted<TAwaiter, TStateMachine>(
        ref TAwaiter awaiter, ref TStateMachine stateMachine)
        where TAwaiter : ICriticalNotifyCompletion
        where TStateMachine : IAsyncStateMachine { ... }
    public MyTask Task { get { ... } }}

在这种情况下,这样的“builder”知道如何创建该类型的实例(Task 属性),如何成功完成并在适当的情况下有结果(SetResult)或有异常(SetException),以及如何处理连接等待尚未完成的事务的延续(AwaitOnCompleted/AwaitUnsafeOnCompleted)。在 System.Threading.Tasks.Task 的情况下,它默认与 AsyncTaskMethodBuilder 相关联。通常情况下,这种关联是通过应用在类型上的 [AsyncMethodBuilder(…)] 属性提供的,但在 C# 中,Task 是已知的,因此实际上没有该属性。因此,编译器已经让构建器使用这个异步方法,并使用模式中的 Create 方法构建它的实例。请注意,与状态机一样,AsyncTaskMethodBuilder 也是一个结构体,因此这里也没有内存分配。

然后用这个入口点方法的参数填充状态机。这些参数需要能够被移动到 MoveNext 中的方法体访问,因此这些参数需要存储在状态机中,以便后续调用 MoveNext 时代码可以引用它们。该状态机也被初始化为初始-1状态。如果 MoveNext 被调用且状态为-1,那么逻辑上我们将从方法的开始处开始。

现在是最不显眼但最重要的一行:调用构建器的 Start 方法。这是模式的另一部分,必须在 async 方法的返回位置所使用的类型上公开,它用于在状态机上执行初始的 MoveNext。构建器的 Start 方法实际上是这样的:

public void Start<TStateMachine>(ref TStateMachine stateMachine) where TStateMachine : IAsyncStateMachine{
    stateMachine.MoveNext();}

例如,调用 stateMachine.<>t__builder.Start(ref stateMachine); 实际上只是调用 stateMachine.MoveNext()。在这种情况下,为什么编译器不直接发出这个信号呢?为什么还要有 Start 呢?答案是,Start 的内容比我所说的要多一点。但为此,我们需要简单地了解一下 ExecutionContext。

❖ ExecutionContext

我们都熟悉在方法之间传递状态。调用一个方法,如果该方法指定了形参,就使用实参调用该方法,以便将该数据传递给被调用方。这是显式传递数据。但还有其他更隐蔽的方法。例如,方法可以是无参数的,但可以指定在调用方法之前填充某些特定的静态字段,然后从那里获取状态。这个方法的签名中没有任何东西表明它接收参数,因为它确实没有:只是调用者和被调用者之间有一个隐含的约定,即调用者可能填充某些内存位置,而被调用者可能读取这些内存位置。被调用者和调用者甚至可能没有意识到它的发生,如果他们是中介,方法 A 可能填充静态信息,然后调用 B, B 调用 C, C 调用 D,最终调用 E,读取这些静态信息的值。这通常被称为“环境”数据:它不是通过参数传递给你的,而是挂在那里,如果需要的话,你可以使用。

我们可以更进一步,使用线程局部状态。线程局部状态,在 .NET 中是通过属性为 [ThreadStatic] 的静态字段或通过 ThreadLocal<T> 类型实现的,可以以相同的方式使用,但数据仅限于当前执行的线程,每个线程都能够拥有这些字段的自己的隔离副本。这样,您就可以填充线程静态,进行方法调用,然后在方法完成后将更改还原到线程静态,从而启用这种隐式传递数据的完全隔离形式。

如果我们进行异步方法调用,而异步方法中的逻辑想要访问环境数据,它会怎么做?如果数据存储在常规静态中,异步方法将能够访问它,但一次只能有一个这样的方法在运行,因为多个调用者在写入这些共享静态字段时可能会覆盖彼此的状态。如果数据存储在线程静态信息中,异步方法将能够访问它,但只有在调用线程停止同步运行之前;如果它将 continuation 连接到它发起的某个操作,并且该 continuation 最终在某个其他线程上运行,那么它将不再能够访问线程静态信息。即使它碰巧运行在同一个线程上,无论是偶然的还是由于调度器的强制,在它这样做的时候,数据可能已经被该线程发起的其他操作删除和/或覆盖。对于异步,我们需要一种机制,允许任意环境数据在这些异步点上流动,这样在 async 方法的整个逻辑中,无论何时何地运行,它都可以访问相同的数据。

输入 ExecutionContext。ExecutionContext 类型是异步操作和异步操作之间传递环境数据的媒介。它存在于一个 [ThreadStatic] 中,但是当某些异步操作启动时,它被“捕获”(从该线程静态中读取副本的一种奇特的方式),存储,然后当该异步操作的延续被运行时,ExecutionContext 首先被恢复到即将运行该操作的线程中的 [ThreadStatic] 中。ExecutionContext 是实现 AsyncLocal<T> 的机制(事实上,在 .NET Core 中,ExecutionContext 完全是关于 AsyncLocal<T> 的,仅此而已),例如,如果你将一个值存储到 AsyncLocal<T> 中,然后例如队列一个工作项在 ThreadPool 上运行,该值将在该 AsyncLocal<T> 中可见,在该工作项上运行:

var number = new AsyncLocal<int>();
number.Value = 42;ThreadPool.QueueUserWorkItem(_ => Console.WriteLine(number.Value));
number.Value = 0;
Console.ReadLine();

这段代码每次运行时都会打印42。在我们对委托进行排队之后,我们将 AsyncLocal<int> 的值重置为0,这无关紧要,因为 ExecutionContext 是作为 QueueUserWorkItem 调用的一部分被捕获的,而该捕获包含了当时 AsyncLocal<int> 的状态。

❖ Back To Start

当我在写 AsyncTaskMethodBuilder.Start 的实现时,我们绕道讨论了 ExecutionContext,我说这是有效的:

public void Start<TStateMachine>(ref TStateMachine stateMachine) where TStateMachine : IAsyncStateMachine{
    stateMachine.MoveNext();}

然后建议我简化一下。这种简化忽略了一个事实,即该方法实际上需要将 ExecutionContext 考虑在内,因此更像是这样:

public void Start<TStateMachine>(ref TStateMachine stateMachine) where TStateMachine : IAsyncStateMachine{
    ExecutionContext previous = Thread.CurrentThread._executionContext; // [ThreadStatic] field
    try
    {
        stateMachine.MoveNext();
    }
    finally
    {
        ExecutionContext.Restore(previous); // internal helper
    }}

这里不像我之前建议的那样只调用 statemmachine .MoveNext(),而是在这里做了一个动作:获取当前的 ExecutionContext,再调用 MoveNext,然后在它完成时将当前上下文重置为调用 MoveNext 之前的状态。

这样做的原因是为了防止异步方法将环境数据泄露给调用者。一个示例方法说明了为什么这很重要:

async Task ElevateAsAdminAndRunAsync(){
    using (WindowsIdentity identity = LoginAdmin())
    {
        using (WindowsImpersonationContext impersonatedUser = identity.Impersonate())
        {
            await DoSensitiveWorkAsync();
        }
    }}

“冒充”是将当前用户的环境信息改为其他人的;这让代码可以代表其他人,使用他们的特权和访问权限。在 .NET 中,这种模拟跨异步操作流动,这意味着它是 ExecutionContext 的一部分。现在想象一下,如果 Start 没有恢复之前的上下文,考虑下面的代码:

Task t = ElevateAsAdminAndRunAsync();PrintUser();await t;

这段代码可以发现,ElevateAsAdminAndRunAsync 中修改的 ExecutionContext 在 ElevateAsAdminAndRunAsync 返回到它的同步调用者之后仍然存在(这发生在该方法第一次等待尚未完成的内容时)。这是因为在调用 Impersonate 之后,我们调用了 DoSensitiveWorkAsync 并等待它返回的任务。假设任务没有完成,它将导致对 ElevateAsAdminAndRunAsync 的调用 yield 并返回到调用者,模拟仍然在当前线程上有效。这不是我们想要的。因此,Start 设置了这个保护机制,以确保对 ExecutionContext 的任何修改都不会从同步方法调用中流出,而只会随着方法执行的任何后续工作一起流出。

❖ MoveNext

因此,调用了入口点方法,初始化了状态机结构体,调用了 Start,然后调用了 MoveNext。什么是 MoveNext?这个方法包含了开发者方法中所有的原始逻辑,但做了一大堆修改。让我们先看看这个方法的脚手架。下面是编译器为我们的方法生成的反编译版本,但删除了生成的 try 块中的所有内容:

private void MoveNext(){
    try
    {
        ... // all of the code from the CopyStreamToStreamAsync method body, but not exactly as it was written
    }
    catch (Exception exception)
    {
        <>1__state = -2;
        <buffer>5__2 = null;
        <>t__builder.SetException(exception);
        return;
    }
    <>1__state = -2;
    <buffer>5__2 = null;
    <>t__builder.SetResult();}

无论 MoveNext 执行什么其他工作,当所有工作完成后,它都有责任完成 async Task 方法返回的任务。如果 try 代码块的主体抛出了未处理的异常,那么任务就会抛出该异常。如果 async 方法成功到达它的终点(相当于同步方法返回),它将成功完成返回的任务。在任何一种情况下,它都将设置状态机的状态以表示完成。(我有时听到开发人员从理论上说,当涉及到异常时,在第一个 await 之前抛出的异常和在第一个 await 之后抛出的异常是有区别的……基于上述,应该清楚情况并非如此。任何未在 async 方法中处理的异常,不管它在方法的什么位置,也不管方法是否产生了结果,都会在上面的 catch 块中结束,然后被捕获的异常会存储在 async 方法返回的任务中。)

还要注意,这个完成过程是通过构建器完成的,使用它的 SetException 和 SetResult 方法,这是编译器预期的构建器模式的一部分。如果 async 方法之前已经挂起了,那么构建器将不得不再挂起处理中创建一个 Task (稍后我们会看到如何以及在哪里执行),在这种情况下,调用 SetException/SetResult 将完成该任务。然而,如果 async 方法之前没有挂起,那么我们还没有创建任务或向调用者返回任何东西,因此构建器在生成任务时有更大的灵活性。如果你还记得之前在入口点方法中,它做的最后一件事是将任务返回给调用者,它通过访问构建器的 Task 属性返回结果:

public Task CopyStreamToStreamAsync(Stream source, Stream destination)
{
    ...
    return stateMachine.<>t__builder.Task;
}

构建器知道该方法是否挂起过,如果挂起了,它就会返回已经创建的任务。如果方法从未挂起,而且构建器还没有任务,那么它可以在这里创建一个完成的任务。在成功完成的情况下,它可以直接使用 Task.CompletedTask 而不是分配一个新的任务,避免任何分配。如果是一般的任务 <TResult>,构建者可以直接使用 Task.FromResult<TResult>(TResult result)。

构建器还可以对它创建的对象进行任何它认为合适的转换。例如,Task 实际上有三种可能的最终状态:成功、失败和取消。AsyncTaskMethodBuilder 的 SetException 方法处理特殊情况 OperationCanceledException,将任务转换为 TaskStatus。如果提供的异常是 OperationCanceledException 或源自 OperationCanceledException,则将任务转换为 TaskStatus.Canceled 最终状态;否则,任务以 TaskStatus.Faulted 结束;这种区别在使用代码时往往不明显;因为无论异常被标记为取消还是故障,都会被存储到 Task 中,等待该任务的代码将无法观察到状态之间的区别(无论哪种情况,原始异常都会被传播)...... 它只影响与任务直接交互的代码,例如通过 ContinueWith,它具有重载,允许仅为完成状态的子集调用 continuation。

现在我们了解了生命周期方面的内容,下面是在 MoveNext 的 try 块内填写的所有内容:

private void MoveNext()
{
    try
    {
        int num = <>1__state;
        TaskAwaiter<int> awaiter;
        if (num != 0)
        {
            if (num != 1)
            {
                <buffer>5__2 = new byte[4096];
                goto IL_008b;
            }
            awaiter = <>u__2;
            <>u__2 = default(TaskAwaiter<int>);
            num = (<>1__state = -1);
            goto IL_00f0;
        }
        TaskAwaiter awaiter2 = <>u__1;
        <>u__1 = default(TaskAwaiter);
        num = (<>1__state = -1);
        IL_0084:
        awaiter2.GetResult();
        IL_008b:
        awaiter = source.ReadAsync(<buffer>5__2, 0, <buffer>5__2.Length).GetAwaiter();
        if (!awaiter.IsCompleted)
        {
            num = (<>1__state = 1);
            <>u__2 = awaiter;
            <>t__builder.AwaitUnsafeOnCompleted(ref awaiter, ref this);
            return;
        }
        IL_00f0:
        int result;
        if ((result = awaiter.GetResult()) != 0)
        {
            awaiter2 = destination.WriteAsync(<buffer>5__2, 0, result).GetAwaiter();
            if (!awaiter2.IsCompleted)
            {
                num = (<>1__state = 0);
                <>u__1 = awaiter2;
                <>t__builder.AwaitUnsafeOnCompleted(ref awaiter2, ref this);
                return;
            }
            goto IL_0084;
        }
    }
    catch (Exception exception)
    {
        <>1__state = -2;
        <buffer>5__2 = null;
        <>t__builder.SetException(exception);
        return;
    }
    <>1__state = -2;
    <buffer>5__2 = null;
    <>t__builder.SetResult();
}

这种复杂的情况可能感觉有点熟悉。还记得我们基于 APM 手动实现的 BeginCopyStreamToStream 有多复杂吗?这没有那么复杂,但也更好,因为编译器为我们做了这些工作,以延续传递的形式重写了方法,同时确保为这些延续保留了所有必要的状态。即便如此,我们也可以眯着眼睛跟着走。请记住,状态在入口点被初始化为-1。然后我们进入 MoveNext,发现这个状态(现在存储在本地 num 中)既不是0也不是1,因此执行创建临时缓冲区的代码,然后跳转到标签 IL_008b,在这里调用 stream.ReadAsync。注意,在这一点上,我们仍然从调用 MoveNext 同步运行,因此从开始到入口点都同步运行,这意味着开发者的代码调用了 CopyStreamToStreamAsync,它仍然在同步执行,还没有返回一个 Task 来表示这个方法的最终完成。

我们调用 Stream.ReadAsync,从中得到一个 Task<int>。读取可能是同步完成的,也可能是异步完成的,但速度快到现在已经完成,也可能还没有完成。不管怎么说,我们有一个表示最终完成的 Task<int>,编译器发出的代码会检查该 Task<int> 以决定如何继续:如果该 Task<int> 确实已经完成(不管它是同步完成还是只是在我们检查时完成),那么这个方法的代码就可以继续同步运行......当我们可以在这里继续运行时,没有必要花不必要的开销排队处理该方法执行的剩余部分。但是为了处理 Task<int> 还没有完成的情况,编译器需要发出代码来为 Task 挂上一个延续。因此,它需要发出代码,询问任务 "你完成了吗?" 它是否是直接与任务对话来问这个问题?

如果你在 C# 中唯一可以等待的东西是 System.Threading.Tasks.Task,这将是一种限制。同样地,如果 C# 编译器必须知道每一种可能被等待的类型,那也是一种限制。相反,C# 在这种情况下通常会做的是:它采用了一种 api 模式。代码可以等待任何公开适当模式(“awaiter”模式)的东西(就像您可以等待任何提供适当的“可枚举”模式的东西一样)。例如,我们可以增强前面写的 MyTask 类型来实现 awaiter 模式:

class MyTask
{
    ...
    public MyTaskAwaiter GetAwaiter() => new MyTaskAwaiter { _task = this };
    public struct MyTaskAwaiter : ICriticalNotifyCompletion
    {
        internal MyTask _task;
        public bool IsCompleted => _task._completed;
        public void OnCompleted(Action continuation) => _task.ContinueWith(_ => continuation());
        public void UnsafeOnCompleted(Action continuation) => _task.ContinueWith(_ => continuation());
        public void GetResult() => _task.Wait();
    }
}

如果一个类型公开了 getwaiter() 方法,就可以等待它,Task 就是这样做的。这个方法需要返回一些内容,而这些内容又公开了几个成员,包括一个 IsCompleted 属性,用于在调用 IsCompleted 时检查操作是否已经完成。你可以看到正在发生的事情:在 IL_008b,从 ReadAsync 返回的任务已经调用了 getwaiter,然后在 struct awaiter 实例上完成访问。如果 IsCompleted 返回 true,那么最终会执行到 IL_00f0,在这里代码会调用 awaiter 的另一个成员:GetResult()。如果操作失败,GetResult() 负责抛出异常,以便将其传播到 async 方法中的 await 之外;否则,GetResult() 负责返回操作的结果。在 ReadAsync 的例子中,如果结果为0,那么我们跳出读写循环,到方法的末尾调用 SetResult,就完成了。

不过,回过头来看一下,真正有趣的部分是,如果 IsCompleted 检查实际上返回 false,会发生什么。如果它返回 true,我们就继续处理循环,类似于在 APM 模式中 completedsynchronized 返回 true,Begin 方法的调用者负责继续执行,而不是回调函数。但是如果 IsCompleted 返回 false,我们需要暂停 async 方法的执行,直到 await 操作完成。这意味着从 MoveNext 中返回,因为这是 Start 的一部分,我们仍然在入口点方法中,这意味着将任务返回给调用者。但在发生任何事情之前,我们需要将 continuation 连接到正在等待的任务(注意,为了避免像在 APM 情况中那样的 stack dives,如果异步操作在 IsCompleted 返回 false 后完成,但在我们连接 continuation 之前,continuation 仍然需要从调用线程异步调用,因此它将进入队列)。由于我们可以等待任何东西,我们不能直接与任务实例对话;相反,我们需要通过一些基于模式的方法来执行此操作。

Awaiter 公开了一个方法来连接 continuation。编译器可以直接使用它,除了一个非常关键的问题:continuation 到底应该是什么?更重要的是,它应该与什么对象相关联?请记住,状态机结构体在栈上,我们当前运行的 MoveNext 调用是对该实例的方法调用。我们需要保存状态机,以便在恢复时我们拥有所有正确的状态,这意味着状态机不能一直存在于栈中;它需要被复制到堆上的某个地方,因为栈最终将被用于该线程执行的其他后续的、无关的工作。然后,延续需要在堆上的状态机副本上调用 MoveNext 方法。

此外,ExecutionContext 也与此相关。状态机需要确保存储在 ExecutionContext 中的任何环境数据在暂停时被捕获,然后在恢复时被应用,这意味着延续也需要合并该 ExecutionContext。因此,仅仅在状态机上创建一个指向 MoveNext 的委托是不够的。这也是我们不想要的开销。如果当我们挂起时,我们在状态机上创建了一个指向 MoveNext 的委托,那么每次这样做我们都要对状态机结构进行装箱(即使它已经作为其他对象的一部分在堆上)并分配一个额外的委托(委托的这个对象引用将是该结构体的一个新装箱的副本)。因此,我们需要做一个复杂的动作,即确保我们只在方法第一次暂停执行时将该结构从堆栈中提升到堆中,而在其他时候都使用相同的堆对象作为 MoveNext 的目标,并在这个过程中确保我们捕获了正确的上下文,并在恢复时确保我们使用捕获的上下文来调用该操作。

你可以在 C# 编译器生成的代码中看到,当我们需要挂起时就会发生:

if (!awaiter.IsCompleted) // we need to suspend when IsCompleted is false
{
    <>1__state = 1;
    <>u__2 = awaiter;
    <>t__builder.AwaitUnsafeOnCompleted(ref awaiter, ref this);
    return;
}

我们将状态 id 存储到 state 字段中,该 id 表示当方法恢复时应该跳转到的位置。然后,我们将 awaiter 本身持久化到一个字段中,以便在恢复后可以使用它来调用 GetResult。然后在返回 MoveNext 调用之前,我们要做的最后一件事是调用 <>t__builder.AwaitUnsafeOnCompleted(ref awaiter, ref this),要求构建器为这个状态机连接一个 continuation 到 awaiter。

(注意,它调用构建器的 AwaitUnsafeOnCompleted 而不是构建器的 AwaitOnCompleted,因为 awaiter 实现了 iccriticalnotifycompletion;状态机处理流动的 ExecutionContext,所以我们不需要 awaiter,正如前面提到的,这样做只会带来重复和不必要的开销。)

AwaitUnsafeOnCompleted 方法的实现太复杂了,不能在这里详述,所以我将总结它在 .NET Framework 上的作用:

1.它使用 ExecutionContext.Capture() 来获取当前上下文。

2.然后它分配一个 MoveNextRunner 对象来包装捕获的上下文和装箱的状态机(如果这是该方法第一次挂起,我们还没有状态机,所以我们只使用 null 作为占位符)。

3.然后,它创建一个操作委托给该 MoveNextRunner 上的 Run 方法;这就是它如何能够获得一个委托,该委托将在捕获的 ExecutionContext 的上下文中调用状态机的 MoveNext。

4.如果这是该方法第一次挂起,我们还没有装箱的状态机,所以此时它会将其装箱,通过将实例存储到本地类型的 IAsyncStateMachine 接口中,在堆上创建一个副本。然后,这个盒子会被存储到已分配的 MoveNextRunner 中。

5.现在到了一个有些令人费解的步骤。如果您查看状态机结构体的定义,它包含构建器,public AsyncTaskMethodBuilder <>t__builder;,如果你查看构建器的定义,它包含内部的 IAsyncStateMachine m_stateMachine;。

构建器需要引用装箱的状态机,以便在后续的挂起中它可以看到它已经装箱了状态机,并且不需要再次这样做。但是我们只是装箱了状态机,并且该状态机包含一个 m_stateMachine 字段为 null 的构建器。我们需要改变装箱状态机的构建器的 m_stateMachine 指向它的父容器。

为了实现这一点,编译器生成的状态机结构体实现了 IAsyncStateMachine 接口,其中包括一个 void SetStateMachine(IAsyncStateMachine stateMachine) ;方法,该状态机结构体包含了该接口方法的实现:

private void SetStateMachine(IAsyncStateMachine stateMachine) =>
<>t__builder.SetStateMachine(stateMachine);

因此,构建器对状态机进行装箱,然后将装箱传递给装箱的 SetStateMachine 方法,该方法会调用构建器的 SetStateMachine 方法,将装箱存储到字段中。

6.最后,我们有一个表示 continuation 的 Action,它被传递给 awaiter 的 UnsafeOnCompleted 方法。在 TaskAwaiter 的情况下,任务将将该操作存储到任务的 continuation 列表中,这样当任务完成时,它将调用该操作,通过 MoveNextRunner.Run 回调,通过 ExecutionContext.Run 回调,最后调用状态机的 MoveNext 方法重新进入状态机,并从它停止的地方继续运行。

这就是在 .NET Framework 中发生的事情,你可以在分析器中看到结果,例如通过运行分配分析器来查看每个 await 上的分配情况。让我们看看这个愚蠢的程序,我写这个程序只是为了强调其中涉及的分配成本:

using System.Threading;
using System.Threading.Tasks;
class Program
{
    static async Task Main()
    {
        var al = new AsyncLocal<int>() { Value = 42 };
        for (int i = 0; i < 1000; i++)
        {
            await SomeMethodAsync();
        }
    }
    static async Task SomeMethodAsync()
    {
        for (int i = 0; i < 1000; i++)
        {
            await Task.Yield();
        }
    }
}

这个程序创建了一个 AsyncLocal<int>,让值42通过所有后续的异步操作。然后它调用 SomeMethodAsync 1000次,每次暂停/恢复1000次。在 Visual Studio 中,我使用  .NET Object Allocation Tracking profiler 运行它,结果如下:

那是很多的分配!让我们来研究一下它们的来源。

ExecutionContext。有超过一百万个这样的内容被分配。为什么?因为在 .NET Framework 中,ExecutionContext 是一个可变的数据结构。由于我们希望流转一个异步操作被 fork 时的数据,并且我们不希望它在 fork 之后看到执行的变更,我们需要复制 ExecutionContext。每个单独的 fork 操作都需要这样的副本,因此有1000次对 SomeMethodAsync 的调用,每个调用都会暂停/恢复1000次,我们有100万个 ExecutionContext 实例。

Action。类似地,每次我们等待尚未完成的任务时(我们的百万个 await Task.Yield()s就是这种情况),我们最终分配一个新的操作委托来传递给 awaiter 的 UnsafeOnCompleted 方法。

MoveNextRunner。同样的,有一百万个这样的例子,因为在前面的步骤大纲中,每次我们暂停时,我们都要分配一个新的 MoveNextRunner 来存储 Action和 ExecutionContext,以便使用后者来执行前者。

LogicalCallContext。这些是 .NET Framework 上 AsyncLocal<T> 的实现细节;AsyncLocal<T> 将其数据存储到 ExecutionContext 的“逻辑调用上下文”中,这是表示与 ExecutionContext 一起流动的一般状态的一种奇特方式。如果我们要复制一百万个 ExecutionContext,我们也会复制一百万个 LogicalCallContext。

QueueUserWorkItemCallback。每个 Task.Yield() 都将一个工作项排队到线程池中,导致分配了100万个工作项对象用于表示这100万个操作。

Task< VoidResult >。这里有一千个这样的,所以至少我们脱离了"百万"俱乐部。每个异步完成的异步任务调用都需要分配一个新的 Task 实例来表示该调用的最终完成。

< SomeMethodAsync > d__1。这是编译器生成的状态机结构的盒子。1000个方法挂起,1000个盒子出现。

QueueSegment / IThreadPoolWorkItem[]。有几千个这样的方法,从技术上讲,它们与具体的异步方法无关,而是与线程池中的队列工作有关。在 .NET 框架中,线程池的队列是一个非循环段的链表。这些段不会被重用;对于长度为 N 的段,一旦 N 个工作项被加入到该段的队列中并从该段中退出,该段就会被丢弃并当作垃圾回收。

这就是 .NET Framework。这是 .NET Core:

对于 .NET Framework 上的这个示例,有超过500万次分配,总共分配了大约145MB的内存。对于 .NET Core 上的相同示例,只有大约1000个内存分配,总共只有大约109KB。为什么这么少?

ExecutionContext。在 .NET Core 中,ExecutionContext 现在是不可变的。这样做的缺点是,对上下文的每次更改,例如将值设置为 AsyncLocal<T>,都需要分配一个新的 ExecutionContext。然而,好处是,流动的上下文比改变它更常见,而且由于 ExecutionContext 现在是不可变的,我们不再需要作为流动的一部分进行克隆。“捕获”上下文实际上就是从字段中读取它,而不是读取它并复制其内容。因此,流动不仅比变化更常见,而且更便宜。

LogicalCallContext。这在 .NET Core 中已经不存在了。在 .NET Core 中,ExecutionContext 唯一存在的东西是 AsyncLocal<T> 的存储。其他在 ExecutionContext 中有自己特殊位置的东西都是以 AsyncLocal<T> 为模型的。例如,在 .NET Framework 中,模拟将作为 SecurityContext 的一部分流动,而SecurityContext 是 ExecutionContext 的一部分;在 .NET Core 中,模拟通过 AsyncLocal<SafeAccessTokenHandle> 流动,它使用 valueChangedHandler 来对当前线程进行适当的更改。

QueueSegment / IThreadPoolWorkItem[]。在 .NET Core 中,ThreadPool 的全局队列现在被实现为 ConcurrentQueue<T>,而 ConcurrentQueue<T> 已经被重写为一个由非固定大小的循环段组成的链表。一旦段的长度大到永远不会被填满因为稳态的出队列能够跟上稳态的入队列,就不需要再分配额外的段,相同的足够大的段就会被无休止地使用。

那么其他的分配呢,比如 Action、MoveNextRunner 和 <SomeMethodAsync>d__1? 要理解剩余的分配是如何被移除的,需要深入了解它在 .NET Core 上是如何工作的。

让我们回到讨论挂起时发生的事情:

if (!awaiter.IsCompleted) // we need to suspend when IsCompleted is false
{
    <>1__state = 1;
    <>u__2 = awaiter;
    <>t__builder.AwaitUnsafeOnCompleted(ref awaiter, ref this);
    return;
}

不管目标是哪个平台,这里发出的代码都是相同的,所以不管是 .NET Framework 还是,为这个挂起生成的 IL 都是相同的。但是,改变的是 AwaitUnsafeOnCompleted 方法的实现,在 .NET Core 中有很大的不同:

public struct AsyncTaskMethodBuilder
{
    private Task<VoidTaskResult>? m_task;
    ...
}

在捕获 ExecutionContext 之后,它检查 m_task 字段是否包含一个 AsyncStateMachineBox<TStateMachine> 的实例,其中 TStateMachine 是编译器生成的状态机结构体的类型。AsyncStateMachineBox<TStateMachine> 类型定义如下:

private class AsyncStateMachineBox<TStateMachine> :
    Task<TResult>, IAsyncStateMachineBox
    where TStateMachine : IAsyncStateMachine
{
    private Action? _moveNextAction;
    public TStateMachine? StateMachine;
    public ExecutionContext? Context;
    ...
}

与其说这是一个单独的 Task,不如说这是一个任务(注意其基本类型)。该结构并没有对状态机进行装箱,而是作为该任务的强类型字段存在。我们不需要用单独的 MoveNextRunner 来存储 Action 和 ExecutionContext,它们只是这个类型的字段,而且由于这是存储在构建器的 m_task 字段中的实例,我们可以直接访问它,不需要在每次暂停时重新分配。如果 ExecutionContext 发生变化,我们可以用新的上下文覆盖该字段,而不需要分配其他东西;我们的任何 Action 仍然指向正确的地方。所以,在捕获了 ExecutionContext 之后,如果我们已经有了这个 AsyncStateMachineBox<TStateMachine> 的实例,这就不是这个方法第一次挂起了,我们可以直接把新捕获的 ExecutionContext 存储到其中。如果我们还没有一个AsyncStateMachineBox<TStateMachine> 的实例,那么我们需要分配它:

var box = new AsyncStateMachineBox<TStateMachine>();
taskField = box; // important: this must be done before storing stateMachine into box.StateMachine!
box.StateMachine = stateMachine;
box.Context = currentContext;

请注意源注释为“重要”的那一行。这取代了 .NET Framework 中复杂的 SetStateMachine,使得 SetStateMachine 在 .NET Core 中根本没有使用。你看到的 taskField 有一个指向 AsyncTaskMethodBuilder 的 m_task 字段的 ref。我们分配 AsyncStateMachineBox< tstatemachinebox >,然后通过 taskField 将对象存储到构建器的 m_task 中(这是在栈上的状态机结构中的构建器),然后将基于堆栈的状态机(现在已经包含对盒子的引用)复制到基于堆的 AsyncStateMachineBox< tstatemachinebox > 中,这样 AsyncStateMachineBox<TStateMachine> 适当地并递归地结束引用自己。这仍然是令人费解的,但却是一种更有效的费解。

它解释了为什么剩下的大部分分配都没有了:<SomeMethodAsync>d__1 没有被装箱,而是作为任务本身的一个字段存在,MoveNextRunner 不再需要,因为它的存在只是为了存储 Action 和 ExecutionContext。但是,根据这个解释,我们仍然应该看到1000个操作分配,每个方法调用一个,但我们没有。为什么?还有那些 QueueUserWorkItemCallback 对象呢?我们仍然在 Task.Yield() 中进行排队,为什么它们没有出现呢?

正如我所提到的,将实现细节推入核心库的好处之一是,它可以随着时间的推移改进实现,我们已经看到了它是如何从 .NET Framework 发展到 .NET Core 的。它在最初为 .NET Core 重写的基础上进一步发展,增加了额外的优化,这得益于对系统关键组件的内部访问。特别是,异步基础设施知道 Task 和 TaskAwaiter 等核心类型。而且因为它知道它们并具有内部访问权限,所以它不必遵循公开定义的规则。C# 语言遵循的 awaiter 模式要求 awaiter 具有 AwaitOnCompleted 或 AwaitUnsafeOnCompleted 方法,这两个方法都将 continuation 作为一个操作,这意味着基础结构需要能够创建一个操作来表示 continuation,以便与基础结构不知道的任意 awaiter 一起工作。但是,如果基础设施遇到它知道的 awaiter,它没有义务采取相同的代码路径。对于 System.Private 中定义的所有核心 awaiter。因此,CoreLib 的基础设施可以遵循更简洁的路径,完全不需要操作。这些 awaiter 都知道 IAsyncStateMachineBoxes,并且能够将 box 对象本身作为 continuation。例如,Task 返回的 YieldAwaitable.Yield 能够将 IAsyncStateMachineBox 本身作为工作项直接放入 ThreadPool 中,而等待任务时使用的 TaskAwaiter 能够将 IAsyncStateMachineBox 本身直接存储到任务的延续列表中。不需要操作,也不需要 QueueUserWorkItemCallback。

因此,在非常常见的情况下,async 方法只等待 System.Private.CoreLib (Task, Task<TResult>, ValueTask, ValueTask<TResult>,YieldAwaitable,以及它们的ConfigureAwait 变体),最坏的情况下,只有一次开销分配与 async 方法的整个生命周期相关:如果这个方法挂起了,它会分配一个单一的 Task-derived 类型来存储所有其他需要的状态,如果这个方法从来没有挂起,就不会产生额外的分配。

如果愿意,我们也可以去掉最后一个分配,至少以平摊的方式。如所示,有一个默认构建器与 Task(AsyncTaskMethodBuilder) 相关联,类似地,有一个默认构建器与任务 <TResult> (AsyncTaskMethodBuilder<TResult>) 和 ValueTask 和ValueTask<TResult> (AsyncValueTaskMethodBuilder 和 AsyncValueTaskMethodBuilder<TResult>,分别)相关联。对于 ValueTask/ValueTask<TResult>,构造器实际上相当简单,因为它们本身只处理同步且成功完成的情况,在这种情况下,异步方法完成而不挂起,构建器可以只返回一个 ValueTask.Completed 或者一个包含结果值的 ValueTask<TResult>。对于其他所有事情,它们只是委托给 AsyncTaskMethodBuilder/AsyncTaskMethodBuilder<TResult>,因为 ValueTask/ValueTask<TResult> 会被返回包装一个 Task,它可以共享所有相同的逻辑。但是 .NET 6 and C# 10 引入了一个方法可以覆盖逐个方法使用的构建器的能力,并为 ValueTask/ValueTask<TResult> 引入了几个专门的构建器,它们能够池化 IValueTaskSource/IValueTaskSource<TResult> 对象来表示最终的完成,而不是使用 Tasks。

我们可以在我们的样本中看到这一点的影响。稍微调整一下之前分析的 SomeMethodAsync 函数,让它返回 ValueTask 而不是 Task:

static async ValueTask SomeMethodAsync()
{
    for (int i = 0; i < 1000; i++)
    {
        await Task.Yield();
    }
}

这将生成以下入口点:

[AsyncStateMachine(typeof(<SomeMethodAsync>d__1))]
private static ValueTask SomeMethodAsync()
{
    <SomeMethodAsync>d__1 stateMachine = default;
    stateMachine.<>t__builder = AsyncValueTaskMethodBuilder.Create();
    stateMachine.<>1__state = -1;
    stateMachine.<>t__builder.Start(ref stateMachine);
    return stateMachine.<>t__builder.Task;
}

现在,我们添加 [AsyncMethodBuilder(typeof(PoolingAsyncValueTaskMethodBuilder))] 到 SomeMethodAsync 的声明中:

[AsyncMethodBuilder(typeof(PoolingAsyncValueTaskMethodBuilder))]
static async ValueTask SomeMethodAsync()
{
    for (int i = 0; i < 1000; i++)
    {
        await Task.Yield();
    }
}

编译器输出如下:

[AsyncStateMachine(typeof(<SomeMethodAsync>d__1))]
[AsyncMethodBuilder(typeof(PoolingAsyncValueTaskMethodBuilder))]
private static ValueTask SomeMethodAsync()
{
    <SomeMethodAsync>d__1 stateMachine = default;
    stateMachine.<>t__builder = PoolingAsyncValueTaskMethodBuilder.Create();
    stateMachine.<>1__state = -1;
    stateMachine.<>t__builder.Start(ref stateMachine);
    return stateMachine.<>t__builder.Task;
}

整个实现的实际 C# 代码生成,包括整个状态机(没有显示),几乎是相同的;唯一的区别是创建和存储的构建器的类型,因此在我们之前看到的任何引用构建器的地方都可以使用。如果你看一下 PoolingAsyncValueTaskMethodBuilder 的代码,你会看到它的结构几乎与 AsyncTaskMethodBuilder 相同,包括使用一些完全相同的共享例程来做一些事情,如特殊套管已知的 awaiter 类型。

关键的区别是,当方法第一次挂起时,它不是执行新的 AsyncStateMachineBox<TStateMachine>(),而是执行 StateMachineBox<TStateMachine>. rentfromcache(),并且在 async 方法 (SomeMethodAsync) 完成并等待返回的 ValueTask 完成时,租用的盒子会被返回到缓存中。这意味着(平摊)零分配:

这个缓存本身有点意思。对象池可能是一个好主意,也可能是一个坏主意。创建一个对象的成本越高,共享它们的价值就越大;因此,例如,对非常大的数组进行池化比对非常小的数组进行池化更有价值,因为更大的数组不仅需要更多的 CPU 周期和内存访问为零,它们还会给垃圾收集器带来更大的压力,使其更频繁地收集垃圾。然而,对于非常小的对象,将它们池化可能会带来负面影响。池只是内存分配器,GC 也是,所以当您使用池时,您是在权衡与一个分配器相关的成本与另一个分配器相关的成本,并且 GC 在处理大量微小的、生存期短的对象方面非常高效。如果你在对象的构造函数中做了很多工作,避免这些工作可以使分配器本身的开销相形见绌,从而使池变得有价值。但是,如果您在对象的构造函数中几乎没有做任何工作,并且将其进行池化,则您将打赌您的分配器(您的池)就所采用的访问模式而言比 GC 更有效,而这通常是一个糟糕的赌注。还涉及其他成本,在某些情况下,您可能最终会有效地对抗 GC 的启发式方法;例如,垃圾回收是基于一个前提进行优化的,即从较高代(如gen2)对象到较低代(如gen0)对象的引用相对较少,但池化对象可以使这些前提失效。

我们今天为大家介绍了 C# 迭代器和 async/await under the covers,下期文章,我们将继续介绍 SynchronizationContext 和 ConfigureAwait,更多关于C# async await迭代器的资料请关注脚本之家其它相关文章!

您可能感兴趣的文章:
阅读全文