进程间通信——消息队列(Message queue)
在Linux中,IPC消息隊列是一個雙向通信的全內存設計,即內核保證了讀寫順序和數據同步,并且是性能比較好的先進先出的數據結構。消息隊列的應用場景:比如異步任務處理,搶占式的數據分發,順序緩存區等。
消息隊列的產生原因
消息隊列其實就是消息傳輸過程中保存消息的容器,既然有了管道,為什么要出現消息隊列呢?理由如下:
(1)生命周期:匿名管道和命名管道都是隨進程的,意味著管道的生命周期是隨進程的退出而退出的。
(2)傳送方式:管道傳送數據時以無格式字節流的形式傳送,給程序的開發帶來不便。
(3)信號傳遞量:擔當數據傳送媒介的管道,其緩沖區大小受到較大的限制。
鑒于上面的三種原因:IPC方式下的另一種進程間通信——>消息隊列應運而生。它克服了信號傳遞信息少、管道只能承載無格式字節流以及緩沖區大小受限等缺點。
什么是消息隊列?
[普通定義]:消息隊列提供了一種從一個進程向另一個進程發送一個數據塊的方法,每個數據塊都被認為含有一個類型,接受者接受的數據塊可以有不同的類型值,我們可以通過發送消息來避免命名管道的同步和阻塞問題。
[最佳定義]:內核地址空間中的內部鏈表,消息可以順序地發送到隊列中,并以幾種不同的方式從隊列中獲取,每個消息隊列是由IPC標識符所唯一標識的。
消息隊列與管道的不同在于:消息隊列是基于消息的,而管道是基于字節流的,且消息隊列的讀取不一定是先入先出,而且如果你沒有顯示的刪除它,那么在關機之前它一直存在。
消息隊列的上限
消息隊列與管道也是一樣的不足,就是每個消息隊列的最大長度是有上限的(MSGMAX),每個消息隊列的總的字節數是有上限(MSGMNB),系統上消息隊列的總數也是有一個上限(MSGMNI)。
[root@localhost panpan]# cat /proc/sys/kernel/msgmax 65536 //每個消息的最大長度 [root@localhost panpan]# cat /proc/sys/kernel/msgmnb 65536 //每個消息隊列的總的字節數 [root@localhost panpan]# cat /proc/sys/kernel/msgmni 1735 //系統消息隊列的總數IPC對象數據結構
內核為每個IPC對象維護了一個數據結構(存在于/usr/include/linux/ipc.h)
struct ipc_perm {__kernel_key_t key;//端口號__kernel_uid_t uid;//所有者的用戶ID__kernel_gid_t gid;//所有者組ID__kernel_uid_t cuid;//創建者的用戶ID__kernel_gid_t cgid;//創建者的組ID__kernel_mode_t mode; //訪問模式unsigned short seq;//順序值 };不僅如此,消息隊列,共享內存和信號量都有這樣一個共同的數據結構。
消息隊列結構
消息隊列結構存在于/usr/include/linux/msg.h目錄下:
struct msqid_ds {struct ipc_perm msg_perm; /* IPC對象結構體 */struct msg *msg_first; /*消息隊列頭指針*/struct msg *msg_last; /*消息隊列尾指針*/__kernel_time_t msg_stime; /*最后一次插入消息隊列消息的時間*/__kernel_time_t msg_rtime; /*最后一次接收消息即刪除隊列中一個消息的時間*/__kernel_time_t msg_ctime; /* 最后修改隊列的時間*/unsigned long msg_lcbytes; /* Reuse junk fields for 32 bit */unsigned long msg_lqbytes; /* ditto */unsigned short msg_cbytes; /*隊列上所有消息總的字節數 */unsigned short msg_qnum; /*在當前隊列上消息的個數 */unsigned short msg_qbytes; /* 隊列最大的字節數 */__kernel_ipc_pid_t msg_lspid;/* 發送最后一條消息的進程的pid */__kernel_ipc_pid_t msg_lrpid;/* 接收最后一條消息的進程的pid */ };從上圖可以看出,消息隊列結構體中的第一條內容就是IPC結構體,即IPC結構體是共用的,后面的都是消息隊列所有的成員,并且消息隊列是由鏈表來實現的。
消息隊列相關函數接口說明
1.msgget ( )函數(用于創建和訪問一個消息隊列)
int msgget(key_t key, int msgflg);參數[key]:類似于端口號,也可以由ftok函數生成。
參數[msgflg]:存在兩個IPC標志,即IPC_CREAT和IPC_EXCL。
(1)IPC_CREAT:如果IPC不存在,則創建一個IPC資源,否則打開操作。
(2)IPC_EXCL:只有在共享內存不存在的時候,新的共享內存才建立,否則出錯。如果單獨使用IPC_EXCL,xxxget ( )函數要么返回一個已經存在的共享內存的操作符,要么返回一個新建的共享內存的標識符。
(3)如果IPC_CREAT和IPC_EXCL同時使用,xxxget ( ) 函數將返回一個新建的IPC標識符,如果該IPC資源已經存在,則返回-1出錯,這樣就保證了只有是二者同時使用,我們就可以保證所得的對象一定是新建的。
2.msgsnd ( ) 和 msgrcv ( ) 函數(msgrcv從隊列中取出消息,msgsnd將數據放到消息隊列中)。
int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg); ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp, int msgflg);參數[msgid]:由msgget函數返回的消息隊列標識符。
參數[msgp]:指向一個準備發送消息的指針,此位置用來暫時存儲發送和接受的消息:是一個用戶可以定義的通用結構體,形態如下:
struct msgstu {long type; //大于0char mtext[用戶指定大小]; };參數[msgsz]:指msgp指向的消息的長度。
參數[msgtyp]:從消息隊列內讀取的消息形態,如果值為0,則表示消息隊列中的所有消息都會被讀取。
參數[msgflag]:用來執行核心程序在隊列沒有數據的情況下采取的行動,如果msgflg和常數IPC_NOWAIT合用,則在msgsnd ( ) 執行時,若消息隊列已滿,則msgsnd ( )就不會被阻塞,而會立即返回-1;如果執行的是msgrcv ( ) ,則在消息隊列為空時,不做等待馬上返回-1,并設定錯誤碼為ENOMSG;當msgflg為0時,msgsnd ( ) 以及 msgrcv ( ) 在隊列為滿或者為空的情況下,采取阻塞等待的處理模式。
3.msgctl ( ) 函數(控制消息隊列,與共享內存的shmctl類似)
int msgctl(int msqid, int cmd, struct msqid_ds *buf);參數[msgid]:由msgget函數返回的消息隊列標識符。
參數[cmd]:指將要采取的行動,系統定義了3中cmd操作:
IPC_STAT:該命令用來獲取消息隊列對應的msgid_ds數據結構,并將保存到buf指定的地址空間。
IPC_SET:設定消息隊列的屬性,要設置的屬性存儲在buf里面。
IPC_RMID:從內核中刪除msgid表示的消息隊列。
參數[buf]:指向msg_ds結構的指針,它指向消息隊列模式和訪問權限的結構。
msg_ds結構體的內容如下:
struct msgid_ds { uid_t shm_perm.uid; uid_t shm_perm.gid; mode_t shm_perm.mode; };成功時返回0,失敗時返回-1.
key_t鍵
System_v IPC使用key_t鍵作為他們的名字,在CentOS6.5 key_t的值被定義為int類型。
/usr/include/sys/ipc.h
typedef __key_t key_t;/usr/include/bits/types.h
__STD_TYPE __DADDR_T_TYPE __daddr_t; /* The type of a disk address. */ __STD_TYPE __SWBLK_T_TYPE __swblk_t; /* Type of a swap block maybe? */ __STD_TYPE __KEY_T_TYPE __key_t; /* Type of an IPC key. */ /usr/include/bits/typesizes.h#define __KEY_T_TYPE __S32_TYPE
/usr/include/bits/types
# define __S64_TYPE long int # define __U64_TYPE unsigned long intftok ( ) 函數
函數ftok把一個已經存在的路徑名和一個整數標識符轉換成一個key_t值,稱為IPC鍵。
key_t ftok(const char *pathname, int proj_id);參數[pathname]:通常是跟本應用有關的目錄。
參數[proj_id]:指的是本應用所用到的IPC的一個序列號,成功返回IPC鍵,失敗返回-1。
注:兩個進程如在pathname和proj_id上達成一致(即約定好),雙方就都能夠通過調用ftok函數得到同一個IPC鍵。
pathname的實現是組合了三個鍵,分別是:
(1)pathname所在文件系統的信息(stat結構的st_dev成員)。
(2)pathname在本文件系統內的索引節點號(stat結構體st_ino成員)。
(3)id的低序8位(不能為0)。
ftok調用返回的整數IPC鍵由proj_id的低序8位,st_dev成員的低序8位,st_info的低序16位組合而成。
不能保證兩個不同的路徑名與同一個proj_id的組合產生不同的鍵,因為上面所列的三個條目(文件系統標識符、索引節點、proj_id)中的信息位數可能大于一個整數的信息位數。
消息隊列模擬實現客戶端與服務器端通信
Makefile
//Makefile .PHONY:all all:client serverclient:client.c comm.cgcc -o $@ $^server:server.c comm.cgcc -o $@ $^.PHONY:clean clean:rm -f client servercomm.h
comm.c
//comm.c #include"comm.h"//success > 0 failed == -1static int CommMsgQueue(int flags) {key_t key = ftok(PATHNAME,PROJ_ID);if(key < 0){perror("ftok");return -1;}int msgid = msgget(key,flags);if(msgid < 0){perror("msgget");}return msgid; } int CreateMsgQueue() {return CommMsgQueue(IPC_CREAT | IPC_EXCL | 0666); } int GetMsgQueue() {return CommMsgQueue(IPC_CREAT); } int DestroyMsgQueue(int msgid) {if(msgctl(msgid,IPC_RMID,NULL) < 0){perror("msgctl");return -1;}return 0; } int SendMsg(int msgid,int who,char* msg) {struct msgbuf buf;buf.mtype = who;strcpy(buf.mtext,msg);if(msgsnd(msgid,(void*)&buf,sizeof(buf.mtext),0) < 0){perror("msgsnd");return -1;}return 0; } int RecvMsg(int msgid,int recvType,char out[]) {struct msgbuf buf;if(msgrcv(msgid,(void*)&buf,sizeof(buf.mtext),recvType,0) < 0){perror("msgrcv");return -1;}strcpy(out,buf.mtext);return 0; }server.c
//server.c #include"comm.h"int main() {int msgid = CreateMsgQueue();char buf[1024];while(1){buf[0] = 0;RecvMsg(msgid,CLIENT_TYPE,buf);printf("client# %s\n",buf);printf("Please Enter# ");fflush(stdout);ssize_t s = read(0,buf,sizeof(buf));if(s > 0){buf[s-1] = 0;SendMsg(msgid,SERVER_TYPE,buf);printf("send done,wait recv ...\n");}}DestroyMsgQueue(msgid);return 0; }client.c
顯示IPC資源:
[panpan@localhost msg]$ ipcs------ Shared Memory Segments -------- key shmid owner perms bytes nattch status 0x00000000 131072 panpan 600 393216 2 dest 0x00000000 163841 panpan 600 393216 2 dest 0x00000000 196610 panpan 600 393216 2 dest 0x00000000 229379 panpan 600 393216 2 dest 0x00000000 262148 panpan 600 393216 2 dest 0x00000000 294917 panpan 600 393216 2 dest 0x00000000 327686 panpan 600 393216 2 dest 0x00000000 360455 panpan 600 393216 2 dest 0x00000000 393224 panpan 600 393216 2 dest 0x00000000 425993 panpan 600 393216 2 dest 0x00000000 458762 panpan 600 393216 2 dest 0x00000000 491531 panpan 600 393216 2 dest 0x00000000 524300 panpan 600 393216 2 dest 0x00000000 557069 panpan 600 393216 2 dest 0x00000000 589838 panpan 600 393216 2 dest 0x00000000 622607 panpan 600 393216 2 dest ------ Semaphore Arrays -------- key semid owner perms nsems ------ Message Queues -------- key msqid owner perms used-bytes messages 0x6602248a 32768 panpan 666 0 0刪除IPC資源:
[panpan@localhost msg]$ ipcrm -q 32768 [panpan@localhost msg]$ ipcs------ Shared Memory Segments -------- key shmid owner perms bytes nattch status 0x00000000 131072 panpan 600 393216 2 dest 0x00000000 163841 panpan 600 393216 2 dest 0x00000000 196610 panpan 600 393216 2 dest 0x00000000 229379 panpan 600 393216 2 dest 0x00000000 262148 panpan 600 393216 2 dest 0x00000000 294917 panpan 600 393216 2 dest 0x00000000 327686 panpan 600 393216 2 dest 0x00000000 360455 panpan 600 393216 2 dest 0x00000000 393224 panpan 600 393216 2 dest 0x00000000 425993 panpan 600 393216 2 dest 0x00000000 458762 panpan 600 393216 2 dest 0x00000000 491531 panpan 600 393216 2 dest 0x00000000 524300 panpan 600 393216 2 dest 0x00000000 557069 panpan 600 393216 2 dest 0x00000000 589838 panpan 600 393216 2 dest 0x00000000 622607 panpan 600 393216 2 dest ------ Semaphore Arrays -------- key semid owner perms nsems ------ Message Queues -------- key msqid owner perms used-bytes messages運行結果:
總結:
(1)消息隊列可以獨立于發送和接收數據的進程而存在,從而消除了在同步命名管道的打開和關閉可能產生的困難。
(2)可以通過發送消息可以避免命名管道的同步和阻塞問題,不需要由進程自己來提供同步方法。
(3)接受程序可以通過消息類型有選擇的接收數據,而不像命名管道中的那樣,只能默認的接受。
(4)雙向通信,生命周期隨內核。
總結
以上是生活随笔為你收集整理的进程间通信——消息队列(Message queue)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Python利用qrcode生成二维码并
- 下一篇: Python3,WIFI 万(破) 能