0
  • 聊天消息
  • 系統(tǒng)消息
  • 評(píng)論與回復(fù)
登錄后你可以
  • 下載海量資料
  • 學(xué)習(xí)在線課程
  • 觀看技術(shù)視頻
  • 寫文章/發(fā)帖/加入社區(qū)
會(huì)員中心
創(chuàng)作中心

完善資料讓更多小伙伴認(rèn)識(shí)你,還能領(lǐng)取20積分哦,立即完善>

3天內(nèi)不再提示

互斥鎖及條件變量的使用

科技綠洲 ? 來源:Linux開發(fā)架構(gòu)之路 ? 作者:Linux開發(fā)架構(gòu)之路 ? 2023-11-10 14:51 ? 次閱讀

本文主要分為三個(gè)部分:

  • 第一部分簡(jiǎn)要介紹線程的概念及其使用
  • 第二部分主要介紹互斥鎖及條件變量的使用(重點(diǎn)探討pthread_cond_wait)
  • 第三部分參考運(yùn)行IBM的多線程工作代碼作為應(yīng)用。

一、線程簡(jiǎn)介及使用

正確的使用線程是一個(gè)優(yōu)秀程序員必備的素質(zhì)。線程類似于進(jìn)程,單處理器系統(tǒng)中內(nèi)核通過時(shí)間片輪轉(zhuǎn)模擬線程的并發(fā)運(yùn)行。那么,對(duì)于大多數(shù)合作任務(wù),為什么多線程比多進(jìn)程優(yōu)越呢?

這是因?yàn)?,線程共享相同的內(nèi)存空間,不同線程之間可以共享內(nèi)存中的全局變量。使用fork()寫過子進(jìn)程的同學(xué)都會(huì)意識(shí)到多線程的重要性。為什么呢?雖然fork()允許創(chuàng)建多個(gè)進(jìn)程,但它會(huì)帶來進(jìn)程之間的通信問題:各個(gè)進(jìn)程都有各自獨(dú)立的內(nèi)存空間,需要使用某種IPC(進(jìn)程間通信)進(jìn)行通信,但它們都遇到兩個(gè)重要障礙:

  • 強(qiáng)加了某種形式的額外內(nèi)核開銷,從而降低性能。
  • 對(duì)于大多數(shù)情形,IPC 不是對(duì)于代碼的“自然”擴(kuò)展。通常極大地增加了程序的復(fù)雜性。

POSIX多線程不必使用諸如管道、套接字等長(zhǎng)距離通信,這些通信方式開銷大、復(fù)雜,由于所有線程都駐留在同一內(nèi)存空間,因此只需要考慮同步問題即可。

線程是快捷的

與標(biāo)準(zhǔn)fork()相比,線程開銷較少。無需單獨(dú)復(fù)制進(jìn)程的內(nèi)存空間或文件描述符等等,大大節(jié)省CPU時(shí)間,創(chuàng)建速度比進(jìn)程創(chuàng)建快到10-100倍,那么可以大量使用線程而無需擔(dān)心CPU或內(nèi)存不足。

同時(shí),線程能夠充分利用多處理器的CPU,特定類型線程(CPU密集型)的性能隨處理器數(shù)目線性提高。如果編寫的是CPU密集型程序,則絕對(duì)要在代碼中使用多線程,無需使用繁瑣的IPC及其他復(fù)雜通信機(jī)制。

線程是可移植的

fork()的底層系統(tǒng)調(diào)用是__clone(),新的子進(jìn)程根據(jù)該系統(tǒng)調(diào)用的參數(shù)有選擇的共享父進(jìn)程的執(zhí)行環(huán)境(內(nèi)存空間,文件描述符等),但__clone()也有不好的一面,__clone()是特定于Linux平臺(tái)的,不使用于實(shí)現(xiàn)可移植程序。而Linux的POSIX線程是可移植的,代碼運(yùn)行于Solaris、FreeBSD、Linux 和其它平臺(tái)。

代碼:

#include
#include
#include
#include
void *thread_function(void *arg) {
int i;
for ( i=0; i<20; i++) {
printf("Thread says hi!n");
sleep(1);
}
return NULL;
}
int main(void) {
pthread_t mythread;

if ( pthread_create( &mythread, NULL, thread_function, NULL) ) {
printf("error creating thread.");
abort();
}
if ( pthread_join ( mythread, NULL ) ) {
printf("error joining thread.");
abort();
}
exit(0);
}

該程序非常簡(jiǎn)單,但是也有我們需要學(xué)習(xí)的地方:

1)線程id的類型為pthread_t,可以認(rèn)為它是一種句柄,后續(xù)的使用都是利用它完成的

2)線程創(chuàng)建函數(shù)需要依次指定線程屬性、回調(diào)函數(shù)以及線程傳參(簡(jiǎn)單變量或結(jié)構(gòu)體),返回值考慮

3)線程創(chuàng)建后兩個(gè)線程如何運(yùn)行,子線程結(jié)束后如何處理

對(duì)于第三個(gè)問題:

子線程創(chuàng)建后,POSIX線程標(biāo)準(zhǔn)將它們視為相同等級(jí)的線程,子線程開始執(zhí)行的同時(shí),主線程繼續(xù)向下執(zhí)行(其實(shí)這里已經(jīng)沒有像進(jìn)程那樣的父子概念了,這里只是為了更好的區(qū)分),二者并沒有一定的先后順序,CPU時(shí)間片的分配取決于內(nèi)核和線程庫。

子線程結(jié)束時(shí)的處理,當(dāng)子線程的默認(rèn)joinable屬性時(shí),由主線程對(duì)其進(jìn)行清理;當(dāng)子線程為detached屬性時(shí),由系統(tǒng)進(jìn)程對(duì)其清理。如果未對(duì)線程進(jìn)行正確清理,最終會(huì)導(dǎo)致 pthread_create() 調(diào)用失敗。

代碼2:

#include
#include
#include
#include
int myglobal;
void *thread_function(void *arg) {
int i,j;
for ( i=0; i<20; i++) {
j=myglobal;
j=j+1;
printf(".");
fflush(stdout);
sleep(1);
myglobal=j;
}
return NULL;
}
int main(void) {
pthread_t mythread;
int i;
if ( pthread_create( &mythread, NULL, thread_function, NULL) ) {
printf("error creating thread.");
abort();
}
for ( i=0; i<20; i++) {
myglobal=myglobal+1;
printf("o");
fflush(stdout);
sleep(1);
}
if ( pthread_join ( mythread, NULL ) ) {
printf("error joining thread.");
abort();
}
printf("nmyglobal equals %dn",myglobal);
exit(0);
}

輸出:

圖片

非常意外吧,主線程和子線程各自對(duì)myglobal進(jìn)行20次加1操作,程序結(jié)束時(shí)myglobal應(yīng)當(dāng)為40,然而myglobal的輸出為21,這里面肯定有問題。究竟是為什么呢?

核心原因就是:對(duì)全局變量的修改并不是原子操作,假設(shè)子線程讀取全局變量到寄存器,寄存器內(nèi)部完成加1,之后即將重新賦值給全局變量前的時(shí)刻。主線程開始讀取全局變量完成操作,那么此時(shí)就覆蓋了子線程的這一環(huán)節(jié)操作,該操作也就成了無效操作。

圖片

圖片

解決這一問題就需要互斥操作了,見第二部分。

二、互斥鎖及條件變量的使用

通過互斥鎖 (mutex)完成對(duì)臨界資源的鎖操作,能夠保證各個(gè)線程對(duì)其的唯一訪問。

互斥對(duì)象的工作原理

如果線程 a 試圖鎖定一個(gè)互斥對(duì)象,而此時(shí)線程 b 已鎖定了同一個(gè)互斥對(duì)象時(shí),線程 a 就將進(jìn)入睡眠狀態(tài)。一旦線程 b 釋放了互斥對(duì)象(通過 pthread_mutex_unlock() 調(diào)用),線程 a 就能夠鎖定這個(gè)互斥對(duì)象(換句話說,線程 a 就將從 pthread_mutex_lock() 函數(shù)調(diào)用中返回,同時(shí)互斥對(duì)象被鎖定)。同樣地,其他對(duì)已鎖定的互斥對(duì)象上調(diào)用 pthread_mutex_lock() 的所有線程都將進(jìn)入睡眠狀態(tài),這些睡眠的線程將“排隊(duì)”訪問這個(gè)互斥對(duì)象。

圖片

看到了嗎?其他試圖訪問已被鎖定互斥對(duì)象的線程都會(huì)排隊(duì)睡眠的:)

代碼修改:

#include
#include
#include
#include
int myglobal;
pthread_mutex_t mymutex=PTHREAD_MUTEX_INITIALIZER;

void *thread_function(void *arg) {
int i,j;
for ( i=0; i<20; i++) {
pthread_mutex_lock(&mymutex);
j=myglobal;
j=j+1;
printf(".");
fflush(stdout);
sleep(1);
myglobal=j;
pthread_mutex_unlock(&mymutex);
}
return NULL;
}
int main(void) {
pthread_t mythread;
int i;
if ( pthread_create( &mythread, NULL, thread_function, NULL) ) {
printf("error creating thread.");
abort();
}
for ( i=0; i<20; i++) {
pthread_mutex_lock(&mymutex);
myglobal=myglobal+1;
pthread_mutex_unlock(&mymutex);
printf("o");
fflush(stdout);
sleep(1);
}
if ( pthread_join ( mythread, NULL ) ) {
printf("error joining thread.");
abort();
}
printf("nmyglobal equals %dn",myglobal);
exit(0);
}

此時(shí)pthread_mutex_lock() 和 pthread_mutex_unlock() 函數(shù)調(diào)用,如同“在施工中”標(biāo)志一樣,將正在修改和讀取的某一特定共享數(shù)據(jù)包圍起來。其他線程訪問時(shí)繼續(xù)睡眠,直到該線程完成對(duì)其的操作。

圖片

等待條件之POSIX條件變量

互斥對(duì)象是線程程序必需的工具,但它們并非萬能的。例如,如果線程正在等待共享數(shù)據(jù)內(nèi)某個(gè)條件出現(xiàn),那會(huì)發(fā)生什么呢?

1)使用忙查詢的方法非常浪費(fèi)時(shí)間和資源,效率非常低。代碼可以反復(fù)對(duì)互斥對(duì)象鎖定和解鎖,以檢查值的任何變化。同時(shí),還要快速將互斥對(duì)象解鎖,以便其它線程能夠進(jìn)行任何必需的更改。這是一種非??膳碌姆椒?,因?yàn)榫€程需要在合理的時(shí)間范圍內(nèi)頻繁地循環(huán)檢測(cè)變化。

2)解決這個(gè)問題的最好方法是使用pthread_cond_wait() 調(diào)用來等待特殊條件發(fā)生。當(dāng)線程在等待滿足某些條件時(shí)使線程進(jìn)入睡眠狀態(tài)。一旦條件滿足,還需要一種方法以喚醒因等待滿足特定條件而睡眠的線程。如果能夠做到這一點(diǎn),線程代碼將是非常高效的,并且不會(huì)占用寶貴的互斥對(duì)象鎖。這正是 POSIX 條件變量能做的事!

了解 pthread_cond_wait() 的作用非常重要 – 它是 POSIX 線程信號(hào)發(fā)送系統(tǒng)的核心,也是最難以理解的部分。

條件變量的概念

通常在程序里,我們使用條件變量來表示等待”某一條件”的發(fā)生。雖然名叫”條件變量”,但是它本身并不保存條件狀態(tài),本質(zhì)上條件變量?jī)H僅是一種通訊機(jī)制:當(dāng)有一個(gè)線程在等待(pthread_cond_wait)某一條件變量的時(shí)候,會(huì)將當(dāng)前的線程掛起,直到另外的線程發(fā)送信號(hào)(pthread_cond_signal)通知其解除阻塞狀態(tài)。

由于要用額外的共享變量保存條件狀態(tài)(這個(gè)變量可以是任何類型比如bool),由于這個(gè)變量會(huì)同時(shí)被不同的線程訪問,因此需要一個(gè)額外的mutex保護(hù)它。

《Linux系統(tǒng)編程手冊(cè)》也有這個(gè)問題的介紹:

A condition variable is always used in conjunction with a mutex. The mutex provides mutual exclusion for accessing the shared variable, while the condition variable is used to signal changes in the variable’s state.

條件變量總是結(jié)合mutex使用,條件變量就共享變量的狀態(tài)改變發(fā)出通知,mutex就是用來保護(hù)這個(gè)共享變量的。

cpp官網(wǎng)描述

pthread_cond_wait實(shí)現(xiàn)步驟

首先,我們使用條件變量的接口實(shí)現(xiàn)一個(gè)簡(jiǎn)單的生產(chǎn)者-消費(fèi)者模型,avail就是保存條件狀態(tài)的共享變量,它對(duì)生產(chǎn)者線程、消費(fèi)者線程均可見。不考慮錯(cuò)誤處理,先看生產(chǎn)者實(shí)現(xiàn):

pthread_mutex_lock(&mutex);
avail++;
pthread_mutex_unlock(&mutex);

pthread_cond_signal(&cond); /* Wake sleeping consumer */

因?yàn)閍vail對(duì)兩個(gè)線程都可見,因此對(duì)其修改均應(yīng)該在mutex的保護(hù)之下,再來看消費(fèi)者線程實(shí)現(xiàn):

for (;;)
{
pthread_mutex_lock(&mutex);
while (avail == 0)
pthread_cond_wait(&cond, &mutex);

while (avail > 0)
{
/* Do something */
avail--;
}
pthread_mutex_unlock(&mutex);
}

當(dāng)”avail==0”時(shí),消費(fèi)者線程會(huì)阻塞在pthread_cond_wait()函數(shù)上。如果pthread_cond_wait()僅需要一個(gè)pthread_cond_t參數(shù)的話,此時(shí)mutex已經(jīng)被鎖,要是不先將mutex變量解鎖,那么其他線程(如生產(chǎn)者線程)永遠(yuǎn)無法訪問avail變量,也就無法繼續(xù)生產(chǎn),消費(fèi)者線程會(huì)一直阻塞下去。

因此pthread_cond_wait()對(duì)mutex解鎖,然后進(jìn)入睡眠狀態(tài),等待cond以接收POSIX 線程“信號(hào)”。一旦接收到“信號(hào)”(加引號(hào)是因?yàn)槲覀儾⒉皇窃谟懻搨鹘y(tǒng)的 UNIX 信號(hào),而是來自pthread_cond_signal() 或 pthread_cond_broadcast() 調(diào)用的信號(hào)),它就會(huì)蘇醒。但 pthread_cond_wait() 沒有立即返回 – 它還要做一件事:重新鎖定 mutex。

理解后提問:調(diào)用 pthread_cond_wait() 之 前,互斥對(duì)象必須處于什么狀態(tài)?pthread_cond_wait() 調(diào)用返回之后,互斥對(duì)象處于什么狀態(tài)?這兩個(gè)問題的答案都是“鎖定”。

綜上,pthread_cond_wait()函數(shù)大致會(huì)分為3個(gè)部分:

1.解鎖互斥量mutex 2.阻塞調(diào)用線程,直到當(dāng)前的條件變量收到通知 3.重新鎖定互斥量mutex

其中1和2是原子操作,也就是說在pthread_cond_wait()調(diào)用線程陷入阻塞之前其他的線程無法獲取當(dāng)前的mutex,也就不能就該條件變量發(fā)出通知。

虛假喚醒

前面判斷條件狀態(tài)的時(shí)候avail > 0放在了while循環(huán)中,而不是if中,這是因?yàn)閜thread_cond_wait()阻塞在條件變量上的時(shí)候,即使其他的線程沒有就該條件變量發(fā)出通知(pthread_cond_signal()/pthread_cond_broadcast()),條件變量也有可能會(huì)自己醒來(pthread_cond_wait()函數(shù)返回),因此需要重新檢查一下共享變量上的條件成不成立,確保條件變量是真的收到了通知,否則繼續(xù)阻塞等待。關(guān)于虛假喚醒的相關(guān)介紹,可以戳這里查看維基百科下面的幾個(gè)引用:https://en.wikipedia.org/wiki/Spurious_wakeup。

三、工作隊(duì)列的實(shí)現(xiàn)

這里摘抄IBM第三部分應(yīng)用實(shí)現(xiàn)代碼:

在這個(gè)方案中,我們創(chuàng)建了許多工作程序線程。每個(gè)線程都會(huì)檢查 wq(“工作隊(duì)列”),查看是否有需要完成的工作。如果有需要完成的工作,那么線程將從隊(duì)列中除去一個(gè)節(jié)點(diǎn),執(zhí)行這些特定工作,然后等待新的工作到達(dá)。

與此同時(shí),主線程負(fù)責(zé)創(chuàng)建這些工作程序線程、將工作添加到隊(duì)列,然后在它退出時(shí)收集所有工作程序線程。您將會(huì)遇到許多 C 代碼,好好準(zhǔn)備吧!

隊(duì)列

需要隊(duì)列是出于兩個(gè)原因。首先,需要隊(duì)列來保存工作作業(yè)。還需要可用于跟蹤已終止線程的數(shù)據(jù)結(jié)構(gòu)。還記得前幾篇文章(請(qǐng)參閱本文結(jié)尾處的 參考資料)中,我曾提到過需要使用帶有特定進(jìn)程標(biāo)識(shí)的 pthread_join 嗎?使用“清除隊(duì)列”(稱作 “cq”)可以解決無法等待 任何已終止線程的問題(稍后將詳細(xì)討論這個(gè)問題)。以下是標(biāo)準(zhǔn)隊(duì)列代碼。將此代碼保存到文件 queue.h 和 queue.c:

queue.h

/* queue.h
** Copyright 2000 Daniel Robbins, Gentoo Technologies, Inc.
** Author: Daniel Robbins
** Date: 16 Jun 2000
*/
typedef struct node {
struct node *next;
} node;
typedef struct queue {
node *head, *tail;
} queue;
void queue_init(queue *myroot);
void queue_put(queue *myroot, node *mynode);
node *queue_get(queue *myroot);

queue.c

/* queue.c
** Copyright 2000 Daniel Robbins, Gentoo Technologies, Inc.
** Author: Daniel Robbins
** Date: 16 Jun 2000
**
** This set of queue functions was originally thread-aware. I
** redesigned the code to make this set of queue routines
** thread-ignorant (just a generic, boring yet very fast set of queue
** routines). Why the change? Because it makes more sense to have
** the thread support as an optional add-on. Consider a situation
** where you want to add 5 nodes to the queue. With the
** thread-enabled version, each call to queue_put() would
** automatically lock and unlock the queue mutex 5 times -- that's a
** lot of unnecessary overhead. However, by moving the thread stuff
** out of the queue routines, the caller can lock the mutex once at
** the beginning, then insert 5 items, and then unlock at the end.
** Moving the lock/unlock code out of the queue functions allows for
** optimizations that aren't possible otherwise. It also makes this
** code useful for non-threaded applications.
**
** We can easily thread-enable this data structure by using the
** data_control type defined in control.c and control.h. */

#include
#include "queue.h"

void queue_init(queue *myroot) {
myroot->head=NULL;
myroot->tail=NULL;
}

void queue_put(queue *myroot,node *mynode) {
mynode->next=NULL;
if (myroot->tail!=NULL)
myroot->tail->next=mynode;
myroot->tail=mynode;
if (myroot->head==NULL)
myroot->head=mynode;
}

node *queue_get(queue *myroot) {
//get from root
node *mynode;
mynode=myroot->head;
if (myroot->head!=NULL)
myroot->head=myroot->head->next;
return mynode;
}

data_control 代碼

我編寫的并不是線程安全的隊(duì)列例程,事實(shí)上我創(chuàng)建了一個(gè)“數(shù)據(jù)包裝”或“控制”結(jié)構(gòu),它可以是任何線程支持的數(shù)據(jù)結(jié)構(gòu)??匆幌?control.h:

#include

typedef struct data_control {
pthread_mutex_t mutex;
pthread_cond_t cond;
int active;
} data_control;

現(xiàn)在您看到了 data_control 結(jié)構(gòu)定義,以下是它的視覺表示:

所使用的 data_control 結(jié)構(gòu)

圖片

圖像中的鎖代表互斥對(duì)象,它允許對(duì)數(shù)據(jù)結(jié)構(gòu)進(jìn)行互斥訪問。黃色的星代表?xiàng)l件變量,它可以睡眠,直到所討論的數(shù)據(jù)結(jié)構(gòu)改變?yōu)橹?。on/off 開關(guān)表示整數(shù) “active”,它告訴線程此數(shù)據(jù)是否是活動(dòng)的。在代碼中,我使用整數(shù) active 作為標(biāo)志,告訴工作隊(duì)列何時(shí)應(yīng)該關(guān)閉。以下是 control.c:

control.c

/* control.c
** Copyright 2000 Daniel Robbins, Gentoo Technologies, Inc.
** Author: Daniel Robbins
** Date: 16 Jun 2000
**
** These routines provide an easy way to make any type of
** data-structure thread-aware. Simply associate a data_control
** structure with the data structure (by creating a new struct, for
** example). Then, simply lock and unlock the mutex, or
** wait/signal/broadcast on the condition variable in the data_control
** structure as needed.
**
** data_control structs contain an int called "active". This int is
** intended to be used for a specific kind of multithreaded design,
** where each thread checks the state of "active" every time it locks
** the mutex. If active is 0, the thread knows that instead of doing
** its normal routine, it should stop itself. If active is 1, it
** should continue as normal. So, by setting active to 0, a
** controlling thread can easily inform a thread work crew to shut
** down instead of processing new jobs. Use the control_activate()
** and control_deactivate() functions, which will also broadcast on
** the data_control struct's condition variable, so that all threads
** stuck in pthread_cond_wait() will wake up, have an opportunity to
** notice the change, and then terminate.
*/

#include "control.h"

int control_init(data_control *mycontrol) {
int mystatus;
if (pthread_mutex_init(&(mycontrol->mutex),NULL))
return 1;
if (pthread_cond_init(&(mycontrol->cond),NULL))
return 1;
mycontrol->active=0;
return 0;
}

int control_destroy(data_control *mycontrol) {
int mystatus;
if (pthread_cond_destroy(&(mycontrol->cond)))
return 1;
if (pthread_cond_destroy(&(mycontrol->cond)))
return 1;
mycontrol->active=0;
return 0;
}
int control_activate(data_control *mycontrol) {
int mystatus;
if (pthread_mutex_lock(&(mycontrol->mutex)))
return 0;
mycontrol->active=1;
pthread_mutex_unlock(&(mycontrol->mutex));
pthread_cond_broadcast(&(mycontrol->cond));
return 1;
}

int control_deactivate(data_control *mycontrol) {
int mystatus;
if (pthread_mutex_lock(&(mycontrol->mutex)))
return 0;
mycontrol->active=0;
pthread_mutex_unlock(&(mycontrol->mutex));
pthread_cond_broadcast(&(mycontrol->cond));
return 1;
}

調(diào)試時(shí)間

在開始調(diào)試之前,還需要一個(gè)文件。以下是 dbug.h:

#define dabort()
{ printf("Aborting at line %d in source file %sn",__LINE__,__FILE__); abort(); }

此代碼用于處理工作組代碼中的不可糾正錯(cuò)誤。

工作組代碼說到工作組代碼,以下就是:workcrew.c

#include
#include
#include "control.h"
#include "queue.h"
#include "dbug.h"

/* the work_queue holds tasks for the various threads to complete. */

struct work_queue {
data_control control;
queue work;
} wq;


/* I added a job number to the work node. Normally, the work node
would contain additional data that needed to be processed. */

typedef struct work_node {
struct node *next;
int jobnum;
} wnode;

/* the cleanup queue holds stopped threads. Before a thread
terminates, it adds itself to this list. Since the main thread is
waiting for changes in this list, it will then wake up and clean up
the newly terminated thread. */

struct cleanup_queue {
data_control control;
queue cleanup;
} cq;

/* I added a thread number (for debugging/instructional purposes) and
a thread id to the cleanup node. The cleanup node gets passed to
the new thread on startup, and just before the thread stops, it
attaches the cleanup node to the cleanup queue. The main thread
monitors the cleanup queue and is the one that performs the
necessary cleanup. */

typedef struct cleanup_node {
struct node *next;
int threadnum;
pthread_t tid;
} cnode;

void *threadfunc(void *myarg) {

wnode *mywork;
cnode *mynode;

mynode=(cnode *) myarg;

pthread_mutex_lock(&wq.control.mutex);

while (wq.control.active) {
while (wq.work.head==NULL && wq.control.active) {
pthread_cond_wait(&wq.control.cond, &wq.control.mutex);
}
if (!wq.control.active)
break;
//we got something!
mywork=(wnode *) queue_get(&wq.work);
pthread_mutex_unlock(&wq.control.mutex);
//perform processing...
printf("Thread number %d processing job %dn",mynode->threadnum,mywork->jobnum);
free(mywork);
pthread_mutex_lock(&wq.control.mutex);
}

pthread_mutex_unlock(&wq.control.mutex);

pthread_mutex_lock(&cq.control.mutex);
queue_put(&cq.cleanup,(node *) mynode);
pthread_mutex_unlock(&cq.control.mutex);
pthread_cond_signal(&cq.control.cond);
printf("thread %d shutting down...n",mynode->threadnum);
return NULL;

}

#define NUM_WORKERS 4

int numthreads;

void join_threads(void) {
cnode *curnode;

printf("joining threads...n");

while (numthreads) {
pthread_mutex_lock(&cq.control.mutex);

/* below, we sleep until there really is a new cleanup node. This
takes care of any false wakeups... even if we break out of
pthread_cond_wait(), we don't make any assumptions that the
condition we were waiting for is true. */

while (cq.cleanup.head==NULL) {
pthread_cond_wait(&cq.control.cond,&cq.control.mutex);
}

/* at this point, we hold the mutex and there is an item in the
list that we need to process. First, we remove the node from
the queue. Then, we call pthread_join() on the tid stored in
the node. When pthread_join() returns, we have cleaned up
after a thread. Only then do we free() the node, decrement the
number of additional threads we need to wait for and repeat the
entire process, if necessary */

curnode = (cnode *) queue_get(&cq.cleanup);
pthread_mutex_unlock(&cq.control.mutex);
pthread_join(curnode->tid,NULL);
printf("joined with thread %dn",curnode->threadnum);
free(curnode);
numthreads--;
}
}


int create_threads(void) {
int x;
cnode *curnode;

for (x=0; x curnode=malloc(sizeof(cnode));
if (!curnode)
return 1;
curnode->threadnum=x;
if (pthread_create(&curnode->tid, NULL, threadfunc, (void *) curnode))
return 1;
printf("created thread %dn",x);
numthreads++;
}
return 0;
}

void initialize_structs(void) {
numthreads=0;
if (control_init(&wq.control))
dabort();
queue_init(&wq.work);
if (control_init(&cq.control)) {
control_destroy(&wq.control);
dabort();
}
queue_init(&wq.work);
control_activate(&wq.control);
}

void cleanup_structs(void) {
control_destroy(&cq.control);
control_destroy(&wq.control);
}


int main(void) {

int x;
wnode *mywork;

initialize_structs();

/* CREATION */

if (create_threads()) {
printf("Error starting threads... cleaning up.n");
join_threads();
dabort();
}

pthread_mutex_lock(&wq.control.mutex);
for (x=0; x<16000; x++) {
mywork=malloc(sizeof(wnode));
if (!mywork) {
printf("ouch! can't malloc!n");
break;
}
mywork->jobnum=x;
queue_put(&wq.work,(node *) mywork);
}
pthread_mutex_unlock(&wq.control.mutex);
pthread_cond_broadcast(&wq.control.cond);

printf("sleeping...n");
sleep(2);
printf("deactivating work queue...n");
control_deactivate(&wq.control);
/* CLEANUP */

join_threads();
cleanup_structs();

};>

代碼初排

現(xiàn)在來快速初排代碼。定義的第一個(gè)結(jié)構(gòu)稱作 “wq”,它包含了 data_control 和隊(duì)列頭。data_control 結(jié)構(gòu)用于仲裁對(duì)整個(gè)隊(duì)列的訪問,包括隊(duì)列中的節(jié)點(diǎn)。下一步工作是定義實(shí)際的工作節(jié)點(diǎn)。要使代碼符合本文中的示例,此處所包含的都是作業(yè)號(hào)。

接著,創(chuàng)建清除隊(duì)列。注釋說明了它的工作方式。好,現(xiàn)在讓我們跳過 threadfunc()、join_threads()、create_threads() 和 initialize_structs() 調(diào)用,直接跳到 main()。所做的第一件事就是初始化結(jié)構(gòu) – 這包括初始化 data_controls 和隊(duì)列,以及激活工作隊(duì)列。

有關(guān)清除的注意事項(xiàng)

現(xiàn)在初始化線程。如果看一下 create_threads() 調(diào)用,似乎一切正常 – 除了一件事。請(qǐng)注意,我們正在分配清除節(jié)點(diǎn),以及初始化它的線程號(hào)和 TID 組件。我們還將清除節(jié)點(diǎn)作為初始自變量傳遞給每一個(gè)新的工作程序線程。為什么這樣做?

因?yàn)楫?dāng)某個(gè)工作程序線程退出時(shí),它會(huì)將其清除節(jié)點(diǎn)連接到清除隊(duì)列,然后終止。那時(shí),主線程會(huì)在清除隊(duì)列中檢測(cè)到這個(gè)節(jié)點(diǎn)(利用條件變量),并將這個(gè)節(jié)點(diǎn)移出隊(duì)列。因?yàn)?TID(線程標(biāo)識(shí))存儲(chǔ)在清除節(jié)點(diǎn)中,所以主線程可以確切知道哪個(gè)線程已終止了。然后,主線程將調(diào)用 pthread_join(tid),并聯(lián)接適當(dāng)?shù)墓ぷ鞒绦蚓€程。如果沒有做記錄,那么主線程就需要按任意順序聯(lián)接工作程序線程,可能是按它們的創(chuàng)建順序。由于線程不一定按此順序終止,那么主線程可能會(huì)在已經(jīng)聯(lián)接了十個(gè)線程時(shí),等待聯(lián)接另一個(gè)線程。您能理解這種設(shè)計(jì)決策是如何使關(guān)閉代碼加速的嗎(尤其在使用幾百個(gè)工作程序線程的情況下)?

創(chuàng)建工作

我們已啟動(dòng)了工作程序線程(它們已經(jīng)完成了執(zhí)行 threadfunc(),稍后將討論此函數(shù)),現(xiàn)在主線程開始將工作節(jié)點(diǎn)插入工作隊(duì)列。首先,它鎖定 wq 的控制互斥對(duì)象,然后分配 16000 個(gè)工作包,將它們逐個(gè)插入隊(duì)列。完成之后,將調(diào)用 pthread_cond_broadcast(),于是所有正在睡眠的線程會(huì)被喚醒,并開始執(zhí)行工作。此時(shí),主線程將睡眠兩秒鐘,然后釋放工作隊(duì)列,并通知工作程序線程終止活動(dòng)。接著,主線程會(huì)調(diào)用 join_threads() 函數(shù)來清除所有工作程序線程。

threadfunc()

現(xiàn)在來討論 threadfunc(),這是所有工作程序線程都要執(zhí)行的代碼。當(dāng)工作程序線程啟動(dòng)時(shí),它會(huì)立即鎖定工作隊(duì)列互斥對(duì)象,獲取一個(gè)工作節(jié)點(diǎn)(如果有的話),然后對(duì)它進(jìn)行處理。如果沒有工作,則調(diào)用 pthread_cond_wait()。您會(huì)注意到這個(gè)調(diào)用在一個(gè)非常緊湊的 while() 循環(huán)中,這是非常重要的。當(dāng)從 pthread_cond_wait() 調(diào)用中蘇醒時(shí),決不能認(rèn)為條件肯定發(fā)生了 – 它 可能發(fā)生了,也可能沒有發(fā)生。如果發(fā)生了這種情況,即錯(cuò)誤地喚醒了線程,而列表是空的,那么 while 循環(huán)將再次調(diào)用 pthread_cond_wait()。

如果有一個(gè)工作節(jié)點(diǎn),那么我們只打印它的作業(yè)號(hào),釋放它并退出。然而,實(shí)際代碼會(huì)執(zhí)行一些更實(shí)質(zhì)性的操作。在 while() 循環(huán)結(jié)尾,我們鎖定了互斥對(duì)象,以便檢查 active 變量,以及在循環(huán)頂部檢查新的工作節(jié)點(diǎn)。如果執(zhí)行完此代碼,就會(huì)發(fā)現(xiàn)如果 wq.control.active 是 0,while 循環(huán)就會(huì)終止,并會(huì)執(zhí)行 threadfunc() 結(jié)尾處的清除代碼。

工作程序線程的清除代碼部件非常有趣。首先,由于 pthread_cond_wait() 返回了鎖定的互斥對(duì)象,它會(huì)對(duì) work_queue 解鎖。然后,它鎖定清除隊(duì)列,添加清除代碼(包含了 TID,主線程將使用此 TID 來調(diào)用 pthread_join()),然后再對(duì)清除隊(duì)列解鎖。此后,它發(fā)信號(hào)給所有 cq 等待者 (pthread_cond_signal(&cq.control.cond)),于是主線程就知道有一個(gè)待處理的新節(jié)點(diǎn)。我們不使用 pthread_cond_broadcast(),因?yàn)闆]有這個(gè)必要 – 只有一個(gè)線程(主線程)在等待清除隊(duì)列中的新節(jié)點(diǎn)。當(dāng)它調(diào)用 join_threads() 時(shí),工作程序線程將打印關(guān)閉消息,然后終止,等待主線程發(fā)出的 pthread_join() 調(diào)用。

join_threads()

如果要查看關(guān)于如何使用條件變量的簡(jiǎn)單示例,請(qǐng)參考 join_threads() 函數(shù)。如果還有工作程序線程,join_threads() 會(huì)一直執(zhí)行,等待清除隊(duì)列中新的清除節(jié)點(diǎn)。如果有新節(jié)點(diǎn),我們會(huì)將此節(jié)點(diǎn)移出隊(duì)列、對(duì)清除隊(duì)列解鎖(從而使工作程序可以添加清除節(jié)點(diǎn))、聯(lián)接新的工作程序線程(使用存儲(chǔ)在清除節(jié)點(diǎn)中的 TID)、釋放清除節(jié)點(diǎn)、減少“現(xiàn)有”線程的數(shù)量,然后繼續(xù)。

圖片

聲明:本文內(nèi)容及配圖由入駐作者撰寫或者入駐合作網(wǎng)站授權(quán)轉(zhuǎn)載。文章觀點(diǎn)僅代表作者本人,不代表電子發(fā)燒友網(wǎng)立場(chǎng)。文章及其配圖僅供工程師學(xué)習(xí)之用,如有內(nèi)容侵權(quán)或者其他違規(guī)問題,請(qǐng)聯(lián)系本站處理。 舉報(bào)投訴
  • cpu
    cpu
    +關(guān)注

    關(guān)注

    68

    文章

    10772

    瀏覽量

    210453
  • 通信
    +關(guān)注

    關(guān)注

    18

    文章

    5926

    瀏覽量

    135705
  • 處理器系統(tǒng)
    +關(guān)注

    關(guān)注

    0

    文章

    9

    瀏覽量

    7784
  • 線程
    +關(guān)注

    關(guān)注

    0

    文章

    502

    瀏覽量

    19614
收藏 人收藏

    評(píng)論

    相關(guān)推薦

    線程 ,生產(chǎn)者. 消費(fèi)者 互斥條件變量

    mutex; //互斥 pthread_cond_t cond_full,cond_empty;//隊(duì)列判滿,判空 條件變量lqueue_t *head;//初始化隊(duì)列int i=
    發(fā)表于 11-18 13:47

    Linux C 多線程編程之互斥條件變量實(shí)例詳解

    條件成立"而掛起;另一個(gè)線程使"條件成立"(給出條件成立信號(hào))。為了防止競(jìng)爭(zhēng),條件變量的使用總是和一個(gè)
    發(fā)表于 06-03 17:13

    淺析linux下的條件變量

    ? 一.條件變量 ? ? 條件變量是用來等待線程而不是上鎖的,條件變量通常和
    發(fā)表于 07-12 08:10

    Linux的線程同步方法

    Linux下提供了多種方式來處理線程同步,最常用的是互斥條件變量和信號(hào)量。
    發(fā)表于 07-19 07:24

    【HarmonyOS HiSpark AI Camera試用連載 】第三次回眸-鴻蒙的線程和互斥

    用的是互斥、條件變量、信號(hào)量和讀寫。我的這個(gè)例子里面,使用漏桶算法,加入互斥
    發(fā)表于 11-13 02:22

    【HarmonyOS HiSpark AI Camera 】第三次回眸-鴻蒙的線程和互斥

    / ... ad-0000001050141770提供的API進(jìn)行編程。多線程在多核處理器可以加快運(yùn)行時(shí)間,在單核處理器上沒有什么優(yōu)勢(shì)。線程同步,最常用的是互斥、條件變量、信號(hào)量和讀
    發(fā)表于 11-13 20:01

    i.MX6ULL開發(fā)板線程同步POSIX無名信號(hào)量

    使用Linux系統(tǒng)提供的機(jī)制來對(duì)線程訪問資源的順序進(jìn)行同步,本文檔挑選了信號(hào)量,互斥,條件變量來介紹線程同步機(jī)制,實(shí)驗(yàn)代碼在sync/目錄下。1 POSIX無名信號(hào)量本章介紹POSI
    發(fā)表于 04-02 14:04

    Linux C多線程編程之互斥條件變量實(shí)例詳解

    死鎖主要發(fā)生在有多個(gè)依賴存在時(shí), 會(huì)在一個(gè)線程試圖以與另一個(gè)線程相反順序鎖住互斥量時(shí)發(fā)生. 如何避免死鎖是使用互斥量應(yīng)該格外注意的東西。
    的頭像 發(fā)表于 03-29 11:53 ?6572次閱讀

    詳談Linux操作系統(tǒng)編程的條件變量

    條件變量是用來等待線程而不是上鎖的,條件變量通常和互斥一起使用。
    的頭像 發(fā)表于 09-27 15:23 ?1947次閱讀
    詳談Linux操作系統(tǒng)編程的<b class='flag-5'>條件</b><b class='flag-5'>變量</b>

    深入了解互斥、條件變量、讀寫以及自旋

    C++11只包含其中的部分。接下來我主要通過pthread的API來展開本文。 mutex(互斥量) mutex(mutual exclusive)即互斥量(互斥體)。也便是常說的互斥
    的頭像 發(fā)表于 11-01 10:02 ?1842次閱讀

    Linux互斥的作用 互斥是什么

    1、互斥 互斥(mutex),在訪問共享資源之前對(duì)互斥進(jìn)行上鎖,在訪問完成后釋放
    的頭像 發(fā)表于 07-21 11:13 ?857次閱讀

    Linux線程條件變量是什么意思

    條件變量 條件變量用于自動(dòng)阻塞線程,直到某個(gè)特定事件發(fā)生或某個(gè)條件滿足為止,通常情況下,條件
    的頭像 發(fā)表于 07-21 11:18 ?457次閱讀

    如何實(shí)現(xiàn)一個(gè)多讀多寫的線程安全的無隊(duì)列

    加鎖。那么如何實(shí)現(xiàn)一個(gè)多讀多寫的線程安全的無隊(duì)列呢? 互斥:mutexqueue(太簡(jiǎn)單不介紹了) 互斥+
    的頭像 發(fā)表于 11-08 15:25 ?1023次閱讀
    如何實(shí)現(xiàn)一個(gè)多讀多寫的線程安全的無<b class='flag-5'>鎖</b>隊(duì)列

    互斥、條件變量、讀寫、自旋及信號(hào)量介紹

    一、互斥(同步) 在多任務(wù)操作系統(tǒng)中,同時(shí)運(yùn)行的多個(gè)任務(wù)可能都需要使用同一種資源。這個(gè)過程有點(diǎn)類似于,公司部門里,我在使用著打印機(jī)打印東西的同時(shí)(還沒有打印完),別人剛好也在此刻使用打印機(jī)打印東西
    的頭像 發(fā)表于 11-10 16:16 ?1468次閱讀
    <b class='flag-5'>互斥</b><b class='flag-5'>鎖</b>、<b class='flag-5'>條件</b><b class='flag-5'>變量</b>、讀寫<b class='flag-5'>鎖</b>、自旋<b class='flag-5'>鎖</b>及信號(hào)量介紹

    互斥和自旋的實(shí)現(xiàn)原理

    互斥和自旋是操作系統(tǒng)中常用的同步機(jī)制,用于控制對(duì)共享資源的訪問,以避免多個(gè)線程或進(jìn)程同時(shí)訪問同一資源,從而引發(fā)數(shù)據(jù)不一致或競(jìng)爭(zhēng)條件等問題。 互斥
    的頭像 發(fā)表于 07-10 10:07 ?344次閱讀