C# 中的多线程实现方式

  • Post author:
  • Post category:其他




前言:

线程 被定义为程序的执行路径。每个线程都定义了一个独特的控制流。如果您的应用程序涉及到复杂的和耗时的操作,那么设置不同的线程执行路径往往是有益的,每个线程执行特定的工作。

下面的

DoSomething



Callback

方法将贯穿全文。

    private string DoSomething(string name, int millisecondsTimeout)
    {
    	Console.WriteLine($"线程{Thread.CurrentThread.ManagedThreadId:00}执行");
        Thread.Sleep(millisecondsTimeout);
        return name;
    }

    private void Callback() { Console.WriteLine("执行回调。"); }



一、使用委托开启多线程

直接使用委托进行异步,

.NET Core 已经不支持该方法,使用时将直接抛出错误

    Func<string, int, string> action = DoSomething;
    var asyncResult = action.BeginInvoke("JOY", 0, null, null);
    string result = action.EndInvoke(asyncResult);



二、使用Thread开启多线程

通过扩展 Thread 类创建的线程

    var thread = new Thread(() => DoSomething("JOY",0));
    thread.Start();

使用 Thread 实现回调

    private void ThreadWithCallBack(ThreadStart threadStart, Action actionCallBack)
    {
        var method = new ThreadStart(() =>
        {
            threadStart.Invoke();
            actionCallBack.Invoke();
        });
    
        new Thread(method).Start();
    }
	// 调用时
	ThreadWithCallBack(() => DoSomething("JOY", 1000), () => Callback());

实现类型 async/await 的功能,异步,非阻塞,并获取计算结果

    private Func<T> ThreadWithReturn<T>(Func<T> func)
    {
        T result = default;
        var method = new ThreadStart(() =>
        {
            result = func.Invoke();
        });
    
        var thread = new Thread(method);
        thread.Start();
    
        return new Func<T>(() =>
        {
            thread.Join();
            return result;
        });
    }
	// 调用时
    var func = ThreadWithReturn(() => DoSomething("JOY", 1000));
    var result = func.Invoke();



三、使用ThreadPool开启多线程

将方法假如线程池队列以便执行

	ThreadPool.QueueUserWorkItem(o => DoSomething("JOY", 20));

设置线程池数量

设置线程池数量是全局的

委托异步调用 Task Parrallel async/await 全部都是线程池的线程

直接使用 Thread 不受这个数量限制,(但是会占用线程池数量)

    ThreadPool.SetMaxThreads(6, 6);
    ThreadPool.SetMinThreads(2, 2);
    
    ThreadPool.GetMaxThreads(out int workerThreads, out int completionPortThreads);
    Console.WriteLine($"当前电脑最大workerThreads={workerThreads},最大completionPortThreads={completionPortThreads}。");
    
    ThreadPool.GetMinThreads(out workerThreads, out completionPortThreads);
    Console.WriteLine($"当前电脑最小workerThreads={workerThreads},最小completionPortThreads={completionPortThreads}。");

使用

ManualResetEvent

等待线程池任务完成

	// 初始设置为 false 指示任务是否终止
	var mre = new ManualResetEvent(false);
    ThreadPool.QueueUserWorkItem(o =>
    {
        DoSomething("JOY", 20);
        mre.Set(); // 设置为任务已终止
    });
	
	mre.WaitOne();



四、使用Task开启多线程

使用Task开启子线程执行任务

    var task_1 = new Task(() => DoSomething("JOY", 2000));
    task_1.Start();

	var task_2 = Task.Run(() => DoSomething("JOY", 2000));

	var taskFactory = Task.Factory;
    Task task_3 = taskFactory.StartNew(() => DoSomething("JOY", 2000));

使用 Delay 方法延迟 Task

	Thread.Sleep(2000);
	// 与 Sleep(2000) 不同的时,Delay(2000) 不会阻塞当前线程,
	// 可以理解为在子线程内部加了一个 Sleep(2000).
	var task = Task.Delay(2000).ContinueWith(t => DoSomething("JOY", 20));

使用 ContinueWith 执行回调

	Task.Run(() => DoSomething("JOY", 2000)).ContinueWith(task => Callback());

获取 Task 的返回值,会阻塞

    var task = Task.Run<string>(() => DoSomething("JOY", 1000));
    string result = task.Result; // 获取返回值,会阻塞

使用 lock 实现线程安全

	// 在数据需要大量的情况下,可以使用数据分拆,安全又高效
    var formLock = new object();
    var numAsync = 0;
    for (int i = 0; i < 10000; i++)
    {
        Task.Run(() =>
        {
            // 任意时刻只有一个线程能进入方法块
            lock (formLock) { numAsync++; }
        });
    }

使用Task等待所有任务完成,会阻塞

    var taskFactory = Task.Factory;
    var taskList = new List<Task>();
    taskList.Add(taskFactory.StartNew(() => DoSomething("JOY_1", 2000)));
    taskList.Add(taskFactory.StartNew(() => DoSomething("JOY_2", 2000)));
    taskList.Add(taskFactory.StartNew(() => DoSomething("JOY_3", 2000)));
    // 阻塞,等待所有任务都完成
    Task.WaitAll(taskList.ToArray());

等待任务完成后的回调

    var taskFactory = Task.Factory;
    var taskList = new List<Task>();
    taskList.Add(taskFactory.StartNew(() => DoSomething("JOY_1", 2000)));
    taskList.Add(taskFactory.StartNew(() => DoSomething("JOY_2", 2000)));
    taskList.Add(taskFactory.StartNew(() => DoSomething("JOY_3", 2000)));
    
    // 只要有一个任务完成就进行的回调, 这里的 task 就是第一个完成的 Task
    taskFactory.ContinueWhenAny(taskList.ToArray(), task => Callback());
    
    // 等待所有任务都完成后进的回调,这里的 tasks 就是任务集
    taskFactory.ContinueWhenAll(taskList.ToArray(), (tasks) => Callback());

控制 Task 的并发数量

	// 不建议用 ThreadPool.SetMaxThreads 和 ThreadPool.SetMinThreads 直接控制
    ThreadPool.SetMaxThreads(6, 6);
    ThreadPool.SetMinThreads(2, 2);
	
	// 建议使用Task状态控制
    var tasks = new List<Task>();
    for (int i = 0; i < 1000; i++)
    {
        var j = i;
        if (tasks.Count(task => task.Status != TaskStatus.RanToCompletion) >= 5)
        {
            Task.WaitAny(tasks.ToArray());
            tasks = tasks.Where(task => task.Status != TaskStatus.RanToCompletion).ToList();
        }
    
        tasks.Add(Task.Run(() => DoSomething($"JOY_{j + 1}.", 2000)));
    }

使用 Task.WaitAll 方法 可以捕获多线程异常

    try
    {
        var tasks = new List<Task>();
        for (int i = 0; i < 9; i++)
        {
            var name = $"JOY_{i}";
            tasks.Add(Task.Run(() =>
            {
                if (name.Equals("JOY_0"))
                    throw new Exception("JOY_0异常");
                else if (name.Equals("JOY_1"))
                    throw new Exception("JOY_1异常");

                Console.WriteLine($"name: {name}, 成功。");
            }));
        }
        Task.WaitAll(tasks.ToArray());
    }
    catch (AggregateException aex)
    {
        Console.WriteLine($"异常数量:{aex.InnerExceptions.Count}");
    }
    catch (Exception ex)
    {
    
    }

使用 CancellationToken 取消其他Task或者自生

    var cts = new CancellationTokenSource();
    for (int i = 0; i < 20; i++)
    {
        var name = $"JOY_{i}";
        Task.Run(() =>
        {
            Console.WriteLine($"任务 {name} 开始。");
            try
            {
                if (name.Equals("JOY_6"))
                    throw new Exception("JOY_6异常");
    
                if (cts.IsCancellationRequested)
                    DoSomething(name, 2000);
                else
                    Console.WriteLine($"任务 {name} 取消。");
            }
            catch (Exception)
            {
                cts.Cancel();
            }
        // 将CancellationToken作为Task.Run的第二个参数,
        // 使得如果线程还没有启动,被取消就不执行任务
        }, cts.Token);
    }



五、使用Parallel开启多线程

Parallel 并发执行多个Action,主线程也会参与计算,会阻塞界面

等于 TaskWaitAll + 主线程计算

    Parallel.Invoke(
        () => DoSomething("JOY_1", 2000),
        () => DoSomething("JOY_2", 2000),
        () => DoSomething("JOY_3", 2000)
    );

Parallel 的 For 循环

同样主线程也会参与计算,会阻塞界面

	Parallel.For(0, 5, i => DoSomething($"JOY_{i}", 2000));

Parallel 的 ForEach 循环

同样主线程也会参与计算,会阻塞界面

	Parallel.ForEach(new int[] { 1, 2, 3, 4, 5 }, i => DoSomething($"JOY_{i}", 2000));

Parallel 控制并发数量

    Parallel.Invoke(
        options,
        () => DoSomething("JOY_1", 2000),
        () => DoSomething("JOY_2", 2000),
        () => DoSomething("JOY_3", 2000),
        () => DoSomething("JOY_4", 2000),
        () => DoSomething("JOY_5", 2000),
        () => DoSomething("JOY_6", 2000)
    );
    Parallel.For(0, 9, options, i => DoSomething($"JOY_{i}", 1000));
    Parallel.ForEach(
        new int[] { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 },
        options,
        i => DoSomething($"JOY_{i}", 2000)
        );



版权声明:本文为Upgrader原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。