操作系统实验报告13:线程池简单实现
操作系統實驗報告13
實驗內容
- 實驗內容:設計實現一個線程池 (Thread Pool)
- 使用 Pthread API
- FIFO
- 先不考慮互斥問題
- 編譯、運行、測試用例
實驗環境
- 架構:Intel x86_64 (虛擬機)
- 操作系統:Ubuntu 20.04
- 匯編器:gas (GNU Assembler) in AT&T mode
- 編譯器:gcc
技術日志
實驗內容原理
- 線程池
- 解決問題:
- 數量上沒有限制的線程可能耗盡系統資源,如CPU時間或內存。
- 解決方案:
- 在進程啟動時創建多個線程,并將它們放入線程池中,它們進入阻塞狀態等待工作。
- 當服務器收到一個請求時,會從這個池中喚醒一個可用的線程,并將服務請求傳遞給這個線程。
- 一旦線程完成其服務,它就會返回到池并等待更多的工作。如果池中不包含可用線程,服務器將等待一個線程空閑。
- 例子:
- IA-32中每個進程的最大線程數。
- 這個數字大約是300,3G地址空間中每個線程的默認堆棧大小為10M。
- 可以創建少于255個線程的池。
- IA-32中每個進程的最大線程數。
- 線程池的好處
- 使用現有線程為請求提供服務通常比等待創建新線程要快一些。
- 線程池限制任意時刻時存在的線程數。對于不能支持大量并發線程的系統,這一點尤為重要。
- 將要執行的任務與創建任務的機制分離,允許我們使用不同的策略來運行任務。
- 線程池的大小
- 池中的線程數可以根據系統中cpu的數量、物理內存的數量和預期的并發客戶機請求的數量等因素進行啟發式設置。
- 更為復雜的線程池架構(如蘋果的Grand Central Dispatch)可以根據使用模式動態調整池中的線程數。
- 解決問題:
設計報告
線程池設計圖
代碼設計
測試代碼:
//threadpools.c文件 #include <stdio.h> #include <stdlib.h> #include <string.h> #include <sched.h> #include <pthread.h> #include <sys/stat.h> #include <sys/types.h> #include <sys/wait.h> #include <sys/ipc.h> #include <sys/time.h> #include <sys/msg.h> #include <sys/syscall.h> #include <fcntl.h> #include <unistd.h>#define gettid() syscall(__NR_gettid) /* wrap the system call syscall(__NR_gettid), __NR_gettid = 224 */ #define gettidv2() syscall(SYS_gettid) /* a traditional wrapper */#define THREADS_NUM 10 // 線程池中的線程個數 #define TASK_QUEUE_MAX_SIZE 12 // 任務的等待隊列的最大長度,等待隊列中的最大任務個數為長度減一 #define TASK_NUM 50 // 要執行的任務總數// 線程池中每個線程執行的任務的結構體 typedef struct {void *(*function)(void *); // 執行函數void *arg; // 參數 } Task;// 任務循環隊列的數據結構 typedef struct {Task tasks[TASK_QUEUE_MAX_SIZE]; // 任務隊列數組int front; // 隊首下標int rear; // 隊尾下標 } TaskQueue;// 線程池數據結構 typedef struct { pthread_t threads[THREADS_NUM]; // 線程數組 TaskQueue taskQueue; // 任務隊列 int taskSum; // 剩余任務總數,結束程序用 } Threadpools;// 線程池中每個線程執行的任務 static void *executeTask(void *arg) {// 向每個線程傳入的參數是線程池Threadpools *pools = (Threadpools *)arg;while (1) {// 當任務隊列為空時while (pools->taskQueue.front == pools->taskQueue.rear) {// 如果已經沒有剩余任務要處理,那么退出線程if (pools->taskSum == 0) {printf("Thread %ld exits.\n", gettid());pthread_exit(NULL);}// 否則等待任務隊列中有任務后再取任務進行執行printf("Thread %ld is waiting for a task.\n", gettid());sleep(1); }// 剩余任務總數減一pools->taskSum--;// 獲取任務隊列隊首的任務Task task;int front = pools->taskQueue.front;task.function = pools->taskQueue.tasks[front].function;task.arg = pools->taskQueue.tasks[front].arg;// 循環隊列隊首下標加一pools->taskQueue.front = (front + 1) % TASK_QUEUE_MAX_SIZE;// 執行任務(*(task.function))(task.arg);} }// 初始化線程池 void initThreadpools(Threadpools *pools) {int ret;// 任務隊列的隊首和隊尾的坐標都為0pools->taskQueue.front = 0;pools->taskQueue.rear = 0;// 線程池中剩余的任務總數設置為總任務數pools->taskSum = TASK_NUM;// 創建線程池中的線程for(int i = 0; i < THREADS_NUM; ++i) {ret = pthread_create(&pools->threads[i], NULL, executeTask, (void *)pools);if(ret != 0) {fprintf(stderr, "pthread_create error: %s\n", strerror(ret));exit(1);}} }// 向任務隊列中添加任務 void addTask(Threadpools *pools, void *(*function)(void *arg), void *arg) {// 當任務隊列為滿時,等待有任務被取出任務隊列不為滿再加入隊列while ((pools->taskQueue.rear + TASK_QUEUE_MAX_SIZE + 1 - pools->taskQueue.front) % TASK_QUEUE_MAX_SIZE == 0) {printf("Task %d is waiting to be added to the task queue.\n", *(int *)arg);sleep(1);}// 向任務隊列的隊尾加入任務Task task;task.function = function;task.arg = arg;int rear = pools->taskQueue.rear;pools->taskQueue.tasks[rear] = task;// 任務隊列隊尾下標加一pools->taskQueue.rear = (rear + 1) % (TASK_QUEUE_MAX_SIZE); }// 任務函數 void *taskFunction(void *arg) {// 獲取每個任務的任務號int *numptr = (int *)arg;int taskId = *numptr;// 打印線程池中的哪個線程正在處理此任務printf("Thread tid = %ld is dealing with task %d\n", gettid(), taskId);// 每個任務休眠1s后繼續執行printf("Task %d is sleeping for 1s.\n", taskId);sleep(1);// 打印任務完成信息和線程被復用printf("\t\t\t\tTask %d is finished and Thread tid = %ld is reused\n", taskId, gettid());return 0; }int main() {int ret;// 創建并初始化線程池Threadpools pools;initThreadpools(&pools);// 休眠1s測試線程池中的線程在任務隊列為空時是否會等待sleep(1);// 傳入參數數組int num[TASK_NUM];for(int i = 0; i < TASK_NUM; ++i) {num[i] = i + 1;}// 向任務隊列中連續添加任務for(int i = 0; i < TASK_NUM; ++i) {addTask(&pools, taskFunction, (void *)&num[i]);}// 主線程等待線程池中的線程全部結束后再繼續for(int i = 0; i < THREADS_NUM; ++i) {ret = pthread_join(pools.threads[i], NULL);if(ret != 0) {fprintf(stderr, "pthread_join error: %s\n", strerror(ret));exit(1);}}// 所有任務都執行完,線程池也退出printf("\nAll %d tasks have been finished.\n", TASK_NUM); }首先進行宏定義:
#define THREADS_NUM 10 // 線程池中的線程個數 #define TASK_QUEUE_MAX_SIZE 12 // 任務的等待隊列的最大長度,等待隊列中的最大任務個數為長度減一 #define TASK_NUM 50 // 要執行的任務總數為了方便測試,這里線程個數和任務隊列長度設置的較小,要執行的任務總數相對線程個數較多。
然后定義使用到的數據結構:
任務:
// 線程池中每個線程執行的任務的結構體 typedef struct {void *(*function)(void *); // 執行函數void *arg; // 參數 } Task;任務隊列和線程池:
// 任務循環隊列的數據結構 typedef struct {Task tasks[TASK_QUEUE_MAX_SIZE]; // 任務隊列數組int front; // 隊首下標int rear; // 隊尾下標 } TaskQueue;// 線程池數據結構 typedef struct { pthread_t threads[THREADS_NUM]; // 線程數組 TaskQueue taskQueue; // 任務隊列 int taskSum; // 剩余任務總數,結束程序用 } Threadpools;線程池初始化函數:
// 初始化線程池 void initThreadpools(Threadpools *pools) {int ret;// 任務隊列的隊首和隊尾的坐標都為0pools->taskQueue.front = 0;pools->taskQueue.rear = 0;// 線程池中剩余的任務總數設置為總任務數pools->taskSum = TASK_NUM;// 創建線程池中的線程for(int i = 0; i < THREADS_NUM; ++i) {ret = pthread_create(&pools->threads[i], NULL, executeTask, (void *)pools);if(ret != 0) {fprintf(stderr, "pthread_create error: %s\n", strerror(ret));exit(1);}} }創建線程池中的線程時,可以看到每個線程執行的函數都為executeTask()任務執行函數。
對應設計圖中的初始化線程池部分:
接著實現函數部分:
線程執行函數:
// 線程池中每個線程執行的任務 static void *executeTask(void *arg) {// 向每個線程傳入的參數是線程池Threadpools *pools = (Threadpools *)arg;while (1) {// 當任務隊列為空時while (pools->taskQueue.front == pools->taskQueue.rear) {// 如果已經沒有剩余任務要處理,那么退出線程if (pools->taskSum == 0) {printf("Thread %ld exits.\n", gettid());pthread_exit(NULL);}// 否則等待任務隊列中有任務后再取任務進行執行printf("Thread %ld is waiting for a task.\n", gettid());sleep(1); }// 剩余任務總數減一pools->taskSum--;// 獲取任務隊列隊首的任務Task task;int front = pools->taskQueue.front;task.function = pools->taskQueue.tasks[front].function;task.arg = pools->taskQueue.tasks[front].arg;// 循環隊列隊首下標加一pools->taskQueue.front = (front + 1) % TASK_QUEUE_MAX_SIZE;// 執行任務(*(task.function))(task.arg);} }可以看到,每個線程執行完任務后,若還有剩余任務且任務隊列不為空,線程會自動從任務隊列中獲取任務,繼續執行任務,而不用手動為每一個任務指定一個空閑線程進行執行,任務隊列為循環隊列,每次從任務隊列的隊首獲取任務,保證了FIFO。
對應設計圖中的每個線程獲取任務的箭頭部分:
將任務添加到任務隊列函數:
// 向任務隊列中添加任務 void addTask(Threadpools *pools, void *(*function)(void *arg), void *arg) {// 當任務隊列為滿時,等待有任務被取出任務隊列不為滿再加入隊列while ((pools->taskQueue.rear + TASK_QUEUE_MAX_SIZE + 1 - pools->taskQueue.front) % TASK_QUEUE_MAX_SIZE == 0) {printf("Task %d is waiting to be added to the task queue.\n", *(int *)arg);sleep(1);}// 向任務隊列的隊尾加入任務Task task;task.function = function;task.arg = arg;int rear = pools->taskQueue.rear;pools->taskQueue.tasks[rear] = task;// 任務隊列隊尾下標加一pools->taskQueue.rear = (rear + 1) % (TASK_QUEUE_MAX_SIZE); }可以看到,任務隊列為循環隊列,每次向任務隊列的隊尾添加任務,保證了FIFO。
對應設計圖中的將任務添加到任務隊列的箭頭部分:
每個任務執行的函數:
// 任務函數 void *taskFunction(void *arg) {// 獲取每個任務的任務號int *numptr = (int *)arg;int taskId = *numptr;// 打印線程池中的哪個線程正在處理此任務printf("Thread tid = %ld is dealing with task %d\n", gettid(), taskId);// 每個任務休眠1s后繼續執行printf("Task %d is sleeping for 1s.\n", taskId);sleep(1);// 打印任務完成信息和線程被復用printf("\t\t\t\tTask %d is finished and Thread tid = %ld is reused\n", taskId, gettid());return 0; }對應設計圖中的每個任務執行的內容部分:
主函數中:
int main() {// 創建并初始化線程池Threadpools pools;initThreadpools(&pools);// 休眠1s測試線程池中的線程在任務隊列為空時是否會等待sleep(1);// 傳入參數數組int num[TASK_NUM];for(int i = 0; i < TASK_NUM; ++i) {num[i] = i + 1;}// 向任務隊列中連續添加任務for(int i = 0; i < TASK_NUM; ++i) {addTask(&pools, taskFunction, (void *)&num[i]);}// 主線程等待線程池中的線程全部結束后再繼續for(int i = 0; i < THREADS_NUM; ++i) {pthread_join(pools.threads[i], NULL);if(ret != 0) {fprintf(stderr, "pthread_join error: %s\n", strerror(ret));exit(1);}}// 所有任務都執行完,線程池也退出printf("\nAll %d tasks have been finished.\n", TASK_NUM); }主函數中,先創建線程池,此時線程處在等待狀態,然后再添加任務,線程池中的線程執行完所有的任務后,再退出程序。
執行命令:
gcc threadpools.c -pthread ./a.out分析:
可以看到,一開始當任務隊列中還沒有任務時,線程池中的線程會等待任務隊列中有任務后再取出任務接著執行。
可以看到,每個線程按照FIFO從任務隊列中取出任務進行執行,每個任務會休眠1s,如果任務隊列已滿,新的任務會等待任務隊列有任務被取出后再加入任務隊列。
可以看到,任務執行完成之后,線程池中的線程會被復用,同一個tid的線程會自動從任務隊列中獲取任務,可以執行不同的任務。
可以看到,當所有的任務都被執行完后,線程池中所有線程退出,回到主線程之后繼續,程序正常退出。
測試用例:
在宏定義處,改變線程池中的線程個數,任務隊列的最大長度和要執行的認為總數,可以進行測試程序:
測試用例1:
線程個數為10,任務隊列最大長度為12(最大任務個數為11),任務總數為50:
#define THREADS_NUM 10 // 線程池中的線程個數 #define TASK_QUEUE_MAX_SIZE 12 // 任務的等待隊列的最大長度,等待隊列中的最大任務個數為長度減一 #define TASK_NUM 50 // 要執行的任務總數執行截圖:
任務總數稍大于線程個數和任務隊列長度時,可以看到,線程池可以正常運行。
測試用例2:
線程個數為10,任務隊列最大長度為12(最大任務個數為11),任務總數為5:
#define THREADS_NUM 10 // 線程池中的線程個數 #define TASK_QUEUE_MAX_SIZE 12 // 任務的等待隊列的最大長度,等待隊列中的最大任務個數為長度減一 #define TASK_NUM 5 // 要執行的任務總數執行截圖:
任務總數小于線程個數和任務隊列長度時,可以看到,線程池可以正常運行。
測試用例3:
線程個數為10,任務隊列最大長度為12(最大任務個數為11),任務總數為1000:
#define THREADS_NUM 10 // 線程池中的線程個數 #define TASK_QUEUE_MAX_SIZE 12 // 任務的等待隊列的最大長度,等待隊列中的最大任務個數為長度減一 #define TASK_NUM 1000 // 要執行的任務總數執行截圖:
任務總數遠遠大于線程個數和任務隊列長度時,可以看到,線程池可以正常運行。
測試用例4:
線程個數為500,任務隊列最大長度為550(最大任務個數為549),任務總數為1000:
#define THREADS_NUM 500 // 線程池中的線程個數 #define TASK_QUEUE_MAX_SIZE 550 // 任務的等待隊列的最大長度,等待隊列中的最大任務個數為長度減一 #define TASK_NUM 1000 // 要執行的任務總數執行截圖:
相比之前一個測試樣例,這個樣例的線程個數較大,線程池可以正常運行,同時因為同時運行的線程較多,所以運行速度相比之前一個樣例快了很多。
總結
以上是生活随笔為你收集整理的操作系统实验报告13:线程池简单实现的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 操作系统实验报告12:线程2
- 下一篇: 操作系统实验报告14:Peterson