分享新思路:一个逻辑完备的线程池

讨论 未结 3 39
holmes1412
holmes1412 会员 2022年4月26日 07:47 发表
<h1>一个逻辑完备的线程池</h1> <p>开源项目 Workflow 中有一个非常重要的基础模块:<strong>代码仅 300 行的 C 语言线程池</strong>。</p> <p>观点写在前面,本文还是主要分享新思路,而线程池最基本的还是简单、易用、高效。新做法的三个特点在第 3 部分开始讲解,欢迎跳阅,或直接到 Github 主页上围观代码。</p> <p><a href="https://github.com/sogou/workflow/blob/master/src/kernel/thrdpool.c" rel="nofollow">https://github.com/sogou/workflow/blob/master/src/kernel/thrdpool.c</a></p> <h2>0 - Workflow 的 thrdpool</h2> <p>Workflow 的特点是:计算通信融为一体的异步调度模式,而计算的核心:Executor 调度器,就是基于这个线程池实现的。可以说,一个通用而高效的线程池,是我们写 C/C++代码时离不开的基础模块。</p> <p><strong>thrdpool</strong>代码位置在 src/kernel/,不仅可以直接拿来使用,同时也适合阅读学习。</p> <p>而更重要的,秉承 Workflow 项目本身一贯的严谨极简的作风,这个 thrdpool 代码极致简洁,实现逻辑上亦非常完备,结构精巧,处处严谨,不得不让我惊叹:妙啊!!!🤩</p> <p>我们可能会很好奇,线程池还能写出什么别致的新思路吗?以下列出一些特点:</p> <ul> <li> <p>特点 1:创建完线程池后,<strong>无需记录任何线程 id 或对象,线程池可以通过一个等一个的方式优雅地去结束所有线程</strong>; 即:<strong>线程之间</strong>是对等的;</p> </li> <li> <p>特点 2:<strong>线程任务可以由另一个线程任务调起</strong>;甚至线程池正在被销毁时也可以提交下一个任务;(这很重要,因为线程本身很可能是不知道线程池的状态的;<br> 即:<strong>任务之间</strong>也是对等的,无论何时、由哪提交;</p> </li> <li> <p>特点 3:<strong>同理,线程任务也可以销毁这个线程池</strong>;(非常完整<br> 即:<strong>行为之间</strong>也是对等的,哪怕这个行为是 destroy ;</p> </li> </ul> <p>我真的迫不及待为大家深层解读一下,这个我愿称之为“<strong>逻辑完备”的线程池</strong>。</p> <h2>1 - 前置知识</h2> <p>第一部分我先从最基本的内容梳理一些个人理解,有基础的小伙伴可以直接跳过。如果有不准确的地方,欢迎大家指正交流~</p> <p>为什么需要线程池?(其实思路不仅对线程池,对任何有限资源的调度管理都是类似的)</p> <p>我们知道,通过系统提供的<strong>pthread</strong>或者<strong>std::thread</strong>创建线程,就可以实现多线程并发执行我们的代码。</p> <p>但是 CPU 的核数是固定的,所以真正并发执行的最大值也是固定的,过多的线程创建除了频繁产生创建的 overhead 以外,还会导致对系统资源进行争抢,这些都是不必要的浪费。</p> <p>因此我们可以管理有限个线程,循环且合理地利用它们。♻️</p> <p>那么线程池一般包含哪些内容呢?</p> <ul> <li>首先是管理若干个<del>工具人</del>线程;</li> <li>其次是管理交给线程去执行的任务,这个一般会有一个队列;</li> <li>再然后线程之间需要一些同步机制,比如 mutex 、condition 等;</li> <li>最后就是各线程池实现上自身需要的其他内容了;</li> </ul> <p>好了,接下来我们看看<strong>Workflow</strong>的<strong>thrdpool</strong>是怎么做的。</p> <h2>2 - 代码概览</h2> <p>以下共 7 步常用思路,足以让我们把代码飞快过一遍。</p> <h4>第 1 步:先看头文件,模块提供什么接口。</h4> <p>我们打开<code>thrdpool.h</code>,可以只关注三个接口:</p> <pre><code class="language-cpp">// 创建线程池 thrdpool_t *thrdpool_create(size_t nthreads, size_t stacksize); // 把任务交给线程池的入口 int thrdpool_schedule(const struct thrdpool_task *task, thrdpool_t *pool);&nbsp; // 销毁线程池 void thrdpool_destroy(void (*pending)(const struct thrdpool_task *), &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; thrdpool_t *pool); </code></pre> <h4>第 2 步:接口上有什么数据结构。</h4> <p>也就是,我们如何描述一个交给线程池的任务。</p> <pre><code class="language-cpp">struct thrdpool_task &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; void (*routine)(void *); // 一个函数指针 &nbsp; &nbsp; void *context; // 一个上下文 }; &nbsp; </code></pre> <h4>第 3 步:再看实现.c ,有什么内部数据结构。</h4> <pre><code class="language-cpp">struct __thrdpool { &nbsp; &nbsp; struct list_head task_queue; &nbsp; // 任务队列 &nbsp; &nbsp; size_t nthreads;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;// 线程个数 &nbsp; &nbsp; size_t stacksize; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;// 构造线程时的参数 &nbsp; &nbsp; pthread_t tid;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // 运行起来之后,pool 上记录的这个是 zero 值 &nbsp; &nbsp; pthread_mutex_t mutex; &nbsp; &nbsp; pthread_cond_t cond; &nbsp; &nbsp; pthread_key_t key; &nbsp; &nbsp; pthread_cond_t *terminate; }; </code></pre> <p><strong>没有一个多余,每一个成员都很到位:</strong></p> <ol> <li><strong>tid</strong>:线程 id ,整个线程池只有一个,它不会奇怪地去记录任何一个线程的 id ,这样就不完美了,它平时运行的时候是空值,退出的时候,它是用来实现链式等待的关键。</li> <li><strong>mutex</strong> 和 <strong>cond</strong>是常见的线程间同步的工具,其中这个 cond 是用来给<strong>生产者和消费者</strong>去操作任务队列用的。</li> <li><strong>key</strong>:是线程池的 key ,然后会赋予给每个由线程池创建的线程作为他们的 thread local ,用于区分这个线程是否是线程池创建的。</li> <li>我们还看到一个<strong>pthread_cond_t *terminate</strong>,这有两个用途:不仅是退出时的标记位 ,而且还是调用退出的那个人要等待的 condition 。</li> </ol> <p>以上各个成员的用途,好像说了,又好像没说,🤔是因为<strong>几乎每一个成员都值得深挖一下</strong>,所以我们记住它们,后面看代码的时候就会豁然开朗!😃</p> <h4>第 4 步:接口都调用了什么核心函数。</h4> <pre><code class="language-cpp">thrdpool_t *thrdpool_create(size_t nthreads, size_t stacksize) { &nbsp; &nbsp; thrdpool_t *pool; &nbsp; &nbsp; ret = pthread_key_create(&amp;pool-&gt;key, NULL); &nbsp; &nbsp; if (ret == 0) &nbsp; &nbsp; { ... // 去掉了其他代码,但是注意到刚才的 tid 和 terminate 的赋值 &nbsp; &nbsp; &nbsp; &nbsp; memset(&amp;pool-&gt;tid, 0, sizeof (pthread_t)); &nbsp; &nbsp; &nbsp; &nbsp; pool-&gt;terminate = NULL; &nbsp; &nbsp; &nbsp; &nbsp; if (__thrdpool_create_threads(nthreads, pool) &gt;= 0) &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return pool; &nbsp; &nbsp; &nbsp; &nbsp; ... </code></pre> <p>这里可以看到<code>__thrdpool_create_threads()</code>里边最关键的就是循环创建<strong>nthreads</strong>个线程。</p> <pre><code class="language-cpp">&nbsp; &nbsp; &nbsp; &nbsp; while (pool-&gt;nthreads &lt; nthreads)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ret = pthread_create(&amp;tid, &amp;attr, __thrdpool_routine, pool); &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ... </code></pre> <h4>第 5 步:略读核心函数的功能。</h4> <p>所以我们在上一步知道了,每个线程执行的是<code>__thrdpool_routine()</code>。不难想象,它会<strong>不停从队列拿任务出来执行</strong>:</p> <pre><code class="language-cpp">static void *__thrdpool_routine(void *arg) &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ...&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; while (1)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; { &nbsp; &nbsp; &nbsp; &nbsp; // 1. 从队列里拿一个任务出来,没有就等待 &nbsp; &nbsp; &nbsp; &nbsp; pthread_mutex_lock(&amp;pool-&gt;mutex); &nbsp; &nbsp; &nbsp; &nbsp; while (!pool-&gt;terminate &amp;&amp; list_empty(&amp;pool-&gt;task_queue)) &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; pthread_cond_wait(&amp;pool-&gt;cond, &amp;pool-&gt;mutex); &nbsp; &nbsp; &nbsp; &nbsp; if (pool-&gt;terminate) // 2. 线程池结束的标志位,记住它,先跳过 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; break; &nbsp; &nbsp; &nbsp; &nbsp; // 3. 如果能走到这里,恭喜你,拿到了任务~&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; entry = list_entry(*pos, struct __thrdpool_task_entry, list); &nbsp; &nbsp; &nbsp; &nbsp; list_del(*pos); &nbsp; &nbsp; &nbsp; &nbsp; pthread_mutex_unlock(&amp;pool-&gt;mutex); // 4. 先解锁 &nbsp;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; task_routine = entry-&gt;task.routine; &nbsp; &nbsp; &nbsp; &nbsp; task_context = entry-&gt;task.context; &nbsp; &nbsp; &nbsp; &nbsp; free(entry); &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; task_routine(task_context); // 5. 再执行 &nbsp; &nbsp; &nbsp; &nbsp; // 6. 这里也先记住它,意思是线程池里的线程可以销毁线程池 &nbsp; &nbsp; &nbsp; &nbsp; if (pool-&gt;nthreads == 0) &nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; /* Thread pool was destroyed by the task. */ &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; free(pool); &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return NULL; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; } ... // 后面还有魔法,留下一章解读~~~ </code></pre> <h4>第 6 步:把函数之间的关系联系起来。</h4> <p>刚才看到的<code>__thrdpool_routine()</code>就是线程的核心函数了,它可以和谁关联起来呢?</p> <p>可以和接口<code>thrdpool_schedule()</code>关联上。</p> <p>我们说过,线程池上有个队列管理任务,</p> <ul> <li>所以,每个执行<strong>routine</strong>的线程,都是消费者;</li> <li>而每个发起<strong>schedule</strong>的线程,都是生产者;</li> </ul> <p>我们已经看过消费者了,来看看生产者的代码:</p> <pre><code class="language-cpp">inline void __thrdpool_schedule(const struct thrdpool_task *task, void *buf, thrdpool_t *pool) { struct __thrdpool_task_entry *entry = (struct __thrdpool_task_entry *)buf; entry-&gt;task = *task; pthread_mutex_lock(&amp;pool-&gt;mutex); list_add_tail(&amp;entry-&gt;list, &amp;pool-&gt;task_queue); // 添加到队列里 pthread_cond_signal(&amp;pool-&gt;cond); // 叫醒在等待的线程 pthread_mutex_unlock(&amp;pool-&gt;mutex); } </code></pre> <p>说到这里,<code>特点 2</code>就非常清晰了:</p> <p>开篇说的<code>特点 2</code>是说,”<strong>线程任务可以由另一个线程任务调起</strong>”。</p> <p>只要对队列的管理做得好,显然我们在消费者所执行的函数也可以做生产者。</p> <h3>第 7 步:看其他情况的处理,对于线程池来说就是比如销毁的情况。</h3> <p>只看我们接口 thrdpool_destroy()的实现是非常简单的:</p> <pre><code class="language-cpp">void thrdpool_destroy(void (*pending)(const struct thrdpool_task *), thrdpool_t *pool) { ... // 1. 内部会设置 pool-&gt;terminate ,并叫醒所有等在队列拿任务的线程 __thrdpool_terminate(in_pool, pool); // 2. 把队列里还没有执行的任务都拿出来,通过 pending 返回给用户 list_for_each_safe(pos, tmp, &amp;pool-&gt;task_queue) { entry = list_entry(pos, struct __thrdpool_task_entry, list); list_del(pos); if (pending) pending(&amp;entry-&gt;task); ... // 后面就是销毁各种内存,同样有魔法~ </code></pre> <p>在退出的时候,我们那些已经提交但是还没有被执行的任务是绝对不能就这么扔掉了的,于是我们可以传入一个<code>pending()</code>函数,<strong>上层可以做自己的回收、回调、任何保证上层逻辑完备的事情</strong>。</p> <p><strong>设计的完整性,无处不在。</strong></p> <p>接下来我们就可以跟着我们的核心问题,针对性地看看每个特点都是怎么实现的。</p> <h2>3 - 特点 1: 一个等待一个的优雅退出</h2> <p>这里提出一个问题:<strong>线程池要退出,如何结束所有线程?</strong></p> <p>一般线程池的实现都是需要记录下所有的线程 id ,或者 thread 对象,以便于我们去 jion 等待它们结束。</p> <p><strong>而线性地退出,一环扣一环,长度本身不重要,让事情可以递归起来,是非常符合计算机世界的常规做法的。</strong></p> <p>我们刚才看,<strong>pool 里并没有记录所有的 tid 呀</strong>?正如开篇说的,<strong>pool 上只有一个 tid ,而且还是个空的值</strong>。</p> <p>所以<code>特点 1</code>给出了<strong>Workflow</strong>的<strong>thrdpool</strong>的答案:</p> <p><strong>无需记录所有线程,我可以让线程挨个自动退出、且一个等待一个,最终达到我调用完 thrdpool_destroy()后内存可以回收干净的目的。</strong></p> <p>这里先给一个简单的图,假设发起 destroy 的人是 main 线程,我们如何做到一个等一个退出:</p> <p>最简单的:外部线程发起 destroy👇</p> <p><img alt="thrdpool_pic_01" class="embedded_image" loading="lazy" referrerpolicy="no-referrer" rel="noreferrer" src="https://user-images.githubusercontent.com/1880011/165244341-33c47ccc-751a-42b6-a403-6ca826fa38ee.jpg"></p> <p>步骤如下:</p> <ol> <li>线程的退出,由 thrdpool_destroy()设置<strong>pool-&gt;terminate</strong>开始。</li> <li>我们每个线程,在 while(1)里会第一时间发现 terminate ,线程池要退出了,然后会 break 出这个 while 循环。</li> <li>注意这个时候,<strong>还持有着 mutex 锁</strong>,我们拿出 pool 上唯一的那个 tid ,放到我的临时变量,我会根据拿出来的值做不同的处理。且我会把我自己的 tid 放上去,然后再解 mutex 锁。</li> <li>那么很显然,第一个从 pool 上拿 tid 的人,会发现这是个 0 值,就可以直接结束了,不用负责等待任何其他人,但我在完全结束之前需要有人负责等待我的结束,所以我会把我的 id 放上去。</li> <li>而如果发现自己从 pool 里拿到的 tid 不是 0 值,<strong>说明我要负责 join 上一个人</strong>,并且把我的 tid 放上去,<strong>让下一个人负责我</strong>。</li> <li>最后的那个人,是那个发现 pool-&gt;nthreads 为 0 的人,那么我就可以通过这个 terminate (它本身是个 condition )去通知发起 destroy 的人。</li> <li>最后发起者就可以退了。🔚</li> </ol> <p>是不是非常有意思!!!非常优雅的做法!!!</p> <p>所以我们会发现,其实<strong>大家不太需要知道太多信息,只需要知道我要负责的上一个人</strong>。</p> <p>当然每一步都是非常严谨的,我们结合刚才跳过的第一段魔法🔮感受一下:</p> <pre><code class="language-cpp">static void *__thrdpool_routine(void *arg) { while (1) { pthread_mutex_lock(&amp;pool-&gt;mutex); // 1.注意这里还持有锁 ... // 等着队列拿任务出来 if (pool-&gt;terminate) // 2. 这既是标识位,也是发起销毁的那个人所等待的 condition break; ... // 执行拿到的任务 } /* One thread joins another. Don't need to keep all thread IDs. */ tid = pool-&gt;tid; // 3. 把线程池上记录的那个 tid 拿下来,我来负责上一人 pool-&gt;tid = pthread_self(); // 4. 把我自己记录到线程池上,下一个人来负责我 if (--pool-&gt;nthreads == 0) // 5. 每个人都减 1 ,最后一个人负责叫醒发起 detroy 的人 pthread_cond_signal(pool-&gt;terminate); pthread_mutex_unlock(&amp;pool-&gt;mutex); // 6. 这里可以解锁进行等待了 if (memcmp(&amp;tid, &amp;__zero_tid, sizeof (pthread_t)) != 0) // 7. 只有第一个人拿到 0 值 pthread_join(tid, NULL); // 8. 只要不 0 值,我就要负责等上一个结束才能退 return NULL; // 9. 退出,干干净净~ } </code></pre> <h2>4 - 特点 2:线程任务可以由另一个线程任务调起</h2> <p>在第二部分我们看过源码,只要队列管理得好,线程任务里提交下一个任务是完全 OK 的。</p> <p>这很合理。👌</p> <p>那么问题来了,<code>特点 1</code>又说,我们每个线程,<strong>是不太需要知道太多线程池的状态和信息的</strong>。而线程池的销毁是个过程,如果在这个过程间提交任务会怎么样呢?</p> <p>因此<code>特点 2</code>的一个重要解读是:<strong>线程池被销毁时也可以提交下一个任务</strong>,必须强调的是,是指线程任务里。而且刚才提过,还没有被执行的任务,可以通过我们传入的 pending()函数拿回来。</p> <p>简单看看销毁时的严谨做法:</p> <pre><code class="language-cpp">static void __thrdpool_terminate(int in_pool, thrdpool_t *pool) { pthread_cond_t term = PTHREAD_COND_INITIALIZER; pthread_mutex_lock(&amp;pool-&gt;mutex); // 1. 加锁设置标识位 pool-&gt;terminate = &amp;term; // 2. 之后的添加任务不会被执行,但可以 pending 拿到 pthread_cond_broadcast(&amp;pool-&gt;cond); // 3. 广播所有等待的消费者 if (in_pool) // 4. 这里的魔法等下讲&gt;_&lt;~ { /* Thread pool destroyed in a pool thread is legal. */ pthread_detach(pthread_self()); pool-&gt;nthreads--; } while (pool-&gt;nthreads &gt; 0) // 5. 如果还有线程没有退完,我会等,注意这里是 while pthread_cond_wait(&amp;term, &amp;pool-&gt;mutex); pthread_mutex_unlock(&amp;pool-&gt;mutex); if (memcmp(&amp;pool-&gt;tid, &amp;__zero_tid, sizeof (pthread_t)) != 0) pthread_join(pool-&gt;tid, NULL); // 6.同样地等待打算退出的上一个人 } </code></pre> <h2>5 - 特点 3:同样可以在线程任务里销毁这个线程池</h2> <p>既然线程任务可以做任何事情,理论上,<strong>线程任务也可以销毁线程池</strong>❓</p> <p>作为一个逻辑完备的线程池,大胆一点,我们把问号去掉。</p> <p>而且,<strong>销毁并不会结束当前任务,它会等这个任务执行完</strong>。</p> <p>想象一下,刚才的<code>__thrdpool_routine()</code>,while 里拿出来的那个任务,做的事情竟然是发起<code>thrdpool_destroy()</code>...</p> <p>我们来把上面的图改一下:</p> <p>大胆点,我们让一个 routine 来 destroy 线程池👇</p> <p><img alt="thrdpool_pic_02" class="embedded_image" loading="lazy" referrerpolicy="no-referrer" rel="noreferrer" src="https://user-images.githubusercontent.com/1880011/165244048-cd728cff-68f0-4124-8d4f-b55f31a658d3.png"></p> <p>如果发起销毁的人,是我们自己内部的线程,那么我们就不是等 n 个,而是等 n-1 ,少了一个外部线程等待我们。如何实现才能让这些逻辑都完美融合呢?我们把刚才跳过的三段魔法串起来看看。</p> <h4>第一段魔法,销毁的发起者。</h4> <p>如果发现发起销毁的人是线程池内部的线程,那么它具有较强的自我管理意识(因为前面说了,会等它这个任务执行完),而我们可以放心大胆地<strong>pthread_detach</strong>,无需任何人 join 它等待它结束。</p> <pre><code class="language-cpp">static void __thrdpool_terminate(int in_pool, thrdpool_t *pool) { …. if (in_pool) // 每个由线程池创建的线程都设置了一个 key ,由此判断是否是 in_pool { /* Thread pool destroyed in a pool thread is legal. */ pthread_detach(pthread_self()); pool-&gt;nthreads--; } </code></pre> <h4>第二段魔法:线程池谁来 free ?</h4> <p>一定是发起销毁的那个人。所以这里用<strong>in_pool</strong>来控制 main 线程的回收:</p> <pre><code class="language-cpp">void thrdpool_destroy(void (*pending)(const struct thrdpool_task *), thrdpool_t *pool) { // 已经调用完第一段,且挨个 pending(未执行的 task)了 ... // 销毁其他内部分配的内存 if (!in_pool) // 如果不是内部线程发起的销毁,要负责回收线程池内存 free(pool); } </code></pre> <p>那现在不是 main 线程发起的销毁呢?发起的销毁的那个内部线程,怎么能保证我,这个正在 routine 里的销毁任务,可以在最后关头把所有资源回收干净、顺利 free(pool)、不会挂地功成身退呢?</p> <p>在前面阅读源码第 5 步,其实我们看过,__thrdpool_routine()里有 free 的地方。</p> <p>于是现在三段魔法终于串起来了。</p> <h4>第三段魔法:严谨的并发。</h4> <pre><code class="language-cpp">static void *__thrdpool_routine(void *arg) { while (1) { ... // 前面执行完一个任务,如果任务里做的事情,是销毁线程池... // 注意这个时候,其他内存都已经被 destroy 的那个清掉了,万万不可以再用什么 mutex 、cond &nbsp; &nbsp; &nbsp; &nbsp; if (pool-&gt;nthreads == 0) &nbsp; &nbsp; &nbsp; &nbsp; { &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; /* Thread pool was destroyed by the task. */ &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; free(pool); &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return NULL; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; } ... </code></pre> <p>非常重要的一点,<strong>由于并发,我们是不知道谁先操作的。假设我们稍微改一改这个顺序,就又是另一番逻辑</strong>。</p> <p>比如我作为一个内部线程,在 routine 里调用 destroy 期间,发现还有线程没有执行完,我就要等在我的 terminate 上,待最后看到 nthreads==0 的那个人叫醒我。然后我的代码继续执行,函数栈就会从 destroy 回到 routine ,也就是上面那几行,然后,free(pool);,这时候我已经放飞自我 detach 了,直接退出即可。</p> <p>你看,无论如何,都可以完美地销毁线程池:</p> <p><img alt="thrdpool_pic_03" class="embedded_image" loading="lazy" referrerpolicy="no-referrer" rel="noreferrer" src="https://user-images.githubusercontent.com/1880011/165245563-67791737-e6be-470a-9e2d-5c4e1896c51a.jpg"></p> <p>是不是太妙了!分析到这里已经要被并发的世界感动了!😭</p> <h2>6 - 简单的用法</h2> <p>这个线程池只有两个文件: <code>thrdpool.h</code> 和 <code>thrdpool.c</code>,而且只依赖内核的数据结构<code>list.h</code>。我们把它拿出来玩,自己写一段代码:</p> <pre><code class="language-cpp">void my_routine(void *context) // 我们要执行的函数 { printf("task-%llu start.\n", reinterpret_cast&lt;unsigned long long&gt;(context); ); } void my_pending(const struct thrdpool_task *task) // 线程池销毁后,没执行的任务会到这里 { printf("pending task-%llu.\n", reinterpret_cast&lt;unsigned long long&gt;(task-&gt;context);); } int main() { thrdpool_t *thrd_pool = thrdpool_create(3, 1024); // 创建 struct thrdpool_task task; unsigned long long i; for (i = 0; i &lt; 5; i++) { task.routine = &amp;my_routine; task.context = reinterpret_cast&lt;void *&gt;(i); thrdpool_schedule(&amp;task, thrd_pool); // 调用 } getchar(); // 卡住主线程,按回车继续 thrdpool_destroy(&amp;my_pending, thrd_pool); // 结束 return 0; } </code></pre> <p>我们再 printf 几行 log ,随便编译一下就可以跑起来:</p> <p><img alt="v2-20c8e3e655216139423a4f2dda37d789_1440w" class="embedded_image" loading="lazy" referrerpolicy="no-referrer" rel="noreferrer" src="https://user-images.githubusercontent.com/1880011/164912430-62ed9a96-755a-4c5b-8d28-33cf8cbe0b0d.jpg"></p> <p>使用接口如此简单,也没有复杂的数据结构。</p> <h2>7 - 并发与结构之美</h2> <p>最后谈谈感受。</p> <p>作为并发架构领域的入门选手,看完之后很后悔为什么没有早点看、早点获取知识、打开视野,并且有一种,我肯定还没有完全理解到里边的精髓,毕竟我不能<strong>深刻地理解到设计者当时对并发的构思和模型上的选择</strong>。</p> <p>只能说,没有十多年<strong>优秀的系统调用和并发编程的功底</strong>难以写出这样的代码,没有<strong>极致的审美与对品控的偏执</strong>也难以写出这样的代码。</p> <p>并发编程有很多说道,就正如退出这个这么简单的事情,想要做到退出时回收干净却很难。如果说你写业务逻辑自己管线程,退出什么的 sleep(1)都无所谓,但做框架的人如果不能把自己的框架做得完美无暇逻辑自洽,就难免让人感觉差点意思。</p> <p>而这个 thrdpool ,它作为一个线程池,是如此地逻辑完备。</p> <p><strong>再次让我深深地感到震撼:我们身边那些原始的、底层的、基础的代码,还有很多新思路,还可以写得如此美。</strong></p> <p>Workflow 项目 GiHut 源码地址:<a href="https://github.com/sogou/workflow" rel="nofollow">https://github.com/sogou/workflow</a></p>
收藏(0)  分享
相关标签: 灌水交流
注意:本文归作者所有,未经作者允许,不得转载
3个回复
  • fawdlstty
    2022年4月26日 08:18
    勘误:“通过系统提供的 pthread 或者 std::thread 创建线程,就可以实现多线程并发执行我们的代码” 首先 pthread 和 std::thread 都不是系统提供的,系统提供的函数比如 windows 平台,api 叫 CreateThread (或类似名称比如 NtCreateThread )。其次,“并发”应该改为“并行”(并发代表同时发生,并行代表同时运行)。 “但是 CPU 的核数是固定的”这句联系上下文不太准确,cpu 并行线程数不等于核心数,所以这儿应该把核数改为线程数。
    0 0
  • justou
    2022年4月26日 10:47
    UP 有没有好的流水线式处理实现思路? n_1 个生产者 -> n_2 个加工者 -> n_3 个加工者 -> ... ->n_k 个最终消费者 设计可复用的 Pipe 来组成一个 Pipeline
    0 0