C#教程

关注公众号 jb51net

关闭
首页 > 软件编程 > C#教程 > C# AsyncLocal

浅析C# AsyncLocal如何在异步间进行数据流转

作者:yi念之间

在异步编程中,处理异步操作之间的数据流转是一个比较常用的操作,C#异步编程提供了一个强大的工具来解决这个问题,那就是AsyncLocal,下面我们就来看看AsyncLocal的原理和用法吧

前言

在异步编程中,处理异步操作之间的数据流转是一个比较常用的操作。C#异步编程提供了一个强大的工具来解决这个问题,那就是AsyncLocal。它是一个线程本地存储的机制,可以在异步操作之间传递数据。它为我们提供了一种简单而可靠的方式来共享数据,而不必担心线程切换或异步上下文的变化。本文我们将探究AsyncLocal的原理和用法,并进行相关源码解析。探讨它如何在异步操作之间实现数据的流转,以及它是如何在底层工作的。

使用方式

上面我们提到了AsyncLocal可以在异步操作间传递数据,我们在之前的文章<研究c#异步操作async await状态机的总结>一文中提到过异步操作会涉及到线程切换的问题,接下来通过Task来模拟一个简单异步示例,来看一下它的工作方式是什么样的,以便加深对它的理解,先看一下示例

AsyncLocal<Person> context = new AsyncLocal<Person>();
context.Value = new Person { Id = 1, Name = "张三" };
Console.WriteLine($"Main之前:{context.Value.Name},ThreadId={Thread.CurrentThread.ManagedThreadId}");
await Task.Run(() =>
{
    Console.WriteLine($"Task1之前:{context.Value.Name},ThreadId={Thread.CurrentThread.ManagedThreadId}");
    context.Value.Name = "李四";
    Console.WriteLine($"Task1之后:{context.Value.Name},ThreadId={Thread.CurrentThread.ManagedThreadId}");
});
await Task.Run(() =>
{
    Console.WriteLine($"Task2之前:{context.Value.Name},ThreadId={Thread.CurrentThread.ManagedThreadId}");
    context.Value.Name = "王五";
    Console.WriteLine($"Task2之后:{context.Value.Name},ThreadId={Thread.CurrentThread.ManagedThreadId}");
});
Console.WriteLine($"Main之后:{context.Value.Name},ThreadId={Thread.CurrentThread.ManagedThreadId}");

在上面的示例中,我们创建了一个AsyncLocal实例,并赋值了一个Person对象,然后我们创建了两个Task,分别执行了两个异步操作,并分别修改了AsyncLocal中的Person对象的值,分别在执行异步之前执行异步过程中和执行异步之后打印值来观察变化,执行程序输出结果如下

Main之前:张三,ThreadId=1
Task1之前:张三,ThreadId=4
Task1之后:李四,ThreadId=4
Task2之前:李四,ThreadId=6
Task2之后:王五,ThreadId=6
Main之后:王五,ThreadId=6

从输出结果来看,虽然我们在异步中修改了AsyncLocalPerson对象的值,并且也发生了线程切换。但是它可以在异步操作之间的数据共享和传递,使得我们在异步间进行的数据就和在一个线程里操作数据一样,让我们可以忽略掉其实已经发生了多次线程切换。

探究本质

通过上面的示例,我们发现AsyncLocal确实可以实现异步之间的数据共享和传递,那么它是如何实现的呢?接下来,我们通过先查看AsyncLocal涉及到的相关源码来探究一下。想弄明白它的流转问题,需要研究两个大方向,一个是AsyncLocal的本身实现,一个是AsyncLocal的流转涉及到的异步或者多线程相关这里涉及到的主要是Task线程池里的相关实现。由于异步相关涉及到了一整个体系,所以但看某一点的时候可能不太容易理解,我们先从AsyncLocal本身入手,然后从Task入手,最后从线程池入手,逐步探究AsyncLocal如何进行数据流转的。但是仍然希望能在阅读本文之前先了解一下设计到该话题的相关文章,先对整体有一个整体的把握

AsyncLocal

虽然强烈建议先看一下上面推荐的文章,但是在这里我们还是简单介绍一下AsyncLocal的实现,所以这里我们简单介绍一下,方便大家能直观的看到。其实涉及到的比较简单,就是看一下AsyncLocal里涉及到关于Value的操作即可[点击查看AsyncLocal.Value源码]

public sealed class AsyncLocal<T> : IAsyncLocal
{
    [MaybeNull]
    public T Value
    {
        get
        {
            object? value = ExecutionContext.GetLocalValue(this);
            if (typeof(T).IsValueType && value is null)
            {
                return default;
            }
            return (T)value!;
        }
        set
        {
            ExecutionContext.SetLocalValue(this, value, _valueChangedHandler is not null);
        }
    }
}

通过上面的源码可以看到AsyncLocal的Value属性的能力来自于ExecutionContext,也可理解为AsyncLocal是对ExecutionContext能力的包装。

在C#中,ExecutionContext是用于多线程和异步编程的类,用于保存和还原线程的执行状态。它的主要功能是确保在线程切换时,状态得以保留和恢复,以便线程能够在正确的上下文中继续执行。这有助于管理线程的数据、状态以及异步任务的正确执行。

所以我们可以继续简单的看一下ExecutionContext中关于GetLocalValue方法和SetLocalValue方法的大致实现,这里我们不在进行全部代码展示,只展示核心实现[点击查看ExecutionContext.LocalValue源码]

public sealed class ExecutionContext : IDisposable, ISerializable
{
    private readonly IAsyncLocalValueMap? m_localValues;
    private ExecutionContext(
        IAsyncLocalValueMap localValues,)
    {
        m_localValues = localValues;
    }
    //获取值的方法
    internal static object? GetLocalValue(IAsyncLocal local)
    {
        //捕获当前线程的执行上下文
        ExecutionContext? current = Thread.CurrentThread._executionContext;
        if (current == null)
        {
            return null;
        }
        //在执行上下文中获取值
        current.m_localValues.TryGetValue(local, out object? value);
        return value;
    }
    //设置值的方法
    internal static void SetLocalValue(IAsyncLocal local, object? newValue, bool needChangeNotifications)
    {
        ExecutionContext? current = Thread.CurrentThread._executionContext;
        //判断设置的心值和旧值是否相同
        object? previousValue = null;
        bool hadPreviousValue = false;
        if (current != null)
        {
            hadPreviousValue = current.m_localValues.TryGetValue(local, out previousValue);
        }
        //相同的话不在进行设置直接返回
        if (previousValue == newValue)
        {
            return;
        }
        if (current != null)
        {
            //设置新值
            newValues = current.m_localValues.Set(local, newValue, treatNullValueAsNonexistent: !needChangeNotifications);
        }
        else
        {
            //如果没有使用过先初始化在存储
            newValues = AsyncLocalValueMap.Create(local, newValue, treatNullValueAsNonexistent: !needChangeNotifications);
        } 
        //给当前线程执行上下文赋值新值
        Thread.CurrentThread._executionContext = (!isFlowSuppressed && AsyncLocalValueMap.IsEmpty(newValues)) ?
            null : new ExecutionContext(newValues, newChangeNotifications, isFlowSuppressed);
    }
}

通过上面的代码我们可以知道GetLocalValue函数用于从当前线程的执行上下文中获取异步本地对象的值。通过检索执行状态并查找本地值字典,该函数能够获取正确的值,实现了上下文数据的提取。SetLocalValue函数用于设置异步本地对象的值。它通过比较新旧值、操作本地值字典,并根据情况创建新的执行上下文,确保了数据正确地传递和存储。而异步操作过程中无非也正是不同线程上下文之间切换的问题。

有关ExecutionContext更详细的源码可以仔细阅读一下,上面开头提到的黑洞大佬文章地址。

在异步中流转

上面我们展示了AsyncLocal相关的代码实现,知道了AsyncLocal本质是对ExecutionContext能力的封装。每个线程Thread对象都包含了_executionContext类存储ExecutionContext执行上下问信息。接下来我们就来研究一下AsyncLocal中的数据是如何在异步过程中流转的。首先我们来大致回顾一下异步编译之后形成状态机的执行过程。

IAsyncStateMachine状态机实例
  ->AsyncTaskMethodBuilder属性类型AwaitUnsafeOnCompleted方法->
    ->AsyncTaskMethodBuilder<VoidTaskResult>.AwaitUnsafeOnCompleted方法
        判断是否是以下类型
        ITaskAwaiter
        IConfiguredTaskAwaiter
        IStateMachineBoxAwareAwaiter
            ->Task是类型ITaskAwaiter类型所以调用UnsafeOnCompletedInternal方法
                ->Task.UnsafeSetContinuationForAwait
                    ->判断交由哪种执行策略执行比如TaskScheduler或ThreadPool

到了Task.UnsafeSetContinuationForAwait方法这一步会涉及到异步代码如何被调度的问题也就是会被自定义调度策略调度还是被线程池调度等等。我们来看一下这个方法的实现,这个方法的实现代码,在上面的<研究c#异步操作async await状态机的总结>一文中也有介绍,咱们简单看一下这里面的代码[点击查看Task.UnsafeSetContinuationForAwait源码]

internal void UnsafeSetContinuationForAwait(IAsyncStateMachineBox stateMachineBox, bool continueOnCapturedContext)
{
    //是否捕获同步上下文
    if (continueOnCapturedContext)
    {
        //在异步执行完成后通过同步上下文执行后续结果
        SynchronizationContext? syncCtx = SynchronizationContext.Current;
        if (syncCtx != null && syncCtx.GetType() != typeof(SynchronizationContext))
        {
            var tc = new SynchronizationContextAwaitTaskContinuation(syncCtx, stateMachineBox.MoveNextAction, flowExecutionContext: false);
            if (!AddTaskContinuation(tc, addBeforeOthers: false))
            {
                tc.Run(this, canInlineContinuationTask: false);
            }
            return;
        }
        else
        {
            //选择执行默认的TaskScheduler还是自定义的Scheduler
            TaskScheduler? scheduler = TaskScheduler.InternalCurrent;
            if (scheduler != null && scheduler != TaskScheduler.Default)
            {
                var tc = new TaskSchedulerAwaitTaskContinuation(scheduler, stateMachineBox.MoveNextAction, flowExecutionContext: false);
                if (!AddTaskContinuation(tc, addBeforeOthers: false))
                {
                    tc.Run(this, canInlineContinuationTask: false);
                }
                return;
            }
        }
    }
    if (!AddTaskContinuation(stateMachineBox, addBeforeOthers: false))
    {
        //兜底的线程池策略
        ThreadPool.UnsafeQueueUserWorkItemInternal(stateMachineBox, preferLocal: true);
    }
}

在许多情况下,特定的代码需要在特定的线程上执行,例如UI操作需要在UI线程上执行,以避免UI冲突和渲染问题。SynchronizationContext就是为了解决这样的问题而引入的。它允许您捕获和存储特定线程的上下文,并在需要时将任务切换到正确的线程。

上面的这段源码是Task执行操作的核心策略,咱们简单的分析一下这段代码涉及到的几个核心的逻辑

首先是continueOnCapturedContext判断,我们使用task.ConfigureAwait(false)这里设置的true或false设置的就是continueOnCapturedContext的值,如果为true则表示当前Task的执行需要切换到当前SynchronizationContext的线程,如果用一段代码描述默认情况下一步执行的原理可以大致理解为下面的代码。

SynchronizationContext sc = SynchronizationContext.Current;
ThreadPool.QueueUserWorkItem(_ =>
{
    try 
    { 
        DoWorker();
    }
    finally 
    { 
        sc.Post(_ => callback(), null); 
    }
});

其次是scheduler != TaskScheduler.Default判断,如果自定义了TaskScheduler则使用自定义的TaskScheduler执行,否则使用ThreadPool的线程池执行。比如经典问题Task.Factory.StartNew()方法中await前后如果不想切换线程可以只用自定义TaskScheduler的方式只用一个Thread执行所有任务,示例代码如下所示。

await Task.Factory.StartNew(async () =>
{
    while (true)
    {
        Console.WriteLine($"Task之前Current Thread:{Thread.CurrentThread.ManagedThreadId}");
        await Task.Delay(2000);
        Console.WriteLine($"Task之后Current Thread:{Thread.CurrentThread.ManagedThreadId}");
    }
}, CancellationToken.None, TaskCreationOptions.None, new SingleThreadScheduler());
public class SingleThreadScheduler : TaskScheduler
{
    private readonly BlockingCollection<Task> _tasks = new BlockingCollection<Task>();
    public SingleThreadScheduler()
    {
        var thread = new Thread(() =>
        {
            foreach (var task in _tasks.GetConsumingEnumerable())
            {
                if (!TryExecuteTask(task))
                { 
                    _tasks.Add(task);
                }
            }
        })
        {
            IsBackground = true
        };
        thread.Start();
    }
    protected override IEnumerable<Task>? GetScheduledTasks()
    {
        return _tasks;
    }
    protected override void QueueTask(Task task)
    {
        _tasks.Add(task);
    }
    protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
    {
        return false;
    }
}

最后兜底的策略就是使用ThreadPool线程池去执行异步任务。

好了接下来我们把探索的重心就在线程池的里,我们知道自从有了Task之后ThreadPool就是和Task关联起来的,关联的核心逻辑就是在ThreadPoolWorkQueue的DispatchWorkItem方法中[点击查看ThreadPoolWorkQueue.DispatchWorkItem源码]

private static void DispatchWorkItem(object workItem, Thread currentThread)
{
    //判断如果线程池执行的任务是Task任务则执行Task里的ExecuteFromThreadPool方法
    if (workItem is Task task)
    {
        //传递当前的线程池里的线程
        task.ExecuteFromThreadPool(currentThread);
    }
    else
    {
        Debug.Assert(workItem is IThreadPoolWorkItem);
        Unsafe.As<IThreadPoolWorkItem>(workItem).Execute();
    }
}

通过上面的源码我们可以看到如果线程池执行的任务是Task任务则执行Task里的ExecuteFromThreadPool方法里,从这里我们也可以看到TaskThreadPool的关联性。需要注意的是这里虽然关联的Task类型但是并非是Task类的实例本身,而是实现了Task类的状态机类型AsyncStateMachineBox<TStateMachine>,通过跟踪生成的状态机代码我们可以看到,实际添加到线程池的是IAsyncStateMachineBox实例,而AsyncStateMachineBox<TStateMachine>即继承了Task也实现了IAsyncStateMachineBox接口,由于逻辑较多只粘贴咱们关注的部分[点击查看AsyncTaskMethodBuilderT.GetStateMachineBox源码]

private static IAsyncStateMachineBox GetStateMachineBox<TStateMachine>(
            ref TStateMachine stateMachine,
            [NotNull] ref Task<TResult>? taskField)
            where TStateMachine : IAsyncStateMachine
{
    //捕获当前线程上下文
    ExecutionContext? currentContext = ExecutionContext.Capture();
    //创建AsyncStateMachineBox实例
    AsyncStateMachineBox<TStateMachine> box = AsyncMethodBuilderCore.TrackAsyncMethodCompletion ?
    CreateDebugFinalizableAsyncStateMachineBox<TStateMachine>() : new AsyncStateMachineBox<TStateMachine>();
    taskField = box; 
    box.StateMachine = stateMachine;
    //传递当前捕获的ExecutionContext执行上下文
    box.Context = currentContext;
    return box;
}

在上面的方法中我们看到在初始化AsyncStateMachineBox<TStateMachine>实例之前先使用ExecutionContext.Capture()方法捕获执行上下文传递进来,这个时候还不存在被线程池执行一说,所以捕获的肯定是初始化Task的线程,注意这个时候还没有执行Task里的任何逻辑。所以我们关注一下ExecutionContext.Capture()方法的实现[点击查看EExecutionContext.Capture源码]

public static ExecutionContext? Capture()
{
    //捕获当前线程的执行上下文
    ExecutionContext? executionContext = Thread.CurrentThread._executionContext;
    if (executionContext == null)
    {
        executionContext = Default;
    }
    //如果设置ExecutionContext.RestoreFlow()则不进行捕获
    else if (executionContext.m_isFlowSuppressed)
    {
        executionContext = null;
    }
    return executionContext;
}

通过上面的代码我们看到了ExecutionContext.Capture()就是捕获当前线程的执行上下文,如果设置了ExecutionContext.RestoreFlow()上面逻辑里的m_isFlowSuppressed值则为true这个时候则不进行上下文捕获。好了我们继续往下看,上面的GetStateMachineBox方法返回的正是AsyncStateMachineBox<TStateMachine>类实例,它是线程池线程中真正执行的Task实例,我们看一下的定义[点击查看AsyncStateMachineBox源码]

private class AsyncStateMachineBox<TStateMachine> :
            Task<TResult>, IAsyncStateMachineBox
            where TStateMachine : IAsyncStateMachine
{
}

这里我们可以看到AsyncStateMachineBox<TStateMachine>类是继承自Task类也实现了IAsyncStateMachine,所以上面的ThreadPoolWorkQueue.DispatchWorkItem方法中调用的ExecuteFromThreadPool方法,本质是调用的AsyncStateMachineBox<TStateMachine>.ExecuteFromThreadPool方法,我们看一下它的实现方式[点击查看AsyncStateMachineBox.ExecuteFromThreadPool源码]

internal sealed override void ExecuteFromThreadPool(Thread threadPoolThread) => MoveNext(threadPoolThread);
public void MoveNext() => MoveNext(threadPoolThread: null);
private void MoveNext(Thread? threadPoolThread)
{
    //获取之前捕获的ExecutionContext执行上下文
    ExecutionContext? context = Context;
    if (context == null)
    {
        Debug.Assert(StateMachine != null);
        StateMachine.MoveNext();
    }
    else
    {
        //判断是否是线程池代码
        if (threadPoolThread is null)
        {
            ExecutionContext.RunInternal(context, s_callback, this);
        }
        else
        {
            //默认是线程池线程,会走到这里的逻辑
            ExecutionContext.RunFromThreadPoolDispatchLoop(threadPoolThread, context, s_callback, this);
        }
    }
}

源码中的s_callback本质是调用状态机生成的MoveNext方法,也就是在线程池线程里需要被执行的逻辑,我们看一下它的定义

private static readonly ContextCallback s_callback = ExecutionContextCallback;
private static void ExecutionContextCallback(object? s)
{
    //本质调用的状态机生成的MoveNext方法
    Unsafe.As<AsyncStateMachineBox<TStateMachine>>(s).StateMachine!.MoveNext();
}

上面的这段代码可以清楚的看到线程池线程里执行的逻辑是async await生成的状态机里的代码,完成了多线程执行状态机逻辑的关联。

咱们再继续看AsyncStateMachineBox.MoveNext方法里的执行逻辑。由于咱们是默认机制所以这段逻辑肯定是在线程池里的线程执行,所以会执行到ExecutionContext.RunFromThreadPoolDispatchLoop()方法里,我们看一下它的逻辑[点击查看ExecutionContext.RunFromThreadPoolDispatchLoop源码]

internal static void RunFromThreadPoolDispatchLoop(Thread threadPoolThread, ExecutionContext executionContext, ContextCallback callback, object state)
{
    //threadPoolThread是线程池线程,executionContext是Task.CapturedContext捕获的执行上下文
    if (executionContext != null && !executionContext.m_isDefault)
    {
        //如果线程存在ExecutionContext则把捕获到的执行上下文赋值给当前线程池线程的执行上下文ExecutionContext
        RestoreChangedContextToThread(threadPoolThread, contextToRestore: executionContext, currentContext: null);
    }
    ExceptionDispatchInfo? edi = null;
    try
    {
        //执行Task里的逻辑
        callback.Invoke(state);
    }
    catch (Exception ex)
    {
        edi = ExceptionDispatchInfo.Capture(ex);
    }
    //捕获当前线程池线程
    Thread currentThread = threadPoolThread;
    //获取当前线程池里的执行上下文
    ExecutionContext? currentExecutionCtx = currentThread._executionContext;
    currentThread._synchronizationContext = null;
    if (currentExecutionCtx != null)
    {
        //将当前线程池里的执行上下文清空,方便下次在线程池里获取到当前线程处于初始化状态
        RestoreChangedContextToThread(currentThread, contextToRestore: null, currentExecutionCtx);
    }
    edi?.Throw();
}
internal static void RestoreChangedContextToThread(Thread currentThread, ExecutionContext? contextToRestore, ExecutionContext? currentContext)
{
    //把捕获到的执行上下文赋值给当前线程池线程的执行上下文ExecutionContext
    currentThread._executionContext = contextToRestore;
    if ((currentContext != null && currentContext.HasChangeNotifications) ||
        (contextToRestore != null && contextToRestore.HasChangeNotifications))
    {
        OnValuesChanged(currentContext, contextToRestore);
    }
}

从上面的ExecutionContext.ExecuteFromThreadPool里的逻辑我们可以清楚的看到我们想要的结果,由于上面提供了大片的源码,看起来容易混乱,老规矩我们在这里总结一下核心逻辑的执行流程

通过上面的总结相信大家对执行上下文数据流转有个很好的理解了。先捕获当前线程执行上下文,然后把捕获的执行上下文填充到要执行任务的线程池的线程里,这样就完成了不同线程中执行上下文的流转,执行完Task任务之后把线程池里线程的执行上下文还原掉方便下次执行的时候是初始化状态。

一个常见的坑

通过上面的源码解析我们清楚的了解到了AsyncLocal在异步中是如何传递的,其实本质也就是在不同的线程里传递。那么接下来我们看一个大家在使用的过程中容易出错的地方,还是刚开始的例子,我们改造一下示例代码,如下所示

AsyncLocal<Person> context = new AsyncLocal<Person>();
context.Value = new Person { Id = 1, Name = "张三" };
Console.WriteLine($"Main之前:{context.Value.Name},ThreadId={Thread.CurrentThread.ManagedThreadId}");
await Task.Run(() =>
{
    Console.WriteLine($"Task1之前:{context.Value.Name},ThreadId={Thread.CurrentThread.ManagedThreadId}");
    context.Value = new Person { Id = 2, Name = "李四" };
    Console.WriteLine($"Task1之后:{context.Value.Name},ThreadId={Thread.CurrentThread.ManagedThreadId}");
});
await Task.Run(() =>
{
    Console.WriteLine($"Task2之前:{context.Value.Name},ThreadId={Thread.CurrentThread.ManagedThreadId}");
    context.Value = new Person { Id = 3, Name = "王五" };;
    Console.WriteLine($"Task2之后:{context.Value.Name},ThreadId={Thread.CurrentThread.ManagedThreadId}");
});
Console.WriteLine($"Main之后:{context.Value.Name},ThreadId={Thread.CurrentThread.ManagedThreadId}");

这段代码的执行结果大家猜到了吗?不卖关子了,上面的示例代码执行结果如下所示

Main之前:张三,ThreadId=1
Task1之前:张三,ThreadId=6
Task1之后:李四,ThreadId=6
Task2之前:张三,ThreadId=8
Task2之后:王五,ThreadId=8
Main之后:张三,ThreadId=8

这里我们可以看到,虽然我们在不同的Task里改变了AsyncLocal里的Value值比如改成了李四王五这种,但是执行完Task之后仿佛值又被还原成最初初始化时候的样子也就是上面说的张三,为什么会这个样子呢?我们来分析一下

画个图简单的演示一下,首先是初始化的时候这个时候线程A.ExecutionContext线程B.ExecutionContext都指向内存区域Person { Id = 1, Name = "张三" }如下所示

线程B里重新实例化了一个新的Person实例,此时的引用指向发生了变化,如下所示

这个时候线程A.ExecutionContext线程B.ExecutionContext已经没啥关系了,所以你无论怎么操作线程B.ExecutionContext也和线程A.ExecutionContext没有任何关系了。

总结

通过本文我们探究了AsyncLocal中的数据如何在异步之间如何流转数据的,本质还是在多个线程之间流转数据。接下来我们大致的总结一下本文的核心内容

也就是先捕获当前线程执行上下文,然后把捕获的执行上下文填充到要执行任务的线程池的线程里,这样就完成了不同线程中执行上下文的流转,执行完Task任务之后把线程池里线程的执行上下文还原掉方便下次执行的时候是初始化状态。

以上就是浅析C# AsyncLocal如何在异步间进行数据流转的详细内容,更多关于C# AsyncLocal的资料请关注脚本之家其它相关文章!

您可能感兴趣的文章:
阅读全文