在網(wǎng)絡(luò)開發(fā)模型中,有一種非常易于開發(fā)同學(xué)使用的方式,那就是同步阻塞的網(wǎng)絡(luò) IO(在 Java 中習(xí)慣叫 BIO)。
例如我們想請求服務(wù)器上的一段數(shù)據(jù),那么 C 語言的一段代碼 demo 大概是下面這樣:
int main()
{
int sk = socket(AF_INET, SOCK_STREAM, 0);
connect(sk, 。..)
recv(sk, 。..)
}
但是在高并發(fā)的服務(wù)器開發(fā)中,這種網(wǎng)絡(luò) IO 的性能奇差。因為
1.進程在 recv 的時候大概率會被阻塞掉,導(dǎo)致一次進程切換
2.當(dāng)連接上數(shù)據(jù)就緒的時候進程又會被喚醒,又是一次進程切換
3.一個進程同時只能等待一條連接,如果有很多并發(fā),則需要很多進程
如果用一句話來概括,那就是:同步阻塞網(wǎng)絡(luò) IO 是高性能網(wǎng)絡(luò)開發(fā)路上的絆腳石! 俗話說得好,知己知彼方能百戰(zhàn)百勝。所以我們今天先不講優(yōu)化,只深入分析同步阻塞網(wǎng)絡(luò) IO 的內(nèi)部實現(xiàn)。
在上面的 demo 中雖然只是簡單的兩三行代碼,但實際上用戶進程和內(nèi)核配合做了非常多的工作。先是用戶進程發(fā)起創(chuàng)建 socket 的指令,然后切換到內(nèi)核態(tài)完成了內(nèi)核對象的初始化。接下來 Linux 在數(shù)據(jù)包的接收上,是硬中斷和 ksoftirqd 進程在進行處理。當(dāng) ksoftirqd 進程處理完以后,再通知到相關(guān)的用戶進程。
從用戶進程創(chuàng)建 socket,到一個網(wǎng)絡(luò)包抵達網(wǎng)卡到被用戶進程接收到,總體上的流程圖如下:
我們今天用圖解加源碼分析的方式來詳細拆解一下上面的每一個步驟,來看一下在內(nèi)核里是它們是怎么實現(xiàn)的。閱讀完本文,你將深刻地理解在同步阻塞的網(wǎng)絡(luò) IO 性能低下的原因!
一、創(chuàng)建一個 socket
開篇源碼中的 socket 函數(shù)調(diào)用執(zhí)行完以后,內(nèi)核在內(nèi)部創(chuàng)建了一系列的 socket 相關(guān)的內(nèi)核對象(是的,不是只有一個)。它們互相之間的關(guān)系如圖。當(dāng)然了,這個對象比圖示的還要更復(fù)雜。我只在圖中把和今天的主題相關(guān)的內(nèi)容展現(xiàn)了出來。
我們來翻翻源碼,看下上面的結(jié)構(gòu)是如何被創(chuàng)造出來的。
//file:net/socket.c
SYSCALL_DEFINE3(socket, int, family, int, type, int, protocol)
{
。..。..
retval = sock_create(family, type, protocol, &sock);
}
sock_create 是創(chuàng)建 socket 的主要位置。其中 sock_create 又調(diào)用了 __sock_create。
//file:net/socket.c
int __sock_create(struct net *net, int family, int type, int protocol,
struct socket **res, int kern)
{
struct socket *sock;
const struct net_proto_family *pf;
。..。..
//分配 socket 對象
sock = sock_alloc();
//獲得每個協(xié)議族的操作表
pf = rcu_dereference(net_families[family]);
//調(diào)用每個協(xié)議族的創(chuàng)建函數(shù), 對于 AF_INET 對應(yīng)的是
err = pf-》create(net, sock, protocol, kern);
}
在 __sock_create 里,首先調(diào)用 sock_alloc 來分配一個 struct sock 對象。接著在獲取協(xié)議族的操作函數(shù)表,并調(diào)用其 create 方法。對于 AF_INET 協(xié)議族來說,執(zhí)行到的是 inet_create 方法。
//file:net/ipv4/af_inet.c
tatic int inet_create(struct net *net, struct socket *sock, int protocol,
int kern)
{
struct sock *sk;
//查找對應(yīng)的協(xié)議,對于TCP SOCK_STREAM 就是獲取到了
//static struct inet_protosw inetsw_array[] =
//{
// {
// .type = SOCK_STREAM,
// .protocol = IPPROTO_TCP,
// .prot = &tcp_prot,
// .ops = &inet_stream_ops,
// .no_check = 0,
// .flags = INET_PROTOSW_PERMANENT |
// INET_PROTOSW_ICSK,
// },
//}
list_for_each_entry_rcu(answer, &inetsw[sock-》type], list) {
//將 inet_stream_ops 賦到 socket-》ops 上
sock-》ops = answer-》ops;
//獲得 tcp_prot
answer_prot = answer-》prot;
//分配 sock 對象, 并把 tcp_prot 賦到 sock-》sk_prot 上
sk = sk_alloc(net, PF_INET, GFP_KERNEL, answer_prot);
//對 sock 對象進行初始化
sock_init_data(sock, sk);
}
在 inet_create 中,根據(jù)類型 SOCK_STREAM 查找到對于 tcp 定義的操作方法實現(xiàn)集合 inet_stream_ops 和 tcp_prot。并把它們分別設(shè)置到 socket-》ops 和 sock-》sk_prot 上。
我們再往下看到了 sock_init_data。在這個方法中將 sock 中的 sk_data_ready 函數(shù)指針進行了初始化,設(shè)置為默認(rèn) sock_def_readable()。
//file: net/core/sock.c
void sock_init_data(struct socket *sock, struct sock *sk)
{
sk-》sk_data_ready = sock_def_readable;
sk-》sk_write_space = sock_def_write_space;
sk-》sk_error_report = sock_def_error_report;
}
當(dāng)軟中斷上收到數(shù)據(jù)包時會通過調(diào)用 sk_data_ready 函數(shù)指針(實際被設(shè)置成了 sock_def_readable()) 來喚醒在 sock 上等待的進程。這個咱們后面介紹軟中斷的時候再說,這里記住這個就行了。
至此,一個 tcp對象,確切地說是 AF_INET 協(xié)議族下 SOCK_STREAM對象就算是創(chuàng)建完成了。這里花費了一次 socket 系統(tǒng)調(diào)用的開銷
二、等待接收消息
接著我們來看 recv 函數(shù)依賴的底層實現(xiàn)。首先通過 strace 命令跟蹤,可以看到 clib 庫函數(shù) recv 會執(zhí)行到 recvfrom 系統(tǒng)調(diào)用。
進入系統(tǒng)調(diào)用后,用戶進程就進入到了內(nèi)核態(tài),通過執(zhí)行一系列的內(nèi)核協(xié)議層函數(shù),然后到 socket 對象的接收隊列中查看是否有數(shù)據(jù),沒有的話就把自己添加到 socket 對應(yīng)的等待隊列里。最后讓出CPU,操作系統(tǒng)會選擇下一個就緒狀態(tài)的進程來執(zhí)行。整個流程圖如下:
看完了整個流程圖,接下來讓我們根據(jù)源碼來看更詳細的細節(jié)。其中我們今天要關(guān)注的重點是 recvfrom 最后是怎么把自己的進程給阻塞掉的(假如我們沒有使用 O_NONBLOCK 標(biāo)記)。
//file: net/socket.c
SYSCALL_DEFINE6(recvfrom, int, fd, void __user *, ubuf, size_t, size,
unsigned int, flags, struct sockaddr __user *, addr,
int __user *, addr_len)
{
struct socket *sock;
//根據(jù)用戶傳入的 fd 找到 socket 對象
sock = sockfd_lookup_light(fd, &err, &fput_needed);
。..。..
err = sock_recvmsg(sock, &msg, size, flags);
。..。..
}
sock_recvmsg ==》 __sock_recvmsg =》 __sock_recvmsg_nosec
static inline int __sock_recvmsg_nosec(struct kiocb *iocb, struct socket *sock,
struct msghdr *msg, size_t size, int flags)
{
。..。..
return sock-》ops-》recvmsg(iocb, sock, msg, size, flags);
}
調(diào)用 socket 對象 ops 里的 recvmsg, 回憶我們上面的 socket 對象圖,從圖中可以看到 recvmsg 指向的是 inet_recvmsg 方法。
//file: net/ipv4/af_inet.c
int inet_recvmsg(struct kiocb *iocb, struct socket *sock, struct msghdr *msg,
size_t size, int flags)
{
。..
err = sk-》sk_prot-》recvmsg(iocb, sk, msg, size, flags & MSG_DONTWAIT,
flags & ~MSG_DONTWAIT, &addr_len);
這里又遇到一個函數(shù)指針,這次調(diào)用的是 socket 對象里的 sk_prot 下面的 recvmsg方法。同上,得出這個 recvmsg 方法對應(yīng)的是 tcp_recvmsg 方法。
//file: net/ipv4/tcp.c
int tcp_recvmsg(struct kiocb *iocb, struct sock *sk, struct msghdr *msg,
size_t len, int nonblock, int flags, int *addr_len)
{
int copied = 0;
。..
do {
//遍歷接收隊列接收數(shù)據(jù)
skb_queue_walk(&sk-》sk_receive_queue, skb) {
。..
}
。..
}
if (copied 》= target) {
release_sock(sk);
lock_sock(sk);
} else //沒有收到足夠數(shù)據(jù),啟用 sk_wait_data 阻塞當(dāng)前進程
sk_wait_data(sk, &timeo);
}
終于看到了我們想要看的東西,skb_queue_walk 是在訪問 sock 對象下面的接收隊列了。
如果沒有收到數(shù)據(jù),或者收到不足夠多,則調(diào)用 sk_wait_data 把當(dāng)前進程阻塞掉。
//file: net/core/sock.c
int sk_wait_data(struct sock *sk, long *timeo)
{
//當(dāng)前進程(current)關(guān)聯(lián)到所定義的等待隊列項上
DEFINE_WAIT(wait);
// 調(diào)用 sk_sleep 獲取 sock 對象下的 wait
// 并準(zhǔn)備掛起,將進程狀態(tài)設(shè)置為可打斷 INTERRUPTIBLE
prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
set_bit(SOCK_ASYNC_WAITDATA, &sk-》sk_socket-》flags);
// 通過調(diào)用schedule_timeout讓出CPU,然后進行睡眠
rc = sk_wait_event(sk, timeo, !skb_queue_empty(&sk-》sk_receive_queue));
。..
我們再來詳細看下 sk_wait_data 是怎么把當(dāng)前進程給阻塞掉的。
首先在 DEFINE_WAIT 宏下,定義了一個等待隊列項 wait。在這個新的等待隊列項上,注冊了回調(diào)函數(shù) autoremove_wake_function,并把當(dāng)前進程描述符 current 關(guān)聯(lián)到其 .private成員上。
//file: include/linux/wait.h
#define DEFINE_WAIT(name) DEFINE_WAIT_FUNC(name, autoremove_wake_function)
#define DEFINE_WAIT_FUNC(name, function)
wait_queue_t name = {
.private = current,
.func = function,
.task_list = LIST_HEAD_INIT((name).task_list),
}
緊接著在 sk_wait_data 中 調(diào)用 sk_sleep 獲取 sock 對象下的等待隊列列表頭 wait_queue_head_t。sk_sleep 源代碼如下:
//file: include/net/sock.h
static inline wait_queue_head_t *sk_sleep(struct sock *sk)
{
BUILD_BUG_ON(offsetof(struct socket_wq, wait) != 0);
return &rcu_dereference_raw(sk-》sk_wq)-》wait;
}
接著調(diào)用 prepare_to_wait 來把新定義的等待隊列項 wait 插入到 sock 對象的等待隊列下。
//file: kernel/wait.c
void
prepare_to_wait(wait_queue_head_t *q, wait_queue_t *wait, int state)
{
unsigned long flags;
wait-》flags &= ~WQ_FLAG_EXCLUSIVE;
spin_lock_irqsave(&q-》lock, flags);
if (list_empty(&wait-》task_list))
__add_wait_queue(q, wait);
set_current_state(state);
spin_unlock_irqrestore(&q-》lock, flags);
}
這樣后面當(dāng)內(nèi)核收完數(shù)據(jù)產(chǎn)生就緒時間的時候,就可以查找 socket 等待隊列上的等待項,進而就可以找到回調(diào)函數(shù)和在等待該 socket 就緒事件的進程了。
最后再調(diào)用 sk_wait_event 讓出 CPU,進程將進入睡眠狀態(tài),這會導(dǎo)致一次進程上下文的開銷。
接下來的小節(jié)里我們將能看到進程是如何被喚醒的了。
三、軟中斷模塊
接著我們再轉(zhuǎn)換一下視角,來看負責(zé)接收和處理數(shù)據(jù)包的軟中斷這邊。關(guān)于網(wǎng)絡(luò)包到網(wǎng)卡后是怎么被網(wǎng)卡接收,最后在交由軟中斷處理的,這里就不多贅述了。感興趣的請看之前的文章《圖解Linux網(wǎng)絡(luò)包接收過程》。我們今天直接從 tcp 協(xié)議的接收函數(shù) tcp_v4_rcv 看起。
軟中斷(也就是 Linux 里的 ksoftirqd 進程)里收到數(shù)據(jù)包以后,發(fā)現(xiàn)是 tcp 的包的話就會執(zhí)行到 tcp_v4_rcv 函數(shù)。接著走,如果是 ESTABLISH 狀態(tài)下的數(shù)據(jù)包,則最終會把數(shù)據(jù)拆出來放到對應(yīng) socket 的接收隊列中。然后調(diào)用 sk_data_ready 來喚醒用戶進程。
我們看更詳細一點的代碼:
// file: net/ipv4/tcp_ipv4.c
int tcp_v4_rcv(struct sk_buff *skb)
{
。..。..
th = tcp_hdr(skb); //獲取tcp header
iph = ip_hdr(skb); //獲取ip header
//根據(jù)數(shù)據(jù)包 header 中的 ip、端口信息查找到對應(yīng)的socket
sk = __inet_lookup_skb(&tcp_hashinfo, skb, th-》source, th-》dest);
。..。..
//socket 未被用戶鎖定
if (!sock_owned_by_user(sk)) {
{
if (!tcp_prequeue(sk, skb))
ret = tcp_v4_do_rcv(sk, skb);
}
}
}
在 tcp_v4_rcv 中首先根據(jù)收到的網(wǎng)絡(luò)包的 header 里的 source 和 dest 信息來在本機上查詢對應(yīng)的 socket。找到以后,我們直接進入接收的主體函數(shù) tcp_v4_do_rcv 來看。
//file: net/ipv4/tcp_ipv4.c
int tcp_v4_do_rcv(struct sock *sk, struct sk_buff *skb)
{
if (sk-》sk_state == TCP_ESTABLISHED) {
//執(zhí)行連接狀態(tài)下的數(shù)據(jù)處理
if (tcp_rcv_established(sk, skb, tcp_hdr(skb), skb-》len)) {
rsk = sk;
goto reset;
}
return 0;
}
//其它非 ESTABLISH 狀態(tài)的數(shù)據(jù)包處理
。..。..
}
我們假設(shè)處理的是 ESTABLISH 狀態(tài)下的包,這樣就又進入 tcp_rcv_established 函數(shù)中進行處理。
//file: net/ipv4/tcp_input.c
int tcp_rcv_established(struct sock *sk, struct sk_buff *skb,
const struct tcphdr *th, unsigned int len)
{
。..。..
//接收數(shù)據(jù)到隊列中
eaten = tcp_queue_rcv(sk, skb, tcp_header_len,
&fragstolen);
//數(shù)據(jù) ready,喚醒 socket 上阻塞掉的進程
sk-》sk_data_ready(sk, 0);
在 tcp_rcv_established 中通過調(diào)用 tcp_queue_rcv 函數(shù)中完成了將接收數(shù)據(jù)放到 socket 的接收隊列上。
如下源碼所示
//file: net/ipv4/tcp_input.c
static int __must_check tcp_queue_rcv(struct sock *sk, struct sk_buff *skb, int hdrlen,
bool *fragstolen)
{
//把接收到的數(shù)據(jù)放到 socket 的接收隊列的尾部
if (!eaten) {
__skb_queue_tail(&sk-》sk_receive_queue, skb);
skb_set_owner_r(skb, sk);
}
return eaten;
}
調(diào)用 tcp_queue_rcv 接收完成之后,接著再調(diào)用 sk_data_ready 來喚醒在socket上等待的用戶進程。 這又是一個函數(shù)指針?;叵肷厦嫖覀冊?創(chuàng)建 socket 流程里執(zhí)行到的 sock_init_data 函數(shù),在這個函數(shù)里已經(jīng)把 sk_data_ready 設(shè)置成 sock_def_readable 函數(shù)了(可以ctrl + f 搜索前文)。它是默認(rèn)的數(shù)據(jù)就緒處理函數(shù)。
//file: net/core/sock.c
static void sock_def_readable(struct sock *sk, int len)
{
struct socket_wq *wq;
rcu_read_lock();
wq = rcu_dereference(sk-》sk_wq);
//有進程在此 socket 的等待隊列
if (wq_has_sleeper(wq))
//喚醒等待隊列上的進程
wake_up_interruptible_sync_poll(&wq-》wait, POLLIN | POLLPRI |
POLLRDNORM | POLLRDBAND);
sk_wake_async(sk, SOCK_WAKE_WAITD, POLL_IN);
rcu_read_unlock();
}
在 sock_def_readable 中再一次訪問到了 sock-》sk_wq 下的wait。回憶下我們前面調(diào)用 recvfrom 執(zhí)行的最后,通過 DEFINE_WAIT(wait) 將當(dāng)前進程關(guān)聯(lián)的等待隊列添加到 sock-》sk_wq 下的 wait 里了。
那接下來就是調(diào)用 wake_up_interruptible_sync_poll 來喚醒在 socket 上因為等待數(shù)據(jù)而被阻塞掉的進程了。
//file: include/linux/wait.h
#define wake_up_interruptible_sync_poll(x, m)
__wake_up_sync_key((x), TASK_INTERRUPTIBLE, 1, (void *) (m))
//file: kernel/sched/core.c
void __wake_up_sync_key(wait_queue_head_t *q, unsigned int mode,
int nr_exclusive, void *key)
{
unsigned long flags;
int wake_flags = WF_SYNC;
if (unlikely(!q))
return;
if (unlikely(!nr_exclusive))
wake_flags = 0;
spin_lock_irqsave(&q-》lock, flags);
__wake_up_common(q, mode, nr_exclusive, wake_flags, key);
spin_unlock_irqrestore(&q-》lock, flags);
}
__wake_up_common 實現(xiàn)喚醒。這里注意下, 該函數(shù)調(diào)用是參數(shù) nr_exclusive 傳入的是 1,這里指的是即使是有多個進程都阻塞在同一個 socket 上,也只喚醒 1 個進程。其作用是為了避免驚群。
//file: kernel/sched/core.c
static void __wake_up_common(wait_queue_head_t *q, unsigned int mode,
int nr_exclusive, int wake_flags, void *key)
{
wait_queue_t *curr, *next;
list_for_each_entry_safe(curr, next, &q-》task_list, task_list) {
unsigned flags = curr-》flags;
if (curr-》func(curr, mode, wake_flags, key) &&
(flags & WQ_FLAG_EXCLUSIVE) && !--nr_exclusive)
break;
}
}
在 __wake_up_common 中找出一個等待隊列項 curr,然后調(diào)用其 curr-》func?;貞浳覀兦懊嬖?recv 函數(shù)執(zhí)行的時候,使用 DEFINE_WAIT() 定義等待隊列項的細節(jié),內(nèi)核把 curr-》func 設(shè)置成了 autoremove_wake_function。
//file: include/linux/wait.h
#define DEFINE_WAIT(name) DEFINE_WAIT_FUNC(name, autoremove_wake_function)
#define DEFINE_WAIT_FUNC(name, function)
wait_queue_t name = {
.private = current,
.func = function,
.task_list = LIST_HEAD_INIT((name).task_list),
}
在 autoremove_wake_function 中,調(diào)用了 default_wake_function。
//file: kernel/sched/core.c
int default_wake_function(wait_queue_t *curr, unsigned mode, int wake_flags,
void *key)
{
return try_to_wake_up(curr-》private, mode, wake_flags);
}
調(diào)用 try_to_wake_up 時傳入的 task_struct 是 curr-》private。這個就是當(dāng)時因為等待而被阻塞的進程項。當(dāng)這個函數(shù)執(zhí)行完的時候,在 socket 上等待而被阻塞的進程就被推入到可運行隊列里了,這又將是一次進程上下文切換的開銷。
小結(jié)
好了,我們把上面的流程總結(jié)一下。內(nèi)核在通知網(wǎng)絡(luò)包的運行環(huán)境分兩部分:
第一部分是我們自己代碼所在的進程,我們調(diào)用的 socket() 函數(shù)會進入內(nèi)核態(tài)創(chuàng)建必要內(nèi)核對象。recv() 函數(shù)在進入內(nèi)核態(tài)以后負責(zé)查看接收隊列,以及在沒有數(shù)據(jù)可處理的時候把當(dāng)前進程阻塞掉,讓出 CPU。
第二部分是硬中斷、軟中斷上下文(系統(tǒng)進程 ksoftirqd)。在這些組件中,將包處理完后會放到 socket 的接收隊列中。然后再根據(jù) socket 內(nèi)核對象找到其等待隊列中正在因為等待而被阻塞掉的進程,然后把它喚醒。
每次一個進程專門為了等一個 socket 上的數(shù)據(jù)就得被從 CPU 上拿下來。然后再換上另一個進程。等到數(shù)據(jù) ready 了,睡眠的進程又會被喚醒??偣矁纱芜M程上下文切換開銷,根據(jù)之前的測試來看,每一次切換大約是 3-5 us(微秒)左右。如果是網(wǎng)絡(luò) IO 密集型的應(yīng)用的話,CPU 就不停地做進程切換這種無用功。
在服務(wù)端角色上,這種模式完全沒辦法使用。因為這種簡單模型里的 socket 和進程是一對一的。我們現(xiàn)在要在單臺機器上承載成千上萬,甚至十幾、上百萬的用戶連接請求。如果用上面的方式,那就得為每個用戶請求都創(chuàng)建一個進程。相信你在無論多原始的服務(wù)器網(wǎng)絡(luò)編程里,都沒見過有人這么干吧。
如果讓我給它起一個名字的話,它就叫單路不復(fù)用(飛哥自創(chuàng)名詞)。那么有沒有更高效的網(wǎng)絡(luò) IO 模型呢?當(dāng)然有,那就是你所熟知的 select、poll 和 epoll了。下次飛哥再開始拆解 epoll 的實現(xiàn)源碼,敬請期待!
這種模式在客戶端角色上,現(xiàn)在還存在使用的情形。因為你的進程可能確實得等 Mysql 的數(shù)據(jù)返回成功之后,才能渲染頁面返回給用戶,否則啥也干不了。
注意一下,我說的是角色,不是具體的機器。例如對于你的 php/java/golang 接口機,你接收用戶請求的時候,你是服務(wù)端角色。但當(dāng)你再請求 redis 的時候,就變?yōu)榭蛻舳私巧恕?/p>
不過現(xiàn)在有一些封裝的很好的網(wǎng)絡(luò)框架例如 Sogou Workflow,Golang 的 net 包等在網(wǎng)絡(luò)客戶端角色上也早已摒棄了這種低效的模式!
編輯:lyn
-
Socket
+關(guān)注
關(guān)注
0文章
188瀏覽量
34606 -
代碼
+關(guān)注
關(guān)注
30文章
4698瀏覽量
68101 -
BIO
+關(guān)注
關(guān)注
0文章
6瀏覽量
9358 -
C 語言
+關(guān)注
關(guān)注
0文章
18瀏覽量
14219 -
網(wǎng)絡(luò)開發(fā)
+關(guān)注
關(guān)注
0文章
13瀏覽量
7847
原文標(biāo)題:圖解:深入理解高性能網(wǎng)絡(luò)開發(fā)路上的絆腳石,同步阻塞網(wǎng)絡(luò) IO
文章出處:【微信號:LinuxHub,微信公眾號:Linux愛好者】歡迎添加關(guān)注!文章轉(zhuǎn)載請注明出處。
發(fā)布評論請先 登錄
相關(guān)推薦
評論