实用技巧

关注公众号 jb51net

关闭
首页 > 网络编程 > ASP.NET > 实用技巧 > .NET多线程任务实现

.NET中多线程任务实现的几种方法小结

作者:百锦再@新空间

这篇文章主要介绍了.NET平台下多线程任务实现的几种主要方法,并深入探讨线程等待机制,帮助开发人员构建高效且可靠的并发应用程序,希望对大家有所帮助

1. 引言

在现代软件开发中,多线程编程已成为提高应用程序性能和响应能力的关键技术。.NET框架提供了丰富的多线程编程模型和API,使开发人员能够根据不同的场景需求选择最合适的实现方式。本文将全面分析.NET平台下多线程任务实现的几种主要方法,并深入探讨线程等待机制,帮助开发人员构建高效、可靠的并发应用程序。

多线程编程虽然强大,但也带来了复杂性,如竞态条件、死锁、线程安全等问题。理解.NET提供的各种多线程实现方式及其适用场景,掌握线程同步与等待的正确方法,对于编写健壮的并发代码至关重要。本文将从基础概念出发,逐步深入,涵盖从传统的Thread类到现代的async/await模式等各种技术,并提供实际代码示例和最佳实践建议。

2.NET多线程编程基础

2.1 线程概念回顾

线程是操作系统能够进行运算调度的最小单位,它被包含在进程之中,是进程中的实际运作单位。一个进程可以包含多个线程,这些线程共享进程的资源,但各自拥有独立的执行路径和调用栈。

在.NET中,线程分为两种主要类型:

多线程编程的主要优势包括:

然而,多线程编程也带来了一些挑战:

2.2 .NET线程模型概述

.NET框架提供了多层次的线程抽象,从低级的Thread类到高级的Task Parallel Library (TPL)和async/await模式,开发者可以根据需求选择不同层次的抽象。

.NET线程模型的关键组件:

3. 多线程任务实现方法

3.1 Thread类实现

System.Threading.Thread类是.NET中最基础的线程创建和控制方式。它提供了对线程生命周期的直接控制,包括创建、启动、暂停、恢复和终止线程。

创建和启动线程:

using System;
using System.Threading;

class Program
{
    static void Main()
    {
        // 创建新线程
        Thread thread = new Thread(new ThreadStart(WorkerMethod));
        
        // 设置为后台线程(可选)
        thread.IsBackground = true;
        
        // 启动线程
        thread.Start();
        
        // 主线程继续执行其他工作
        for (int i = 0; i < 5; i++)
        {
            Console.WriteLine($"主线程: {i}");
            Thread.Sleep(100);
        }
        
        // 等待工作线程完成(可选)
        thread.Join();
        Console.WriteLine("工作线程完成");
    }
    
    static void WorkerMethod()
    {
        for (int i = 0; i < 10; i++)
        {
            Console.WriteLine($"工作线程: {i}");
            Thread.Sleep(200);
        }
    }
}

Thread类的关键特性:

1.线程控制:

2.线程属性:

3.线程数据:

4.优点:

5.缺点:

3.2 ThreadPool实现

System.Threading.ThreadPool类提供了一个共享的线程池,用于执行短期的后台任务。线程池管理一组工作线程,根据需要创建新线程或重用现有线程,减少了线程创建和销毁的开销。

使用ThreadPool执行任务:

using System;
using System.Threading;

class Program
{
    static void Main()
    {
        // 将工作项排队到线程池
        ThreadPool.QueueUserWorkItem(WorkerMethod, "参数1");
        ThreadPool.QueueUserWorkItem(WorkerMethod, "参数2");
        
        // 主线程继续执行其他工作
        for (int i = 0; i < 5; i++)
        {
            Console.WriteLine($"主线程: {i}");
            Thread.Sleep(100);
        }
        
        // 注意:ThreadPool没有直接的等待机制
        Console.ReadLine(); // 防止程序退出
    }
    
    static void WorkerMethod(object state)
    {
        string param = (string)state;
        for (int i = 0; i < 3; i++)
        {
            Console.WriteLine($"{param}: {i}");
            Thread.Sleep(200);
        }
    }
}

ThreadPool的关键特性:

1.自动管理:

2.配置选项:

3.适用场景:

4.优点:

5.缺点:

3.3 Task Parallel Library (TPL)

Task Parallel Library (TPL)是.NET Framework 4.0引入的一组API,简化了并行和异步编程。TPL的核心是System.Threading.Tasks.Task类,它代表一个异步操作。

使用Task执行工作:

using System;
using System.Threading.Tasks;

​​​​​​​class Program
{
    static void Main()
    {
        // 创建并启动任务
        Task task1 = Task.Run(() => WorkerMethod("任务1"));
        Task task2 = Task.Run(() => WorkerMethod("任务2"));
        
        // 主线程继续执行其他工作
        for (int i = 0; i < 5; i++)
        {
            Console.WriteLine($"主线程: {i}");
            Task.Delay(100).Wait();
        }
        
        // 等待所有任务完成
        Task.WaitAll(task1, task2);
        Console.WriteLine("所有任务完成");
    }
    
    static void WorkerMethod(string taskName)
    {
        for (int i = 0; i < 3; i++)
        {
            Console.WriteLine($"{taskName}: {i}");
            Task.Delay(200).Wait();
        }
    }
}

TPL的关键特性:

1.任务创建:

2.任务控制:

3.任务返回结果:

4.异常处理:

5.优点:

6.缺点:

3.4 Parallel类

System.Threading.Tasks.Parallel类是TPL的一部分,提供了简单的数据并行和任务并行方法。它特别适合对集合进行并行操作。

使用Parallel类:

using System;
using System.Threading.Tasks;

​​​​​​​class Program
{
    static void Main()
    {
        // Parallel.For - 数据并行
        Parallel.For(0, 10, i => 
        {
            Console.WriteLine($"For迭代 {i}, 线程ID: {Thread.CurrentThread.ManagedThreadId}");
            Thread.Sleep(100);
        });
        
        // Parallel.ForEach - 数据并行
        var data = new[] { "A", "B", "C", "D", "E" };
        Parallel.ForEach(data, item => 
        {
            Console.WriteLine($"处理 {item}, 线程ID: {Thread.CurrentThread.ManagedThreadId}");
            Thread.Sleep(200);
        });
        
        // Parallel.Invoke - 任务并行
        Parallel.Invoke(
            () => WorkerMethod("任务1"),
            () => WorkerMethod("任务2"),
            () => WorkerMethod("任务3")
        );
    }
    
    static void WorkerMethod(string taskName)
    {
        for (int i = 0; i < 3; i++)
        {
            Console.WriteLine($"{taskName}: {i}");
            Thread.Sleep(200);
        }
    }
}

Parallel类的关键特性:

1.自动并行化:

2.并行方法:

3.配置选项:

4.优点:

5.缺点:

3.5 BackgroundWorker组件

System.ComponentModel.BackgroundWorker是一个基于事件的组件,主要用于在Windows Forms和WPF应用程序中简化后台操作与UI更新的交互。

使用BackgroundWorker:

using System;
using System.ComponentModel;
using System.Threading;
using System.Windows.Forms;

​​​​​​​class Program
{
    static BackgroundWorker worker;
    
    static void Main()
    {
        worker = new BackgroundWorker();
        
        // 设置支持取消和进度报告
        worker.WorkerReportsProgress = true;
        worker.WorkerSupportsCancellation = true;
        
        // 事件处理
        worker.DoWork += Worker_DoWork;
        worker.ProgressChanged += Worker_ProgressChanged;
        worker.RunWorkerCompleted += Worker_RunWorkerCompleted;
        
        // 模拟UI线程
        Console.WriteLine("按S开始工作,C取消工作");
        while (true)
        {
            var key = Console.ReadKey(true);
            if (key.Key == ConsoleKey.S && !worker.IsBusy)
            {
                worker.RunWorkerAsync("参数");
                Console.WriteLine("工作已开始");
            }
            else if (key.Key == ConsoleKey.C && worker.IsBusy)
            {
                worker.CancelAsync();
                Console.WriteLine("取消请求已发送");
            }
        }
    }
    
    static void Worker_DoWork(object sender, DoWorkEventArgs e)
    {
        BackgroundWorker worker = (BackgroundWorker)sender;
        string param = (string)e.Argument;
        
        for (int i = 0; i <= 100; i++)
        {
            if (worker.CancellationPending)
            {
                e.Cancel = true;
                return;
            }
            
            // 模拟工作
            Thread.Sleep(50);
            
            // 报告进度
            worker.ReportProgress(i, $"处理 {param} - {i}%");
        }
        
        e.Result = $"完成 {param}";
    }
    
    static void Worker_ProgressChanged(object sender, ProgressChangedEventArgs e)
    {
        Console.WriteLine($"进度: {e.ProgressPercentage}%, 状态: {e.UserState}");
    }
    
    static void Worker_RunWorkerCompleted(object sender, RunWorkerCompletedEventArgs e)
    {
        if (e.Cancelled)
        {
            Console.WriteLine("工作被取消");
        }
        else if (e.Error != null)
        {
            Console.WriteLine($"错误: {e.Error.Message}");
        }
        else
        {
            Console.WriteLine($"结果: {e.Result}");
        }
    }
}

BackgroundWorker的关键特性:

1.事件驱动模型:

2.UI集成:

3.功能支持:

4.优点:

5.缺点:

3.6 Async/Await模式

C# 5.0引入的async/await模式是.NET异步编程的重大改进,它允许以近乎同步的方式编写异步代码,大大简化了异步编程的复杂性。

使用async/await:

using System;
using System.Threading.Tasks;

class Program
{
    static async Task Main(string[] args)
    {
        Console.WriteLine("主线程开始");
        
        // 启动异步任务
        Task<string> task1 = DoWorkAsync("任务1");
        Task<string> task2 = DoWorkAsync("任务2");
        
        // 主线程可以继续做其他工作
        for (int i = 0; i < 3; i++)
        {
            Console.WriteLine($"主线程工作 {i}");
            await Task.Delay(100);
        }
        
        // 等待任务完成并获取结果
        string result1 = await task1;
        string result2 = await task2;
        
        Console.WriteLine($"结果1: {result1}");
        Console.WriteLine($"结果2: {result2}");
        Console.WriteLine("所有工作完成");
    }
    
    static async Task<string> DoWorkAsync(string name)
    {
        Console.WriteLine($"{name} 开始");
        
        for (int i = 0; i < 5; i++)
        {
            Console.WriteLine($"{name} 步骤 {i}");
            await Task.Delay(200); // 模拟异步工作
        }
        
        return $"{name} 完成";
    }
}

async/await的关键特性:

1.语法简化:

2.返回类型:

3.异常处理:

4.上下文捕获:

5.优点:

6.缺点:

3.7 各种方法的比较与选择

方法适用场景优点缺点推荐使用场景
Thread需要精细控制线程的场景完全控制线程生命周期开销大,管理复杂长期运行的高优先级任务
ThreadPool短期后台任务自动管理,开销小控制有限大量短生命周期任务
TPL (Task)大多数并行/异步场景高级抽象,功能丰富轻微开销现代.NET应用的主要选择
Parallel数据并行操作简化并行循环灵活性有限集合的并行处理
BackgroundWorkerUI应用的后台任务简化UI更新仅限UI场景传统WinForms/WPF应用
async/awaitI/O密集型异步操作代码简洁,可读性好需要理解机制现代异步编程首选

选择指南:

4. 线程等待机制详解

在多线程编程中,线程等待和同步是核心概念。.NET提供了多种机制来实现线程间的协调和同步。

4.1 基本等待方法

Thread.Join()

Thread.Join()方法阻塞调用线程,直到指定的线程终止。

Thread workerThread = new Thread(WorkerMethod);
workerThread.Start();

// 主线程等待workerThread完成
workerThread.Join();
Console.WriteLine("工作线程已完成");

Task.Wait() / Task.WaitAll() / Task.WaitAny()

Task task1 = Task.Run(() => WorkerMethod("任务1"));
Task task2 = Task.Run(() => WorkerMethod("任务2"));

// 等待单个任务
task1.Wait();

// 等待所有任务
Task.WaitAll(task1, task2);

// 等待任意一个任务
Task.WaitAny(task1, task2);

async/await

async Task Main()
{
    Task task1 = DoWorkAsync("任务1");
    Task task2 = DoWorkAsync("任务2");
    
    // 异步等待
    await task1;
    await task2;
    
    // 或者同时等待多个任务
    await Task.WhenAll(task1, task2);
    // 或 await Task.WhenAny(task1, task2);
}

4.2 同步原语

.NET提供了多种同步原语来处理线程同步:

1.lock语句(Monitor):

private static readonly object _lock = new object();

void CriticalSection()
{
    lock (_lock)
    {
        // 临界区代码
    }
}

2.Mutex:

using var mutex = new Mutex(false, "Global\\MyMutex");

try
{
    mutex.WaitOne();
    // 临界区代码
}
finally
{
    mutex.ReleaseMutex();
}

3.Semaphore/SemaphoreSlim:

private static SemaphoreSlim _semaphore = new SemaphoreSlim(3); // 允许3个线程同时进入

async Task AccessResource()
{
    await _semaphore.WaitAsync();
    try
    {
        // 受保护的代码
    }
    finally
    {
        _semaphore.Release();
    }
}

4.ManualResetEvent/AutoResetEvent:

private static ManualResetEvent _mre = new ManualResetEvent(false);

​​​​​​​void Worker()
{
    Console.WriteLine("工作线程等待信号...");
    _mre.WaitOne();
    Console.WriteLine("工作线程收到信号");
}

void SetSignal()
{
    Thread.Sleep(2000);
    _mre.Set(); // 发送信号
}

5.Barrier:

Barrier barrier = new Barrier(3); // 3个参与者

void Worker(object name)
{
    Console.WriteLine($"{name} 到达阶段1");
    barrier.SignalAndWait();
    
    Console.WriteLine($"{name} 到达阶段2");
    barrier.SignalAndWait();
}

// 启动3个线程
new Thread(Worker).Start("线程1");
new Thread(Worker).Start("线程2");
new Thread(Worker).Start("线程3");

6.CountdownEvent:

CountdownEvent cde = new CountdownEvent(3); // 初始计数3

void Worker(object name)
{
    Thread.Sleep(1000);
    Console.WriteLine($"{name} 完成工作");
    cde.Signal(); // 计数减1
}

// 启动3个线程
new Thread(Worker).Start("线程1");
new Thread(Worker).Start("线程2");
new Thread(Worker).Start("线程3");

​​​​​​​cde.Wait(); // 等待计数归零
Console.WriteLine("所有工作完成");

7.ReaderWriterLockSlim:

允许多个读取器或单个写入器

提高读取密集型资源的性能

ReaderWriterLockSlim rwLock = new ReaderWriterLockSlim();

void ReadData()
{
    rwLock.EnterReadLock();
    try
    {
        // 读取操作
    }
    finally
    {
        rwLock.ExitReadLock();
    }
}

​​​​​​​void WriteData()
{
    rwLock.EnterWriteLock();
    try
    {
        // 写入操作
    }
    finally
    {
        rwLock.ExitWriteLock();
    }
}

4.3 异步等待

在现代.NET应用中,异步等待(async/await)是处理并发和异步操作的首选方式。

关键异步等待方法:

1.Task.Delay:

异步版本的Thread.Sleep

不会阻塞线程

async Task ProcessAsync()
{
    Console.WriteLine("开始处理");
    await Task.Delay(1000); // 异步等待1秒
    Console.WriteLine("处理完成");
}

2.Task.WhenAll:

等待多个任务全部完成

async Task ProcessMultipleAsync()
{
    Task task1 = DoWorkAsync("任务1");
    Task task2 = DoWorkAsync("任务2");
    Task task3 = DoWorkAsync("任务3");
    
    await Task.WhenAll(task1, task2, task3);
    Console.WriteLine("所有任务完成");
}

3.Task.WhenAny:

等待任意一个任务完成

async Task ProcessFirstCompletedAsync()
{
    Task<int> task1 = DoWorkWithResultAsync("任务1", 2000);
    Task<int> task2 = DoWorkWithResultAsync("任务2", 1000);
    
    Task<int> completedTask = await Task.WhenAny(task1, task2);
    int result = await completedTask;
    Console.WriteLine($"第一个完成的任务结果: {result}");
}

4.CancellationToken:

支持异步取消操作

async Task LongRunningOperationAsync(CancellationToken cancellationToken)
{
    for (int i = 0; i < 10; i++)
    {
        cancellationToken.ThrowIfCancellationRequested();
        
        Console.WriteLine($"工作进度: {i * 10}%");
        await Task.Delay(500, cancellationToken);
    }
}

​​​​​​​// 使用示例
var cts = new CancellationTokenSource();
try
{
    // 3秒后取消
    cts.CancelAfter(3000);
    await LongRunningOperationAsync(cts.Token);
}
catch (OperationCanceledException)
{
    Console.WriteLine("操作被取消");
}

4.4 超时处理

在多线程编程中,处理超时是防止死锁和响应迟缓的重要手段。

Thread.Join超时:

Thread workerThread = new Thread(WorkerMethod);
workerThread.Start();

if (!workerThread.Join(TimeSpan.FromSeconds(3)))
{
    Console.WriteLine("工作线程未在3秒内完成");
}

Task.Wait超时:

Task longRunningTask = Task.Run(() => Thread.Sleep(5000));

try
{
    if (!longRunningTask.Wait(TimeSpan.FromSeconds(3)))
    {
        Console.WriteLine("任务未在3秒内完成");
    }
}
catch (AggregateException ae)
{
    // 处理异常
}

异步等待超时:

async Task ProcessWithTimeoutAsync()
{
    Task longTask = LongRunningOperationAsync();
    Task timeoutTask = Task.Delay(3000);
    
    Task completedTask = await Task.WhenAny(longTask, timeoutTask);
    if (completedTask == timeoutTask)
    {
        Console.WriteLine("操作超时");
        // 可能的取消逻辑
    }
    else
    {
        Console.WriteLine("操作按时完成");
    }
}

同步原语超时:

// 使用Monitor.TryEnter
if (Monitor.TryEnter(_lock, TimeSpan.FromSeconds(1)))
{
    try
    {
        // 临界区代码
    }
    finally
    {
        Monitor.Exit(_lock);
    }
}
else
{
    Console.WriteLine("获取锁超时");
}

​​​​​​​// 使用SemaphoreSlim.WaitAsync
if (await _semaphore.WaitAsync(TimeSpan.FromSeconds(1)))
{
    try
    {
        // 受保护的代码
    }
    finally
    {
        _semaphore.Release();
    }
}
else
{
    Console.WriteLine("获取信号量超时");
}

5. 高级主题与最佳实践

5.1 线程安全与同步

线程安全的基本原则:

常见线程安全问题:

1.竞态条件:当多个线程访问共享数据并尝试同时修改它时发生。

解决方案:使用适当的同步机制。

2.死锁:两个或多个线程互相等待对方释放资源。

解决方案:按固定顺序获取锁,使用超时,或避免嵌套锁。

3.活锁:线程不断改变状态以响应其他线程,但无法取得进展。

解决方案:引入随机性,或使用退避策略。

4.内存可见性:一个线程的修改对其他线程不可见。

解决方案:使用volatile关键字或适当的同步机制。

线程安全集合:

.NET提供了多种线程安全的集合类:

1.ConcurrentQueue<T>:线程安全队列

2.ConcurrentStack<T>:线程安全栈

3.ConcurrentDictionary<TKey, TValue>:线程安全字典

4.ConcurrentBag<T>:无序集合

5.BlockingCollection<T>:支持边界和阻塞的集合

var concurrentQueue = new ConcurrentQueue<int>();

// 多线程安全入队
Parallel.For(0, 100, i => concurrentQueue.Enqueue(i));

// 多线程安全出队
Parallel.For(0, 100, i => 
{
    if (concurrentQueue.TryDequeue(out int item))
    {
        Console.WriteLine($"出队: {item}");
    }
});

5.2 死锁预防

死锁的四个必要条件(Coffman条件):

预防死锁的策略:

锁顺序:确保所有线程以相同的顺序获取锁。

例如,总是先获取锁A再获取锁B。

锁超时:使用Monitor.TryEnter或类似机制设置超时。

if (Monitor.TryEnter(_lock, TimeSpan.FromSeconds(1)))
{
    try { /* 临界区 */ }
    finally { Monitor.Exit(_lock); }
}

避免嵌套锁:尽量减少锁的嵌套层次。

使用更高级的抽象:如Concurrent集合或不可变数据结构。

死锁检测:在复杂系统中实现死锁检测机制。

死锁示例与解决:

// 死锁示例
object lock1 = new object();
object lock2 = new object();

void Thread1()
{
    lock (lock1)
    {
        Thread.Sleep(100);
        lock (lock2) { /* ... */ }
    }
}

void Thread2()
{
    lock (lock2)
    {
        Thread.Sleep(100);
        lock (lock1) { /* ... */ }
    }
}

// 解决方案:固定锁顺序
void SafeThread1()
{
    lock (lock1)
    {
        Thread.Sleep(100);
        lock (lock2) { /* ... */ }
    }
}

​​​​​​​void SafeThread2()
{
    lock (lock1) // 与Thread1相同的顺序
    {
        Thread.Sleep(100);
        lock (lock2) { /* ... */ }
    }
}

5.3 性能考量

多线程性能优化的关键点:

减少锁竞争:

避免虚假共享:

合理设置并行度:

异步I/O优于线程池:

对于I/O密集型操作,使用真正的异步API而非线程池

选择适当的集合类型:

性能测量工具:

Stopwatch:测量代码执行时间

var sw = Stopwatch.StartNew();
// 被测代码
sw.Stop();
Console.WriteLine($"耗时: {sw.ElapsedMilliseconds}ms");

性能分析器:

并发可视化工具:

Visual Studio并发可视化工具

5.4 调试多线程应用

多线程调试的挑战:

调试技巧:

命名线程:

Thread worker = new Thread(WorkerMethod);
worker.Name = "Worker Thread";

使用调试位置标记:

Debug.WriteLine($"线程 {Thread.CurrentThread.Name} 进入方法X");

Visual Studio调试功能:

日志记录:

简化重现:

常见调试场景:

死锁检测:

竞态条件:

内存泄漏:

6. 实际案例分析

案例1:高性能日志处理器

需求:开发一个高性能日志处理器,能够并发处理大量日志消息,并写入文件系统,同时不影响主应用程序性能。

解决方案:

public class AsyncLogger : IDisposable
{
    private readonly BlockingCollection<string> _logQueue = new BlockingCollection<string>(10000);
    private readonly Task _processingTask;
    private readonly StreamWriter _writer;
    private readonly CancellationTokenSource _cts = new CancellationTokenSource();

    public AsyncLogger(string filePath)
    {
        _writer = new StreamWriter(filePath, append: true);
        _processingTask = Task.Run(ProcessLogs);
    }

    public void Log(string message)
    {
        if (!_logQueue.TryAdd($"{DateTime.UtcNow:o}: {message}"))
        {
            // 队列已满,可选择丢弃或等待
            _logQueue.Add(message); // 阻塞直到有空间
        }
    }

    private async Task ProcessLogs()
    {
        try
        {
            foreach (var message in _logQueue.GetConsumingEnumerable(_cts.Token))
            {
                try
                {
                    await _writer.WriteLineAsync(message);
                    
                    // 定期刷新以提高性能
                    if (_logQueue.Count == 0)
                    {
                        await _writer.FlushAsync();
                    }
                }
                catch (Exception ex)
                {
                    Debug.WriteLine($"日志写入失败: {ex.Message}");
                }
            }
        }
        catch (OperationCanceledException)
        {
            // 正常退出
        }
        
        // 最终刷新
        await _writer.FlushAsync();
    }

    public void Dispose()
    {
        _logQueue.CompleteAdding();
        _cts.Cancel();
        _processingTask.Wait();
        _writer.Dispose();
        _cts.Dispose();
    }
}

// 使用示例
using var logger = new AsyncLogger("app.log");

// 多线程记录日志
Parallel.For(0, 100, i => 
{
    logger.Log($"消息 {i} 来自线程 {Thread.CurrentThread.ManagedThreadId}");
});

设计要点:

案例2:并行数据处理管道

需求:处理大量数据,需要经过多个处理阶段,每个阶段可以并行化。

解决方案:

public class DataProcessingPipeline
{
    public async Task ProcessDataAsync(IEnumerable<InputData> inputData)
    {
        // 阶段1: 并行数据加载和初步处理
        var stage1Results = inputData
            .AsParallel()
            .WithDegreeOfParallelism(Environment.ProcessorCount)
            .Select(LoadAndPreprocessData)
            .ToList();
            
        // 阶段2: 并行复杂计算
        var stage2Tasks = stage1Results
            .Select(data => Task.Run(() => ComputeComplexFeatures(data)))
            .ToArray();
            
        var stage2Results = await Task.WhenAll(stage2Tasks);
        
        // 阶段3: 并行验证和过滤
        var stage3Results = new ConcurrentBag<ResultData>();
        Parallel.ForEach(stage2Results, data =>
        {
            if (ValidateData(data))
            {
                var transformed = TransformData(data);
                stage3Results.Add(transformed);
            }
        });
        
        // 阶段4: 批量存储
        await BatchStoreResultsAsync(stage3Results);
    }
    
    private InputData LoadAndPreprocessData(RawData raw)
    {
        // 模拟耗时操作
        Thread.Sleep(10);
        return new InputData();
    }
    
    private ComplexData ComputeComplexFeatures(InputData input)
    {
        // 模拟CPU密集型操作
        Thread.Sleep(100);
        return new ComplexData();
    }
    
    private bool ValidateData(ComplexData data)
    {
        // 简单验证
        return true;
    }
    
    private ResultData TransformData(ComplexData data)
    {
        return new ResultData();
    }
    
    private async Task BatchStoreResultsAsync(IEnumerable<ResultData> results)
    {
        // 模拟批量存储
        await Task.Delay(100);
    }
}

设计要点:

案例3:实时数据仪表板

需求:构建一个实时数据仪表板,从多个数据源获取数据,更新UI,并确保UI保持响应。

解决方案(WPF示例):

public class DashboardViewModel : INotifyPropertyChanged
{
    private readonly CancellationTokenSource _cts = new CancellationTokenSource();
    private readonly ObservableCollection<DataItem> _items = new ObservableCollection<DataItem>();
    private readonly object _syncLock = new object();
    private double _averageValue;
    
    public event PropertyChangedEventHandler PropertyChanged;
    
    public ObservableCollection<DataItem> Items => _items;
    
    public double AverageValue
    {
        get => _averageValue;
        private set
        {
            if (_averageValue != value)
            {
                _averageValue = value;
                PropertyChanged?.Invoke(this, new PropertyChangedEventArgs(nameof(AverageValue)));
            }
        }
    }
    
    public DashboardViewModel()
    {
        // 启动数据收集任务
        Task.Run(() => CollectDataAsync(_cts.Token));
    }
    
    private async Task CollectDataAsync(CancellationToken ct)
    {
        var dataSources = new IDataSource[]
        {
            new NetworkDataSource(),
            new FileDataSource(),
            new DatabaseDataSource()
        };
        
        while (!ct.IsCancellationRequested)
        {
            try
            {
                // 并行从所有数据源获取数据
                var tasks = dataSources
                    .Select(ds => ds.GetDataAsync(ct))
                    .ToArray();
                
                // 等待所有数据源响应或超时
                var timeoutTask = Task.Delay(TimeSpan.FromSeconds(5), ct);
                var completedTask = await Task.WhenAny(
                    Task.WhenAll(tasks),
                    timeoutTask);
                
                if (completedTask == timeoutTask)
                {
                    Debug.WriteLine("数据获取超时");
                    continue;
                }
                
                // 处理接收到的数据
                var allData = tasks.Select(t => t.Result).ToList();
                var newItems = ProcessData(allData);
                
                // 更新UI线程上的集合
                await Application.Current.Dispatcher.InvokeAsync(() =>
                {
                    foreach (var item in newItems)
                    {
                        _items.Add(item);
                    }
                    
                    // 保持合理数量的项目
                    while (_items.Count > 1000)
                    {
                        _items.RemoveAt(0);
                    }
                    
                    // 更新平均值
                    AverageValue = _items.Average(i => i.Value);
                }, System.Windows.Threading.DispatcherPriority.Background);
                
                // 短暂延迟
                await Task.Delay(TimeSpan.FromSeconds(1), ct);
            }
            catch (OperationCanceledException)
            {
                // 正常退出
                break;
            }
            catch (Exception ex)
            {
                Debug.WriteLine($"数据收集错误: {ex.Message}");
                await Task.Delay(TimeSpan.FromSeconds(5), ct);
            }
        }
    }
    
    private List<DataItem> ProcessData(List<RawData[]> allData)
    {
        // 模拟数据处理
        var result = new List<DataItem>();
        foreach (var dataSet in allData)
        {
            foreach (var raw in dataSet)
            {
                result.Add(new DataItem
                {
                    Timestamp = DateTime.Now,
                    Value = raw.Value * 1.2,
                    Source = raw.SourceName
                });
            }
        }
        return result;
    }
    
    public void Dispose()
    {
        _cts.Cancel();
        _cts.Dispose();
    }
}

设计要点:

7. 结论

.NET平台提供了丰富的多线程编程模型和API,从低级的Thread类到高级的async/await模式,涵盖了各种并发编程场景的需求。通过本文的全面分析,我们可以得出以下关键结论:

技术选择应根据具体需求:

线程同步至关重要:

现代异步模式优势明显:

性能与正确性平衡:

调试与测试挑战:

随着.NET平台的持续发展,多线程和异步编程模型也在不断演进。开发人员应当:

通过合理选择和组合本文介绍的各种多线程实现方法和同步技术,.NET开发人员可以构建出高性能、高响应性且可靠的并发应用程序。

以上就是.NET中多线程任务实现的几种方法小结的详细内容,更多关于.NET多线程任务实现的资料请关注脚本之家其它相关文章!

您可能感兴趣的文章:
阅读全文