Linux多进程编程
fork系統(tǒng)調(diào)用
#include <sys/types.h> #include <unistd.h>/* Clone the calling process, creating an exact copy.Return -1 for errors, 0 to the new process,and the process ID of the new process to the old process. */ extern __pid_t fork (void) __THROWNL;該函數(shù)的每次調(diào)用都返回兩次,在父進程中返回的是子進程的PID,在子進程中則返回0。 該返回值是后續(xù)代碼判斷當(dāng)前進程是父進程還是子進程的依據(jù)。fork調(diào)用失敗時返回-1,并設(shè)置ermo。
fork函數(shù)復(fù)制當(dāng)前進程,在內(nèi)核進程表中創(chuàng)建一個新的進程表項。新的程表項有很多屬性和原進程相同,比如堆指針、棧指針和標(biāo)志寄存器的值。但也有許多屬性被賦予了新的值,比如該進程的PPID被設(shè)置成原進程的PID,信號位圖被清除(原進程設(shè)置的信號處理函數(shù)不再對新進程起作用)。
子進程的代碼與父進程完全相同,同時它還會復(fù)制父進程的數(shù)據(jù)(堆數(shù)據(jù)、棧數(shù)據(jù)和靜態(tài)數(shù)據(jù))。數(shù)據(jù)的復(fù)制采用的是所謂的寫時復(fù)制(copy on writte),即只有在任一進程(父進程或子進程)對數(shù)據(jù)執(zhí)行了寫操作時,復(fù)制才會發(fā)生(先是缺頁中斷,然后操作系統(tǒng)給子進程分配內(nèi)存并復(fù)制父進程的數(shù)據(jù))。即便如此,如果我們在程序中分配了大量內(nèi)存,那么使用fork時也應(yīng)當(dāng)十分謹(jǐn)慎,盡量避免沒必要的內(nèi)存分配和數(shù)據(jù)復(fù)制。
此外,創(chuàng)建子進程后,父進程中打開的文件描述符默認(rèn)在子進程中也是打開的,且文件描述符的引用計數(shù)加1。不僅如此,父進程的用戶根目錄、當(dāng)前工作目錄等變量的引用計數(shù)均會加1。
exec系列系統(tǒng)調(diào)用
有時我們需要在子進程中執(zhí)行其他程序,即替換當(dāng)前進程映像,這就需要使用如下exec
系列函數(shù)之一:
path參數(shù)指定可執(zhí)行文件的完整路徑,file 參數(shù)可以接受文件名,該文件的具體位置則在環(huán)境變量PATH中搜尋。arg 接受可變參數(shù),argv 則接受參數(shù)數(shù)組,它們都會被傳遞給新程序(path或file指定的程序)的main兩數(shù)。envp參數(shù)用于設(shè)置新程序的環(huán)境變量。
如果未設(shè)置它,則新程序?qū)⑹褂糜扇肿兞縠nviron指定的環(huán)境變量。一般情況下,exec函數(shù)是不返回的,除非出錯。它出錯時返回-1,并設(shè)置沒出錯,則原程序中exec調(diào)用之后的代碼都不會執(zhí)行,因為此時原程序已經(jīng)被exec的參數(shù)指定的程序完全替換(包括代碼和數(shù)據(jù))。
exec函數(shù)不會關(guān)閉原程序打開的文件描述符,除非該文件描述符被設(shè)置了類似SOCK_CLOEXEC的屬性。
處理僵尸進程
對于多進程程序而言,父進程一般需要跟蹤子進程的退出狀態(tài)。因此,當(dāng)子進程結(jié)束運行時,內(nèi)核不會立即釋放該進程的進程表表項,以滿足父進程后續(xù)對該子進程退出信息的查詢(如果父進程還在運行)。在子進程結(jié)束運行之后,父進程讀取其退出狀態(tài)之前,我們稱該子進程處于僵尸態(tài)。另外-一種使子進程進人僵尸態(tài)的情況是:父進程結(jié)束或者異常終止,而子進程繼續(xù)運行。此時子進程的PPID將被操作系統(tǒng)設(shè)置為1,即init進程。init 進程接管了該子進程,并等待它結(jié)束。在父進程退出之后,子進程退出之前,該子進程處于僵尸態(tài)。由此可見,無論哪種情況,如果父進程沒有正確地處理子進程的返回信息,子進程都將停留在僵尸態(tài),并占據(jù)著內(nèi)核資源。這是絕對不能容許的,畢竟內(nèi)核資源有限。下面這對函數(shù)在父進程中調(diào)用,以等待子進程的結(jié)束,并獲取子進程的返回信息,從而避免了僵尸進程的產(chǎn)生,或者使子進程的僵尸態(tài)立即結(jié)束:
/* Wait for a child to die. When one does, put its status in *STAT_LOCand return its process ID. For errors, return (pid_t) -1.This function is a cancellation point and therefore not marked with__THROW. */ extern __pid_t wait (int *__stat_loc);/* Wait for a child matching PID to die.If PID is greater than 0, match any process whose process ID is PID.If PID is (pid_t) -1, match any process.If PID is (pid_t) 0, match any process with thesame process group as the current process.If PID is less than -1, match any process whoseprocess group is the absolute value of PID.If the WNOHANG bit is set in OPTIONS, and that childis not already dead, return (pid_t) 0. If successful,return PID and store the dead child's status in STAT_LOC.Return (pid_t) -1 for errors. If the WUNTRACED bit isset in OPTIONS, return status for stopped children; otherwise don't.This function is a cancellation point and therefore not marked with__THROW. */ extern __pid_t waitpid (__pid_t __pid, int *__stat_loc, int __options);wait函數(shù)將阻塞進程,直到該進程的某個子進程結(jié)束運行為止。它返回結(jié)束運行的子進程的PID,并將該子進程的退出狀態(tài)信息存儲于stat_loc 參數(shù)指向的內(nèi)存中。sys/wait.h 頭文件中定義了幾個宏來幫助解釋子進程的退出狀態(tài)信息,如表所示。
wait函數(shù)的阻塞特性顯然不是服務(wù)器程序期望的,而waitpid函數(shù)解決了這個問題。waitpid只等待由pid參數(shù)指定的子進程。如果pid取值為-1,那么它就和wait函數(shù)相同,即等待任意一個子進程結(jié)束。statloc參數(shù)的含義和wait函數(shù)的statloc參數(shù)相同。options參數(shù)可以控制waitpid函數(shù)的行為。該參數(shù)最常用的取值是WNOHANG。當(dāng)options的取值是WNOHANG時,waitpid調(diào)用將是非阻塞的:如果pid指定的目標(biāo)子進程還沒有結(jié)束或意外終止,則waitpid立即返回0;如果目標(biāo)子進程確實正常退出了,則waitpid返回該子進程的PID。waitpid 調(diào)用失敗時返回-1并設(shè)置errno。
要在事件已經(jīng)發(fā)生的情況下執(zhí)行非阻塞調(diào)用才能提高程序的效率。對waitpid函數(shù)而言,我們最好在某個子進程退出之后再調(diào)用它。那么父進程從何得知某個子進程已經(jīng)退出了呢?這正是SIGCHLD信號的用途。當(dāng)一個進程結(jié)束時,它將給其父進程發(fā)送一個SIGCHLD信號。因此,我們可以在父進程中捕獲SIGCHLD信號,并在信號處理函數(shù)中調(diào)用waitpid函數(shù)以"徹底結(jié)束"一個子進程。
static void handle_child(int sig) {pid_t pid;int stat;while ((pid = waitpid(-1, &stat, WNOHANG)) > 0){/*對結(jié)束的子進程善后處理*/} }管道
管道能在父、子進程間傳遞數(shù)據(jù),利用的是fork調(diào)用之后兩個管道文件描述符( fd[0]和fd[1])都保持打開。一對這樣的文件描述符只能保證父、子進程間一個方向的數(shù)據(jù)傳輸,父進程和子進程必須有一個關(guān)閉fd[0],另一個關(guān)閉fd[1]。比如,我們要使用管道實現(xiàn)從父進程向子進程寫數(shù)據(jù),就應(yīng)該按照圖所示來操作。
顯然,如果要實現(xiàn)父、子進程之間的雙向數(shù)據(jù)傳輸,就必須使用兩個管道。socket 編程接口提供了一個創(chuàng)建全雙工管道的系統(tǒng)調(diào)用: socketpair,以實現(xiàn)在父進程和日志服務(wù)子進程之間傳遞日志信息。
信號量
當(dāng)多個進程同時訪問系統(tǒng)上的某個資源的時候,比如同時寫一個數(shù)據(jù)庫的某條記錄,或者同時修改某個文件,就需要考慮進程的同步問題,以確保任一時刻只有一個進程可以擁有對資源的獨占式訪問。通常,程序?qū)蚕碣Y源的訪問的代碼只是很短的一段,但就是這一段代碼引發(fā)了進程之間的競態(tài)條件。我們稱這段代碼為關(guān)鍵代碼段,或者臨界區(qū)。對進程同步,也就是確保任一時刻只有一個進程能進人關(guān)鍵代碼段。
要編寫具有通用目的的代碼,以確保關(guān)鍵代碼段的獨占式訪問是非常困難的。有兩個名為Dekker算法和Peterson算法的解決方案,它們試圖從語言本身(不需要內(nèi)核支持)解決并發(fā)問題。但它們依賴于忙等待,即進程要持續(xù)不斷地等待某個內(nèi)存位置狀態(tài)的改變。這種方式下CPU利用率太低,顯然是不可取的。
Dijkstra提出的信號量( Semaphore)概念是并發(fā)編程領(lǐng)域邁出的重要一步。信號量是一種特殊的變量,它只能取自然數(shù)值并且只支持兩種操作:等待(wait) 和信號(signal)。 不過在Linux/UNIX中,“等待”和“信號”都已經(jīng)具有特殊的含義,所以對信號量的這兩種操作更常用的稱呼是P、V操作。這兩個字母來自于荷蘭語單詞passeren (傳遞,就好像進人臨界區(qū))和vrijgeven (釋放,就好像退出臨界區(qū))。假設(shè)有信號量SV,則對它的P、V操作
- P(SV),如果SV的值大于0,就將它減1;如果sv的值為0,則掛起進程的執(zhí)行。
- V(SV),如果有其他進程因為等待SV而掛起,則喚醒之;如果沒有,則將sv加1。
信號量的取值可以是任何自然數(shù)。但最常用的、最簡單的信號量是二進制信號量,它只能取0和1這兩個值。本書僅討論二進制信號量。使用二進制信號量同步兩個進程,以確保關(guān)鍵代碼段的獨占式訪問的一個典型例子如圖所示。
在圖中,當(dāng)關(guān)鍵代碼段可用時,二進制信號量SV的值為1,進程A和B都有機會進人關(guān)鍵代碼段。如果此時進程A執(zhí)行了P(SV)操作將SV減1,則進程B若再執(zhí)行P(SV)操作就會被掛起。直到進程A離開關(guān)鍵代碼段,并執(zhí)行V(SV)操作將SV加1,關(guān)鍵代碼段才重新變得可用。如果此時進程B因為等待SV而處于掛起狀態(tài),則它將被喚醒,并進入關(guān)鍵代碼段。同樣,這時進程A如果再執(zhí)行P(SV)操作,則也只能被操作系統(tǒng)掛起以等待進程B退出關(guān)鍵代碼段。
Linux信號量的API都定義在sys/sem.h頭文件中,主要包含3個系統(tǒng)調(diào)用:semget,semop和semctl。它們都被設(shè)計為操作一組信號量,即信號量集,而不是單個信號量,因此這些接口看上去多少比我們期望的要復(fù)雜一點。
semget系統(tǒng)調(diào)用
semget系統(tǒng)調(diào)用創(chuàng)建一個新的信號量集,或者獲取-一個已經(jīng)存在的信號量集。其定義如下:
#include <sys/sem.h>/* Get semaphore. */ extern int semget (key_t __key, int __nsems, int __semflg) __THROW;key參數(shù)是一個鍵值,用來標(biāo)識一個全局唯一的信號量集,就像文件名全局唯一地標(biāo)識一個文件一樣。要通過信號量通信的進程需要使用相同的鍵值來創(chuàng)建/獲取該信號量。
num_sems參數(shù)指定要創(chuàng)建/獲取的信號量集中信號量的數(shù)目。如果是創(chuàng)建信號量,則該值必須被指定;如果是獲取已經(jīng)存在的信號量,則可以把它設(shè)置為0。
sem_flags參數(shù)指定一組標(biāo)志。它低端的9個比特是該信號量的權(quán)限,其格式和含義都與系統(tǒng)調(diào)用open的mode參數(shù)相同。此外,它還可以和IPC_CREAT標(biāo)志做按位“或”運算以創(chuàng)建新的信號量集。此時即使信號量已經(jīng)存在,semget也不會產(chǎn)生錯誤。我們還可以聯(lián)合使用IPC_CREAT和IPC_EXCL標(biāo)志來確保創(chuàng)建一組新的、唯一的信號量集。在這種情況下,如果信號量集已經(jīng)存在,則semget返回錯誤并設(shè)置errno為EEXIST。這種創(chuàng)建信號量的行為與用O_CREAT和O_EXCL標(biāo)志調(diào)用open來排他式地打開一個文件相似。
semget成功時返回一個正整數(shù)值, 它是信號量集的標(biāo)識符; semget 失敗時返回-1,并設(shè)置errno。如果semget用于創(chuàng)建信號量集,則與之關(guān)聯(lián)的內(nèi)核數(shù)據(jù)結(jié)構(gòu)體semid_ds將被創(chuàng)建并初始化。semid_ds 結(jié)構(gòu)體的定義如下:
/* Data structure used to pass permission information to IPC operations.It follows the kernel ipc64_perm size so the syscall can be made directlywithout temporary buffer copy. However, since glibc defines the MODEfield as mode_t per POSIX definition (BZ#18231), it omits the __PAD1 field(since glibc does not export mode_t as 16-bit for any architecture). */ struct ipc_perm {__key_t __key; /* Key. */__uid_t uid; /* Owner's user ID. */__gid_t gid; /* Owner's group ID. */__uid_t cuid; /* Creator's user ID. */__gid_t cgid; /* Creator's group ID. */__mode_t mode; /* Read/write permission. */unsigned short int __seq; /* Sequence number. */unsigned short int __pad2;__syscall_ulong_t __glibc_reserved1;__syscall_ulong_t __glibc_reserved2; }; /* Data structure describing a set of semaphores. */ struct semid_ds {struct ipc_perm sem_perm; /* operation permission struct */__SEM_PAD_TIME (sem_otime, 1); /* last semop() time */__SEM_PAD_TIME (sem_ctime, 2); /* last time changed by semctl() */__syscall_ulong_t sem_nsems; /* number of semaphores in set */__syscall_ulong_t __glibc_reserved3;__syscall_ulong_t __glibc_reserved4; };semop系統(tǒng)調(diào)用
用于改變信號量的值,即執(zhí)行P,V操作。對信號量的操作實際上就是對這些內(nèi)核變量的操作。semop的定義如下:
/* Operate on semaphore. */ extern int semop (int __semid, struct sembuf *__sops, size_t __nsops) __THROW;sem_id 參數(shù)是由semget調(diào)用返回的信號量集標(biāo)識符,用以指定被操作的目標(biāo)信號量集。sem_ops 參數(shù)指向一個sembuf結(jié)構(gòu)體類型的數(shù)組,sembuf 結(jié)構(gòu)體的定義如下:
struct sembuf {unsigned short int sem_num; /* semaphore number */short int sem_op; /* semaphore operation */short int sem_flg; /* operation flag */ };其中,sem_num成員是信號量集中信號量的編號,0表示信號量集中的第一個信 號量。sem_op成員指定操作類型,其可選值為正整數(shù)、0和負(fù)整數(shù)。每種類型的操作的行為又受到sem_flg成員的影響。sem_flg 的可選值是IPC_NOWAIT和SEM_UNDO。IPC_NOWAIT的含義是,無論信號量操作是否成功,semop調(diào)用都將立即返回,這類似于非阻塞I/O操作。SEM_UNDO的含義是,當(dāng)進程退出時取消正在進行的semop操作。
semop系統(tǒng)調(diào)用的第3個參數(shù)num_sem_ops 指定要執(zhí)行的操作個數(shù),即sem_ops 數(shù)組中元素的個數(shù)。semop 對數(shù)組sem_ops 中的每個成員按照數(shù)組順序依次執(zhí)行操作,并且該過程是原子操作,以避免別的進程在同一時刻按照不同的順序?qū)υ撔盘柤械男盘柫繄?zhí)行semop操作導(dǎo)致的競態(tài)條件。
semop成功時返回0,失敗則返回-1并設(shè)置ermo。失敗的時候,sem_ _ops 數(shù)組中指定的所有操作都不被執(zhí)行。
semctl系統(tǒng)調(diào)用
semctl系統(tǒng)調(diào)用允許調(diào)用者對信號量進行直接控制。其定義如下:
/* Semaphore control operation. */ extern int semctl (int __semid, int __semnum, int __cmd, ...) __THROW;sem_id參數(shù)是由semget調(diào)用返回的信號量集標(biāo)識符,用以指定被操作的信號量集。參數(shù)指定被操作的信號量在信號量集中的編號。command參數(shù)指定要執(zhí)行的命令。有的命令需要調(diào)用者傳遞第4個參數(shù)。第4個參數(shù)的類型由用戶自己定義,但sys/sem.h頭文件給出了它的推薦格式,具體如下:
/* The user should define a union like the following to use it for argumentsfor `semctl'.union semun{int val; <= value for SETVALstruct semid_ds *buf; <= buffer for IPC_STAT & IPC_SETunsigned short int *array; <= array for GETALL & SETALLstruct seminfo *__buf; <= buffer for IPC_INFO};Previous versions of this file used to define this union but this isincorrect. One can test the macro _SEM_SEMUN_UNDEFINED to see whetherone must define the union or not. */ struct seminfo {int semmap; /* Linux內(nèi)核沒有使用*/int semmni; /*系統(tǒng)最多可以擁有的信號量集數(shù)目*/int semmns; /*系統(tǒng)最多可以擁有的信號量數(shù)目*/int semmnu; /* Linux內(nèi)核沒有使用*/int semmsl; /*一個信號量集最多允許包含的信號量數(shù)目*/int semopm; /* semop一次最多能執(zhí)行的sem_ op操作數(shù)目*/int semume; /* Linux 內(nèi)核沒有使用*/int semusz; /* sem undo結(jié)構(gòu)體的大小*/int semvmx; /*最大允許的信號量值*/int semaem; /*最多允許的UNDO次數(shù)(帶SEM _UNDO標(biāo)志的semop操作的次數(shù)) */ };
semctl成功時的返回值取決于command參數(shù),如表所示。semctl 失敗時返回-1,并設(shè)置errmo。
特殊鍵值IPC_ PRIVATE
semget的調(diào)用者可以給其key參數(shù)傳遞一個特殊的鍵值IPC_ PRIVATE (其值為0),這樣無論該信號量是否已經(jīng)存在,semget 都將創(chuàng)建一個新的信 號量。使用該鍵值創(chuàng)建的信號量并非像它的名字聲稱的那樣是進程私有的。其他進程,尤其是子進程,也有方法來訪問這個信號量。所以semget的man手冊的BUGS部分上說,使用名字IPC_PRIVATE有些誤導(dǎo)(歷史原因),應(yīng)該稱為IPC_NEW比如下面的代碼 就在父、子進程間使用一個 IPC_PRIVATE信號量來同步。
#include <sys/sem.h> #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <sys/wait.h>union semun {int val;struct semid_ds *buf;unsigned short int *array;struct seminfo *__buf; };void pv(int sem_id, int op) {struct sembuf sem_b;sem_b.sem_num = 0;sem_b.sem_op = op;sem_b.sem_flg = SEM_UNDO;semop(sem_id, &sem_b, 1); }int main(int argc, char *argv[]) {int sem_id = semget(IPC_PRIVATE, 1, 0666);union semun sem_un;sem_un.val = 1;semctl(sem_id, 0, SETVAL, sem_un);pid_t id = fork();if (id < 0){return 1;}else if (id == 0){printf("child try to get binary sem\n");pv(sem_id, -1);printf("child get the sem and would release it after 5 seconds\n");sleep(5);pv(sem_id, 1);exit(0);}else{printf("parent try to get binary sem\n");pv(sem_id, -1);printf("parent get the sem and would release it after 5 seconds\n");sleep(5);pv(sem_id, 1);}waitpid(id, NULL, 0);semctl(sem_id, 0, IPC_RMID, sem_un);return 0; }共享內(nèi)存
共享內(nèi)存是最高效的IPC機制,因為它不涉及進程之間的任何數(shù)據(jù)傳輸。這種高效率帶來的問題是,我們必須用其他輔助手段來同步進程對共享內(nèi)存的訪問,否則會產(chǎn)生競態(tài)條件。因此,共享內(nèi)存通常和其他進程間通信方式一起使用。Linux共享內(nèi)存的API都定義在sys/shm.h頭文件中,包括4個系統(tǒng)調(diào)用:shmget,shmat、 shmdt和shmctl。
shmget
shmget系統(tǒng)調(diào)用創(chuàng)建一段新的共享內(nèi)存,或者獲取段一已經(jīng)存在的共享內(nèi)存。其定義如下:
#include <sys/shm.h>/* Get shared memory segment. */ extern int shmget (key_t __key, size_t __size, int __shmflg) __THROW;和semget系統(tǒng)調(diào)用一樣,key 參數(shù)是一個鍵值,用來標(biāo)識- -段全局唯一的共享內(nèi)存。size參數(shù)指定共享內(nèi)存的大小,單位是字節(jié)。如果是創(chuàng)建新的共享內(nèi)存,則size值必須被指定。如果是獲取已經(jīng)存在的共享內(nèi)存,則可以把size設(shè)置為0。
shmflg參數(shù)的使用和含義與semget系統(tǒng)調(diào)用的sem_fags 參數(shù)相同。不過shmget支持兩個額外的標(biāo)志一SHM_HUGETLB和SHM_NORESERVE。 它們的含義如下:
- SHM_HUGETLB,類似于mmap的MAP_HUGETLB標(biāo)志,系統(tǒng)將使用“大頁面”來為共享內(nèi)存分配空間。
- SHM_NORESERVE,類似于mmap的MAP_NORESERVE標(biāo)志,不為共享內(nèi)存保留交換分區(qū)(swap 空間)。這樣,當(dāng)物理內(nèi)存不足的時候,對該共享內(nèi)存執(zhí)行寫操作將觸發(fā)SIGSEGV信號。
shmget成功時返回-一個正整數(shù)值,它是共享內(nèi)存的標(biāo)識符。shmget 失敗時返回-1,并設(shè)置errmo。
如果shmget用于創(chuàng)建共享內(nèi)存,則這段共享內(nèi)存的所有字節(jié)都被初始化為0,與之關(guān)聯(lián)的內(nèi)核數(shù)據(jù)結(jié)構(gòu)shmid_ds 將被創(chuàng)建并初始化。shmid_ds 結(jié)構(gòu)體的定義如下:
/* Data structure describing a shared memory segment. */ struct shmid_ds{struct ipc_perm shm_perm; /* operation permission struct */ #if !__SHM_SEGSZ_AFTER_TIMEsize_t shm_segsz; /* size of segment in bytes */ #endif__SHM_PAD_TIME (shm_atime, 1); /* time of last shmat() */__SHM_PAD_TIME (shm_dtime, 2); /* time of last shmdt() */__SHM_PAD_TIME (shm_ctime, 3); /* time of last change by shmctl() */ #if __SHM_PAD_BETWEEN_TIME_AND_SEGSZunsigned long int __glibc_reserved4; #endif #if __SHM_SEGSZ_AFTER_TIMEsize_t shm_segsz; /* size of segment in bytes */ #endif__pid_t shm_cpid; /* pid of creator */__pid_t shm_lpid; /* pid of last shmop */shmatt_t shm_nattch; /* number of current attaches */__syscall_ulong_t __glibc_reserved5;__syscall_ulong_t __glibc_reserved6;};shmat和shmdt
共享內(nèi)存被創(chuàng)建/獲取之后,我們不能立即訪問它,而是需要先將它關(guān)聯(lián)到進程的地址空間中。使用完共享內(nèi)存之后,我們也需要將它從進程地址空間中分離。這兩項任務(wù)分別由如下兩個系統(tǒng)調(diào)用實現(xiàn):
/* Attach shared memory segment. */ extern void *shmat (int __shmid, const void *__shmaddr, int __shmflg)__THROW;/* Detach shared memory segment. */ extern int shmdt (const void *__shmaddr) __THROW;其中,shm_id參數(shù)是由shmget調(diào)用返回的共享內(nèi)存標(biāo)識符。shm_addr 參數(shù)指定將共享內(nèi)存關(guān)聯(lián)到進程的哪塊地址空間,最終的效果還受到shmfg參數(shù)的可選標(biāo)志SHM_RND的影響:
- 如果shm_addr為NUll,則被關(guān)聯(lián)的地址由操作系統(tǒng)選擇。這是推薦的做法,以確保代碼的可移植性。
- 如果shm_addr 非空,并且SHM_RND標(biāo)志未被設(shè)置,則共享內(nèi)存被關(guān)聯(lián)到addr指定的地址處。
- 如果shm_addr 非空,并且設(shè)置了SHM_RND標(biāo)志,則被關(guān)聯(lián)的地址是[shm_addr-(shm_addr%SHMLBA)]。SHMLBA的含義是“段低端邊界地址倍數(shù)”(Segment Low Boundary Address Multiple),它必須是內(nèi)存頁面大小(PAGE_SIZE)的整數(shù)倍。現(xiàn)在的Linux內(nèi)核中,它等于一個內(nèi)存頁大小。SHM_RND的含義是圓整(round),即將共享內(nèi)存被關(guān)聯(lián)的地址向下圓整到離shm_addr最近的SHMLBA的整數(shù)倍地址處。
除了SHM_ RND標(biāo)志外,shmflg 參數(shù)還支持如下標(biāo)志:
- SHM_RDONLY。進程僅能讀取共享內(nèi)存中的內(nèi)容。若沒有指定該標(biāo)志,則進程可同時對共享內(nèi)存進行讀寫操作(當(dāng)然,這需要在創(chuàng)建共享內(nèi)存的時候指定其讀寫權(quán)限)。
- SHM_REMAP如果地址shmaddr已經(jīng)被關(guān)聯(lián)到一段共享內(nèi)存 上,則重新關(guān)聯(lián)。
- SHM EXEC它指定對共享內(nèi)存段的執(zhí)行權(quán)限。對共享內(nèi)存而言,執(zhí)行權(quán)限實際上和讀權(quán)限是一樣的。
shmat成功時返回共享內(nèi)存被關(guān)聯(lián)到的地址,失敗則返回(void*)-1并設(shè)置ermo。shmat成功時,將修改內(nèi)核數(shù)據(jù)結(jié)構(gòu)shmid_ds的部分字段,如下:
- 將shm_nattach 加1。
- 將shm_lpid 設(shè)置為調(diào)用進程的PID。
- 將shm_atime 設(shè)置為當(dāng)前的時間。
shmdt函數(shù)將關(guān)聯(lián)到shm_addr 處的共享內(nèi)存從進程中分離。它成功時返回0,失敗則返回-1并設(shè)置errno。shmdt 在成功調(diào)用時將修改內(nèi)核數(shù)據(jù)結(jié)構(gòu)shmid_ds 的部分字段,如下:
- 將shm_nattach 減1。
- 將shm_lpid 設(shè)置為調(diào)用進程的PID。
- 將shm_dtime 設(shè)置為當(dāng)前的時間。
shmctl
shmctl系統(tǒng)調(diào)用控制共享內(nèi)存的某些屬性。其定義如下:
/* Shared memory control operation. */ extern int shmctl (int __shmid, int __cmd, struct shmid_ds *__buf) __THROW;其中,shm_id 參數(shù)是由shmget調(diào)用返回的共享內(nèi)存標(biāo)識符。command參數(shù)指定要執(zhí)行的命令。shmctl 支持的所有命令如表所示。
shmctl成功時的返回值取決于command參數(shù),如表13-3所示。shmetl 失敗時返回-1,并設(shè)置errno。
共享內(nèi)存POSIX方法
Linux 提供了另外一種利用mmap在無關(guān)進程之間共享內(nèi)存的方式。這種方式無須任
何文件的支持,但它需要先使用如下函數(shù)來創(chuàng)建或打開一個POSIX共享內(nèi)存對象:
shm_open的使用方法與open系統(tǒng)調(diào)用完全相同。
name參數(shù)指定要創(chuàng)建/打開的共享內(nèi)存對象。從可移植性的角度考慮,該參數(shù)應(yīng)該使用“/somename"的格式:以“/”開始,后接多個字符,且這些字符都不是“/”;以“\0”結(jié)尾,長度不超過NAME_MAX (通常是255)。
ofag參數(shù)指定創(chuàng)建方式。它可以是下列標(biāo)志中的一個或者多個的按位或:
- O_RDONLY。以只讀方式打開共享內(nèi)存對象。
- O_RDWR。以可讀、可寫方式打開共享內(nèi)存對象。
- O_CREAT。如果共享內(nèi)存對象不存在,則創(chuàng)建之。此時mode參數(shù)的最低9位將指定該共享內(nèi)存對象的訪問權(quán)限。共享內(nèi)存對象被創(chuàng)建的時候,其初始長度為0。
- O_EXCL。 和O_CREAT一起使用,如果由name指定的共享內(nèi)存對象已經(jīng)存在,則shm_open調(diào)用返回錯誤,否則就創(chuàng)建一個新的共享內(nèi)存對象。
- O_TRUNC。如果共享內(nèi)存對象已經(jīng)存在,則把它截斷,使其長度為0.
shm_open調(diào)用成功時返回一個文件描述符。該文件描述符可用于后續(xù)的mmap調(diào)用,從而將共享內(nèi)存關(guān)聯(lián)到調(diào)用進程。shm_open失敗時返回-1,并設(shè)置ermo。
和打開的文件最后需要關(guān)閉一樣,由shm_open 創(chuàng)建的共享內(nèi)存對象使用完之后也需要被刪除。這個過程是通過如下函數(shù)實現(xiàn)的:
/* Remove shared memory segment. */ extern int shm_unlink (const char *__name);該函數(shù)將name參數(shù)指定的共享內(nèi)存對象標(biāo)記為等待刪除。當(dāng)所有使用該共享內(nèi)存對象的進程都使用ummap將它從進程中分離之后,系統(tǒng)將銷毀這個共享內(nèi)存對象所占據(jù)的資源。如果代碼中使用了上述POSIX共享內(nèi)存函數(shù),則編譯的時候需要指定鏈接選項-Irt。
#include <sys/socket.h> #include <netinet/in.h> #include <arpa/inet.h> #include <assert.h> #include <stdio.h> #include <unistd.h> #include <errno.h> #include <string.h> #include <fcntl.h> #include <stdlib.h> #include <sys/epoll.h> #include <signal.h> #include <sys/wait.h> #include <sys/mman.h> #include <sys/stat.h> #include <fcntl.h>#define USER_LIMIT 5 #define BUFFER_SIZE 1024 #define FD_LIMIT 65535 #define MAX_EVENT_NUMBER 1024 #define PROCESS_LIMIT 65536struct client_data {sockaddr_in address;int connfd;pid_t pid;int pipefd[2]; };static const char *shm_name = "/my_shm"; int sig_pipefd[2]; int epollfd; int listenfd; int shmfd; char *share_mem = 0; client_data *users = 0; int *sub_process = 0; int user_count = 0; bool stop_child = false;int setnonblocking(int fd) {int old_option = fcntl(fd, F_GETFL);int new_option = old_option | O_NONBLOCK;fcntl(fd, F_SETFL, new_option);return old_option; }void addfd(int epollfd, int fd) {epoll_event event;event.data.fd = fd;event.events = EPOLLIN | EPOLLET;epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event);setnonblocking(fd); }void sig_handler(int sig) {int save_errno = errno;int msg = sig;send(sig_pipefd[1], (char *)&msg, 1, 0);errno = save_errno; }void addsig(int sig, void (*handler)(int), bool restart = true) {struct sigaction sa;memset(&sa, '\0', sizeof(sa));sa.sa_handler = handler;if (restart){sa.sa_flags |= SA_RESTART;}sigfillset(&sa.sa_mask);assert(sigaction(sig, &sa, NULL) != -1); }void del_resource() {close(sig_pipefd[0]);close(sig_pipefd[1]);close(listenfd);close(epollfd);shm_unlink(shm_name);delete[] users;delete[] sub_process; }void child_term_handler(int sig) {stop_child = true; }int run_child(int idx, client_data *users, char *share_mem) {epoll_event events[MAX_EVENT_NUMBER];int child_epollfd = epoll_create(5);assert(child_epollfd != -1);int connfd = users[idx].connfd;addfd(child_epollfd, connfd);int pipefd = users[idx].pipefd[1];addfd(child_epollfd, pipefd);int ret;addsig(SIGTERM, child_term_handler, false);while (!stop_child){int number = epoll_wait(child_epollfd, events, MAX_EVENT_NUMBER, -1);if ((number < 0) && (errno != EINTR)){printf("epoll failure\n");break;}for (int i = 0; i < number; i++){int sockfd = events[i].data.fd;if ((sockfd == connfd) && (events[i].events & EPOLLIN)){memset(share_mem + idx * BUFFER_SIZE, '\0', BUFFER_SIZE);ret = recv(connfd, share_mem + idx * BUFFER_SIZE, BUFFER_SIZE - 1, 0);if (ret < 0){if (errno != EAGAIN){stop_child = true;}}else if (ret == 0){stop_child = true;}else{send(pipefd, (char *)&idx, sizeof(idx), 0);}}else if ((sockfd == pipefd) && (events[i].events & EPOLLIN)){int client = 0;ret = recv(sockfd, (char *)&client, sizeof(client), 0);if (ret < 0){if (errno != EAGAIN){stop_child = true;}}else if (ret == 0){stop_child = true;}else{send(connfd, share_mem + client * BUFFER_SIZE, BUFFER_SIZE, 0);}}else{continue;}}}close(connfd);close(pipefd);close(child_epollfd);return 0; }int main(int argc, char *argv[]) {if (argc <= 2){printf("usage: %s ip_address port_number\n", basename(argv[0]));return 1;}const char *ip = argv[1];int port = atoi(argv[2]);int ret = 0;struct sockaddr_in address;bzero(&address, sizeof(address));address.sin_family = AF_INET;inet_pton(AF_INET, ip, &address.sin_addr);address.sin_port = htons(port);listenfd = socket(PF_INET, SOCK_STREAM, 0);assert(listenfd >= 0);ret = bind(listenfd, (struct sockaddr *)&address, sizeof(address));assert(ret != -1);ret = listen(listenfd, 5);assert(ret != -1);user_count = 0;users = new client_data[USER_LIMIT + 1];sub_process = new int[PROCESS_LIMIT];for (int i = 0; i < PROCESS_LIMIT; ++i){sub_process[i] = -1;}epoll_event events[MAX_EVENT_NUMBER];epollfd = epoll_create(5);assert(epollfd != -1);addfd(epollfd, listenfd);ret = socketpair(PF_UNIX, SOCK_STREAM, 0, sig_pipefd);assert(ret != -1);setnonblocking(sig_pipefd[1]);addfd(epollfd, sig_pipefd[0]);addsig(SIGCHLD, sig_handler);addsig(SIGTERM, sig_handler);addsig(SIGINT, sig_handler);addsig(SIGPIPE, SIG_IGN);bool stop_server = false;bool terminate = false;shmfd = shm_open(shm_name, O_CREAT | O_RDWR, 0666);assert(shmfd != -1);ret = ftruncate(shmfd, USER_LIMIT * BUFFER_SIZE);assert(ret != -1);share_mem = (char *)mmap(NULL, USER_LIMIT * BUFFER_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, shmfd, 0);assert(share_mem != MAP_FAILED);close(shmfd);while (!stop_server){int number = epoll_wait(epollfd, events, MAX_EVENT_NUMBER, -1);if ((number < 0) && (errno != EINTR)){printf("epoll failure\n");break;}for (int i = 0; i < number; i++){int sockfd = events[i].data.fd;if (sockfd == listenfd){struct sockaddr_in client_address;socklen_t client_addrlength = sizeof(client_address);int connfd = accept(listenfd, (struct sockaddr *)&client_address, &client_addrlength);if (connfd < 0){printf("errno is: %d\n", errno);continue;}if (user_count >= USER_LIMIT){const char *info = "too many users\n";printf("%s", info);send(connfd, info, strlen(info), 0);close(connfd);continue;}users[user_count].address = client_address;users[user_count].connfd = connfd;ret = socketpair(PF_UNIX, SOCK_STREAM, 0, users[user_count].pipefd);assert(ret != -1);pid_t pid = fork();if (pid < 0){close(connfd);continue;}else if (pid == 0){close(epollfd);close(listenfd);close(users[user_count].pipefd[0]);close(sig_pipefd[0]);close(sig_pipefd[1]);run_child(user_count, users, share_mem);munmap((void *)share_mem, USER_LIMIT * BUFFER_SIZE);exit(0);}else{close(connfd);close(users[user_count].pipefd[1]);addfd(epollfd, users[user_count].pipefd[0]);users[user_count].pid = pid;sub_process[pid] = user_count;user_count++;}}else if ((sockfd == sig_pipefd[0]) && (events[i].events & EPOLLIN)){int sig;char signals[1024];ret = recv(sig_pipefd[0], signals, sizeof(signals), 0);if (ret == -1){continue;}else if (ret == 0){continue;}else{for (int i = 0; i < ret; ++i){switch (signals[i]){case SIGCHLD:{pid_t pid;int stat;while ((pid = waitpid(-1, &stat, WNOHANG)) > 0){int del_user = sub_process[pid];sub_process[pid] = -1;if ((del_user < 0) || (del_user > USER_LIMIT)){printf("the deleted user was not change\n");continue;}epoll_ctl(epollfd, EPOLL_CTL_DEL, users[del_user].pipefd[0], 0);close(users[del_user].pipefd[0]);users[del_user] = users[--user_count];sub_process[users[del_user].pid] = del_user;printf("child %d exit, now we have %d users\n", del_user, user_count);}if (terminate && user_count == 0){stop_server = true;}break;}case SIGTERM:case SIGINT:{printf("kill all the clild now\n");//addsig( SIGTERM, SIG_IGN );//addsig( SIGINT, SIG_IGN );if (user_count == 0){stop_server = true;break;}for (int i = 0; i < user_count; ++i){int pid = users[i].pid;kill(pid, SIGTERM);}terminate = true;break;}default:{break;}}}}}else if (events[i].events & EPOLLIN){int child = 0;ret = recv(sockfd, (char *)&child, sizeof(child), 0);printf("read data from child accross pipe\n");if (ret == -1){continue;}else if (ret == 0){continue;}else{for (int j = 0; j < user_count; ++j){if (users[j].pipefd[0] != sockfd){printf("send data to child accross pipe\n");send(users[j].pipefd[0], (char *)&child, sizeof(child), 0);}}}}}}del_resource();return 0; }消息隊列
msgget系統(tǒng)調(diào)用創(chuàng)建一個消 息隊列,或者獲取-一個已有的消息隊列。其定義如下:
#include <sys/msg.h>/* Get messages queue. */ extern int msgget (key_t __key, int __msgflg) __THROW;和semget系統(tǒng)調(diào)用一樣,key 參數(shù)是一個鍵值,用來標(biāo)識一個全局唯一的消息隊列。msgflg參數(shù)的使用和含義與semget系統(tǒng)調(diào)用的sem_flags 參數(shù)相同。msgget成功時返回一個正整數(shù)值,它是消息隊列的標(biāo)識符。msgget 失敗時返回-1,并設(shè)置errno。
如果msgget用于創(chuàng)建消息隊列,則與之關(guān)聯(lián)的內(nèi)核數(shù)據(jù)結(jié)構(gòu)msqid_ds 將被創(chuàng)建并初始化。msqid_ds 結(jié)構(gòu)體的定義如下:
/* Structure of record for one message inside the kernel.The type `struct msg' is opaque. */ struct msqid_ds {struct ipc_perm msg_perm; /* structure describing operation permission */__MSQ_PAD_TIME (msg_stime, 1); /* time of last msgsnd command */__MSQ_PAD_TIME (msg_rtime, 2); /* time of last msgrcv command */__MSQ_PAD_TIME (msg_ctime, 3); /* time of last change */__syscall_ulong_t __msg_cbytes; /* current number of bytes on queue */msgqnum_t msg_qnum; /* number of messages currently on queue */msglen_t msg_qbytes; /* max number of bytes allowed on queue */__pid_t msg_lspid; /* pid of last msgsnd() */__pid_t msg_lrpid; /* pid of last msgrcv() */__syscall_ulong_t __glibc_reserved4;__syscall_ulong_t __glibc_reserved5; };msgsnd
msgsnd系統(tǒng)調(diào)用把- -條消息添加到消息隊列中。其定義如下:
/* Send message to message queue.This function is a cancellation point and therefore not marked with__THROW. */ extern int msgsnd (int __msqid, const void *__msgp, size_t __msgsz,int __msgflg);msqid參數(shù)是由msgget調(diào)用返回的消息隊列標(biāo)識符。
msg_ ptr參數(shù)指向- -個準(zhǔn)備發(fā)送的消息,消息必須被定義為如下類型:
其中,mtype 成員指定消息的類型,它必須是一個正整數(shù)。mtext 是消息數(shù)據(jù)。msg_sz參數(shù)是消息的數(shù)據(jù)部分(mtext) 的長度。這個長度可以為0,表示沒有消息數(shù)據(jù)。
msgfg參數(shù)控制msgsnd的行為。它通常僅支持IPC_NOWAIT標(biāo)志,即以非阻塞的方式發(fā)送消息。默認(rèn)情況下,發(fā)送消息時如果消息隊列滿了,則msgsnd將阻塞。若IPC_NOWAIT標(biāo)志被指定,則msgsnd將立即返回并設(shè)置ermno為EAGAIN。
處于阻塞狀態(tài)的msgsnd調(diào)用可能被如下兩種異常情況所中斷:
- 消息隊列被移除。此時msgsnd調(diào)用將立即返回并設(shè)置ermo為EIDRM。
- 程序接收到信號。此時msgsnd調(diào)用將立即返回并設(shè)置ermo為EINTR。
msgsnd成功時返回0,失敗則返回-1并設(shè)置errmo。msgsnd 成功時將修改內(nèi)核數(shù)據(jù)結(jié)構(gòu)msqid_ds的部分字段,如下所示:
- 將msg_qnum加1。
- 將msg_lspid 設(shè)置為調(diào)用進程的PID。
- 將msg_stime 設(shè)置為當(dāng)前的時間。
msgrcv
msgrcv系統(tǒng)調(diào)用從消息隊列中獲取消息。其定義如下:
/* Receive message from message queue.This function is a cancellation point and therefore not marked with__THROW. */ extern ssize_t msgrcv (int __msqid, void *__msgp, size_t __msgsz,long int __msgtyp, int __msgflg);msqid參數(shù)是由msgget調(diào)用返回的消息隊列標(biāo)識符。
msg_ ptr 參數(shù)用于存儲接收的消息,msg. _sz 參數(shù)指的是消息數(shù)據(jù)部分的長度。
msgtype參數(shù)指定接收何種類型的消息。我們可以使用如下幾種方式來指定消息類型:
- msgtype等于0。讀取消息隊列中的第-一個消息。
- msgtype大于0。讀取消息隊列中第一個類型為msgtype的消息(除非指定了標(biāo)志MSG_ EXCEPT)。
- msgtype小于0。讀取消息隊列中第-一個類型值比msgtype的絕對值小的消息。
參數(shù)msgfg控制msgrev函數(shù)的行為。它可以是如下一-些標(biāo)志的按位或:
- IPC_NOWAIT。如果消息隊列中沒有消息,則msgrcv調(diào)用立即返回并設(shè)置ermno為ENOMSG。
- MSG_EXCEPT.如果msgtype大于0,則接收消息隊列中第-一個非msgtype類型的消息。
- MSG_NOERROR.如果消息數(shù)據(jù)部分的長度超過了msg_sz, 就將它截斷。
處于阻塞狀態(tài)的msgrcv調(diào)用還可能被如下兩種異常情況所中斷:
- 消息隊列被移除。此時msgrcv調(diào)用將立即返回并設(shè)置errmo 為EIDRM。
- 程序接收到信號。此時msgrcv調(diào)用將立即返回并設(shè)置errmo 為EINTR。
msgrcv成功時返回0,失敗則返回-1并設(shè)置errmo。 msgrcv 成功時將修改內(nèi)核數(shù)據(jù)結(jié)構(gòu)msqid_ds 的部分字段,如下所示:
- 將msg_qnum減1。
- 將msg_lrpid 設(shè)置為調(diào)用進程的PID。
- 將msg_rtime 設(shè)置為當(dāng)前的時間。
msgctl
msqid參數(shù)是由msgget調(diào)用返回的共享內(nèi)存標(biāo)識符。__cmd 參數(shù)指定要執(zhí)行的命令。
msgctl支持的所有命令如表所示。
msgctl成功時的返回值取決于__cmd參數(shù),如表所示。msgctl函數(shù)失敗時返回-1
并設(shè)置ermo。
在進程間傳遞文件描述符
由于fork調(diào)用之后,父進程中打開的文件描述符在子進程中仍然保持打開,所以文件描述符可以很方便地從父進程傳遞到子進程。需要注意的是,傳遞一個文件描述符并不是傳遞一個文件描述符的值,而是要在接收進程中創(chuàng)建一個新的文件描述符,并且該文件描述符和發(fā)送進程中被傳遞的文件描述符指向內(nèi)核中相同的文件表項。
#include <sys/socket.h> #include <fcntl.h> #include <stdio.h> #include <unistd.h> #include <stdlib.h> #include <assert.h> #include <string.h>static const int CONTROL_LEN = CMSG_LEN(sizeof(int));void send_fd(int fd, int fd_to_send) {struct iovec iov[1];struct msghdr msg;char buf[0];iov[0].iov_base = buf;iov[0].iov_len = 1;msg.msg_name = NULL;msg.msg_namelen = 0;msg.msg_iov = iov;msg.msg_iovlen = 1;cmsghdr cm;cm.cmsg_len = CONTROL_LEN;cm.cmsg_level = SOL_SOCKET;cm.cmsg_type = SCM_RIGHTS;*(int *)CMSG_DATA(&cm) = fd_to_send;msg.msg_control = &cm;msg.msg_controllen = CONTROL_LEN;sendmsg(fd, &msg, 0); }int recv_fd(int fd) {struct iovec iov[1];struct msghdr msg;char buf[0];iov[0].iov_base = buf;iov[0].iov_len = 1;msg.msg_name = NULL;msg.msg_namelen = 0;msg.msg_iov = iov;msg.msg_iovlen = 1;cmsghdr cm;msg.msg_control = &cm;msg.msg_controllen = CONTROL_LEN;recvmsg(fd, &msg, 0);int fd_to_read = *(int *)CMSG_DATA(&cm);return fd_to_read; }int main() {int pipefd[2];int fd_to_pass = 0;int ret = socketpair(PF_UNIX, SOCK_DGRAM, 0, pipefd);assert(ret != -1);pid_t pid = fork();assert(pid >= 0);if (pid == 0){close(pipefd[0]);fd_to_pass = open("test.txt", O_RDWR, 0666);send_fd(pipefd[1], (fd_to_pass > 0) ? fd_to_pass : 0);close(fd_to_pass);exit(0);}close(pipefd[1]);fd_to_pass = recv_fd(pipefd[0]);char buf[1024];memset(buf, '\0', 1024);read(fd_to_pass, buf, 1024);printf("I got fd %d and data %s\n", fd_to_pass, buf);close(fd_to_pass); }總結(jié)
以上是生活随笔為你收集整理的Linux多进程编程的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: FFmpeg 出现错误 Invalida
- 下一篇: 你不得不熟悉且熟练掌握的前端知识