操作系统实验报告15:进程同步与互斥线程池
操作系統實驗報告15
實驗內容
- 實驗內容:進程同步。
- 內容1:編譯運行課件 Lecture18 例程代碼。
- Algorithms 18-1 ~ 18-9.
- 內容2:在 Lab Week 13 的基礎上用信號量解決線程池分配的互斥問題。
- 編譯、運行、測試用例。
- 提交新的設計報告
- 內容1:編譯運行課件 Lecture18 例程代碼。
實驗環境
- 架構:Intel x86_64 (虛擬機)
- 操作系統:Ubuntu 20.04
- 匯編器:gas (GNU Assembler) in AT&T mode
- 編譯器:gcc
技術日志
內容1:編譯運行課件 Lecture18 例程代碼
實驗內容原理:
- Linux版本
- 在版本2.6之前,Linux為非搶占內核,即使有一個更高優先級的進程能夠運行,它也不能搶占在內核模式下運行的其它進程。
- 版本2.6及更高版本,Linux內核是完全可搶占的。這樣在內核態下運行的任務也能被搶占。
- Linux在內核中提供了幾種不同的同步機制:
- __sync_fetch_類型
- 自旋鎖
- 互斥鎖
- 信號量
- 自旋鎖和信號量的讀者-寫者版本。
- 在單CPU系統上,自旋鎖被啟用和禁用內核搶占取代。
gcc __sync_系列原子操作函數
// 將value加到*ptr上,結果更新到*ptr,并返回操作之前*ptr的值 type __sync_fetch_and_add (type *ptr, type value); // 從*ptr減去value,結果更新到*ptr,并返回操作之前*ptr的值 type __sync_fetch_and_sub (type *ptr, type value, ...) // 將*ptr與value相或,結果更新到*ptr, 并返回操作之前*ptr的值 type __sync_fetch_and_or (type *ptr, type value, ...) // 將*ptr與value相與,結果更新到*ptr,并返回操作之前*ptr的值 type __sync_fetch_and_and (type *ptr, type value, ...) // 將*ptr與value異或,結果更新到*ptr,并返回操作之前*ptr的值 type __sync_fetch_and_xor (type *ptr, type value, ...) // 將*ptr取反后,與value相與,結果更新到*ptr,并返回操作之前*ptr的值 type __sync_fetch_and_nand (type *ptr, type value, ...) // 將value加到*ptr上,結果更新到*ptr,并返回操作之后新*ptr的值 type __sync_add_and_fetch (type *ptr, type value, ...) // 從*ptr減去value,結果更新到*ptr,并返回操作之后新*ptr的值 type __sync_sub_and_fetch (type *ptr, type value, ...) // 將*ptr與value相或, 結果更新到*ptr,并返回操作之后新*ptr的值 type __sync_or_and_fetch (type *ptr, type value, ...) // 將*ptr與value相與,結果更新到*ptr,并返回操作之后新*ptr的值 type __sync_and_and_fetch (type *ptr, type value, ...) // 將*ptr與value異或,結果更新到*ptr,并返回操作之后新*ptr的值 type __sync_xor_and_fetch (type *ptr, type value, ...)// 將*ptr取反后,與value相與,結果更新到*ptr,并返回操作之后新*ptr的值 type __sync_nand_and_fetch (type *ptr, type value, ...) // 比較*ptr與oldval的值,如果兩者相等,則將newval更新到*ptr并返回true bool __sync_bool_compare_and_swap (type *ptr, type oldval type newval, ...)// 比較*ptr與oldval的值,如果兩者相等,則將newval更新到*ptr并返回操作之前*ptr的值 type __sync_val_compare_and_swap (type *ptr, type oldval type newval, ...) // 發出完整內存柵欄 __sync_synchronize (...) // 將value寫入ptr,對ptr加鎖,并返回操作之前ptr的值。 type __sync_lock_test_and_set (type ptr, type value, ...)// 將0寫入到ptr,并對*ptr解鎖。 void __sync_lock_release (type ptr, ...)其中type可以是類型uint8_t, unt16_t, uint32_t, unt64_t。
- 驗證實驗alg.18-1-syn-fetch-1.c
執行程序命令:
gcc alg.18-1-syn-fetch-1.c ./a.out分析:
實現細節解釋:
一開始先讓變量i等于10,然后在同一條打印語句中打印函數__sync_fetch_and_add(&i, 20)的返回值和i的值,__sync_fetch_and_add(&i, 20)是無鎖化原子操作語句,實現的是先取值再加第二個參數即20的操作,返回操作前i的值,所以語句執行后獲取到的值還是原來i的值,即是10,而在同一條打印語句中的i的值與函數__sync_fetch_and_add(&i, 20)無關,所以i的值還是10,下一條語句還是打印i的值,此時已經執行完函數__sync_fetch_and_add(&i, 20),所以可以看到此時i的值為30。
接著繼續讓變量i等于10,然后在同一條打印語句中打印函數__sync_add_and_fetch(&i, 20)的返回值和i的值,__sync_add_and_fetch(&i, 20)是無鎖化原子操作語句,實現的是先加第二個參數即20再取值的操作,返回操作后i的值,所以語句執行后獲取到的值還是加上20后i的值,即是30,而在同一條打印語句中的i的值與函數__sync_add_and_fetch(&i, 20)無關,所以i的值還是10,下一條語句還是打印i的值,此時已經執行完函數__sync_add_and_fetch(&i, 20),所以可以看到此時i的值為30。
- 驗證實驗alg.18-1-syn-fetch-2.c
執行程序命令:
gcc alg.18-1-syn-fetch-2.c -pthread ./a.out分析:
可以看到,每個線程在加1的時候,因為使用的是__sync_fetch_and_add()函數,是原子化操作,所以沒有發生條件沖突而產生錯誤的值,值為40*20000=800000,計算結果正確。
實現細節解釋:
一開始使用語句pthread_create(&ptid[i], NULL, &test_func, NULL)創建MAX_N即40個線程,每個線程的線程執行函數都為:
void *test_func(void *arg) {for (int i = 0; i < 20000; ++i)__sync_fetch_and_add(&count, 1);/* count++; gave a wrong result */ return NULL; }線程執行函數的作用為使用__sync_fetch_and_add(&count, 1)語句使全局靜態變量count加1加20000次。
回到主線程中,使用pthread_join(ptid[i], NULL)語句使主線程等待MAX_N即40個線程結束后再繼續運行,最后打印count的值,值為40*20000=800000,因為__sync_fetch_and_add()函數是原子化操作,避免了每個線程在count加1時發生條件沖突,這樣得到的結果也是無誤的,程序運行正確。
- 驗證實驗alg.18-1-syn-fetch-3.cc
執行程序命令:
gcc alg.18-1-syn-fetch-3.c -pthread ./a.out分析:
可以看到,每個線程在加1的時候,因為使用的是count++語句,不是原子操作語句,所以產生了條件沖突而產生錯誤的值,值不為40*20000=800000,而是694845, 計算結果錯誤。
實現細節解釋:
和之前一個程序相比,這個程序在線程執行函數中使用的是count++語句使全局靜態變量count加1加20000次,這樣因為使用的不是原子操作語句,分成從緩存取到寄存器中,寄存器加一,再存入緩存三步進行,所以各個線程會很容易發生條件沖突,最后產生的是一個錯的結果。
- 驗證實驗alg.18-2-syn-compare-test.c
執行程序命令:
gcc alg.18-2-syn-compare-test.c ./a.out分析:
實現細節解釋:
第一個代碼片段中,value值為200000,oldval值為123456,newval值為654321,執行語句__sync_bool_compare_and_swap(&value, oldval, newval),比較value與oldval的值,因為不相等,所以value保持原值,并返回false給ret,所以最后打印結果,ret為0,value為200000,oldval為123456,newval為654321。
第二個代碼片段中,value值為200000,oldval值為200000,newval值為654321,執行語句__sync_bool_compare_and_swap(&value, oldval, newval),比較value與oldval的值,因為相等,所以newval更新到value,并返回true給ret,所以最后打印結果,ret為1,value為654321,oldval為123456,newval為654321。
第三個代碼片段中,value值為200000,oldval值為123456,newval值為654321,執行語句__sync_val_compare_and_swap(&value, oldval, newval),比較value與oldval的值,因為不相等,所以value保持原值,并返回操作之前value的值給ret,所以最后打印結果,ret為200000,value為200000,oldval為123456,newval為654321。
第四個代碼片段中,value值為200000,oldval值為200000,newval值為654321,執行語句__sync_val_compare_and_swap(&value, oldval, newval),比較value與oldval的值,因為相等,所以newval的值更新到value,并返回操作之前value的值給ret,所以最后打印結果,ret為200000,value為654321,oldval為200000,newval為654321。
第五個代碼片段中,value值為200000,newval值為654321,執行語句__sync_lock_test_and_set(&value, newval),將newval寫入value,對value加鎖,并返回操作之前value的值,所以最后打印結果,ret為200000,value為654321,newval為654321。
第六個代碼片段中,value值為200000,執行語句__sync_lock_release(&value),將0寫入到value,并對&value解鎖,所以最后打印結果,value為0。
POSIX互斥鎖
-
互斥鎖用于保護代碼的臨界區,即線程在進入臨界區之前獲取鎖,并在退出臨界區時釋放鎖。
-
Pthreads互斥鎖采用數據類型pthread_mutex_t。一個互斥鎖可以使用pthread_mutex_init()函數創建。
#include <pthread.h>pthread_mutex_t mutex;/* 創建并初始化這個互斥鎖 */ pthread_mutex_init(&mutex, NULL);- 第一個參數是指向互斥鎖的指針。第二個參數是NULL,表示將互斥鎖按照其默認屬性初始化。
-
互斥鎖是通過pthread_mutex_lock()和pthread_mutex_unlock()函數來獲取和釋放的。如果調用pthread_mutex_lock()時互斥鎖不可用,則調用線程將被阻塞在等待隊列中,直到互斥鎖的所有者調用pthread_mutex_unlock()釋放互斥鎖為止。
-
以下代碼說明了如何使用互斥鎖保護臨界區:
所有互斥函數當操作正確時返回值為0,如果發生錯誤,這些函數將返回非零錯誤代碼。
- 驗證實驗alg.18-3-syn-pthread-mutex.c
執行程序命令:
gcc alg.18-3-syn-pthread-mutex.c -pthread ./a.out ./a.out syn分析:
可以看到,當編譯命令中沒有參數時,得到的加法結果是一個錯誤的結果;當編譯命令中有參數syn時,得到的加法結果是正確的結果800000。
實現細節解釋:
首先在全局中,使用pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER將pthread_mutex_t類型變量mutex使用宏定義PTHREAD_MUTEX_INITIALIZER進行靜態初始化,或者在主函數中使用語句pthread_mutex_init (&mutex, NULL)進行初始化。主函數最后會等待創建的線程都執行完后再繼續進行,然后使用pthread_mutex_destroy(&mutex)語句釋放互斥鎖,最后打印count結果。
當程序的編譯命令參數是syn時,程序創建MAX_N即40個線程,每個線程的執行函數都為:
void *test_func_syn(void *arg) {for (int i = 0; i < 20000; ++i) {pthread_mutex_lock(&mutex);count++;pthread_mutex_unlock(&mutex);}pthread_exit(NULL); }在線程執行函數中,有一個執行20000次的for循環,里面每次使count自增前得到一個互斥鎖,然后再令count自增,最后再釋放互斥鎖,這樣保證了線程之間不會出現競爭條件沖突,count的自增操作有序進行,最后得到的也是正確結果800000。
當程序的編譯命令沒有參數或參數不是syn時,程序創建MAX_N即40個線程,每個線程的執行函數都為:
void *test_func_asy(void *arg) {for (int i = 0; i < 20000; ++i) {count++;}pthread_exit(NULL); }在線程執行函數中,有一個執行20000次的for循環,里面沒有使用互斥鎖而是直接讓count進行自增,這樣容易發生條件沖突,最后得到的結果也并不正確613245。
POSIX信號量
- POSIX SEM 擴展指定了兩種類型的信號量:命名信號量和無名信號量。從內核的版本2.6開始,Linux系統提供對這兩種類型的支持。
- POSIX命名信號量
-
函數sem_open()用于創建新的或打開已經存在的信號量:
#include <fcntl.h> #include <sys/stat.h> #include <semaphore.h> sem_t *sem_open(const char *name, int oflag); sem_t *sem_open(const char *name, int oflag, mode_t mode, unsigned int value); -
例如:
sem_t *sem; sem = sem_open("MYSEM", O_CREAT, 0666, 1);- 命名信號量MYSEM被創建并初始化為1。它對其他進程具有讀寫訪問權限。
-
多個不相關的進程可以簡單地通過引用信號量的名稱,使用一個通用的命名信號量作為同步機制。
-
在上面的示例中,一旦創建了信號量MYSEM,其他進程隨后使用相同參數調用sem_open()時,會將描述符sem返回給現有的信號量。POSIX分別聲明這些操作為sem_wait(sem)和sem_post(sem)。
-
下面說明如何使用上面創建的命名信號量保護臨界區:
sem_wait(sem); /* 獲取信號量 */ 臨界區 sem_post(sem); /* 釋放信號量 */ ... sem_close(sem);
-
- POSIX無名信號量
- 無名信號量是通過sem_init()函數進行創建和初始化的,該函數傳遞了三個參數:
(1)信號量的指針
(2)表示共享級別的標志
(3)信號量的初始值int sem_init(sem_t *sem, int pshared, unsigned int value) - 例如:#include <semaphore.h> sem_t sem; sem_init(&sem, 0, 1); /* 創建信號量并將其初始化為1 */
- pshared = 0表示此信號量只能由屬于創建該信號量的同一進程的線程共享。
- 信號量設置為值1。
- POSIX無名信號量對描述符sem也使用了與命名信號量相同的sem_wait(sem)和sem_post(sem)操作。
- 下面說明如何使用上面創建的無名信號量保護臨界區:sem_wait(&sem); /* 獲取信號量 */ 臨界區 sem_post(&sem); /* 釋放信號量 */ ... sem_destroy(&sem);
- 無名信號量是通過sem_init()函數進行創建和初始化的,該函數傳遞了三個參數:
通常在進程間同步中使用命名信號量,而無名信號量用于線程間通信。
- 驗證實驗alg.18-4-syn-pthread-sem-unnamed.c
執行程序命令:
gcc alg.18-4-syn-pthread-sem-unnamed.c -pthread ./a.out syn ./a.out分析:
可以看到,當編譯命令中有參數syn時,得到的加法結果是正確的結果800000;當編譯命令中沒有參數時,得到的加法結果是一個錯誤的結果。
實現細節解釋:
首先在全局中,聲明一個信號量標識符類型sem_t變量unnamed_sem,然后在主函數中使用語句sem_init(&unnamed_sem, 0, 1)創建無名信號量unnamed_sem并初始化為1。主函數最后會等待創建的線程都執行完后再繼續進行,然后打印count結果,最后使用sem_destroy(&unnamed_sem)語句銷毀信號量。
當程序的編譯命令參數是syn時,程序創建MAX_N即40個線程,每個線程的執行函數都為:
void *test_func_syn(void *arg) {for (int i = 0; i < 20000; ++i) {sem_wait(&unnamed_sem);count++;sem_post(&unnamed_sem);}pthread_exit(NULL); }在線程執行函數中,有一個執行20000次的for循環,里面每次使count自增前得到一個信號量,然后再令count自增,最后再釋放信號量,這樣保證了線程之間不會出現競爭條件沖突,count的自增操作有序進行,最后得到的也是正確結果800000。
當程序的編譯命令沒有參數或參數不是syn時,程序創建MAX_N即40個線程,每個線程的執行函數都為:
void *test_func_asy(void *arg) {for (int i = 0; i < 20000; ++i) {count++;}pthread_exit(NULL); }在線程執行函數中,有一個執行20000次的for循環,里面沒有使用信號量而是直接讓count進行自增,這樣容易發生條件沖突,最后得到的結果也并不正確632537。
- 驗證實驗alg.18-5-syn-pthread-sem-named.c
執行程序命令:
gcc alg.18-5-syn-pthread-sem-named.c -pthread ./a.out syn ./a.out分析:
可以看到,當編譯命令中有參數syn時,得到的加法結果是正確的結果800000;當編譯命令中沒有參數時,得到的加法結果是一個錯誤的結果。
實現細節解釋:
首先在全局中,聲明一個信號量標識符類型sem_t *指針變量named_sem,然后在主函數中使用語句named_sem = sem_open("MYSEM", O_CREAT, 0666, 1)創建命名信號量MYSEM并初始化為1,并返回信號量標識符給變量named_sem,這時一個名為sem.MYSEM的文件將會在/dev/shm/目錄下被創建,任何知道這個文件名的進程和線程都可以共享這個信號量。
主函數最后會等待創建的線程都執行完后再繼續進行,然后打印count結果,接著使用sem_close(named_sem)語句關閉命名信號量,最后使用語句sem_unlink("MYSEM")從/dev/shm/目錄下移除sem.MYSEM文件當其標識符為0時。
當程序的編譯命令參數是syn時,程序創建MAX_N即40個線程,每個線程的執行函數都為:
void *test_func_syn(void *arg) {for (int i = 0; i < 20000; ++i) {sem_wait(&unnamed_sem);count++;sem_post(&unnamed_sem);}pthread_exit(NULL); }在線程執行函數中,有一個執行20000次的for循環,里面每次使count自增前得到一個信號量,然后再令count自增,最后再釋放信號量,這樣保證了線程之間不會出現競爭條件沖突,count的自增操作有序進行,最后得到的也是正確結果800000。
當程序的編譯命令沒有參數或參數不是syn時,程序創建MAX_N即40個線程,每個線程的執行函數都為:
void *test_func_asy(void *arg) {for (int i = 0; i < 20000; ++i) {count++;}pthread_exit(NULL); }在線程執行函數中,有一個執行20000次的for循環,里面沒有使用信號量而是直接讓count進行自增,這樣容易發生條件沖突,最后得到的結果也并不正確704064。
- 驗證實驗多生產者-多消費者問題
執行程序命令:
gcc alg.18-6-syn-pc-con-6.c -pthread gcc alg.18-7-syn-pc-producer-6.c -o alg.18-7-syn-pc-producer-6.o -pthread gcc alg.18-8-syn-pc-consumer-6.c -o alg.18-8-syn-pc-consumer-6.o -pthread ./a.out myshm 4 8 2 3分析:
緩沖區大小為4,生產項目數量為8,生產者數目為2,消費者數量為3時,生產和消費的過程有序進行,直到8個項目被從循環隊列中全部取出消費,程序結束。
實現細節解釋:
在頭文件alg.18-6-syn-pc-con-6.h中定義了必要的數據和結構:
#define BASE_ADDR 10 /* 共享內存的前十個單位保留給控制結構體ctln_pc_st,數據從下標為10的單位開始循環數據隊列由(enqueue | dequeue) % buffer_size + BASE_ADDR表示 */struct ctln_pc_st {int BUFFER_SIZE; // 緩沖區大小,共享內存中數據單元的數目int MAX_ITEM_NUM; // 要生產的項目數目int THREAD_PRO; // 生產者數目int THREAD_CONS; // 消費者數目sem_t sem_mutex; // 表示互斥信號量sem_t stock; // 表示緩沖區中存儲數量的信號量sem_t emptyslot; // 表示緩沖區中空閑單元數目的信號量int item_num; // 已經生產了的項目的總數目int consume_num; // 已經消費了的項目的總數目int enqueue; // 當前生產者在循環隊列中的位置int dequeue; // 當前消費者在循環隊列中的位置int END_FLAG; // 生產者生產完所有項目完成工作后,置為1,否則置為0,表示生產者還未完成完工作 }; /* 60 bytes */struct data_pc_st {int item_no; // 生產項目時的項目序號int pro_no; // 生產者序號long int pro_tid; // 生產該項目的生產者的線程號 }; /* 16 bytes */首先,進程syn-pc-con會先創建一個共享內存區,然后使用execv()函數引發兩個子進程,分別為syn-pc-producer生產者進程和syn-pc-consumer消費者進程,兩個子進程異步執行,并將共享內存標識符作為參數傳遞給子進程,父進程等待子進程執行完后再接著執行,最后結束。
alg.18-6-syn-pc-con-6.c:
#include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <sys/stat.h> #include <sys/shm.h> #include <semaphore.h> #include <wait.h> #include "alg.18-6-syn-pc-con-6.h"int shmid; void *shm = NULL; int detachshm(void);int main(int argc, char *argv[]) {pid_t childpid, pro_pid, cons_pid;struct stat statbuf;int buffer_size, max_item_num, thread_pro, thread_cons;// 需要在編譯命令中提供共享對象的文件名或路徑if (argc < 2) {printf("\nshared file object undeclared!\nUsage: syn-pc-con-6.o /home/myshm\n");return EXIT_FAILURE;}// 共享對象的文件應該要存在if (stat(argv[1], &statbuf) == -1) {perror("stat()");return EXIT_FAILURE;}while (1) {// 輸入緩沖區大小printf("Pls input the buffer size(1-100, 0 quit): ");scanf("%d", &buffer_size);if (buffer_size <= 0) return 0;if (buffer_size > 100) continue;// 輸入要生產的項目的最大個數printf("Pls input the max number of items to be produced(1-10000, 0 quit): ");scanf("%d", &max_item_num);if (max_item_num <= 0) return 0;if (max_item_num > 10000) continue;// 輸入生產者的個數printf("Pls input the number of producers(1-500, 0 quit): ");scanf("%d", &thread_pro);if (thread_pro <= 0) return 0;if (thread_pro < 0) continue;// 輸入消費者的個數printf("Pls input the number of consumers(1-500, 0 quit): ");scanf("%d", &thread_cons);if (thread_cons <= 0) return 0;if (thread_cons < 0) continue;break;}struct ctln_pc_st *ctln = NULL;struct data_pc_st *data = NULL;key_t key;int ret;// 獲取IPC鍵值if ((key = ftok(argv[1], 0x28)) < 0) { perror("ftok()");exit(EXIT_FAILURE);}// 獲取共享內存標識符shmid = shmget((key_t)key, (buffer_size + BASE_ADDR)*sizeof(struct data_pc_st), 0666 | IPC_CREAT);if (shmid == -1) {perror("shmget()");exit(EXIT_FAILURE);}// 把共享內存區對象映射到調用進程的地址空間,允許本進程訪問共享內存shm = shmat(shmid, 0, 0);if (shm == (void *)-1) {perror("shmat()");exit(EXIT_FAILURE);}// 設置共享內存,分別設置控制結構體ctln和數據結構體datactln = (struct ctln_pc_st *)shm;data = (struct data_pc_st *)shm;// 初始化所有的控制參數,共享內存的前十個單位保留給控制參數,數據從下標為10的單位開始ctln->BUFFER_SIZE = buffer_size;ctln->MAX_ITEM_NUM = max_item_num;ctln->THREAD_PRO = thread_pro;ctln->THREAD_CONS = thread_cons; ctln->item_num = 0;ctln->consume_num = 0;// 循環數據隊列由(enqueue | dequeue) % buffer_size + BASE_ADDR表示ctln->enqueue = 0;ctln->dequeue = 0;ctln->END_FLAG = 0;// 初始化互斥信號量為1,對于進程間共享,sem_init()的第二個參數必須設置為非零ret = sem_init(&ctln->sem_mutex, 1, 1);if (ret == -1) {perror("sem_init-mutex");return detachshm();}// 將表示緩沖區存儲數量的信號量ctln->stock初始化為0ret = sem_init(&ctln->stock, 1, 0);if (ret == -1) {perror("sem_init-stock");return detachshm();}// 將表示緩沖區中空閑單元數目的信號量ctln->emptyslot初始化為BUFFER_SIZEret = sem_init(&ctln->emptyslot, 1, ctln->BUFFER_SIZE);if (ret == -1) {perror("sem_init-emptyslot");return detachshm();}// 打印進程進程號printf("\nsyn-pc-con console pid = %d\n", getpid());// 將共享內存標識符作為參數傳遞給生產者進程和消費者進程char *argv1[3];char execname[] = "./";char shmidstring[10];sprintf(shmidstring, "%d", shmid);argv1[0] = execname;argv1[1] = shmidstring;argv1[2] = NULL;childpid = vfork();if (childpid < 0) {perror("first fork");return detachshm();} // 調用生產者進程else if (childpid == 0) {pro_pid = getpid();printf("producer pid = %d, shmid = %s\n", pro_pid, argv1[1]);execv("./alg.18-7-syn-pc-producer-6.o", argv1);}else {childpid = vfork();if (childpid < 0) {perror("second fork");return detachshm();} // 調用消費者進程else if (childpid == 0) {cons_pid = getpid();printf("consumer pid = %d, shmid = %s\n", cons_pid, argv1[1]);execv("./alg.18-8-syn-pc-consumer-6.o", argv1);}}// 等待生產者進程和消費者進程結束后父進程再執行if (waitpid(pro_pid, 0, 0) != pro_pid)perror("wait pro");elseprintf("waiting pro_pid %d success.\n", pro_pid);if (waitpid(cons_pid, 0, 0) != cons_pid)perror("wait cons");elseprintf("waiting cons_pid %d success.\n", cons_pid);// 銷毀互斥信號量ctln->sem_mutexret = sem_destroy(&ctln->sem_mutex);if (ret == -1)perror("sem_destroy sem_mutex");// 銷毀表示緩沖區存儲數量的信號量ctln->sem_stockret = sem_destroy(&ctln->stock);if (ret == -1)perror("sem_destroy stock");// 銷毀表示緩沖區中空閑單元數目的信號量ctln->emptyslotret = sem_destroy(&ctln->emptyslot);if (ret == -1)perror("sem_destroy empty_slot");return detachshm(); }// 斷開進程與共享內存附加點的地址,釋放共享內存區 int detachshm(void) {if (shmdt(shm) == -1) {perror("shmdt()");exit(EXIT_FAILURE);}if (shmctl(shmid, IPC_RMID, 0) == -1) {perror("shmctl(IPC_RMID)");exit(EXIT_FAILURE);} }生產者進程syn-pc-producer會創建THREAD_PRO個生產者線程,異步進行生產。只有當已經生產的產品數量小于要生產的產品數量時,才會執行循環生產代碼,生產的產品插入到循環隊列中,當已經生產的產品數量等于要生產的產品數量時,完成工作,生產者的進程結束。
alg.18-7-syn-pc-producer-6.c:
#include <stdio.h> #include <stdlib.h> #include <pthread.h> #include <sys/shm.h> #include <semaphore.h> #include <unistd.h> #include <sys/syscall.h> #include "alg.18-6-syn-pc-con-6.h"#define gettid() syscall(__NR_gettid)void *producer(void *arg) {// 獲取共享內存結構體,分別為控制結構體和數據結構體struct ctln_pc_st *ctln = (struct ctln_pc_st *)arg;struct data_pc_st *data = (struct data_pc_st *)arg;// 當生產者已經制造的項目數量小于要生產的項目時while (ctln->item_num < ctln->MAX_ITEM_NUM) {// 等待緩沖區空閑單元數目信號量大于0,表示有空閑單元可以生產后存放項目,然后將空閑單元數目信號量減一,繼續執行sem_wait(&ctln->emptyslot);// 等待互斥信號量大于0,防止臨界沖突,然后將互斥鎖信號量減一,繼續執行sem_wait(&ctln->sem_mutex);// 當生產者已經制造的項目數量小于要生產的項目時if (ctln->item_num < ctln->MAX_ITEM_NUM) {// 生產者已經制造的項目數量加一,并將制造的項目設置好項目序列號和制造該項目的線程號后,放入循環隊列ctln->item_num++; ctln->enqueue = (ctln->enqueue + 1) % ctln->BUFFER_SIZE;(data + ctln->enqueue + BASE_ADDR)->item_no = ctln->item_num;(data + ctln->enqueue + BASE_ADDR)->pro_tid = gettid();printf("producer tid %ld prepared item no %d, now enqueue = %d\n", (data + ctln->enqueue + BASE_ADDR)->pro_tid, (data + ctln->enqueue + BASE_ADDR)->item_no, ctln->enqueue);// 當生產者已經制造的項目數量等于要生產的項目時,說明完成工作,設置ctln->END_FLAG為1if (ctln->item_num == ctln->MAX_ITEM_NUM)ctln->END_FLAG = 1;// 將表示緩沖區中存儲數量的信號量加一,繼續執行sem_post(&ctln->stock);} // 當生產者已經制造的項目數量不小于要生產的項目時,將表示緩沖區空閑單元數目的信號量加一else {sem_post(&ctln->emptyslot);}// 然后將互斥鎖信號量加一,允許其它線程執行sem_post(&ctln->sem_mutex);sleep(1);}pthread_exit(0); }int main(int argc, char *argv[]) {struct ctln_pc_st *ctln = NULL;struct data_pc_st *data = NULL;int shmid;void *shm = NULL;// 獲取共享內存標識符shmid = strtol(argv[1], NULL, 10);// 把共享內存區對象映射到調用進程的地址空間,允許本進程訪問共享內存shm = shmat(shmid, 0, 0);if (shm == (void *)-1) {perror("\nproducer shmat()");exit(EXIT_FAILURE);}// 獲取共享內存結構體,分別為控制結構體ctln和數據結構體datactln = (struct ctln_pc_st *)shm;data = (struct data_pc_st *)shm;pthread_t ptid[ctln->THREAD_PRO];int i, ret;// 創建ctln->THREAD_PRO個生產者線程for (i = 0; i < ctln->THREAD_PRO; ++i) {// 線程執行函數為producerret = pthread_create(&ptid[i], NULL, &producer, shm);if (ret != 0) {perror("producer pthread_create()");break;}} // 主線程等待子線程都執行完后再繼續執行for (i = 0; i < ctln->THREAD_PRO; ++i) {pthread_join(ptid[i], NULL);}// 所有生產者都停止工作,以防止有些消費者會拿走最后的項目,不超過THREAD_CON-1個消費者會停留在sem_wait(&stock)的等待隊列中for (i = 0; i < ctln->THREAD_CONS - 1; ++i)sem_post(&ctln->stock);// 斷開進程與共享內存附加點的地址if (shmdt(shm) == -1) {perror("producer shmdt()");exit(EXIT_FAILURE);}return 0; }消費者進程syn-pc-consumer會創建THREAD_CONS個消費者線程,異步進行消費。只有當消費者已經消費的項目數量小于生產者已經生產的項目數量,或生產者還沒完成工作時,才會執行循環消費代碼,消費的產品從循環隊列中取出。
alg.18-8-syn-pc-consumer-6.c:
#include <stdio.h> #include <stdlib.h> #include <pthread.h> #include <sys/shm.h> #include <semaphore.h> #include <unistd.h> #include <sys/syscall.h> #include "alg.18-6-syn-pc-con-6.h"#define gettid() syscall(__NR_gettid)void *consumer(void *arg) {// 獲取共享內存結構體,分別為控制結構體和數據結構體struct ctln_pc_st *ctln = (struct ctln_pc_st *)arg;struct data_pc_st *data = (struct data_pc_st *)arg;// 當消費者已經消費的項目數量小于生產者已經生產的項目數量,或生產者還沒完成工作時while ((ctln->consume_num < ctln->item_num) || (ctln->END_FLAG == 0)) { // 等待表示緩沖區中存儲數量的信號量大于0,表示緩沖區中有項目可以消費,然后將存儲數量信號量減一,繼續執行。如果存儲數量是空的,且所有的生產者都停止工作,那么一個或多個消費者可能會永遠等待sem_wait(&ctln->stock);// 等待互斥信號量大于0,防止臨界沖突,然后將互斥鎖信號量減一,繼續執行sem_wait(&ctln->sem_mutex);// 當消費者已經消費的項目數量小于生產者已經生產的項目數量if (ctln->consume_num < ctln->item_num) { // 從循環隊列中取出項目消費,打印取出項目的相關信息ctln->dequeue = (ctln->dequeue + 1) % ctln->BUFFER_SIZE;printf("\t\t\t\tconsumer tid %ld taken item no %d by pro %ld, now dequeue = %d\n", gettid(), (data + ctln->dequeue + BASE_ADDR)->item_no, (data + ctln->dequeue + BASE_ADDR)->pro_tid, ctln->dequeue);ctln->consume_num++;// 將表示緩沖區空閑單元數目的信號量加一,繼續執行sem_post(&ctln->emptyslot);}// 當消費者已經消費的項目數量不小于生產者已經生產的項目數量,將表示緩沖區中存儲數量的信號量加一else {sem_post(&ctln->stock);}// 然后將互斥鎖信號量加一,允許其它線程執行sem_post(&ctln->sem_mutex);}pthread_exit(0); }int main(int argc, char *argv[]) {struct ctln_pc_st *ctln = NULL;struct data_pc_st *data = NULL;int shmid;void *shm = NULL;// 獲取共享內存標識符shmid = strtol(argv[1], NULL, 10);// 把共享內存區對象映射到調用進程的地址空間,允許本進程訪問共享內存shm = shmat(shmid, 0, 0);if (shm == (void *)-1) {perror("consumer shmat()");exit(EXIT_FAILURE);}// 獲取共享內存結構體,分別為控制結構體ctln和數據結構體datactln = (struct ctln_pc_st *)shm;data = (struct data_pc_st *)shm;pthread_t ptid[ctln->THREAD_CONS];int i, ret;// 創建ctln->THREAD_CONS個消費者線程for (i = 0; i < ctln->THREAD_CONS; ++i) {// 線程執行函數為consumerret = pthread_create(&ptid[i], NULL, &consumer, shm); if (ret != 0) {perror("consumer pthread_create()");break;}} // 主線程等待子線程都執行完后再繼續執行for (i = 0; i < ctln->THREAD_CONS; ++i)pthread_join(ptid[i], NULL);// 斷開進程與共享內存附加點的地址if (shmdt(shm) == -1) {perror("consumer shmdt()");exit(EXIT_FAILURE);} return 0; }POSIX條件變量
-
Pthreads中的條件變量的行為類似于監視器上下文中使用的條件變量,后者提供了一種鎖定機制來確保數據完整性。
-
Pthreads通常用于C程序中。由于C語言沒有監視器,互斥鎖與條件變量相關聯以完成鎖定。
-
Pthreads中的條件變量使用pthread_cond_t數據類型,并由pthread_cond_init()初始化。以下代碼創建并初始化條件變量及其關聯的互斥鎖:
pthread_mutex_t mutex; pthread_cond_t cond_var;pthread_mutex_init(&mutex, NULL); pthread_cond_init(&cond_var, NULL); -
例子:
- 線程可以使用Pthread條件變量等待條件子句(a == b)變為true:pthread_mutex_lock(&mutex); while (a != b)pthread_cond_wait(&cond_var, &mutex); 臨界區 pthread_mutex_unlock(&mutex);
-
在調用pthread_cond_wait()函數之前,必須鎖定與cond_var關聯的互斥鎖,因為它用于保護條件子句中的數據不受可能的競爭條件的影響。
-
pthread_cond_wait()函數用于等待條件變量。
-
一旦獲得了這個鎖,線程就會檢查條件并調用pthread_cond_wait(),當(a != b)時,將互斥鎖和cond_var作為參數傳遞,條件不正確。
-
pthread_cond_wait()將調用線程放在條件等待隊列的末尾,釋放互斥鎖以允許另一個線程訪問共享數據,并可能更新其值,以便條件子句(a == b)的判斷結果為true。當調用線程被激活時,它將鎖定互斥鎖并再次檢查條件。
- 這一點很重要,因為當條件子句為true時,條件等待隊列中調用線程之前的另一個線程可能會被調度。
-
例子:
- 線程可以調用pthread_cond_signal()函數,從而發出一個線程在等待條件變量的信號。pthread_mutex_lock(&mutex); if (a == b)pthread_cond_signal(&cond_var); pthread_mutex_unlock(&mutex);
-
需要注意的是:
- pthread_cond_signal()不會釋放互斥鎖。
- pthread_mutex_unlock()釋放互斥鎖。
- 一旦釋放互斥鎖,發出信號的線程就成為互斥鎖的所有者,并從pthread_cond_wait()調用返回控制。
-
驗證實驗alg.18-9-pthread-cond-wait.c
執行程序命令:
gcc alg.18-9-pthread-cond-wait.c -pthread ./a.out syn分析:
可以看到,變量count的自增和自減有序進行,沒有發生競爭條件導致count的值錯亂的情況。
實現細節解釋:
首先在全局中,將pthread_mutex_t互斥鎖標識符類型變量mutex使用宏定義PTHREAD_MUTEX_INITIALIZER進行靜態初始化,將pthread_cond_t條件變量類型變量cond使用宏定義PTHREAD_COND_INITIALIZER進行初始化,
主函數最后會等待創建的線程都執行完后再繼續進行,然后使用pthread_mutex_destroy(&mutex)語句銷毀互斥鎖,使用語句pthread_cond_destroy(&cond)銷毀條件變量,結束程序。
主函數中會創建兩個線程,兩個線程異步執行,其中一個線程的執行函數為:
void *decrement(void *arg) { for (int i = 0; i < 4; i++) {pthread_mutex_lock(&mutex); while (count <= 0) /* wait until count > 0 */pthread_cond_wait(&cond, &mutex); count--; printf("\t\t\t\tcount = %d.\n", count); printf("\t\t\t\tUnlock decrement.\n"); pthread_mutex_unlock(&mutex); }return NULL; }在線程執行函數中,有一個執行4次的for循環,里面每次循環首先獲取一個互斥鎖,以防止多個線程同時請求pthread_cond_wait()的競爭條件,當變量count小于等于0時,pthread_cond_wait()會先解除互斥鎖,然后在等待隊列中休眠,直到變量count大于0且等待條件成立被喚醒后才繼續執行,先鎖定互斥鎖,然后count自減,打印此時count的值并釋放互斥鎖。
另一個線程的執行函數為:
void *increment(void *arg) {for (int i = 0; i < 4; i++) {for (int j = 0; j < 10000; j++) ; /* sleep for a while */pthread_mutex_lock(&mutex); count++; printf("count = %d.\n", count);if (count > 0) pthread_cond_signal(&cond); printf("Unlock increment.\n"); pthread_mutex_unlock(&mutex); }return NULL; }在線程執行函數中,有一個執行4次的for循環,里面每次循環首先利用for循環等待一段時間,然后獲取一個互斥鎖,接著使count自增,如果此時count大于0時,使用語句pthread_cond_signal(&cond)激活一個正在等待該條件的線程,最后釋放互斥鎖。
內容2:在 Lab Week 13 的基礎上用信號量解決線程池分配的互斥問題。
設計報告
線程池設計圖
代碼設計
測試代碼:
//threadpools.c文件 #include <stdio.h> #include <stdlib.h> #include <string.h> #include <sched.h> #include <pthread.h> #include <sys/stat.h> #include <sys/types.h> #include <sys/wait.h> #include <sys/ipc.h> #include <sys/time.h> #include <sys/msg.h> #include <sys/syscall.h> #include <semaphore.h> #include <fcntl.h> #include <unistd.h>#define gettid() syscall(__NR_gettid)/* wrap the system call syscall(__NR_gettid), __NR_gettid = 224 */ #define gettidv2() syscall(SYS_gettid) /* a traditional wrapper */#define THREADS_NUM 10 // 線程池中的線程個數 #define TASK_QUEUE_MAX_SIZE 12 // 任務的等待隊列的最大長度,等待隊列中的最大任務個數為長度減一 #define TASK_NUM 100 // 要執行的任務總數// 線程池中每個線程執行的任務的結構體 typedef struct {void *(*function)(void *); // 執行函數void *arg; // 參數 } Task;// 任務循環隊列的數據結構 typedef struct {Task tasks[TASK_QUEUE_MAX_SIZE]; // 任務隊列數組int front; // 隊首下標int rear; // 隊尾下標 } TaskQueue;// 線程池數據結構 typedef struct {pthread_t threads[THREADS_NUM]; // 線程數組TaskQueue taskQueue; // 任務隊列int taskSum; // 剩余任務總數,結束程序用sem_t sem_mutex; // 互斥信號量 } Threadpools;// 線程池中每個線程執行的任務 static void *executeTask(void *arg) {// 向每個線程傳入的參數是線程池Threadpools *pools = (Threadpools *)arg;while (1) {// 等待互斥信號量大于0,防止臨界沖突,然后將互斥鎖信號量減一,繼續執行sem_wait(&pools->sem_mutex);// 當任務隊列為空時while (pools->taskQueue.front == pools->taskQueue.rear) {// 如果已經沒有剩余任務要處理,那么退出線程if (pools->taskSum == 0) {printf("Thread %ld exits.\n", gettid());sem_post(&pools->sem_mutex);pthread_exit(NULL);}// 否則等待任務隊列中有任務后再取任務進行執行printf("Thread %ld is waiting for a task.\n", gettid());sleep(1); }// 剩余任務總數減一pools->taskSum--;// 獲取任務隊列隊首的任務Task task;int front = pools->taskQueue.front;task.function = pools->taskQueue.tasks[front].function;task.arg = pools->taskQueue.tasks[front].arg;// 循環隊列隊首下標加一pools->taskQueue.front = (front + 1) % TASK_QUEUE_MAX_SIZE;// 將互斥鎖信號量加一,允許其它線程執行sem_post(&pools->sem_mutex);// 執行任務(*(task.function))(task.arg);} }// 初始化線程池 void initThreadpools(Threadpools *pools) {int ret;// 任務隊列的隊首和隊尾的坐標都為0pools->taskQueue.front = 0;pools->taskQueue.rear = 0;// 線程池中剩余的任務總數設置為總任務數pools->taskSum = TASK_NUM;// 初始化互斥信號量為1ret = sem_init(&pools->sem_mutex, 1, 1);if (ret == -1) {perror("sem_init-mutex");exit(1);}// 創建線程池中的線程for(int i = 0; i < THREADS_NUM; ++i) {ret = pthread_create(&pools->threads[i], NULL, executeTask, (void *)pools);if(ret != 0) {fprintf(stderr, "pthread_create error: %s\n", strerror(ret));exit(1);}} }// 向任務隊列中添加任務 void addTask(Threadpools *pools, void *(*function)(void *arg), void *arg) {// 當任務隊列為滿時,等待有任務被取出任務隊列不為滿再加入隊列while ((pools->taskQueue.rear + TASK_QUEUE_MAX_SIZE + 1 - pools->taskQueue.front) % TASK_QUEUE_MAX_SIZE == 0) {printf("Task %d is waiting to be added to the task queue.\n", *(int *)arg);sleep(1);}// 向任務隊列的隊尾加入任務Task task;task.function = function;task.arg = arg;int rear = pools->taskQueue.rear;pools->taskQueue.tasks[rear] = task;// 任務隊列隊尾下標加一pools->taskQueue.rear = (rear + 1) % (TASK_QUEUE_MAX_SIZE); }// 任務函數 void *taskFunction(void *arg) {// 獲取每個任務的任務號int *numptr = (int *)arg;int taskId = *numptr;// 打印線程池中的哪個線程正在處理此任務printf("Thread tid = %ld is dealing with task %d\n", gettid(), taskId);// 每個任務休眠1s后繼續執行printf("Task %d is sleeping for 1s.\n", taskId);sleep(1);// 打印任務完成信息和線程被復用printf("\t\t\t\tTask %d is finished and Thread tid = %ld is reused\n", taskId, gettid());return 0; }int main() {int ret;// 創建并初始化線程池Threadpools pools;initThreadpools(&pools);// 傳入參數數組int num[TASK_NUM];for(int i = 0; i < TASK_NUM; ++i) {num[i] = i + 1;}// 向任務隊列中連續添加任務for(int i = 0; i < TASK_NUM; ++i) {addTask(&pools, taskFunction, (void *)&num[i]);}// 主線程等待線程池中的線程全部結束后再繼續for(int i = 0; i < THREADS_NUM; ++i) {ret = pthread_join(pools.threads[i], NULL);if(ret != 0) {fprintf(stderr, "pthread_join error: %s\n", strerror(ret));exit(1);}}// 所有任務都執行完,線程池也退出printf("\nAll %d tasks have been finished.\n", TASK_NUM);// 銷毀互斥信號量ret = sem_destroy(&pools.sem_mutex);if (ret == -1) {perror("sem_destroy sem_mutex");} }首先進行宏定義:
#define THREADS_NUM 10 // 線程池中的線程個數 #define TASK_QUEUE_MAX_SIZE 12 // 任務的等待隊列的最大長度,等待隊列中的最大任務個數為長度減一 #define TASK_NUM 100 // 要執行的任務總數然后定義使用到的數據結構:
任務:
// 線程池中每個線程執行的任務的結構體 typedef struct {void *(*function)(void *); // 執行函數void *arg; // 參數 } Task;任務隊列和線程池:
// 任務循環隊列的數據結構 typedef struct {Task tasks[TASK_QUEUE_MAX_SIZE]; // 任務隊列數組int front; // 隊首下標int rear; // 隊尾下標 } TaskQueue;// 線程池數據結構 typedef struct {pthread_t threads[THREADS_NUM]; // 線程數組TaskQueue taskQueue; // 任務隊列int taskSum; // 剩余任務總數,結束程序用sem_t sem_mutex; // 互斥信號量 } Threadpools;線程池初始化函數:
// 初始化線程池 void initThreadpools(Threadpools *pools) {int ret;// 任務隊列的隊首和隊尾的坐標都為0pools->taskQueue.front = 0;pools->taskQueue.rear = 0;// 線程池中剩余的任務總數設置為總任務數pools->taskSum = TASK_NUM;// 初始化互斥信號量為1ret = sem_init(&pools->sem_mutex, 1, 1);if (ret == -1) {perror("sem_init-mutex");exit(1);}// 創建線程池中的線程for(int i = 0; i < THREADS_NUM; ++i) {ret = pthread_create(&pools->threads[i], NULL, executeTask, (void *)pools);if(ret != 0) {fprintf(stderr, "pthread_create error: %s\n", strerror(ret));exit(1);}} }創建線程池中的線程時,可以看到每個線程執行的函數都為executeTask()任務執行函數。
對應設計圖中的初始化線程池部分:
接著實現函數部分:
線程執行函數:
// 線程池中每個線程執行的任務 static void *executeTask(void *arg) {// 向每個線程傳入的參數是線程池Threadpools *pools = (Threadpools *)arg;while (1) {// 等待互斥信號量大于0,防止臨界沖突,然后將互斥鎖信號量減一,繼續執行sem_wait(&pools->sem_mutex);// 當任務隊列為空時while (pools->taskQueue.front == pools->taskQueue.rear) {// 如果已經沒有剩余任務要處理,那么退出線程if (pools->taskSum == 0) {printf("Thread %ld exits.\n", gettid());sem_post(&pools->sem_mutex);pthread_exit(NULL);}// 否則等待任務隊列中有任務后再取任務進行執行printf("Thread %ld is waiting for a task.\n", gettid());sleep(1); }// 剩余任務總數減一pools->taskSum--;// 獲取任務隊列隊首的任務Task task;int front = pools->taskQueue.front;task.function = pools->taskQueue.tasks[front].function;task.arg = pools->taskQueue.tasks[front].arg;// 循環隊列隊首下標加一pools->taskQueue.front = (front + 1) % TASK_QUEUE_MAX_SIZE;// 將互斥鎖信號量加一,允許其它線程執行sem_post(&pools->sem_mutex);// 執行任務(*(task.function))(task.arg);} }當線程從任務隊列中獲取任務執行時,有可能發生條件競爭,多個線程同時取同一個任務進行執行,所以要在線程執行函數處用信號量避免這種沖突,使線程取任務執行有序進行。
可以看到,每個線程執行完任務后,若還有剩余任務且任務隊列不為空,線程會自動從任務隊列中獲取任務,繼續執行任務,而不用手動為每一個任務指定一個空閑線程進行執行,任務隊列為循環隊列,每次從任務隊列的隊首獲取任務,保證了FIFO。
對應設計圖中的每個線程獲取任務的箭頭部分:
將任務添加到任務隊列函數:
// 向任務隊列中添加任務 void addTask(Threadpools *pools, void *(*function)(void *arg), void *arg) {// 當任務隊列為滿時,等待有任務被取出任務隊列不為滿再加入隊列while ((pools->taskQueue.rear + TASK_QUEUE_MAX_SIZE + 1 - pools->taskQueue.front) % TASK_QUEUE_MAX_SIZE == 0) {printf("Task %d is waiting to be added to the task queue.\n", *(int *)arg);sleep(1);}// 向任務隊列的隊尾加入任務Task task;task.function = function;task.arg = arg;int rear = pools->taskQueue.rear;pools->taskQueue.tasks[rear] = task;// 任務隊列隊尾下標加一pools->taskQueue.rear = (rear + 1) % (TASK_QUEUE_MAX_SIZE); }可以看到,任務隊列為循環隊列,每次向任務隊列的隊尾添加任務,保證了FIFO。
對應設計圖中的將任務添加到任務隊列的箭頭部分:
每個任務執行的函數:
// 任務函數 void *taskFunction(void *arg) {// 獲取每個任務的任務號int *numptr = (int *)arg;int taskId = *numptr;// 打印線程池中的哪個線程正在處理此任務printf("Thread tid = %ld is dealing with task %d\n", gettid(), taskId);// 每個任務休眠1s后繼續執行printf("Task %d is sleeping for 1s.\n", taskId);sleep(1);// 打印任務完成信息和線程被復用printf("\t\t\t\tTask %d is finished and Thread tid = %ld is reused\n", taskId, gettid());return 0; }對應設計圖中的每個任務執行的內容部分:
主函數中:
int main() {int ret;// 創建并初始化線程池Threadpools pools;initThreadpools(&pools);// 傳入參數數組int num[TASK_NUM];for(int i = 0; i < TASK_NUM; ++i) {num[i] = i + 1;}// 向任務隊列中連續添加任務for(int i = 0; i < TASK_NUM; ++i) {addTask(&pools, taskFunction, (void *)&num[i]);}// 主線程等待線程池中的線程全部結束后再繼續for(int i = 0; i < THREADS_NUM; ++i) {ret = pthread_join(pools.threads[i], NULL);if(ret != 0) {fprintf(stderr, "pthread_join error: %s\n", strerror(ret));exit(1);}}// 所有任務都執行完,線程池也退出printf("\nAll %d tasks have been finished.\n", TASK_NUM);// 銷毀互斥信號量ret = sem_destroy(&pools.sem_mutex);if (ret == -1) {perror("sem_destroy sem_mutex");} }主函數中,先創建線程池,此時線程處在等待狀態,然后再添加任務,線程池中的線程執行完所有的任務后,再退出程序。
執行命令:
gcc threadpools.c -pthread ./a.out分析:
可以看到,一開始當任務隊列中還沒有任務時,線程池中的線程會等待任務隊列中有任務后再取出任務接著執行。
可以看到,每個線程按照FIFO從任務隊列中取出任務進行執行,每個任務會休眠1s,如果任務隊列已滿,新的任務會等待任務隊列有任務被取出后再加入任務隊列。
可以看到,任務執行完成之后,線程池中的線程會被復用,同一個tid的線程會自動從任務隊列中獲取任務,可以執行不同的任務。
可以看到,當所有的任務都被執行完后,線程池中所有線程退出,回到主線程之后繼續,程序正常退出。
測試用例:
在宏定義處,改變線程池中的線程個數,任務隊列的最大長度和要執行的認為總數,可以進行測試程序:
測試用例1:
線程個數為10,任務隊列最大長度為12(最大任務個數為11),任務總數為50:
#define THREADS_NUM 10 // 線程池中的線程個數 #define TASK_QUEUE_MAX_SIZE 12 // 任務的等待隊列的最大長度,等待隊列中的最大任務個數為長度減一 #define TASK_NUM 50 // 要執行的任務總數執行截圖:
任務總數稍大于線程個數和任務隊列長度時,可以看到,線程池可以正常運行。
測試用例2:
線程個數為10,任務隊列最大長度為12(最大任務個數為11),任務總數為5:
#define THREADS_NUM 10 // 線程池中的線程個數 #define TASK_QUEUE_MAX_SIZE 12 // 任務的等待隊列的最大長度,等待隊列中的最大任務個數為長度減一 #define TASK_NUM 5 // 要執行的任務總數執行截圖:
任務總數小于線程個數和任務隊列長度時,可以看到,線程池可以正常運行。
測試用例3:
線程個數為10,任務隊列最大長度為12(最大任務個數為11),任務總數為10000:
#define THREADS_NUM 10 // 線程池中的線程個數 #define TASK_QUEUE_MAX_SIZE 12 // 任務的等待隊列的最大長度,等待隊列中的最大任務個數為長度減一 #define TASK_NUM 10000 // 要執行的任務總數執行截圖:
當任務總數遠遠多于線程個數時,線程池可以正常運行。
測試用例4:
線程個數為500,任務隊列最大長度為500(最大任務個數為499),任務總數為10000:
#define THREADS_NUM 500 // 線程池中的線程個數 #define TASK_QUEUE_MAX_SIZE 500 // 任務的等待隊列的最大長度,等待隊列中的最大任務個數為長度減一 #define TASK_NUM 10000 // 要執行的任務總數執行截圖:
可以看到,當線程個數較多時,線程池可以正常運行,由于使用了信號量,所以多個線程在獲取任務時不會發生條件競爭,導致沖突
使用之前沒有用信號量的程序進行運行相同參數的程序時,可以看到,由于發生條件競爭,出現了無限阻塞現象,線程之間獲取任務時有沖突。
測試用例5:
線程個數為3000,任務隊列最大長度為4000(最大任務個數為3999),任務總數為500000:
#define THREADS_NUM 3000 // 線程池中的線程個數 #define TASK_QUEUE_MAX_SIZE 4000 // 任務的等待隊列的最大長度,等待隊列中的最大任務個數為長度減一 #define TASK_NUM 500000 // 要執行的任務總數執行截圖:
進行多線程高并發測試,可以看到,線程池可以正常運行。
總結
以上是生活随笔為你收集整理的操作系统实验报告15:进程同步与互斥线程池的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 操作系统实验报告14:Peterson
- 下一篇: 操作系统实验报告16:CPU 调度