如何喚醒事件驅動模塊
我們可以以阻塞或非阻塞模式調用一個系統調用,當以阻塞模式調用系統調用時,如果條件沒有滿足則操作系統會把線程掛起,然后調度其他線程執行,直到滿足條件再喚醒掛起的線程。基于事件驅動模塊的系統一般都會存在阻塞在事件驅動模塊系統調用的場景,有一個問題是如果在線程掛起期間突然有一些事情需要處理那么該怎么辦呢?比如線程池完成了一個任務,需要通知主線程處理,或者有更早超時的定時器。下面介紹幾種在各個軟件中使用的方案。
通信管道
這種方式是兼容性最好,也是最常用的方案。下面是 Go 中的實現。
func netpollinit() {
// 創建 kqueue
kq = kqueue()
closeonexec(kq)
// 注冊管道讀端 fd 到 kqueue
addWakeupEvent(kq)
}
func addWakeupEvent(kq int32) {
// 創建一個通信管道
r, w := nonblockingPipe()
// 把讀端注冊到 kqueue
ev := keventt{
filter: _EVFILT_READ,
flags: _EV_ADD,
}
*(*uintptr)(unsafe.Pointer(&ev.ident)) = uintptr(r)
n := kevent(kq, &ev, 1, nil, 0, nil)
// 保存到全局變量
netpollBreakRd = uintptr(r)
netpollBreakWr = uintptr(w)
}
Go 在初始化事件驅動模塊時(這里是 kqueue)會創建一個通信管道并注冊讀端到 kqueue,然后在必要的時候通過往管道寫通知事件驅動模塊。
func wakeNetpoll(_ int32) {
for {
var b byte
n := write(netpollBreakWr, unsafe.Pointer(&b), 1)
}
}
kqueue
kqueue 支持手動喚醒事件驅動模塊的機制,Go 在初始化時 kqueue 時會注冊該能力。
func addWakeupEvent(_ int32) {
ev := keventt{
ident: kqIdent,
filter: _EVFILT_USER, // 支持用戶手動喚醒
flags: _EV_ADD,
}
for {
n := kevent(kq, &ev, 1, nil, 0, nil)
}
}
相對應的喚醒代碼如下。
func wakeNetpoll(kq int32) {
ev := keventt{
ident: kqIdent,
filter: _EVFILT_USER,
flags: _EV_ENABLE,
fflags: _NOTE_TRIGGER,
}
for {
n := kevent(kq, &ev, 1, nil, 0, nil)
}
}
前兩種方式是比較常見的方案,如 Libuv / Go 里都是這種方案。
信號機制
這個方案來自 bRPC, bRPC 利用收到信號時會導致某些系統調用返回 EINTR 的方式來喚醒線程。其代碼如下。
#include <signal.h>
#include "bthread/interrupt_pthread.h"
namespace bthread {
// TODO: Make sure SIGURG is not used by user.
// This empty handler is simply for triggering EINTR in blocking syscalls.
void do_nothing_handler(int) {}
static pthread_once_t register_sigurg_once = PTHREAD_ONCE_INIT;
static void register_sigurg() {
signal(SIGURG, do_nothing_handler);
}
int interrupt_pthread(pthread_t th) {
pthread_once(?ister_sigurg_once, register_sigurg);
return pthread_kill(th, SIGURG);
}
}
kqueue 的文檔中也對 EINTR 進行了描述:[EINTR] A signal was delivered before the timeout expired and before any events were placed on the kqueue for return。
下面是一個測試例子。
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/event.h>
#include <errno.h>
void do_nothing_handler(int dummy) {}
int main() {
signal(SIGURG, do_nothing_handler);
int kq = kqueue();
if (kq == -1) {
perror("kqueue failed");
exit(EXIT_FAILURE);
}
printf("pid=%d\n", getpid());
struct kevent events[1];
while (1) {
int nfds = kevent(kq, NULL, 0, events, 1, NULL);
if (nfds == -1 && errno == EINTR) {
printf("epoll_wait return EINTR\n");
} else {
printf("epoll_wait return error\n");
}
}
return 0;
}
這是一個永久阻塞的例子,我們可以通過給該進程發生信號來喚醒它。