首页
视频
资源
登录
原
.net core Task的学习理解(上)
13264
人阅读
2021/8/18 12:06
总访问:
2603543
评论:
2
收藏:
1
手机
分类:
.net后台框架
![.netcore](https://img.tnblog.net/arcimg/hb/c857299a86d84ee7b26d181a31e58234.jpg ".netcore") >#.net core Task的学习理解(上) [TOC] Thread 线程的生命周期 ------------ ![](https://img.tnblog.net/arcimg/hb/19fe6096041643069902db120116d43b.png) tn2>计算机系统在某一个时刻,当只有一个CPU工作时,它只执行一个进程中的一个线程,但是用户在使用计算机时回打开多个进行,这样就涉及到了线程的不同状态,一个进程可以创建多个线程。 ![](https://img.tnblog.net/arcimg/hb/5926f2d27a034b0fbcf10992754c5f89.png) tn2>当一个cpu从一个线程调用另一个线程的时候,需要将Thread1线程的状态保存到PCB1中,然后加载PCB2中Thread2的状态并调用Thread2的线程,当时间片用尽的时候再保存状态到PCB2中。... tn>当线程有很多的时候,CPU会不停的切换上下文,此时就会非常消耗CPU性能。所以线程池就出现了。 ThreadPool 线程池 ------------ tn2>线程池里面通过设置好线程的数量,可以将空闲的线程进行回收利用。可以通过`preferLocal:false`把任务放到线程池的(global queue)全局队列中,利用空闲的线程执行该任务;也可以通过`preferLocal:true`将放入线程中本地队列进行执行。 ![](https://img.tnblog.net/arcimg/hb/8944d003846047559446e1b7dab3c00f.png) tn2>线程池内部有两种队列,global queue和每个worker thread绑定的local queue。 Work item 的存储类型是Object,实际被 worker thread 执行的时候会判断其类型执行对应的入口方法。 work item 类型可能是Task也可能是IThreadPoolWorkItem。 ![](https://img.tnblog.net/arcimg/hb/f182bf48951c4665b7c2e11d5a8c34bb.png) tn2>当全局任务队列里面的任务也没有的时候,算法将会把其他有任务的线程中的任务,分配给这些空闲的线程去执行。其目的是每个线程都有事干。 ThreadPoolTaskScheduler ------------ tn2>我们先举个例子。Task.Run是我们常用的调用方式,这里我们通过委托的方式去获取它默认的调度器与委托,可以看到它默认的调度器是ThreadPoolTaskScheduler。 ```csharp static void Main(string[] args) { Console.WriteLine(Thread.CurrentThread.ManagedThreadId); Task.Run(() => { Console.WriteLine(Thread.CurrentThread.ManagedThreadId); Console.WriteLine(TaskScheduler.Current.GetType()); }); Console.ReadLine(); } ``` ![](https://img.tnblog.net/arcimg/hb/bdf04a571c3c47e9946061a59e50593b.png) tn2>ThreadPoolTaskScheduler是在ThreadPool之上的一层,它确定一般模式下会将该任务(Task)对象分配给线程池(ThreadPool)的全局队列中,然后将其分配到哪个线程(Thread)的本地队列中进行执行。如果设置为LongRunning模式将不会把任务分配到线程池中,执行由下图所示。 ![](https://img.tnblog.net/arcimg/hb/f22482ca373a43e593abfab50efba7fc.png) tn>Task并不是线程,而是一个可以封装执行过程的一个对象实体。比如上一个案例中我们就是封装了一个委托。 线程与协程 ------------ >### 并行计算的协程 ![](https://img.tnblog.net/arcimg/hb/c0e3a859a5584aaa8c18fbddca7e1d1f.png) tn2>可以多个任务通过不同的线程执行进行计算。 比如下列例子:通过task1与task2不同的线程计算最后求积。 ```csharp static async Task Main(string[] args) { Task<int> task1 = Task.Run<int>(()=> { Console.WriteLine($"task1 finish [Thread]:{Thread.CurrentThread.ManagedThreadId}"); return 1; }); Task<int> task2 = Task.Run<int>(()=> { Console.WriteLine($"task2 finish [Thread]:{Thread.CurrentThread.ManagedThreadId}"); return 2; }); Text((await task1) * (await task2)); Console.ReadLine(); } static void Text(int i) { Console.WriteLine(i); } ``` ![](https://img.tnblog.net/arcimg/hb/d2f9fc9616c8475c95c51b4519f7a681.png) ```bash # 这样也可以 await Task.WhenAll(task1, task2); ``` tn2>在实际运用场景中我们可以计算工资啊,账单结算啊.... tn>如果不支持Main的异步,需要在配置文件中添加这几行。 ```bash <PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|AnyCPU'"> <LangVersion>7.1</LangVersion> </PropertyGroup> ``` >### 协程与异步IO tn2>Task1发起请求然后自己的工作完成了,接着由IO等待线程接力,获取完所有IO后,由Task2进行处理。 ![](https://img.tnblog.net/arcimg/hb/eeeabd75919f43d487f782a059d9b466.png) tn2>我们通过请求百度之前,发现是由线程1处理该过程,IO获取完毕后是由线程8进行处理的。 ![](https://img.tnblog.net/arcimg/hb/d58668af936a4b28be2b13311cb7d379.png) ```bash static async Task Main(string[] args) { await downloadtask(); Console.ReadLine(); } static async Task downloadtask() { using (var client = new HttpClient()) { Console.WriteLine($"Before IO [Thread]:{Thread.CurrentThread.ManagedThreadId}"); Console.WriteLine($"Before IO [Thread]:{Thread.CurrentThread.ManagedThreadId}"); Console.WriteLine($"Before IO [Thread]:{Thread.CurrentThread.ManagedThreadId}"); var result = await client.GetStringAsync("https://www.baidu.com/"); Console.WriteLine($"After IO [Thread]:{Thread.CurrentThread.ManagedThreadId}"); Console.WriteLine($"After IO [Thread]:{Thread.CurrentThread.ManagedThreadId}"); Console.WriteLine($"After IO [Thread]:{Thread.CurrentThread.ManagedThreadId}"); Console.WriteLine($"After IO [Thread]:{Thread.CurrentThread.ManagedThreadId}"); } } ``` tn>`await`与线程的切换没有半点关系。 >### 本地队列与全局队列 tn2>我们通过编写如下代码,将任务放到本地队列中,并且执行5次,看看执行结果。 ```csharp for (int i = 0; i < 5; i++) { int b = i; Task.Run(() => Console.WriteLine($"Task{b} ThreadId:{Thread.CurrentThread.ManagedThreadId}")) .ContinueWith(_ => Console.WriteLine($"continuationAction{b} ThreadId:{Thread.CurrentThread.ManagedThreadId}")); Task.Delay(2000); } ``` ![](https://img.tnblog.net/arcimg/hb/233cd26217d64ae4a8586d116e14a18c.png) tn2>我发现Task.Run与ContinueWith的线程Id是同一个线程,在运行Task4的时候由于线程7空闲了,直接将Task4的任务执行了。 接着我们修改部分代码看看结果。 ```csharp Task.Run(() => Console.WriteLine($"Task1 ThreadId:{Thread.CurrentThread.ManagedThreadId}")) .ContinueWith(_ => Console.WriteLine($"continuationAction1 ThreadId:{Thread.CurrentThread.ManagedThreadId}")); Task.Run(() => Console.WriteLine($"Task2 ThreadId:{Thread.CurrentThread.ManagedThreadId}")) .ContinueWith(_ => Console.WriteLine($"continuationAction2 ThreadId:{Thread.CurrentThread.ManagedThreadId}"), TaskContinuationOptions.RunContinuationsAsynchronously); Console.ReadLine(); ``` ![](https://img.tnblog.net/arcimg/hb/c61a42525b514ecaa7564b01f121f670.png) tn2>很奇怪为什么不一样?这是因为Task1刚执行完,看到Task2还在执行,线程4就没事做了就去task2后面的continuationAction2任务做了。线程6发现自己没事做然后就去task1里面的continuationAction1拿过来做了。 >### 自定义TaskScheduler tn2>并不是所有的Task运行时都会到线程池(ThreadPool)里面去,这取决于它的TaskScheduler是哪一个;要想指定哪一个TaskScheduler需要通过TaskFactory来进行指定。下面我们通过编写自定义的TaskSchedule来进行测试。 ```csharp class CustomTaskScheduler : TaskScheduler { /// <summary> /// 定义一个线程安全的Task集合 /// </summary> private BlockingCollection<Task> _queue = new BlockingCollection<Task>(); public CustomTaskScheduler() => new Thread(() => { while (true) { // 从队列中取出一个 var task = _queue.Take(); // 进行执行 TryExecuteTask(task); Console.WriteLine($"task {task.Id} Executed"); } }) { // 定义为后台 IsBackground = true }.Start(); /// <summary> /// 获取所有Task /// </summary> /// <returns></returns> protected override IEnumerable<Task> GetScheduledTasks() => _queue.ToArray(); /// <summary> /// 添加Task /// </summary> /// <param name="task"></param> protected override void QueueTask(Task task) => _queue.Add(task); /// <summary> /// 确定提供的Task是否可以同步执行 /// </summary> /// <param name="task"></param> /// <param name="taskWasPreviouslyQueued"></param> /// <returns></returns> protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued) => false; } ``` ```csharp static void Main(string[] args) { var taskFactory = new TaskFactory(new CustomTaskScheduler()); taskFactory.StartNew(() => Console.WriteLine($"task {Task.CurrentId}" + $"threadId:{Thread.CurrentThread.ManagedThreadId}")); taskFactory.StartNew(() => Console.WriteLine($"task {Task.CurrentId}" + $"threadId:{Thread.CurrentThread.ManagedThreadId}")); Console.ReadLine(); } ``` ![](https://img.tnblog.net/arcimg/hb/d3e93218bfd1433b870ca6f38ff0e869.png) 有限元状态机(finite-state machine) ------------ tn2>有限元状态机主要解决下列几个问题: 1.原来的线程去了哪里? 2.await表达式的返回值是怎么回事? 3.await之前的变量是如何被await之后的代码获取的? 4.await之后的任务在哪执行? 5.为什么有时候线程没有发生切换? ![](https://img.tnblog.net/arcimg/hb/fc306b2b6d9e42bb84d090dd7f956959.png) tn2>FooAsync里面的`await Task.Delay(100);`会编译成如下代码。而`FooStateMachine`就是状态机。(3)关于它的变量被编译成字段。并且初始化状态机状态为`-1`,接着开始执行状态机里面的内容。而MoveNext方法里面的内容就是执行的主要部分,(4)其中需要执行的代码也在该方法里面。 ![](https://img.tnblog.net/arcimg/hb/dc09c3b0fe144f34ae8efbe90f4830bf.png) tn2>接着我们执行MoveNext里面的内容。一开始状态是`-1`所以我们执行`_state!=0`这个判断下面的内容,然后通过需要执行的任务(Task)中获取`GetAwaiter`去进行判断是否执行完成。 如果执行完成将直接到`GetResult`方法获取结果然后返回该结果,执行结束。 如果taskAwaiter没有执行完成,状态将会改为`0`,并且通过异步的方式等待完成,封装并且将再次回调MoveNext方法(想在什么地方回调就在什么地方回调)。是`AwaitUnsafeOnCompleted`的主要工作。 当第二次进行执行的时候会执行else下面的内容,(2)通过taskAwaiter去获取结果,不管返回值需不需要结果都会去调用`GetResult`方法。 最后将通过`SetResult`放上结果值进行返回。 ![](https://img.tnblog.net/arcimg/hb/0588cd26af894701b62f440ca027d19e.png) tn2>整个执行流程图 ![](https://img.tnblog.net/arcimg/hb/3e6b86d7c6d1499281f1dd22213ded82.png) Task内部结构 ------------ ![](https://img.tnblog.net/arcimg/hb/a2e6c7b620254ef188dc3bbf65feafef.png) tn2>其核心的接口有两个`INotifyCompletion`与`ICriticalNotifyCompletion`。其中OnCompleted方法用来传递执行上下文,当实现`ICriticalNotifyCompletion`接口时就会去调用UnsafeOnCompleted这个方法。 自定义Awaiter ------------ tn2>可以按照该结构去进行自定义一个Awaiter ![](https://img.tnblog.net/arcimg/hb/5d1249cd7cc24c9990c8ffbb4f819a74.png) ```csharp public class CustomAwaiter<T> : ICriticalNotifyCompletion { private Action _continuation; private T _result; private Exception _exception; public bool IsCompleted { get; private set; } public void OnCompleted(Action continuation) { Console.WriteLine("CustomAwaiter.OnCompleted"+$"continuation:{continuation.Target.GetType().Name}.{continuation.Method}"); _continuation = continuation; } public void UnsafeOnCompleted(Action continuation) { Console.WriteLine("CustomAwaiter.UnsafeOnCompleted" + $"continuation:{continuation.Target.GetType().Name}.{continuation.Method}"); _continuation = continuation; } public T GetResult() { Console.WriteLine("CustomAwaiter.GetResult"); if (_exception != null) throw _exception; return _result; } public void SetResult(T result) { Console.WriteLine("CustomAwaiter.SetResult"); IsCompleted = true; _result = result; _continuation.Invoke(); } public void SetException(Exception exception) { Console.WriteLine("CustomAwaiter.SetException"); IsCompleted = true; _exception = exception; _continuation.Invoke(); } } ``` ```csharp public class CustomAwaitable<T> { private CustomAwaiter<T> _awaiter = new CustomAwaiter<T>(); public CustomAwaiter<T> GetAwaiter() { Console.WriteLine("CustomAwaitable.GetAwaiter"); return _awaiter; } } ``` tn2>测试执行代码 ```csharp static async Task Main(string[] args) { var foo = await Foo(); Console.WriteLine(foo); } static CustomAwaitable<string> Foo() { Console.WriteLine("Foo"); var awaitable = new CustomAwaitable<string>(); var awaiter = awaitable.GetAwaiter(); new Thread(() => { Thread.Sleep(100); awaiter.SetResult("Hello World!"); }) { IsBackground = false }.Start(); return awaitable; } ``` tn2>如果运气好,执行如下 ![](https://img.tnblog.net/arcimg/hb/4317c602579e4e84a6d6daa4b5307549.png) tn2>运气不好,就会报错。报错原因是因为`SetResult`在`UnsafeOnCompleted`回调未完成的之前执行了,`_continuation`字段的委托并没有MoveNext方法。(大佬建议不使用) ![](https://img.tnblog.net/arcimg/hb/2a1ba010a8a847c28522d5b8715bfecc.png) tn2>我做了一些修改可能会导致性能不太佳,但勉勉强强能避开这个问题。 ```csharp public class CustomAwaiter<T> : ICriticalNotifyCompletion { private Action _continuation; private T _result; private Exception _exception; AutoResetEvent autoEvent = new AutoResetEvent(false); public bool IsCompleted { get; private set; } public void OnCompleted(Action continuation) { Console.WriteLine("CustomAwaiter.OnCompleted"+$"continuation:{continuation.Target.GetType().Name}.{continuation.Method}"); _continuation = continuation; } public void UnsafeOnCompleted(Action continuation) { Console.WriteLine("CustomAwaiter.UnsafeOnCompleted" + $"continuation:{continuation.Target.GetType().Name}.{continuation.Method} {Thread.CurrentThread.ManagedThreadId}"); _continuation = continuation; autoEvent.Set(); } public T GetResult() { Console.WriteLine("CustomAwaiter.GetResult"); if (_exception != null) throw _exception; return _result; } public void SetResult(T result) { Console.WriteLine($"CustomAwaiter.SetResult {Thread.CurrentThread.ManagedThreadId}"); IsCompleted = true; _result = result; if (_continuation == null) { autoEvent.WaitOne(); } _continuation.Invoke(); } public void SetException(Exception exception) { Console.WriteLine("CustomAwaiter.SetException"); IsCompleted = true; _exception = exception; _continuation.Invoke(); } } ``` tn>当我们运行的程序可能会报错时,Task的异常信息里不会有GetResult的StackFrame,这里我们自己的程序是会报错的。 因为在Task的GetResult的方法里面会添加`StackTraceHidden`特性。 ![](https://img.tnblog.net/arcimg/hb/bb23181f42b448f3818f1658ac190bbe.png) ```csharp static async Task Main(string[] args) { var foo = await Foo(); Console.WriteLine(foo); } static CustomAwaitable<string> Foo() { Console.WriteLine("Foo"); var awaitable = new CustomAwaitable<string>(); var awaiter = awaitable.GetAwaiter(); new Thread(() => { try { Thread.Sleep(100); Console.WriteLine($"child Thread {Thread.CurrentThread.ManagedThreadId}"); throw new Exception("Hello Bug!"); } catch (Exception ex) { awaiter.SetException(ex); } }) { IsBackground = false }.Start(); return awaitable; } ``` ![](https://img.tnblog.net/arcimg/hb/0701b2738c3f4702b61ef0f79e34bd51.png) async的支持 ------------ tn2>虽然实现了await的方法但是并没有实现async的支持,所以这里async并不会认识我们定义的`CustomAwaiter`,相比async Task方法的编译结果,我们少了个`AsyncTaskMethodBuilder`。 ![](https://img.tnblog.net/arcimg/hb/448cb91ac67f4453b3908af4682628a2.png) ![](https://img.tnblog.net/arcimg/hb/40e69b1782ed40538ca82952bb8cc5f7.png) AsyncTaskMethodBuilder ------------ tn2>关于AsyncMethodBuilder的结构如下图所示。 ![](https://img.tnblog.net/arcimg/hb/2f72f6ab85724ef28869e7bc489029b8.png) tn2>如果要我们去实现async的支持的话,需要我们去定义自己的`AsyncMethodBuilder`。 ```csharp public struct CustomAsyncMethodBuilder<T> { private CustomAwaiter<T> _awaiter; private CustomAwaitable<T> _awaitable; /// <summary> /// 创建CustomAsyncMethodBuilder /// </summary> /// <returns></returns> public static CustomAsyncMethodBuilder<T> Create() { Console.WriteLine("CustomAsyncMethodBuilder.Create"); var awaitable = new CustomAwaitable<T>(); var builder = new CustomAsyncMethodBuilder<T> { _awaitable = awaitable, _awaiter = awaitable.GetAwaiter() }; return builder; } /// <summary> /// CustomAsyncMethodBuilder 的 任务 /// </summary> public CustomAwaitable<T> Task { get { Console.WriteLine("CustomAsyncMethodBuilder.Task"); return _awaitable; } } public void SetException(Exception exception) { Console.WriteLine("CustomAsyncMethodBuilder.SetException"); _awaiter.SetException(exception); } public void SetResult(T result) { Console.WriteLine("CustomAsyncMethodBuilder.SetResult"); _awaiter.SetResult(result); } /// <summary> /// 调用OnCompleted方法 /// </summary> /// <typeparam name="TAwaiter"></typeparam> /// <typeparam name="TStateMachine"></typeparam> /// <param name="awaiter"></param> /// <param name="stateMachine"></param> public void AwaitOnCompleted<TAwaiter, TStateMachine>(ref TAwaiter awaiter,ref TStateMachine stateMachine) where TAwaiter : INotifyCompletion where TStateMachine : IAsyncStateMachine { Console.WriteLine("CustomAsyncMethodBuilder.AwaitOnCompleted"); _awaiter.OnCompleted(stateMachine.MoveNext); } /// <summary> /// 调用AwaitUnsafeOnCompleted方法 /// </summary> /// <typeparam name="TAwaiter"></typeparam> /// <typeparam name="TStateMachine"></typeparam> /// <param name="awaiter"></param> /// <param name="stateMachine"></param> /// 将类型或成员标识为安全关键型,并可由透明代码安全访问 [SecuritySafeCritical] public void AwaitUnsafeOnCompleted<TAwaiter, TStateMachine>(ref TAwaiter awaiter, ref TStateMachine stateMachine) where TAwaiter : INotifyCompletion where TStateMachine : IAsyncStateMachine { Console.WriteLine("CustomAsyncMethodBuilder.AwaitUnsafeOnCompleted"); // var iscompleted = bool.Parse(awaiter.GetType().GetProperty("IsCompleted").GetValue(awaiter).ToString()); _awaiter.UnsafeOnCompleted(stateMachine.MoveNext); } public void Start<TStateMachine>(ref TStateMachine stateMachine) where TStateMachine:IAsyncStateMachine { Console.WriteLine("CustomAsyncMethodBuilder.Start"); stateMachine.MoveNext(); } public void SetStateMachine(IAsyncStateMachine stateMachine) { Console.WriteLine("CustomAsyncMethodBuilder.SetStateMachine"); } } ``` tn2>最后还需要在`CustomAwaitable`上添加自定义的`CustomAsyncMethodBuilder`特性 ```csharp [AsyncMethodBuilder(typeof(CustomAsyncMethodBuilder<>))] public class CustomAwaitable<T> { private CustomAwaiter<T> _awaiter = new CustomAwaiter<T>(); public CustomAwaiter<T> GetAwaiter() { Console.WriteLine("CustomAwaitable.GetAwaiter"); return _awaiter; } } ``` tn2>最后`CustomAwaiter`也做了一些修改 ```csharp public class CustomAwaiter<T> : ICriticalNotifyCompletion { private Action _continuation; private T _result; private Exception _exception; // AutoResetEvent autoEvent = new AutoResetEvent(false); public bool IsCompleted { get; private set; } public void OnCompleted(Action continuation) { Console.WriteLine("CustomAwaiter.OnCompleted"+$"continuation:{continuation.Target.GetType().Name}.{continuation.Method}"); _continuation = continuation; } public void UnsafeOnCompleted(Action continuation) { Console.WriteLine("CustomAwaiter.UnsafeOnCompleted" + $"continuation:{continuation.Target.GetType().Name}.{continuation.Method} {Thread.CurrentThread.ManagedThreadId}"); _continuation = continuation; _continuation.Invoke(); // autoEvent.Set(); } public T GetResult() { Console.WriteLine("CustomAwaiter.GetResult"); if (_exception != null) throw _exception; return _result; } public void SetResult(T result) { Console.WriteLine($"CustomAwaiter.SetResult {Thread.CurrentThread.ManagedThreadId}"); IsCompleted = true; _result = result; } public void SetException(Exception exception) { Console.WriteLine("CustomAwaiter.SetException"); IsCompleted = true; _exception = exception; } } ``` tn2>接着我们测试一下 ```csharp static async Task Main(string[] args) { var boo = await Boo(); Console.WriteLine(boo); Console.ReadLine(); } static async CustomAwaitable<string> Boo() { await Task.Run(() => Console.WriteLine("Finish")); string str = "bob"; return str; } ``` ![](https://img.tnblog.net/arcimg/hb/9df40436f032403fafc7b5e6fe76b939.png)
欢迎加群讨论技术,1群:677373950(满了,可以加,但通过不了),2群:656732739
👈{{preArticle.title}}
👉{{nextArticle.title}}
评价
{{titleitem}}
{{titleitem}}
{{item.content}}
{{titleitem}}
{{titleitem}}
{{item.content}}
尘叶心繁
这一世以无限游戏为使命!
博主信息
排名
6
文章
6
粉丝
16
评论
8
文章类别
.net后台框架
168篇
linux
17篇
linux中cve
1篇
windows中cve
0篇
资源分享
10篇
Win32
3篇
前端
28篇
传说中的c
4篇
Xamarin
9篇
docker
15篇
容器编排
101篇
grpc
4篇
Go
15篇
yaml模板
1篇
理论
2篇
更多
Sqlserver
4篇
云产品
39篇
git
3篇
Unity
1篇
考证
2篇
RabbitMq
23篇
Harbor
1篇
Ansible
8篇
Jenkins
17篇
Vue
1篇
Ids4
18篇
istio
1篇
架构
2篇
网络
7篇
windbg
4篇
AI
18篇
threejs
2篇
人物
1篇
嵌入式
2篇
python
13篇
HuggingFace
8篇
pytorch
9篇
opencv
6篇
最新文章
最新评价
{{item.articleTitle}}
{{item.blogName}}
:
{{item.content}}
关于我们
ICP备案 :
渝ICP备18016597号-1
网站信息:
2018-2024
TNBLOG.NET
技术交流:
群号656732739
联系我们:
contact@tnblog.net
欢迎加群
欢迎加群交流技术