成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

C++協(xié)程項目實戰(zhàn):協(xié)程函數(shù)與線程切換

開發(fā) 前端
協(xié)程宛如一顆璀璨新星,照亮了高并發(fā)編程的新路徑。協(xié)程,這一輕量級的并發(fā)編程模型,可在單線程內(nèi)實現(xiàn)多任務(wù)的高效協(xié)作。它就像一位訓(xùn)練有素的舞者,在舞臺上優(yōu)雅地暫停、恢復(fù),巧妙地避開資源爭奪,使得程序在處理 I/O 密集型任務(wù)時,效率得到質(zhì)的飛躍。

在 C/C++ 編程的廣袤天地中,高并發(fā)一直是開發(fā)者們追求的圣杯。傳統(tǒng)的多線程、多進(jìn)程編程模式,在帶來強(qiáng)大并發(fā)能力的同時,也伴隨著高昂的資源開銷與復(fù)雜的同步問題。想象一下,一個龐大的服務(wù)器程序,需要同時處理成千上萬的客戶端請求,若采用傳統(tǒng)方式,光是線程的頻繁切換與資源競爭,就可能讓程序陷入性能瓶頸的泥沼。

此時,協(xié)程宛如一顆璀璨新星,照亮了高并發(fā)編程的新路徑。協(xié)程,這一輕量級的并發(fā)編程模型,可在單線程內(nèi)實現(xiàn)多任務(wù)的高效協(xié)作。它就像一位訓(xùn)練有素的舞者,在舞臺上優(yōu)雅地暫停、恢復(fù),巧妙地避開資源爭奪,使得程序在處理 I/O 密集型任務(wù)時,效率得到質(zhì)的飛躍。

在接下來的內(nèi)容中,讓我們一起從 0 到 1 吃透 C/C++ 協(xié)程。我們將深入剖析協(xié)程的底層原理,手把手教你如何在代碼中巧妙運(yùn)用協(xié)程,解鎖高并發(fā)編程的新姿勢,讓你的程序性能更上一層樓。

一、協(xié)程(Coroutine)簡介

協(xié)程,又稱微線程,纖程。英文名Coroutine。

協(xié)程的概念很早就提出來了,但直到最近幾年才在某些語言(如Lua)中得到廣泛應(yīng)用。

子程序,或者稱為函數(shù),在所有語言中都是層級調(diào)用,比如A調(diào)用B,B在執(zhí)行過程中又調(diào)用了C,C執(zhí)行完畢返回,B執(zhí)行完畢返回,最后是A執(zhí)行完畢。所以子程序調(diào)用是通過棧實現(xiàn)的,一個線程就是執(zhí)行一個子程序。

子程序調(diào)用總是一個入口,一次返回,調(diào)用順序是明確的。而協(xié)程的調(diào)用和子程序不同,協(xié)程看上去也是子程序,但執(zhí)行過程中,在子程序內(nèi)部可中斷,然后轉(zhuǎn)而執(zhí)行別的子程序,在適當(dāng)?shù)臅r候再返回來接著執(zhí)行(注意,在一個子程序中中斷,去執(zhí)行其他子程序,不是函數(shù)調(diào)用,有點類似CPU的中斷)。

比如子程序A、B:def A():

print '1'
print '2'
print '3'
def B():
print 'x'
print 'y'
print 'z'

假設(shè)由協(xié)程執(zhí)行,在執(zhí)行A的過程中,可以隨時中斷,去執(zhí)行B,B也可能在執(zhí)行過程中中斷再去執(zhí)行A,結(jié)果可能是:

1
2
x
y
3
z

但是在A中是沒有調(diào)用B的,所以協(xié)程的調(diào)用比函數(shù)調(diào)用理解起來要難一些。

看起來A、B的執(zhí)行有點像多線程,但協(xié)程的特點在于是一個線程執(zhí)行,那和多線程比,協(xié)程有何優(yōu)勢?

最大的優(yōu)勢就是協(xié)程極高的執(zhí)行效率。因為子程序切換不是線程切換,而是由程序自身控制,因此,沒有線程切換的開銷,和多線程比,線程數(shù)量越多,協(xié)程的性能優(yōu)勢就越明顯。

第二大優(yōu)勢就是不需要多線程的鎖機(jī)制,因為只有一個線程,也不存在同時寫變量沖突,在協(xié)程中控制共享資源不加鎖,只需要判斷狀態(tài)就好了,所以執(zhí)行效率比多線程高很多。

因為協(xié)程是一個線程執(zhí)行,那怎么利用多核CPU呢?最簡單的方法是多進(jìn)程+協(xié)程,既充分利用多核,又充分發(fā)揮協(xié)程的高效率,可獲得極高的性能。

Python對協(xié)程的支持還非常有限,用在generator中的yield可以一定程度上實現(xiàn)協(xié)程。雖然支持不完全,但已經(jīng)可以發(fā)揮相當(dāng)大的威力了。

來看例子:

傳統(tǒng)的生產(chǎn)者-消費者模型是一個線程寫消息,一個線程取消息,通過鎖機(jī)制控制隊列和等待,但一不小心就可能死鎖。

如果改用協(xié)程,生產(chǎn)者生產(chǎn)消息后,直接通過yield跳轉(zhuǎn)到消費者開始執(zhí)行,待消費者執(zhí)行完畢后,切換回生產(chǎn)者繼續(xù)生產(chǎn),效率極高:import time

def consumer():
r = ''
while True:
n = yield r
if not n:
return
print('[CONSUMER] Consuming %s...' % n)
time.sleep(1)
r = '200 OK'
def produce(c):
c.next()
n = 0
while n < 5:
n = n + 1
print('[PRODUCER] Producing %s...' % n)
r = c.send(n)
print('[PRODUCER] Consumer return: %s' % r)
c.close()
if __name__=='__main__':
c = consumer()
produce(c)

執(zhí)行結(jié)果:

[PRODUCER] Producing 1...
[CONSUMER] Consuming 1...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 2...
[CONSUMER] Consuming 2...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 3...
[CONSUMER] Consuming 3...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 4...
[CONSUMER] Consuming 4...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 5...
[CONSUMER] Consuming 5...
[PRODUCER] Consumer return: 200 OK=

注意到consumer函數(shù)是一個generator(生成器),把一個consumer傳入produce后:

  • 首先調(diào)用c.next()啟動生成器;
  • 然后,一旦生產(chǎn)了東西,通過c.send(n)切換到consumer執(zhí)行;
  • consumer通過yield拿到消息,處理,又通過yield把結(jié)果傳回;
  • produce拿到consumer處理的結(jié)果,繼續(xù)生產(chǎn)下一條消息;
  • produce決定不生產(chǎn)了,通過c.close()關(guān)閉consumer,整個過程結(jié)束。

整個流程無鎖,由一個線程執(zhí)行,produce和consumer協(xié)作完成任務(wù),所以稱為“協(xié)程”,而非線程的搶占式多任務(wù)。

二、C/C++ 協(xié)程

c++作為一個相對古老的語言,曾經(jīng)是步履蹣跚,直到c++11才奮起直追,但是對新技術(shù)的整體演進(jìn),其實c++仍然是保守的。現(xiàn)在c++20的標(biāo)準(zhǔn)雖然已經(jīng)實現(xiàn)了協(xié)程,但目前能比較好支持c++20的編譯器幾乎都和整體的環(huán)境不太兼容。換句話說,還需要繼續(xù)等待整個c++的迭代版本,可能到了c++23,整體的環(huán)境就會跟上去,協(xié)程才會真正的飛入程序員的“尋常百姓家”。

正如前面提到的,協(xié)程一般來說是不需要鎖的,但是如果協(xié)程的底層操作是跨越線程動態(tài)操作,仍然是需要鎖的存在的。這也是為什么要求盡量把協(xié)和的調(diào)度放到一個線程中去的原因。

首先需要聲明的是,這里不打算花時間來介紹什么是協(xié)程,以及協(xié)程和線程有什么不同。如果對此有任何疑問,可以自行 google。與 Python 不同,C/C++ 語言本身是不能天然支持協(xié)程的。現(xiàn)有的 C++ 協(xié)程庫均基于兩種方案:利用匯編代碼控制協(xié)程上下文的切換,以及利用操作系統(tǒng)提供的 API 來實現(xiàn)協(xié)程上下文切換。

典型的例如:

  1. libco,Boost.context:基于匯編代碼的上下文切換
  2. phxrpc:基于 ucontext/Boost.context 的上下文切換
  3. libmill:基于 setjump/longjump 的協(xié)程切換

一般而言,基于匯編的上下文切換要比采用系統(tǒng)調(diào)用的切換更加高效,這也是為什么 phxrpc 在使用 Boost.context 時要比使用 ucontext 性能更好的原因。關(guān)于 phxrpc 和 libmill 具體的協(xié)程實現(xiàn)方式,以后有時間再詳細(xì)介紹。

2.1協(xié)程和線程之間區(qū)別

在了解了協(xié)程的基本概念之后,很多人可能會將它與線程混淆,畢竟它們都和程序的并發(fā)執(zhí)行有關(guān)。那么,協(xié)程和線程到底有什么區(qū)別呢?接下來,我們就來深入探討一下。

(1)線程:操作系統(tǒng)的寵兒

線程,是操作系統(tǒng)能夠進(jìn)行運(yùn)算調(diào)度的最小單位。它被包含在進(jìn)程之中,是進(jìn)程中的實際運(yùn)作單位。打個比方,如果把進(jìn)程看作是一個工廠,那么線程就是工廠里的工人,每個工人都可以獨立執(zhí)行任務(wù),但又共享著工廠的資源。

線程的創(chuàng)建、調(diào)度和切換都由操作系統(tǒng)內(nèi)核負(fù)責(zé)。當(dāng)我們在程序中創(chuàng)建一個線程時,操作系統(tǒng)會為它分配一系列的資源,包括獨立的棧空間、程序計數(shù)器(PC)等。線程的調(diào)度采用的是搶占式調(diào)度策略,也就是說,操作系統(tǒng)會根據(jù)一定的算法,在適當(dāng)?shù)臅r候剝奪當(dāng)前正在執(zhí)行的線程的 CPU 使用權(quán),將其放入就緒隊列,然后從就緒隊列中選擇一個新的線程來執(zhí)行。這種調(diào)度方式可以保證多個線程能夠公平地競爭 CPU 資源,實現(xiàn)并發(fā)執(zhí)行。

(2)協(xié)程:輕量級的后起之秀

協(xié)程則是一種用戶態(tài)的輕量級線程,它的調(diào)度完全由用戶控制,而不是操作系統(tǒng)。這就好比一個小組里的成員,他們可以自行決定任務(wù)的執(zhí)行順序和時間,不需要外部的強(qiáng)制干預(yù)。

在 C/C++ 中,協(xié)程通常通過函數(shù)庫或者語言特性來實現(xiàn)。創(chuàng)建協(xié)程時,系統(tǒng)只會為其分配少量的資源,比如一個很小的棧空間,用于保存協(xié)程的執(zhí)行狀態(tài)和局部變量。協(xié)程的調(diào)度是協(xié)作式的,也就是說,只有當(dāng)一個協(xié)程主動讓出執(zhí)行權(quán)時,其他協(xié)程才有機(jī)會執(zhí)行。比如,當(dāng)一個協(xié)程遇到 I/O 操作、調(diào)用特定的掛起函數(shù)或者執(zhí)行時間過長時,它可以主動暫停自己的執(zhí)行,將執(zhí)行權(quán)交給其他協(xié)程。

(3)二者之間的區(qū)別

調(diào)度方式:線程由操作系統(tǒng)內(nèi)核調(diào)度,采用搶占式調(diào)度策略;而協(xié)程由用戶控制調(diào)度,采用協(xié)作式調(diào)度策略。這就導(dǎo)致了線程的調(diào)度是被動的,而協(xié)程的調(diào)度是主動的。

上下文切換:線程的上下文切換涉及到用戶態(tài)和內(nèi)核態(tài)的切換,需要保存和恢復(fù)寄存器、棧指針等大量的狀態(tài)信息,開銷較大;而協(xié)程的上下文切換只在用戶態(tài)進(jìn)行,只需要保存和恢復(fù)少量的寄存器和棧信息,開銷非常小,甚至可以忽略不計。

資源占用:線程的創(chuàng)建和銷毀需要操作系統(tǒng)分配和回收大量的資源,每個線程都有自己獨立的棧空間,通常棧空間較大,所以線程占用的資源較多;而協(xié)程的創(chuàng)建和銷毀開銷小,占用的資源也很少,一個線程中可以創(chuàng)建成百上千個協(xié)程。

適用場景:線程適用于需要充分利用多核 CPU 資源、處理計算密集型任務(wù)的場景;而協(xié)程適用于 I/O 密集型任務(wù),比如網(wǎng)絡(luò)請求、文件讀寫等,因為在這些場景中,大量的時間都花費在等待 I/O 操作完成上,協(xié)程可以在等待時主動讓出執(zhí)行權(quán),提高程序的整體效率 。

為了更直觀地感受線程和協(xié)程的區(qū)別,我們來看下面這個表格:

比較項

線程

協(xié)程

調(diào)度者

操作系統(tǒng)內(nèi)核

用戶程序

上下文切換開銷

資源占用

適用場景

計算密集型任務(wù)

I/O 密集型任務(wù)

2.2協(xié)程的原理

既然協(xié)程如此厲害,那么它實現(xiàn)的原理到底是什么呢?協(xié)程最重要的應(yīng)用方式就是把線程在內(nèi)核上的開銷轉(zhuǎn)到了應(yīng)用層的開銷,避開或者屏蔽(對應(yīng)用者)線程操作的難度。那多線程操作的復(fù)雜性在哪兒呢?線程切換的隨機(jī)性和線程Context的跟隨,出入棧的保存和恢復(fù),相關(guān)數(shù)據(jù)的鎖和讀寫控制。這才是多線程的復(fù)雜性,如果再加異步引起的數(shù)據(jù)的非連續(xù)性和事件的非必然性操作,就更加增強(qiáng)了多線程遇到問題的判別和斷點的準(zhǔn)確。

好,既然是這樣,那么上框架,封裝不就得了。

協(xié)程和線程一樣,同樣需要做好兩個重點:第一個是協(xié)程的調(diào)度;第二是上下文的切換。而這兩點在OS的相關(guān)書籍中的介紹海了去了,這里就不再贅述,原理基本都是一樣的。

如果以協(xié)程的關(guān)系來區(qū)分,協(xié)程也可以劃分為對稱和非對稱協(xié)程兩種。協(xié)程間是平等關(guān)系的,就是對稱的;反之為非對稱的。名字越起越多,但事兒還是那么兩下子,大家自己體會即可。

只要能保證上面所說的對上下文數(shù)據(jù)的安全性保證又能夠?qū)崿F(xiàn)協(xié)程在具體線程上的操作(某一個線程上執(zhí)行的所有協(xié)程是串行的),那么鎖的操作,從理論上講是不需要的(但實際開發(fā)中,因為協(xié)程的應(yīng)用還是少,所以還需要具體的問題具體分析)。協(xié)程的動作集中在應(yīng)用層,而把復(fù)雜的內(nèi)核調(diào)度的線程屏蔽在下層框架上(或者以后會不會出現(xiàn)OS進(jìn)行封裝),從而大幅的降低了編程的難度,但卻擁有了線程快速異步調(diào)用的效果。

2.3協(xié)程實現(xiàn)機(jī)制

協(xié)程的實現(xiàn)有以下幾種機(jī)制:

①基于匯編的實現(xiàn):這個對匯編編程得要求有兩下子,這個網(wǎng)上也有不少例子,就不再這里搬門弄斧了。

②基于switch-case來實現(xiàn):這個其實更像是一個C語言的技巧,利用不同的狀態(tài)Case來達(dá)到目的,或者說大家認(rèn)知中的對編程語言的一種內(nèi)卷使用,網(wǎng)上有一個開源的項目:

https://github.com/georgeredinger/protothreads

③基于操作系統(tǒng)提供的接口:Linux的ucontext,Windows的Fiber

Fiber可能很多人都不熟悉,這其實就是微軟原來提供的纖程,有興趣的可以去網(wǎng)上查找一下,有幾年這個概念炒得還是比較火的。ucontext是Linux上的一種操作,這兩個都可以當(dāng)作是一種類似特殊的應(yīng)用存在。游戲界的大佬云風(fēng)(《游戲之旅:我的編程感悟》作者)的coroutine就是類似于這種。興趣是編程的動力,大家如果對這些有興趣可以看看這本書,雖然其中很多的東西都比較老了,但是整體的思想還是非常有借鑒的。

④基于接口 setjmp 和 longjmp同時使用 static local 的變量來保存協(xié)程內(nèi)部的數(shù)據(jù)

這兩個函數(shù)是C語言的一個非常有意思的應(yīng)用,一般寫C好長時間的人,都沒接觸過這兩個API函數(shù),這個函數(shù)的定義是:

int setjmp(jmp_buf envbuf);
void longjmp(jmp_buf envbuf, int val);

它們兩個的作用,前者是用來將棧楨(上下文)保存在jmp_buf這個數(shù)據(jù)結(jié)構(gòu)中,然后可以通過后者 longjmp在指定的位置恢復(fù)出來。這就類似于使用goto語句跳轉(zhuǎn)到任意的地方,然后再把相關(guān)的數(shù)據(jù)恢復(fù)出來。看一下個《C專家編程》中的例子:

#include <stdio.h>
#include <setjmp.h>

jmp_buf buf;

banana()
{
    printf("in banana() \n");
    longjmp(buf,1);
    printf("you'll never see this,because i longjmp'd");
}

main()
{
    if(setjmp(buf))
        printf("back in main\n");
    else
    {
        printf("first time through\n");
        banana();
    }
}

看完了上述的幾種方法,其實網(wǎng)上還有幾種實現(xiàn)的方式,但都是比較刻板,有興趣的可以搜索一下,這里就不提供鏈接了。

協(xié)程的實現(xiàn),按理說還是OS搞定最好,其實是框架底層,但C/C++的復(fù)雜性,以及不同的平臺和不同編譯器、庫之間的長期差異,導(dǎo)致這方面能做好的可能性真心是覺得不會太大。

三、協(xié)程核心原理機(jī)制

3.1libco協(xié)程的創(chuàng)建和切換

在介紹 coroutine 的創(chuàng)建之前,我們先來熟悉一下 libco 中用來表示一個 coroutine 的數(shù)據(jù)結(jié)構(gòu),即定義在 co_routine_inner.h 中的 stCoRoutine_t:

struct stCoRoutine_t
{
stCoRoutineEnv_t *env; // 協(xié)程運(yùn)行環(huán)境
pfn_co_routine_t pfn; // 協(xié)程執(zhí)行的邏輯函數(shù)
void *arg; // 函數(shù)參數(shù)
coctx_t ctx; // 保存協(xié)程的下文環(huán)境
...
char cEnableSysHook; // 是否運(yùn)行系統(tǒng) hook,即非侵入式邏輯
char cIsShareStack; // 是否在共享棧模式
void *pvEnv;
stStackMem_t* stack_mem; // 協(xié)程運(yùn)行時的棧空間
char* stack_sp; // 用來保存協(xié)程運(yùn)行時的棧空間
unsigned int save_size;
char* save_buffer;
};

我們暫時只需要了解表示協(xié)程的最簡單的幾個參數(shù),例如協(xié)程運(yùn)行環(huán)境,協(xié)程的上下文環(huán)境,協(xié)程運(yùn)行的函數(shù)以及運(yùn)行時棧空間。后面的 stack_sp,save_size 和 save_buffer 與 libco 共享棧模式相關(guān),有關(guān)共享棧的內(nèi)容我們后續(xù)再說。

3.2協(xié)程的執(zhí)行流程

為了更直觀地理解協(xié)程的執(zhí)行流程,我們來看一個簡單的 C++ 代碼示例:

#include <iostream>
#include <coroutine>

struct Task {
    struct promise_type;
    using handle_type = std::coroutine_handle<promise_type>;
    handle_type coro;

    Task(handle_type h) : coro(h) {}

    ~Task() {
        if (coro) coro.destroy();
    }

    bool resume() {
        if (!coro.done()) {
            coro.resume();
            return true;
        }
        return false;
    }

    struct promise_type {
        Task get_return_object() {
            return Task{handle_type::from_promise(*this)};
        }

        std::suspend_never initial_suspend() { return {}; }
        std::suspend_never final_suspend() noexcept { return {}; }

        void return_void() {}
        void unhandled_exception() { std::terminate(); }
    };

    // 協(xié)程函數(shù)
    static Task simple_coroutine() {
        std::cout << "Coroutine started" << std::endl;
        // 暫停協(xié)程
        co_await std::suspend_always{};
        std::cout << "Coroutine resumed" << std::endl;
    }
};

int main() {
    // 創(chuàng)建協(xié)程
    Task t = Task::simple_coroutine();
    std::cout << "Main function" << std::endl;
    // 恢復(fù)協(xié)程執(zhí)行
    t.resume();
    return 0;
}

在這個示例中,我們定義了一個Task結(jié)構(gòu)體來表示一個協(xié)程任務(wù)。simple_coroutine函數(shù)是一個協(xié)程函數(shù),它內(nèi)部使用了co_await關(guān)鍵字來暫停協(xié)程的執(zhí)行。

當(dāng)main函數(shù)中調(diào)用Task::simple_coroutine()時,協(xié)程開始創(chuàng)建,但并不會立即執(zhí)行,而是返回一個Task對象。此時,協(xié)程處于掛起狀態(tài)。

接著,main函數(shù)繼續(xù)執(zhí)行,輸出Main function。然后調(diào)用t.resume(),協(xié)程從上次暫停的地方(即co_await處)恢復(fù)執(zhí)行,輸出Coroutine resumed。

從這個示例中,我們可以清晰地看到協(xié)程的執(zhí)行流程:創(chuàng)建協(xié)程時,協(xié)程函數(shù)并不會立即執(zhí)行完,而是可以通過co_await暫停執(zhí)行,將執(zhí)行權(quán)交回給調(diào)用者;當(dāng)調(diào)用者調(diào)用resume方法時,協(xié)程又可以從暫停的地方恢復(fù)執(zhí)行 。在協(xié)程暫停時,其內(nèi)部的局部變量等狀態(tài)信息都會被保存下來,以便恢復(fù)執(zhí)行時能夠繼續(xù)之前的操作。

3.3實現(xiàn)方式面面觀

在 C/C++ 中,實現(xiàn)協(xié)程主要有以下幾種常見方式:

⑴利用匯編代碼控制上下文切換

這是一種比較底層的實現(xiàn)方式。通過匯編代碼,我們可以直接操作 CPU 寄存器和棧,實現(xiàn)協(xié)程上下文的保存和恢復(fù)。例如,在 x86 架構(gòu)下,我們需要保存和恢復(fù)rsp(棧指針)、rbp(棧基址指針)、rbx、r12 - r15(數(shù)據(jù)寄存器)以及rip(程序運(yùn)行的下一個指令地址)等寄存器的值。因為協(xié)程的切換本質(zhì)上就是上下文的切換,通過精確控制這些寄存器,我們能夠?qū)崿F(xiàn)協(xié)程在暫停和恢復(fù)時的狀態(tài)一致性。

這種方式的優(yōu)點是性能極高,因為直接操作硬件資源,避免了操作系統(tǒng) API 調(diào)用的開銷。然而,它的缺點也很明顯,代碼編寫難度大,需要對匯編語言和 CPU 架構(gòu)有深入的了解,而且可移植性差,不同的 CPU 架構(gòu)可能需要編寫不同的匯編代碼 。

⑵使用操作系統(tǒng)提供的 API

一些操作系統(tǒng)提供了用于上下文切換的 API,比如 Unix 系統(tǒng)中的ucontext和 Windows 系統(tǒng)中的fiber。以ucontext為例,它提供了getcontext、setcontext、makecontext和swapcontext等函數(shù)來管理上下文。getcontext用于獲取當(dāng)前上下文,setcontext用于設(shè)置上下文,makecontext用于創(chuàng)建一個新的上下文,swapcontext則用于交換兩個上下文。

使用這些 API,我們可以相對容易地實現(xiàn)協(xié)程的上下文切換。這種方式的優(yōu)點是實現(xiàn)相對簡單,不需要深入了解匯編語言,而且具有較好的可移植性,只要操作系統(tǒng)支持相應(yīng)的 API。但是,由于涉及到系統(tǒng)調(diào)用,性能相對較低,因為系統(tǒng)調(diào)用會帶來一定的開銷,包括用戶態(tài)和內(nèi)核態(tài)的切換等。

⑶利用 C 語言的setjmp和longjmp函數(shù)

setjmp函數(shù)用于保存當(dāng)前的調(diào)用環(huán)境,包括寄存器的值和棧指針等,它會返回一個整數(shù)值。longjmp函數(shù)則用于恢復(fù)之前由setjmp保存的調(diào)用環(huán)境,并跳轉(zhuǎn)到setjmp調(diào)用的位置繼續(xù)執(zhí)行。通過這兩個函數(shù)的配合,我們可以實現(xiàn)協(xié)程的暫停和恢復(fù)。例如,在協(xié)程需要暫停時,調(diào)用setjmp保存當(dāng)前環(huán)境,然后在需要恢復(fù)時,調(diào)用longjmp恢復(fù)環(huán)境。

這種方式的優(yōu)點是代碼實現(xiàn)相對簡潔,不需要復(fù)雜的匯編知識。但它也有局限性,它要求函數(shù)里面使用static local變量來保存協(xié)程內(nèi)部的數(shù)據(jù),因為setjmp和longjmp并不會自動保存和恢復(fù)局部變量,而且這種方式在處理復(fù)雜的函數(shù)調(diào)用和嵌套時可能會出現(xiàn)問題 。

四、協(xié)程的實現(xiàn)與原理剖析

4.1協(xié)程的起源

問題:協(xié)程存在的原因?協(xié)程能夠解決哪些問題?

在我們現(xiàn)在CS,BS開發(fā)模式下,服務(wù)器的吞吐量是一個很重要的參數(shù)。其實吞吐量是IO處理時間加上業(yè)務(wù)處理。為了簡單起見,比如,客戶端與服務(wù)器之間是長連接的,客戶端定期給服務(wù)器發(fā)送心跳包數(shù)據(jù)。客戶端發(fā)送一次心跳包到服務(wù)器,服務(wù)器更新該新客戶端狀態(tài)的。心跳包發(fā)送的過程,業(yè)務(wù)處理時長等于IO讀取(RECV系統(tǒng)調(diào)用)加上業(yè)務(wù)處理(更新客戶狀態(tài))。吞吐量等于1s業(yè)務(wù)處理次數(shù)。

業(yè)務(wù)處理(更新客戶端狀態(tài))時間,業(yè)務(wù)不一樣的,處理時間不一樣,我們就不做討論。

那如何提升recv的性能。若只有一個客戶端,recv的性能也沒有必要提升,也不能提升。若在有百萬計的客戶端長連接的情況,我們該如何提升。以Linux為例,在這里需要介紹一個“網(wǎng)紅”就是epoll。服務(wù)器使用epoll管理百萬計的客戶端長連接,代碼框架如下:

while (1) {
    int nready = epoll_wait(epfd, events, EVENT_SIZE, -1);

    for (i = 0;i < nready;i ++) {

        int sockfd = events[i].data.fd;
        if (sockfd == listenfd) {
            int connfd = accept(listenfd, xxx, xxxx);

            setnonblock(connfd);

            ev.events = EPOLLIN | EPOLLET;
            ev.data.fd = connfd;
            epoll_ctl(epfd, EPOLL_CTL_ADD, connfd, &ev);

        } else {
            handle(sockfd);
        }
    }
}

對于響應(yīng)式服務(wù)器,所有的客戶端的操作驅(qū)動都是來源于這個大循環(huán)。來源于epoll_wait的反饋結(jié)果。

對于服務(wù)器處理百萬計的IO。Handle(sockfd)實現(xiàn)方式有兩種。

第一種,handle(sockfd)函數(shù)內(nèi)部對sockfd進(jìn)行讀寫動作。代碼如下

int handle(int sockfd) {
    recv(sockfd, rbuffer, length, 0);
    parser_proto(rbuffer, length);
    send(sockfd, sbuffer, length, 0);
}

handle的io操作(send,recv)與epoll_wait是在同一個處理流程里面的。這就是IO同步操作。

優(yōu)點:

  • sockfd管理方便。
  • 操作邏輯清晰。

缺點:

  • 服務(wù)器程序依賴epoll_wait的循環(huán)響應(yīng)速度慢。
  • 程序性能差

第二種,handle(sockfd)函數(shù)內(nèi)部將sockfd的操作,push到線程池中,代碼如下:

int thread_cb(int sockfd) {
    // 此函數(shù)是在線程池創(chuàng)建的線程中運(yùn)行。
    // 與handle不在一個線程上下文中運(yùn)行
    recv(sockfd, rbuffer, length, 0);
    parser_proto(rbuffer, length);
    send(sockfd, sbuffer, length, 0);
}

int handle(int sockfd) {
    //此函數(shù)在主線程 main_thread 中運(yùn)行
    //在此處之前,確保線程池已經(jīng)啟動。
    push_thread(sockfd, thread_cb); //將sockfd放到其他線程中運(yùn)行。
}

Handle函數(shù)是將sockfd處理方式放到另一個已經(jīng)其他的線程中運(yùn)行,如此做法,將io操作(recv,send)與epoll_wait 不在一個處理流程里面,使得io操作(recv,send)與epoll_wait實現(xiàn)解耦。這就叫做IO異步操作。

優(yōu)點:

  • 子模塊好規(guī)劃。
  • 程序性能高。

缺點:

正因為子模塊好規(guī)劃,使得模塊之間的sockfd的管理異常麻煩。每一個子線程都需要管理好sockfd,避免在IO操作的時候,sockfd出現(xiàn)關(guān)閉或其他異常。

上文有提到IO同步操作,程序響應(yīng)慢,IO異步操作,程序響應(yīng)快。

下面來對比一下IO同步操作與IO異步操作。

代碼如下:

https://github.com/wangbojing/c1000k_test/blob/master/server_mulport_epoll.c

在這份代碼的486行,#if 1, 打開的時候,為IO異步操作。關(guān)閉的時候,為IO同步操作。

接下來把我測試接入量的結(jié)果粘貼出來。

  • IO異步操作,每1000個連接接入的服務(wù)器響應(yīng)時間(900ms左右)。
  • IO同步操作,每1000個連接接入的服務(wù)器響應(yīng)時間(6500ms左右)。
  • IO異步操作與IO同步操作

對比項

  • IO同步操作
  • IO異步操作

Sockfd管理

  • 管理方便
  • 多個線程共同管理

代碼邏輯

  • 程序整體邏輯清晰
  • 子模塊邏輯清晰

程序性能

  • 響應(yīng)時間長,性能差
  • 響應(yīng)時間短,性能好

有沒有一種方式,有異步性能,同步的代碼邏輯。來方便編程人員對IO操作的組件呢?有,采用一種輕量級的協(xié)程來實現(xiàn)。在每次send或者recv之前進(jìn)行切換,再由調(diào)度器來處理epoll_wait的流程。

就是采用了基于這樣的思考,寫了NtyCo,實現(xiàn)了一個IO異步操作與協(xié)程結(jié)合的組件。

4.2協(xié)程的案例

問題:協(xié)程如何使用?與線程使用有何區(qū)別?

在做網(wǎng)絡(luò)IO編程的時候,有一個非常理想的情況,就是每次accept返回的時候,就為新來的客戶端分配一個線程,這樣一個客戶端對應(yīng)一個線程。就不會有多個線程共用一個sockfd。每請求每線程的方式,并且代碼邏輯非常易讀。但是這只是理想,線程創(chuàng)建代價,調(diào)度代價就呵呵了。

先來看一下每請求每線程的代碼如下:

while(1) {
    socklen_t len = sizeof(struct sockaddr_in);
    int clientfd = accept(sockfd, (struct sockaddr*)&remote, &len);

    pthread_t thread_id;
    pthread_create(&thread_id, NULL, client_cb, &clientfd);

}

這樣的做法,寫完放到生產(chǎn)環(huán)境下面,如果你的老板不打死你,你來找我。我來幫你老板,為民除害。

如果我們有協(xié)程,我們就可以這樣實現(xiàn)。參考代碼如下:

https://github.com/wangbojing/NtyCo/blob/master/nty_server_test.c

while (1) {
    socklen_t len = sizeof(struct sockaddr_in);
    int cli_fd = nty_accept(fd, (struct sockaddr*)&remote, &len);

    nty_coroutine *read_co;
    nty_coroutine_create(&read_co, server_reader, &cli_fd);
}

這樣的代碼是完全可以放在生成環(huán)境下面的。如果你的老板要打死你,你來找我,我?guī)湍惆涯憷习宕蛩溃瑸槊癯Α?/span>

線程的API思維來使用協(xié)程,函數(shù)調(diào)用的性能來測試協(xié)程。

NtyCo封裝出來了若干接口,一類是協(xié)程本身的,二類是posix的異步封裝

協(xié)程API:while

  • 協(xié)程創(chuàng)建
int nty_coroutine_create(nty_coroutine **new_co, proc_coroutine func, void *arg)
  • 協(xié)程調(diào)度器的運(yùn)行
void nty_schedule_run(void)

POSIX異步封裝API:

int nty_socket(int domain, int type, int protocol)
int nty_accept(int fd, struct sockaddr *addr, socklen_t *len)
int nty_recv(int fd, void *buf, int length)
int nty_send(int fd, const void *buf, int length)
int nty_close(int fd)

4.3協(xié)程的實現(xiàn)之工作流程

問題:協(xié)程內(nèi)部是如何工作呢?

先來看一下協(xié)程服務(wù)器案例的代碼, 代碼參考:

https://github.com/wangbojing/NtyCo/blob/master/nty_server_test.c

分別討論三個協(xié)程的比較晦澀的工作流程。第一個協(xié)程的創(chuàng)建;第二個IO異步操作;第三個協(xié)程子過程回調(diào)

(1)創(chuàng)建協(xié)程

當(dāng)我們需要異步調(diào)用的時候,我們會創(chuàng)建一個協(xié)程。比如accept返回一個新的sockfd,創(chuàng)建一個客戶端處理的子過程。再比如需要監(jiān)聽多個端口的時候,創(chuàng)建一個server的子過程,這樣多個端口同時工作的,是符合微服務(wù)的架構(gòu)的。

創(chuàng)建協(xié)程的時候,進(jìn)行了如何的工作?

創(chuàng)建API如下:

int nty_coroutine_create(nty_coroutine **new_co, proc_coroutine func, void *arg)
  • 參數(shù)1:nty_coroutine **new_co,需要傳入空的協(xié)程的對象,這個對象是由內(nèi)部創(chuàng)建的,并且在函數(shù)返回的時候,會返回一個內(nèi)部創(chuàng)建的協(xié)程對象。
  • 參數(shù)2:proc_coroutine func,協(xié)程的子過程。當(dāng)協(xié)程被調(diào)度的時候,就會執(zhí)行該函數(shù)。
  • 參數(shù)3:void *arg,需要傳入到新協(xié)程中的參數(shù)。

協(xié)程不存在親屬關(guān)系,都是一致的調(diào)度關(guān)系,接受調(diào)度器的調(diào)度。調(diào)用create API就會創(chuàng)建一個新協(xié)程,新協(xié)程就會加入到調(diào)度器的就緒隊列中。

創(chuàng)建的協(xié)程具體步驟會在《協(xié)程的實現(xiàn)之原語操作》來描述。

(2)實現(xiàn)IO異步操作

大部分的朋友會關(guān)心IO異步操作如何實現(xiàn),在send與recv調(diào)用的時候,如何實現(xiàn)異步操作的。

先來看一下一段代碼:

while (1) {
    int nready = epoll_wait(epfd, events, EVENT_SIZE, -1);

    for (i = 0;i < nready;i ++) {

        int sockfd = events[i].data.fd;
        if (sockfd == listenfd) {
            int connfd = accept(listenfd, xxx, xxxx);

            setnonblock(connfd);

            ev.events = EPOLLIN | EPOLLET;
            ev.data.fd = connfd;
            epoll_ctl(epfd, EPOLL_CTL_ADD, connfd, &ev);

        } else {

            epoll_ctl(epfd, EPOLL_CTL_DEL, sockfd, NULL);
            recv(sockfd, buffer, length, 0);

            //parser_proto(buffer, length);

            send(sockfd, buffer, length, 0);
            epoll_ctl(epfd, EPOLL_CTL_ADD, sockfd, NULL);
        }
    }
}

在進(jìn)行IO操作(recv,send)之前,先執(zhí)行了 epoll_ctl的del操作,將相應(yīng)的sockfd從epfd中刪除掉,在執(zhí)行完IO操作(recv,send)再進(jìn)行epoll_ctl的add的動作。這段代碼看起來似乎好像沒有什么作用。

如果是在多個上下文中,這樣的做法就很有意義了。能夠保證sockfd只在一個上下文中能夠操作IO的。不會出現(xiàn)在多個上下文同時對一個IO進(jìn)行操作的。協(xié)程的IO異步操作正式是采用此模式進(jìn)行的。

把單一協(xié)程的工作與調(diào)度器的工作的劃分清楚,先引入兩個原語操作 resume,yield會在《協(xié)程的實現(xiàn)之原語操作》來講解協(xié)程所有原語操作的實現(xiàn),yield就是讓出運(yùn)行,resume就是恢復(fù)運(yùn)行。

調(diào)度器與協(xié)程的上下文切換如下圖所示:

圖片圖片

在協(xié)程的上下文IO異步操作(nty_recv,nty_send)函數(shù),步驟如下:

  • 將sockfd 添加到epoll管理中。
  • 進(jìn)行上下文環(huán)境切換,由協(xié)程上下文yield到調(diào)度器的上下文。
  • 調(diào)度器獲取下一個協(xié)程上下文。Resume新的協(xié)程

IO異步操作的上下文切換的時序圖如下:

圖片圖片

(3)回調(diào)協(xié)程的子過程

在create協(xié)程后,何時回調(diào)子過程?何種方式回調(diào)子過程?

首先來回顧一下x86_64寄存器的相關(guān)知識。匯編與寄存器相關(guān)知識還會在《協(xié)程的實現(xiàn)之切換》繼續(xù)深入探討的。x86_64 的寄存器有16個64位寄存器,分別是:

%rax, %rbx,%rcx, %esi, %edi, %rbp, %rsp, %r8, %r9, %r10, %r11, %r12, %r13, %r14, %r15。
  • %rax 作為函數(shù)返回值使用的。
  • %rsp 棧指針寄存器,指向棧頂
  • %rdi, %rsi, %rdx, %rcx, %r8, %r9 用作函數(shù)參數(shù),依次對應(yīng)第1參數(shù),第2參數(shù)。。。
  • %rbx, %rbp, %r12, %r13, %r14, %r15 用作數(shù)據(jù)存儲,遵循調(diào)用者使用規(guī)則,換句話說,就是隨便用。調(diào)用子函數(shù)之前要備份它,以防它被修改
  • %r10, %r11 用作數(shù)據(jù)存儲,就是使用前要先保存原值

以NtyCo的實現(xiàn)為例,來分析這個過程。CPU有一個非常重要的寄存器叫做EIP,用來存儲CPU運(yùn)行下一條指令的地址。我們可以把回調(diào)函數(shù)的地址存儲到EIP中,將相應(yīng)的參數(shù)存儲到相應(yīng)的參數(shù)寄存器中。實現(xiàn)子過程調(diào)用的邏輯代碼如下:

void _exec(nty_coroutine *co) {
    co->func(co->arg); //子過程的回調(diào)函數(shù)
}

void nty_coroutine_init(nty_coroutine *co) {
    //ctx 就是協(xié)程的上下文
    co->ctx.edi = (void*)co; //設(shè)置參數(shù)
    co->ctx.eip = (void*)_exec; //設(shè)置回調(diào)函數(shù)入口
    //當(dāng)實現(xiàn)上下文切換的時候,就會執(zhí)行入口函數(shù)_exec , _exec 調(diào)用子過程func
}

4.4協(xié)程的實現(xiàn)之原語操作

問題:協(xié)程的內(nèi)部原語操作有哪些?分別如何實現(xiàn)的?

協(xié)程的核心原語操作:create, resume, yield。協(xié)程的原語操作有create怎么沒有exit?以NtyCo為例,協(xié)程一旦創(chuàng)建就不能有用戶自己銷毀,必須得以子過程執(zhí)行結(jié)束,就會自動銷毀協(xié)程的上下文數(shù)據(jù)。

以_exec執(zhí)行入口函數(shù)返回而銷毀協(xié)程的上下文與相關(guān)信息。co->func(co->arg) 是子過程,若用戶需要長久運(yùn)行協(xié)程,就必須要在func函數(shù)里面寫入循環(huán)等操作。所以NtyCo里面沒有實現(xiàn)exit的原語操作。

create:創(chuàng)建一個協(xié)程。

  • 調(diào)度器是否存在,不存在也創(chuàng)建。調(diào)度器作為全局的單例。將調(diào)度器的實例存儲在線程的私有空間pthread_setspecific。
  • 分配一個coroutine的內(nèi)存空間,分別設(shè)置coroutine的數(shù)據(jù)項,棧空間,棧大小,初始狀態(tài),創(chuàng)建時間,子過程回調(diào)函數(shù),子過程的調(diào)用參數(shù)。
  • 將新分配協(xié)程添加到就緒隊列 ready_queue中

實現(xiàn)代碼如下:

int nty_coroutine_create(nty_coroutine **new_co, proc_coroutine func, void *arg) {

    assert(pthread_once(&sched_key_once, nty_coroutine_sched_key_creator) == 0);
    nty_schedule *sched = nty_coroutine_get_sched();

    if (sched == NULL) {
        nty_schedule_create(0);

        sched = nty_coroutine_get_sched();
        if (sched == NULL) {
            printf("Failed to create schedulern");
            return -1;
        }
    }

    nty_coroutine *co = calloc(1, sizeof(nty_coroutine));
    if (co == NULL) {
        printf("Failed to allocate memory for new coroutinen");
        return -2;
    }

    //
    int ret = posix_memalign(&co->stack, getpagesize(), sched->stack_size);
    if (ret) {
        printf("Failed to allocate stack for new coroutinen");
        free(co);
        return -3;
    }

    co->sched = sched;
    co->stack_size = sched->stack_size;
    co->status = BIT(NTY_COROUTINE_STATUS_NEW); //
    co->id = sched->spawned_coroutines ++;
co->func = func;

    co->fd = -1;
co->events = 0;

    co->arg = arg;
    co->birth = nty_coroutine_usec_now();
    *new_co = co;

    TAILQ_INSERT_TAIL(&co->sched->ready, co, ready_next);

    return 0;
}

yield:讓出CPU。

void nty_coroutine_yield(nty_coroutine *co)

參數(shù):當(dāng)前運(yùn)行的協(xié)程實例

調(diào)用后該函數(shù)不會立即返回,而是切換到最近執(zhí)行resume的上下文。該函數(shù)返回是在執(zhí)行resume的時候,會有調(diào)度器統(tǒng)一選擇resume的,然后再次調(diào)用yield的。resume與yield是兩個可逆過程的原子操作。

resume:恢復(fù)協(xié)程的運(yùn)行權(quán)

int nty_coroutine_resume(nty_coroutine *co)

參數(shù):需要恢復(fù)運(yùn)行的協(xié)程實例

調(diào)用后該函數(shù)也不會立即返回,而是切換到運(yùn)行協(xié)程實例的yield的位置。返回是在等協(xié)程相應(yīng)事務(wù)處理完成后,主動yield會返回到resume的地方。

4.5協(xié)程的實現(xiàn)之切換

問題:協(xié)程的上下文如何切換?切換代碼如何實現(xiàn)?

首先來回顧一下x86_64寄存器的相關(guān)知識。x86_64 的寄存器有16個64位寄存器,分別是:

%rax, %rbx, %rcx, %esi, %edi, %rbp, %rsp, %r8, %r9, %r10, %r11, %r12,%r13, %r14, %r15。
  • %rax 作為函數(shù)返回值使用的。
  • %rsp 棧指針寄存器,指向棧頂
  • %rdi, %rsi, %rdx, %rcx, %r8, %r9 用作函數(shù)參數(shù),依次對應(yīng)第1參數(shù),第2參數(shù)。。。
  • %rbx, %rbp, %r12, %r13, %r14, %r15 用作數(shù)據(jù)存儲,遵循調(diào)用者使用規(guī)則,換句話說,就是隨便用。調(diào)用子函數(shù)之前要備份它,以防它被修改
  • %r10, %r11 用作數(shù)據(jù)存儲,就是使用前要先保存原值。

上下文切換,就是將CPU的寄存器暫時保存,再將即將運(yùn)行的協(xié)程的上下文寄存器,分別mov到相對應(yīng)的寄存器上。此時上下文完成切換。如下圖所示:

切換_switch函數(shù)定義:

int _switch(nty_cpu_ctx *new_ctx, nty_cpu_ctx *cur_ctx);
  • 參數(shù)1:即將運(yùn)行協(xié)程的上下文,寄存器列表
  • 參數(shù)2:正在運(yùn)行協(xié)程的上下文,寄存器列表

我們nty_cpu_ctx結(jié)構(gòu)體的定義,為了兼容x86,結(jié)構(gòu)體項命令采用的是x86的寄存器名字命名。

typedef struct _nty_cpu_ctx {
void *esp; //
void *ebp;
void *eip;
void *edi;
void *esi;
void *ebx;
void *r1;
void *r2;
void *r3;
void *r4;
void *r5;
} nty_cpu_ctx;

_switch返回后,執(zhí)行即將運(yùn)行協(xié)程的上下文,是實現(xiàn)上下文的切換;

_switch的實現(xiàn)代碼:

0: __asm__ (
1: "    .text                                  n"
2: "       .p2align 4,,15                                   n"
3: ".globl _switch                                          n"
4: ".globl __switch                                         n"
5: "_switch:                                                n"
6: "__switch:                                               n"
7: "       movq %rsp, 0(%rsi)      # save stack_pointer     n"
8: "       movq %rbp, 8(%rsi)      # save frame_pointer     n"
9: "       movq (%rsp), %rax       # save insn_pointer      n"
10: "       movq %rax, 16(%rsi)                              n"
11: "       movq %rbx, 24(%rsi)     # save rbx,r12-r15       n"
12: "       movq %r12, 32(%rsi)                              n"
13: "       movq %r13, 40(%rsi)                              n"
14: "       movq %r14, 48(%rsi)                              n"
15: "       movq %r15, 56(%rsi)                              n"
16: "       movq 56(%rdi), %r15                              n"
17: "       movq 48(%rdi), %r14                              n"
18: "       movq 40(%rdi), %r13     # restore rbx,r12-r15    n"
19: "       movq 32(%rdi), %r12                              n"
20: "       movq 24(%rdi), %rbx                              n"
21: "       movq 8(%rdi), %rbp      # restore frame_pointer  n"
22: "       movq 0(%rdi), %rsp      # restore stack_pointer  n"
23: "       movq 16(%rdi), %rax     # restore insn_pointer   n"
24: "       movq %rax, (%rsp)                                n"
25: "       ret                                              n"
26: );

按照x86_64的寄存器定義,%rdi保存第一個參數(shù)的值,即new_ctx的值,%rsi保存第二個參數(shù)的值,即保存cur_ctx的值。X86_64每個寄存器是64bit,8byte。

  1. Movq %rsp, 0(%rsi) 保存在棧指針到cur_ctx實例的rsp項
  2. Movq %rbp, 8(%rsi)
  3. Movq (%rsp), %rax #將棧頂?shù)刂防锩娴闹荡鎯Φ絩ax寄存器中。Ret后出棧,執(zhí)行棧頂
  4. Movq %rbp, 8(%rsi) #后續(xù)的指令都是用來保存CPU的寄存器到new_ctx的每一項中
  5. Movq 8(%rdi), %rbp #將new_ctx的值
  6. Movq 16(%rdi), %rax #將指令指針rip的值存儲到rax中
  7. Movq %rax, (%rsp) # 將存儲的rip值的rax寄存器賦值給棧指針的地址的值。
  8. Ret # 出棧,回到棧指針,執(zhí)行rip指向的指令。

上下文環(huán)境的切換完成。

4.6協(xié)程的實現(xiàn)之定義

問題:協(xié)程如何定義? 調(diào)度器如何定義?

先來一道設(shè)計題:設(shè)計一個協(xié)程的運(yùn)行體R與運(yùn)行體調(diào)度器S的結(jié)構(gòu)體

  • 1. 運(yùn)行體R:包含運(yùn)行狀態(tài){就緒,睡眠,等待},運(yùn)行體回調(diào)函數(shù),回調(diào)參數(shù),棧指針,棧大小,當(dāng)前運(yùn)行體
  • 2. 調(diào)度器S:包含執(zhí)行集合{就緒,睡眠,等待}

這道設(shè)計題拆分兩個個問題,一個運(yùn)行體如何高效地在多種狀態(tài)集合更換。調(diào)度器與運(yùn)行體的功能界限。

(1)運(yùn)行體如何高效地在多種狀態(tài)集合更換

新創(chuàng)建的協(xié)程,創(chuàng)建完成后,加入到就緒集合,等待調(diào)度器的調(diào)度;協(xié)程在運(yùn)行完成后,進(jìn)行IO操作,此時IO并未準(zhǔn)備好,進(jìn)入等待狀態(tài)集合;IO準(zhǔn)備就緒,協(xié)程開始運(yùn)行,后續(xù)進(jìn)行sleep操作,此時進(jìn)入到睡眠狀態(tài)集合。

  • 就緒(ready),睡眠(sleep),等待(wait)集合該采用如何數(shù)據(jù)結(jié)構(gòu)來存儲?
  • 就緒(ready)集合并不沒有設(shè)置優(yōu)先級的選型,所有在協(xié)程優(yōu)先級一致,所以可以使用隊列來存儲就緒的協(xié)程,簡稱為就緒隊列(ready_queue)。
  • 睡眠(sleep)集合需要按照睡眠時長進(jìn)行排序,采用紅黑樹來存儲,簡稱睡眠樹(sleep_tree)紅黑樹在工程實用為<key, value>, key為睡眠時長,value為對應(yīng)的協(xié)程結(jié)點。
  • 等待(wait)集合,其功能是在等待IO準(zhǔn)備就緒,等待IO也是有時長的,所以等待(wait)集合采用紅黑樹的來存儲,簡稱等待樹(wait_tree),此處借鑒nginx的設(shè)計。

Coroutine就是協(xié)程的相應(yīng)屬性,status表示協(xié)程的運(yùn)行狀態(tài)。sleep與wait兩顆紅黑樹,ready使用的隊列,比如某協(xié)程調(diào)用sleep函數(shù),加入睡眠樹(sleep_tree),status |= S即可。比如某協(xié)程在等待樹(wait_tree)中,而IO準(zhǔn)備就緒放入ready隊列中,只需要移出等待樹(wait_tree),狀態(tài)更改status &= ~W即可。有一個前提條件就是不管何種運(yùn)行狀態(tài)的協(xié)程,都在就緒隊列中,只是同時包含有其他的運(yùn)行狀態(tài)。

(2)調(diào)度器與協(xié)程的功能界限

每一協(xié)程都需要使用的而且可能會不同屬性的,就是協(xié)程屬性。每一協(xié)程都需要的而且數(shù)據(jù)一致的,就是調(diào)度器的屬性。比如棧大小的數(shù)值,每個協(xié)程都一樣的后不做更改可以作為調(diào)度器的屬性,如果每個協(xié)程大小不一致,則可以作為協(xié)程的屬性。

用來管理所有協(xié)程的屬性,作為調(diào)度器的屬性。比如epoll用來管理每一個協(xié)程對應(yīng)的IO,是需要作為調(diào)度器屬性。

按照前面幾章的描述,定義一個協(xié)程結(jié)構(gòu)體需要多少域,我們描述了每一個協(xié)程有自己的上下文環(huán)境,需要保存CPU的寄存器ctx;需要有子過程的回調(diào)函數(shù)func;需要有子過程回調(diào)函數(shù)的參數(shù) arg;需要定義自己的棧空間 stack;需要有自己棧空間的大小 stack_size;需要定義協(xié)程的創(chuàng)建時間 birth;需要定義協(xié)程當(dāng)前的運(yùn)行狀態(tài) status;需要定當(dāng)前運(yùn)行狀態(tài)的結(jié)點(ready_next, wait_node, sleep_node);需要定義協(xié)程id;需要定義調(diào)度器的全局對象 sched。

協(xié)程的核心結(jié)構(gòu)體如下:

typedef struct _nty_coroutine {

    nty_cpu_ctx ctx;
    proc_coroutine func;
    void *arg;
    size_t stack_size;

    nty_coroutine_status status;
    nty_schedule *sched;

    uint64_t birth;
    uint64_t id;

    void *stack;

    RB_ENTRY(_nty_coroutine) sleep_node;
    RB_ENTRY(_nty_coroutine) wait_node;

    TAILQ_ENTRY(_nty_coroutine) ready_next;
    TAILQ_ENTRY(_nty_coroutine) defer_next;

} nty_coroutine;

調(diào)度器是管理所有協(xié)程運(yùn)行的組件,協(xié)程與調(diào)度器的運(yùn)行關(guān)系。

調(diào)度器的屬性,需要有保存CPU的寄存器上下文 ctx,可以從協(xié)程運(yùn)行狀態(tài)yield到調(diào)度器運(yùn)行的。從協(xié)程到調(diào)度器用yield,從調(diào)度器到協(xié)程用resume以下為協(xié)程的定義。

typedef struct _nty_coroutine_queue nty_coroutine_queue;

typedef struct _nty_coroutine_rbtree_sleep nty_coroutine_rbtree_sleep;
typedef struct _nty_coroutine_rbtree_wait nty_coroutine_rbtree_wait;

typedef struct _nty_schedule {
    uint64_t birth;
nty_cpu_ctx ctx;

    struct _nty_coroutine *curr_thread;
    int page_size;

    int poller_fd;
    int eventfd;
    struct epoll_event eventlist[NTY_CO_MAX_EVENTS];
    int nevents;

    int num_new_events;

    nty_coroutine_queue ready;
    nty_coroutine_rbtree_sleep sleeping;
    nty_coroutine_rbtree_wait waiting;

} nty_schedule;

4.7協(xié)程的實現(xiàn)之調(diào)度器

問題:協(xié)程如何被調(diào)度?

調(diào)度器的實現(xiàn),有兩種方案,一種是生產(chǎn)者消費者模式,另一種多狀態(tài)運(yùn)行。

(1)生產(chǎn)者消費者模式

邏輯代碼如下:

while (1) {

        //遍歷睡眠集合,將滿足條件的加入到ready
        nty_coroutine *expired = NULL;
        while ((expired = sleep_tree_expired(sched)) != ) {
            TAILQ_ADD(&sched->ready, expired);
        }

        //遍歷等待集合,將滿足添加的加入到ready
        nty_coroutine *wait = NULL;
        int nready = epoll_wait(sched->epfd, events, EVENT_MAX, 1);
        for (i = 0;i < nready;i ++) {
            wait = wait_tree_search(events[i].data.fd);
            TAILQ_ADD(&sched->ready, wait);
        }

        // 使用resume回復(fù)ready的協(xié)程運(yùn)行權(quán)
        while (!TAILQ_EMPTY(&sched->ready)) {
            nty_coroutine *ready = TAILQ_POP(sched->ready);
            resume(ready);
        }
    }

(2)多狀態(tài)運(yùn)行

實現(xiàn)邏輯代碼如下:

while (1) {

        //遍歷睡眠集合,使用resume恢復(fù)expired的協(xié)程運(yùn)行權(quán)
        nty_coroutine *expired = NULL;
        while ((expired = sleep_tree_expired(sched)) != ) {
            resume(expired);
        }

        //遍歷等待集合,使用resume恢復(fù)wait的協(xié)程運(yùn)行權(quán)
        nty_coroutine *wait = NULL;
        int nready = epoll_wait(sched->epfd, events, EVENT_MAX, 1);
        for (i = 0;i < nready;i ++) {
            wait = wait_tree_search(events[i].data.fd);
            resume(wait);
        }

        // 使用resume恢復(fù)ready的協(xié)程運(yùn)行權(quán)
        while (!TAILQ_EMPTY(sched->ready)) {
            nty_coroutine *ready = TAILQ_POP(sched->ready);
            resume(ready);
        }
    }

4.8協(xié)程性能測試

測試環(huán)境:4臺VMWare 虛擬機(jī)

  • 1臺服務(wù)器 6G內(nèi)存,4核CPU
  • 3臺客戶端 2G內(nèi)存,2核CPU

操作系統(tǒng):ubuntu 14.04

  • 服務(wù)器端測試代碼:https://github.com/wangbojing/NtyCo
  • 客戶端測試代碼:https://github.com/wangbojing/c1000k_test/blob/master/client_mutlport_epoll.c
  • 按照每一個連接啟動一個協(xié)程來測試。每一個協(xié)程棧空間 4096byte
  • 6G內(nèi)存 –> 測試協(xié)程數(shù)量100W無異常。并且能夠正常收發(fā)數(shù)據(jù)。

五、協(xié)程創(chuàng)建和運(yùn)行

由于多個協(xié)程運(yùn)行于一個線程內(nèi)部的,因此當(dāng)創(chuàng)建線程中的第一個協(xié)程時,需要初始化該協(xié)程所在的環(huán)境 stCoRoutineEnv_t,這個環(huán)境是線程用來管理協(xié)程的,通過該環(huán)境,線程可以得知當(dāng)前一共創(chuàng)建了多少個協(xié)程,當(dāng)前正在運(yùn)行哪一個協(xié)程,當(dāng)前應(yīng)當(dāng)如何調(diào)度協(xié)程:

struct stCoRoutineEnv_t
{
stCoRoutine_t *pCallStack[ 128 ]; // 記錄當(dāng)前創(chuàng)建的協(xié)程
int iCallStackSize; // 記錄當(dāng)前一共創(chuàng)建了多少個協(xié)程
stCoEpoll_t *pEpoll; // 該線程的協(xié)程調(diào)度器
// 在使用共享棧模式拷貝棧內(nèi)存時記錄相應(yīng)的 coroutine
stCoRoutine_t* pending_co;
stCoRoutine_t* occupy_co;
};

上述代碼表明 libco 允許一個線程內(nèi)最多創(chuàng)建 128 個協(xié)程,其中 pCallStack[iCallStackSize-1] 也就是棧頂?shù)膮f(xié)程表示當(dāng)前正在運(yùn)行的協(xié)程。當(dāng)調(diào)用函數(shù) co_create 時,首先檢查當(dāng)前線程中的 coroutine env 結(jié)構(gòu)是否創(chuàng)建。

這里 libco 對于每個線程內(nèi)的 stCoRoutineEnv_t 并沒有使用 thread-local 的方式(例如gcc 內(nèi)置的 __thread,phxrpc采用這種方式)來管理,而是預(yù)先定義了一個大的數(shù)組,并通過對應(yīng)的 PID 來獲取其協(xié)程環(huán)境。

static stCoRoutineEnv_t* g_arrCoEnvPerThread[204800]
stCoRoutineEnv_t *co_get_curr_thread_env()
{
return g_arrCoEnvPerThread[ GetPid() ];
}

初始化 stCoRoutineEnv_t 時主要完成以下幾步:

為 stCoRoutineEnv_t 申請空間并且進(jìn)行初始化,設(shè)置協(xié)程調(diào)度器 pEpoll。

創(chuàng)建一個空的 coroutine,初始化其上下文環(huán)境( 有關(guān) coctx 在后文詳細(xì)介紹 ),將其加入到該線程的協(xié)程環(huán)境中進(jìn)行管理,并且設(shè)置其為 main coroutine。這個 main coroutine 用來運(yùn)行該線程主邏輯。

當(dāng)初始化完成協(xié)程環(huán)境之后,調(diào)用函數(shù) co_create_env 來創(chuàng)建具體的協(xié)程,該函數(shù)初始化一個協(xié)程結(jié)構(gòu) stCoRoutine_t,設(shè)置該結(jié)構(gòu)中的各項字段,例如運(yùn)行的函數(shù) pfn,運(yùn)行時的棧地址等等。需要說明的就是,如果使用了非共享棧模式,則需要為該協(xié)程單獨申請棧空間,否則從共享棧中申請空間。棧空間表示如下:

struct stStackMem_t
{
stCoRoutine_t* occupy_co; // 使用該棧的協(xié)程
int stack_size; // 棧大小
char* stack_bp; // 棧的指針,棧從高地址向低地址增長
char* stack_buffer; // 棧底
};

使用 co_create 創(chuàng)建完一個協(xié)程之后,將調(diào)用 co_resume 來將該協(xié)程激活運(yùn)行:

void co_resume( stCoRoutine_t *co )
{
stCoRoutineEnv_t *env = co->env;
// 獲取當(dāng)前正在運(yùn)行的協(xié)程的結(jié)構(gòu)
stCoRoutine_t *lpCurrRoutine = env->pCallStack[ env->iCallStackSize - 1 ];
if( !co->cStart )
{
// 為將要運(yùn)行的 co 布置上下文環(huán)境
coctx_make( &co->ctx,(coctx_pfn_t)CoRoutineFunc,co,0 );
co->cStart = 1;
}
env->pCallStack[ env->iCallStackSize++ ] = co; // 設(shè)置co為運(yùn)行的線程
co_swap( lpCurrRoutine, co );
}

函數(shù) co_swap 的作用類似于 Unix 提供的函數(shù) swapcontext:將當(dāng)前正在運(yùn)行的 coroutine 的上下文以及狀態(tài)保存到結(jié)構(gòu) lpCurrRoutine 中,并且將 co 設(shè)置成為要運(yùn)行的協(xié)程,從而實現(xiàn)協(xié)程的切換。co_swap 具體完成三項工作:

記錄當(dāng)前協(xié)程 curr 的運(yùn)行棧的棧頂指針,通過 char c; curr_stack_sp=&c 實現(xiàn),當(dāng)下次切換回 curr時,可以從該棧頂指針指向的位置繼續(xù),執(zhí)行完 curr 后可以順利釋放該棧。

處理共享棧相關(guān)的操作,并且調(diào)用函數(shù) coctx_swap 來完成上下文環(huán)境的切換。注意執(zhí)行完 coctx_swap之后,執(zhí)行流程將跳到新的 coroutine 也就是 pending_co 中運(yùn)行,后續(xù)的代碼需要等下次切換回 curr 時才會執(zhí)行。

當(dāng)下次切換回 curr 時,處理共享棧相關(guān)的操作。

對應(yīng)于 co_resume 函數(shù),協(xié)程主動讓出執(zhí)行權(quán)則調(diào)用 co_yield 函數(shù)。co_yield 函數(shù)調(diào)用了 co_yield_env,將當(dāng)前協(xié)程與當(dāng)前線程中記錄的其他協(xié)程進(jìn)行切換:

void co_yield_env( stCoRoutineEnv_t *env )
{
stCoRoutine_t *last = env->pCallStack[ env->iCallStackSize - 2 ];
stCoRoutine_t *curr = env->pCallStack[ env->iCallStackSize - 1 ];
env->iCallStackSize--;
co_swap( curr, last);
}

前面我們已經(jīng)提到過,pCallStack 棧頂所指向的即為當(dāng)前正在運(yùn)行的協(xié)程所對應(yīng)的結(jié)構(gòu),因此該函數(shù)將 curr 取出來,并將當(dāng)前正運(yùn)行的協(xié)程上下文保存到該結(jié)構(gòu)上,并切換到協(xié)程 last 上執(zhí)行。接下來我們以 32-bit 的系統(tǒng)為例來分析 libco 是如何實現(xiàn)協(xié)程運(yùn)行環(huán)境的切換的。

六、協(xié)程上下文的創(chuàng)建和切換

libco 使用結(jié)構(gòu) struct coctx_t 來表示一個協(xié)程的上下文環(huán)境:

struct coctx_t
{

if defined(__i386__)
void *regs[ 8 ];

else
void *regs[ 14 ];

endif
size_t ss_size;
char *ss_sp;
};

圖片圖片

結(jié)合上圖,我們需要知道關(guān)鍵的幾點:

函數(shù)調(diào)用棧是調(diào)用者和被調(diào)用者共同負(fù)責(zé)布置的。Caller 將其參數(shù)從右向左反向壓棧,再將調(diào)用后的返回地址壓棧,然后將執(zhí)行流程交給 Callee。

典型的編譯器會將 Callee 函數(shù)匯編成為以 push %ebp; move %ebp, %esp; sub $esp N; 這種形式開頭的匯編代碼。這幾句代碼主要目的是為了方便 Callee 利用 ebp 來訪問調(diào)用者提供的參數(shù)以及自身的局部變量(如下圖)。

當(dāng)調(diào)用過程完成清除了局部變量以后,會執(zhí)行 pop %ebp; ret,這樣指令會跳轉(zhuǎn)到 RA 也就是返回地址上面執(zhí)行。這一點也是實現(xiàn)協(xié)程切換的關(guān)鍵:我們只需要將指定協(xié)程的函數(shù)指針地址保存到 RA 中,當(dāng)調(diào)用完 coctx_swap 之后,會自動跳轉(zhuǎn)到該協(xié)程的函數(shù)起始地址開始運(yùn)行。

了解了這些,我們就來看一下協(xié)程上下文環(huán)境的初始化函數(shù) coctx_make:

int coctx_make( coctx_t ctx, coctx_pfn_t pfn, const void s, const void *s1 )
{
char *sp = ctx->ss_sp + ctx->ss_size - sizeof(coctx_param_t);
sp = (char*)((unsigned long)sp & -16L);
coctx_param_t param = (coctx_param_t)sp ;
param->s1 = s;
param->s2 = s1;
memset(ctx->regs, 0, sizeof(ctx->regs));
ctx->regs[ kESP ] = (char)(sp) - sizeof(void);
ctx->regs[ kEIP ] = (char*)pfn;
return 0;
}

這段代碼應(yīng)該比較好理解,首先為函數(shù) coctx_pfn_t 預(yù)留 2 個參數(shù)的棧空間并對其到 16 字節(jié),之后將實參設(shè)置到預(yù)留的棧上空間中。最后在 ctx 結(jié)構(gòu)中填入相應(yīng)的,其中記錄 reg[kEIP] 返回地址為函數(shù)指針 pfn,記錄 reg[kESP] 為獲得的棧頂指針 sp 減去一個指針長度,這個減去的空間是為返回地址 RA 預(yù)留的。當(dāng)調(diào)用 coctx_swap 時,reg[kEIP] 會被放到返回地址 RA 的位置,待 coctx_swap 執(zhí)行結(jié)束,自然會跳轉(zhuǎn)到函數(shù) pfn 處執(zhí)行。

coctx_swap(ctx1, ctx2) 在 coctx_swap.S 中實現(xiàn)。這里可以看到,該函數(shù)并沒有使用 push %ebp; move %ebp, %esp; sub $esp N; 開頭,因此棧空間分布中不會出現(xiàn) ebp 的位置。coctx_swap 函數(shù)主要分為兩段,其首先將當(dāng)前的上下文環(huán)境保存到 ctx1 結(jié)構(gòu)中:

leal 4(%esp), %eax // eax = old_esp + 4
movl 4(%esp), %esp // 將 esp 的值設(shè)為 &ctx1(即ctx1的地址)
leal 32(%esp), %esp // esp = (char*)&ctx1 + 32
pushl %eax // ctx1->regs[EAX] = %eax
pushl %ebp // ctx1->regs[EBP] = %ebp
pushl %esi // ctx1->regs[ESI] = %esi
pushl %edi // ctx1->regs[EDI] = %edi
pushl %edx // ctx1->regs[EDX] = %edx
pushl %ecx // ctx1->regs[ECX] = %ecx
pushl %ebx // ctx1->regs[EBX] = %ebx
pushl -4(%eax) // ctx1->regs[EIP] = RA, 注意:%eax-4=%old_esp

這里需要注意指令 leal 和 movl 的區(qū)別。leal 將 eax 的值設(shè)置成為 esp 的值加 4,而 movl 將 esp 的值設(shè)為 esp+4 所指向的內(nèi)存上的值,也就是參數(shù) ctx1 的地址。之后該函數(shù)將 ctx2 中記錄的上下文恢復(fù)到 CPU 寄存器中,并跳轉(zhuǎn)到其函數(shù)地址處運(yùn)行:

movl 4(%eax), %esp // 將 esp 的值設(shè)為 &ctx2(即ctx2的地址)
popl %eax // %eax = ctx1->regs[EIP],也就是 &pfn
popl %ebx // %ebx = ctx1->regs[EBP]
popl %ecx // %ecx = ctx1->regs[ECX]
popl %edx // %edx = ctx1->regs[EDX]
popl %edi // %edi = ctx1->regs[EDI]
popl %esi // %esi = ctx1->regs[ESI]
popl %ebp // %ebp = ctx1->regs[EBP]
popl %esp // %esp = ctx1->regs[ESP],即(char)(sp) - sizeof(void)
pushl %eax // RA = %eax = &pfn,注意此時esp已經(jīng)指向了新的esp
xorl %eax, %eax // reset eax
ret

上面的代碼看起來可能有些繞:

  • 首先 line 1 將 esp 設(shè)置為參數(shù) ctx2 的地址,后續(xù)的 popl 操作均在 ctx2 的內(nèi)存空間上執(zhí)行。
  • line 2-9 將 ctx2->regs[] 中的內(nèi)容恢復(fù)到相應(yīng)的寄存器中。還記得在前面 coctx_make 中設(shè)置了 regs[EIP] 和 regs[ESP] 嗎?這里剛好就對應(yīng)恢復(fù)了相應(yīng)的值。
  • 當(dāng)執(zhí)行完 line 9 之后,esp 已經(jīng)指向了 ctx2 中新的棧頂指針,由于在 coctx_make 中預(yù)留了一個指針長度的 RA 空間,line 10 剛好將新的函數(shù)指針 &pfn 設(shè)置到該 RA 上。
  • 最后執(zhí)行 ret 指令時,函數(shù)流程將跳到 pfn 處執(zhí)行。這樣,整個協(xié)程上下文的切換就完成了。

七、如何使用 libco

我們首先以 libco 提供的例子 example_echosvr.cpp 來介紹應(yīng)用程序如何使用 libco 來編寫服務(wù)端程序。在 example_echosvr.cpp 的 main 函數(shù)中,主要執(zhí)行如下幾步:

  1. 創(chuàng)建 socket,監(jiān)聽在本機(jī)的 1024 端口,并設(shè)置為非阻塞;
  2. 主線程使用函數(shù) readwrite_coroutine 創(chuàng)建多個讀寫協(xié)程,調(diào)用 co_resume 啟動協(xié)程運(yùn)行直到其掛起。這里我們忽略掉無關(guān)的多進(jìn)程 fork 的過程;
  3. 主線程繼續(xù)創(chuàng)建 socket 接收協(xié)程 accpet_co,同樣調(diào)用 co_resume 啟動協(xié)程直到其掛起;
  4. 主線程調(diào)用函數(shù) co_eventloop 實現(xiàn)事件的監(jiān)聽和協(xié)程的循環(huán)切換;

函數(shù) readwrite_coroutine 在外層循環(huán)中將新創(chuàng)建的讀寫協(xié)程都加入到隊列 g_readwrite 中,此時這些讀寫協(xié)程都沒有具體與某個 socket 連接對應(yīng),可以將隊列 g_readwrite 看成一個 coroutine pool。當(dāng)加入到隊列中之后,調(diào)用函數(shù) co_yield_ct 函數(shù)讓出 CPU,此時控制權(quán)回到主線程。

主線程中的函數(shù) co_eventloop 監(jiān)聽網(wǎng)絡(luò)事件,將來自于客戶端新進(jìn)的連接交由協(xié)程 accept_co 處理,關(guān)于 co_eventloop 如何喚醒 accept_co 的細(xì)節(jié)我們將在后續(xù)介紹。accept_co 調(diào)用函數(shù) accept_routine 接收新連接,該函數(shù)的流程如下:

檢查隊列 g_readwrite 是否有空閑的讀寫 coroutine,如果沒有,調(diào)用函數(shù) poll 將該協(xié)程加入到 Epoll 管理的定時器隊列中,也就是 sleep(1000) 的作用;

調(diào)用 co_accept 來接收新連接,如果接收連接失敗,那么調(diào)用 co_poll 將服務(wù)端的 listen_fd 加入到 Epoll 中來觸發(fā)下一次連接事件;

對于成功的連接,從 g_readwrite 中取出一個讀寫協(xié)程來負(fù)責(zé)處理讀寫;

再次回到函數(shù) readwrite_coroutine 中,該函數(shù)會調(diào)用 co_poll 將新建立的連接的 fd 加入到 Epoll 監(jiān)聽中,并將控制流程返回到 main 協(xié)程;當(dāng)有讀或者寫事件發(fā)生時,Epoll 會喚醒對應(yīng)的 coroutine ,繼續(xù)執(zhí)行 read 函數(shù)以及 write 函數(shù)。

上面的過程大致說明了控制流程是如何在不同的協(xié)程中切換,接下來我們介紹具體的實現(xiàn)細(xì)節(jié),即如何通過 Epoll 來管理協(xié)程,以及如何對系統(tǒng)函數(shù)進(jìn)行改造以滿足 libco 的調(diào)用。

八、通過 Epoll 管理和喚醒協(xié)程

上一章節(jié)中介紹了協(xié)程可以通過函數(shù) co_poll 來將 fd 交由 Epoll 管理,待 Epoll 的相應(yīng)的事件觸發(fā)時,再切換回來執(zhí)行 read 或者 write 操作,從而實現(xiàn)由 Epoll 管理協(xié)程的功能。co_poll 函數(shù)原型如下:

int co_poll(stCoEpoll_t *ctx, struct pollfd fds[],
nfds_t nfds, int timeout_ms)

stCoEpoll_t 是為 libco 定制的 Epoll 相關(guān)數(shù)據(jù)結(jié)構(gòu),fds 是 pollfd 結(jié)構(gòu)的文件句柄,nfds 為 fds 數(shù)組的長度,最后一個參數(shù)表示定時器時間,也就是在 timeout 毫秒之后觸發(fā)處理這些文件句柄。這里可以看到,co_poll 能夠同時將多個文件句柄同時加入到 Epoll 管理中。我們先看 stCoEpoll_t 結(jié)構(gòu):

struct stCoEpoll_t
{
int iEpollFd; // Epoll 主 FD
static const int _EPOLL_SIZE = 1024 * 10; // Epoll 可以監(jiān)聽的句柄總數(shù)
struct stTimeout_t *pTimeout; // 時間輪定時器
struct stTimeoutItemLink_t *pstTimeoutList; // 已經(jīng)超時的時間
struct stTimeoutItemLink_t *pstActiveList; // 活躍的事件
co_epoll_res *result; // Epoll 返回的事件結(jié)果
};

以 stTimeout_ 開頭的數(shù)據(jù)結(jié)構(gòu)與 libco 的定時器管理有關(guān),我們在后面介紹。co_epoll_res 是對 Epoll 事件數(shù)據(jù)結(jié)構(gòu)的封裝,也就是每次觸發(fā) Epoll 事件時的返回結(jié)果,在 Unix 和 MaxOS 下,libco 將使用 Kqueue 替代 Epoll,因此這里也保留了 kevent 數(shù)據(jù)結(jié)構(gòu)。

```clike
struct co_epoll_res
{
int size;
struct epoll_event *events; // for linux epoll
struct kevent *eventlist; // for Unix or MacOs kqueue
};

co_poll 實際是對函數(shù) co_poll_inner 的封裝。我們將 co_epoll_inner 函數(shù)的結(jié)構(gòu)分為上下兩半段。在上半段中,調(diào)用 co_poll 的協(xié)程 CC 將其需要監(jiān)聽的句柄數(shù)組 fds 都加入到 Epoll 管理中,并通過函數(shù) co_yield_env 讓出 CPU;當(dāng) main 協(xié)程的事件循環(huán) co_eventloop 中觸發(fā)了 CC 對應(yīng)的監(jiān)聽事件時,會恢復(fù) CC的執(zhí)行。此時,CC 將開始執(zhí)行下半段,即將上半段添加的句柄 fds 從 epoll 中移除,清理殘留的數(shù)據(jù)結(jié)構(gòu),下面的流程圖簡要說明了控制流的轉(zhuǎn)移過程:

圖片圖片

有了上面的基本概念,我們來看具體的實現(xiàn)細(xì)節(jié)。co_poll 首先在內(nèi)部將傳入的文件句柄數(shù)組 fds 轉(zhuǎn)化為數(shù)據(jù)結(jié)構(gòu) stPoll_t,這一步主要是為了方便后續(xù)處理。該結(jié)構(gòu)記錄了 iEpollFd,ndfs,fds 數(shù)組,以及該協(xié)程需要執(zhí)行的函數(shù)和參數(shù)。有兩點需要說明的是:

  1. 對于每一個 fd,為其申請一個 stPollItem_t 來管理對應(yīng) Epoll 事件以及記錄回調(diào)參數(shù)。libco 在此做了一個小的優(yōu)化,對于長度小于 2 的 fds 數(shù)組,直接在棧上定義相應(yīng)的 stPollItem_t 數(shù)組,否則從堆中申請內(nèi)存。這也是一種比較常見的優(yōu)化,畢竟從堆中申請內(nèi)存比較耗時;
  2. 函數(shù)指針 OnPollProcessEvent 封裝了協(xié)程的切換過程。當(dāng)傳入指定的 stPollItem_t 結(jié)構(gòu)時,即可喚醒對應(yīng)于該結(jié)構(gòu)的 coroutine,將控制權(quán)交由其執(zhí)行;

co_poll 的第二步,也是最關(guān)鍵的一步,就是將 fd 數(shù)組全部加入到 Epoll 中進(jìn)行監(jiān)聽。協(xié)程 CC 會將每一個 epoll_event 的 data.ptr 域設(shè)置為對應(yīng)的 stPollItem_t 結(jié)構(gòu)。這樣當(dāng)事件觸發(fā)時,可以直接從對應(yīng)的 ptr中取出 stPollItem_t 結(jié)構(gòu),然后喚醒指定協(xié)程。

如果本次操作提供了 Timeout 參數(shù),co_poll 還會將協(xié)程 CC 本次操作對應(yīng)的 stPoll_t 加入到定時器隊列中。這表明在 Timeout 定時觸發(fā)之后,也會喚醒協(xié)程 CC 的執(zhí)行。當(dāng)整個上半段都完成后,co_poll 立即調(diào)用 co_yield_env 讓出 CPU,執(zhí)行流程跳轉(zhuǎn)回到 main 協(xié)程中。

從上面的流程圖中也可以看出,當(dāng)執(zhí)行流程再次跳回時,表明協(xié)程 CC 添加的讀寫等監(jiān)聽事件已經(jīng)觸發(fā),即可以執(zhí)行相應(yīng)的讀寫操作了。此時 CC 首先將其在上半段中添加的監(jiān)聽事件從 Epoll 中刪除,清理殘留的數(shù)據(jù)結(jié)構(gòu),然后調(diào)用讀寫邏輯。

九、定時器實現(xiàn)

協(xié)程 CC 在將一組 fds 加入 Epoll 的同時,還能為其設(shè)置一個超時時間。在超時時間到期時,也會再次喚醒 CC 來執(zhí)行。libco 使用 Timing-Wheel 來實現(xiàn)定時器。關(guān)于 Timing-Wheel 算法,可以參考,其優(yōu)勢是 O(1) 的插入和刪除復(fù)雜度,缺點是只有有限的長度,在某些場合下不能滿足需求。

回過去看 stCoEpoll_t 結(jié)構(gòu),其中 pTimeout 代表時間輪,通過函數(shù) AllocateTimeout 初始化為一個固定大小(60 1000)的數(shù)組。根據(jù) Timing-Wheel 的特性可知,libco 只支持最大 60s 的定時事件。而實際上,在添加定時器時,libco 要求定時時間不超過 40s。成員 pstTimeoutList 記錄在 co_eventloop 中發(fā)生超時的事件,而 pstActiveList 記錄當(dāng)前活躍的事件,包括超時事件。這兩個結(jié)構(gòu)都將在 co_eventloop 中進(jìn)行處理。

下面我們簡要分析一下加入定時器的實現(xiàn):

int AddTimeout( stTimeout_t apTimeout, stTimeoutItem_t apItem,
unsigned long long allNow )
{
if( apTimeout->ullStart == 0 ) // 初始化時間輪的基準(zhǔn)時間
{
apTimeout->ullStart = allNow;
apTimeout->llStartIdx = 0; // 當(dāng)前時間輪指針指向數(shù)組0
}
// 1. 當(dāng)前時間不可能小于時間輪的基準(zhǔn)時間
// 2. 加入的定時器的超時時間不能小于當(dāng)前時間
if( allNow < apTimeout->ullStart || apItem->ullExpireTime < allNow )
{
return __LINE__;
}
int diff = apItem->ullExpireTime - apTimeout->ullStart;
if( diff >= apTimeout->iItemSize ) // 添加的事件不能超過時間輪的大小
{
return __LINE__;
}
// 插入到時間輪盤的指定位置
AddTail( apTimeout->pItems +
(apTimeout->llStartIdx + diff ) % apTimeout->iItemSize, apItem );
return 0;
}

定時器的超時檢查在函數(shù) co_eventloop 中執(zhí)行。

十、EPOLL 事件循環(huán)

main 協(xié)程通過調(diào)用函數(shù) co_eventloop 來監(jiān)聽 Epoll 事件,并在相應(yīng)的事件觸發(fā)時切換到指定的協(xié)程執(zhí)行。有關(guān) co_eventloop 與 應(yīng)用協(xié)程的交互過程在上一節(jié)的流程圖中已經(jīng)比較清楚了,下面我們主要介紹一下 co_eventloop 函數(shù)的實現(xiàn):

上文中也提到,通過 epoll_wait 返回的事件都保存在 stCoEpoll_t 結(jié)構(gòu)的 co_epoll_res 中。因此 co_eventloop 首先為 co_epoll_res 申請空間,之后通過一個無限循環(huán)來監(jiān)聽所有 coroutine 添加的所有事件:

for(;;)
{
int ret = co_epoll_wait( ctx->iEpollFd,result,stCoEpoll_t::_EPOLL_SIZE, 1 );
...
}

對于每一個觸發(fā)的事件,co_eventloop 首先通過指針域 data.ptr 取出保存的 stPollItem_t 結(jié)構(gòu),并將其添加到 pstActiveList 列表中;之后從定時器輪盤中取出所有已經(jīng)超時的事件,也將其全部添加到 pstActiveList 中,pstActiveList 中的所有事件都作為活躍事件處理。

對于每一個活躍事件,co_eventloop 將通過調(diào)用對應(yīng)的 pfnProcess 也就是上圖中的OnPollProcessEvent 函數(shù)來切換到該事件對應(yīng)的 coroutine,將流程跳轉(zhuǎn)到該 coroutine 處執(zhí)行。

最后 co_eventloop 在調(diào)用時也提供一個額外的參數(shù)來供調(diào)用者傳入一個函數(shù)指針 pfn。該函數(shù)將會在每次循環(huán)完成之后執(zhí)行;當(dāng)該函數(shù)返回 -1 時,將會終止整個事件循環(huán)。用戶可以利用該函數(shù)來控制 main 協(xié)程的終止或者完成一些統(tǒng)計需求。

責(zé)任編輯:武曉燕 來源: 深度Linux
相關(guān)推薦

2022-09-06 20:30:48

協(xié)程Context主線程

2021-09-16 09:59:13

PythonJavaScript代碼

2023-11-17 11:36:59

協(xié)程纖程操作系統(tǒng)

2022-09-12 06:35:00

C++協(xié)程協(xié)程狀態(tài)

2020-11-29 17:03:08

進(jìn)程線程協(xié)程

2023-10-12 09:46:00

并發(fā)模型線程

2022-09-10 18:51:09

C++協(xié)程主線程

2024-09-25 08:28:45

2023-11-04 20:00:02

C++20協(xié)程

2021-04-25 09:36:20

Go協(xié)程線程

2023-10-24 19:37:34

協(xié)程Java

2025-02-08 09:13:40

2021-12-09 06:41:56

Python協(xié)程多并發(fā)

2025-06-26 02:00:00

2022-04-19 20:39:03

協(xié)程多進(jìn)程

2025-03-26 01:22:00

NtyCo協(xié)程框架

2017-08-10 15:50:44

PHP協(xié)程阻塞

2020-07-07 09:19:28

Android 協(xié)程開發(fā)

2020-04-07 11:10:30

Python數(shù)據(jù)線程

2025-06-05 01:22:00

線程虛擬內(nèi)存系統(tǒng)
點贊
收藏

51CTO技術(shù)棧公眾號

主站蜘蛛池模板: 日韩中文字幕 | 五月婷亚洲 | 女同久久另类99精品国产 | 久草青青草| av在线免费网 | 午夜看电影在线观看 | 一级片在线免费播放 | 久久久久久久一区 | a级大片| 免费午夜视频 | 美女亚洲一区 | 97视频人人澡人人爽 | h视频在线观看免费 | 免费的色网站 | 精品久久久久久久 | 免费一区在线观看 | 久久久久一区二区三区四区 | 一区福利视频 | 综合激情网 | 高清国产一区二区 | 在线看av网址| 三区四区在线观看 | 亚洲视频免费观看 | 亚洲天堂中文字幕 | 久久一区二区免费视频 | 久久一起草 | 超碰97免费在线 | 成人一区二区三区在线观看 | 午夜激情在线视频 | 国产高潮好爽受不了了夜色 | 日韩色视频 | 久久国产成人精品国产成人亚洲 | 欧美日韩国产精品激情在线播放 | 91精品国产91久久久久久吃药 | 一区二区三区免费在线观看 | 亚洲福利免费 | 国产亚洲一区二区三区在线观看 | 午夜爽爽爽男女免费观看 | 亚洲精品一区二区三区在线 | 欧美精品91| 四虎影音|