缓冲技术之二:缓冲池BufferPool的简单实现
生活随笔
收集整理的這篇文章主要介紹了
缓冲技术之二:缓冲池BufferPool的简单实现
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
在文章緩沖技術中提到無論是單緩沖、雙緩沖或循環緩沖,均僅是進程專屬緩沖配備,而一旦考慮到操作系統的分時并行特性,任一時刻只有一個進程的緩沖體系在工作之中,而其他進程的緩沖體系并不在工作(要么是遷移到swap cache外設磁盤上要么是占據物理內存,顯著減少可用物理內存容量),所以采用公用緩沖池架構為多個并發進程提供緩沖服務是當前主流的手段(操作系統、數據庫)。
由于完整地提供操作系統級別的緩沖池涉及到較多的內核知識,復雜度較高,出于演示說明的目的,所以本文通過一個為單個進程中的多線程提供緩沖服務的程序demo進行分析。
0. 可擴展數組實現的隊列模板
對于緩沖池而言,其主要由3種不同類型的緩沖區構成:空閑緩沖區、裝滿輸入數據的緩沖區和裝滿輸出數據的緩沖區,分別通過3個隊列進行管理:空緩沖區隊列emq, 裝滿輸入數據的緩沖區隊列inq, 裝滿輸出數據的緩沖區隊列outq。故而合理的隊列實現是構成緩沖池管理各buffer塊的基礎,這里通過可擴展容量的數組實現隊列模板。
queue.h
#pragma once #include <cassert> #include <cstring>namespace _tool {template <typename T>class queue{protected:int mFront,mEnd,mCapacity; //分別定義該隊列的頭(第一個有效數據Buffer的位置),尾(下一個待插入的空閑位置),mCapacity(當前隊列的最大容量)T *mpData; //指向Buffer塊連續數組的數組指針/*擴展當前buffer塊數組的容量2倍*/virtual void double_resize(void){T *tmp = new T[this->mCapacity*2];int size = this->size();for (int i=0; i<size; i++){tmp[i] = this->mpData[(this->mFront+i)%this->mCapacity];}delete[] mpData;this->mpData = tmp;this->mCapacity*=2;this->mEnd = size;this->mFront = 0;}public:/*默認初始化函數*/queue(void){const int INIT_SIZE = 10;this->mCapacity = INIT_SIZE;this->mFront = this->mEnd = 0;this->mpData = new T[this->mCapacity];}/*“引用傳遞”初始化函數*/queue(const queue &from){this->mCapacity = from.mCapacity;this->mFront = from.mFront;this->mEnd = from.mEnd;this->mpData = new T[this->mCapacity];for (int i=0; i<this->mCapacity; i++) {this->mpData[i] = from.mpData[i];//是否是深拷貝依舊取決于T數據類型的具體實現} }virtual ~queue(void){if (this->mpData){delete[] this->mpData;}this->mpData = 0;}/*往隊列中壓入新數據*/virtual void push_back(T value){if (this->size() >= this->mCapacity-1){this->double_resize();}this->mpData[this->mEnd++] = value;this->mEnd %= this->mCapacity;}/*取出隊列的首部元素*/virtual const T pop_front(void){assert(!this->empty()); //輸出斷言,如果隊列是空,則彈出斷言,如果非空,則彈出隊列首部T tmp = this->mpData[this->mFront++];this->mFront %= this->mCapacity;return tmp;}/*獲取隊列的首部元素,但不彈出*/virtual const T &get_front(void) const{assert(!this->empty());return this->mpData[this->mFront];}/*獲取隊列中當有效數據規模size*/virtual int size(void) const{return (this->mEnd+this->mCapacity-this->mFront) % this->mCapacity;}virtual bool empty(void) const{return this->size()==0;}const queue &operator =(const queue &from) //引用傳遞{if (this->mpData){delete[] this->mpData;}this->mpData = 0;this->mCapacity = from.mCapacity;this->mFront = from.mFront;this->mEnd = from.mEnd;this->mpData = new T[this->mCapacity];for (int i=0; i<this->mCapacity; i++){this->mpData[i] = from.mpData[i]; //此處可以看出隊列的深拷貝還淺拷貝是由子元素自己實現的操作符函數的實現決定的}return *this;}};/*fixed_queue是在queue基礎上繼承而來的子類,fixed_queue子類并不支持queue的動態可擴展*/template <typename T>class fixed_queue : public queue<T>{public:fixed_queue(int size = 10){this->mCapacity = size+1;this->mFront = this->mEnd = 0;this->mpData = new T[this->mCapacity];}/*引用傳遞初始化函數*/fixed_queue(const fixed_queue &from){this->mCapacity = from.mCapacity;this->mFront = from.mFront;this->mEnd = from.mEnd;this->mpData = new T[this->mCapacity];for (int i=0; i<this->mCapacity; i++){this->mpData[i] = from.mpData[i];}}virtual ~fixed_queue(void){if (this->mpData){delete[] this->mpData;}this->mpData = 0;}virtual void push_back(T value){assert(!this->full());this->mpData[this->mEnd++] = value;this->mEnd %= this->mCapacity;}virtual bool full(void){return (this->size() >= this->mCapacity-1);}const fixed_queue &operator =(const fixed_queue &right){if (this->mpData){delete[] this->mpData;}this->mpData = 0;this->mCapacity = right.mCapacity;this->mFront = right.mFront;this->mEnd = right.mEnd;this->mpData = new T[this->mCapacity];for (int i=0; i<this->mCapacity; i++){this->mpData[i] = right.mpData[i];}return *this;}/*如果fixed_queue實在容量不夠,那么必須顯式指明新的size參數*/void resize(int size){if (this->mpData){delete[] this->mpData;}this->mpData = 0;this->mCapacity = size+1;this->mFront = this->mEnd = 0;this->mpData = new T[this->mCapacity];}/*修改從父類queue中繼承的double_resize()動態擴展函數*/protected:void double_resize(void){assert(!"Don't call me!!");}}; }1. BufferPool對外提供的API和具體實現
緩沖池的主要構成是三種隊列:空閑緩沖塊隊列EM、裝滿輸入數據的緩沖塊隊列IN、裝滿輸出數據待處理的緩沖塊隊列OUT
緩沖池的工作方式主要是4種:收容輸入、提取輸入、收容輸出、提取輸出;
BufferPool.h #pragma once #include "queue.h" #include <windows.h>class IOBuffer { public: /*通過枚舉數據結構事先聲明三種不同的隊列,分別是空閑緩沖塊隊列、輸入數據緩沖塊隊列、輸出數據緩沖塊隊列*/enum eBuffType{BT_EM = 0,BT_IN = 1,BT_OUT = 2,};IOBuffer(int buffer_pool_capacity, int each_buffer_size);~IOBuffer(void);//4個對外的函數int accept_in(void);//為了方便實現,只返回一個緩沖區,需要fill_in裝入輸入緩沖隊列,和fill_in一起構成收容輸入int accept_out(void);//為了方便實現,只返回一個緩沖區,需要fill_out裝入輸出緩沖隊列,和fill_out一起構成收容輸出void fill_in(int number);//將緩沖塊buffer裝載輸入數據后放入輸入緩沖區隊列void fill_out(int number);//將緩沖區buffer裝載要輸出的數據然后放入輸出緩沖區隊列//2個提取裝滿數據的緩沖塊的函數void distill_in(void);//對應提取輸入void distill_out(void);//對應提取輸出_tool::fixed_queue<char> &thebuffer(int number); //返回指定index的buffer的引用private:IOBuffer(const IOBuffer &from);int get_buff(int type);//已處理互斥問題void put_buf(int type, int work_buf);//已處理互斥問題/*為了簡化操作,直接在當前進程空間的堆中申明一塊buffer_pool_capacity * each_buffer_size大小的空間作為緩沖池的物理空間使用,并用mpBuffers指針指向該空間的首地址*/_tool::fixed_queue<char> *mpBuffers;//3個隊列,之所以模板類采用int具象化,是因為和后續的設計配合在一起,我們是通過數組批量劃分緩沖空間的,//故而每個緩沖塊都對應緩沖空間數組中的具體索引Index,出于簡化管理,之所以管理各緩沖塊的Index即可_tool::queue<int> mqEM; _tool::queue<int> mqIN;_tool::queue<int> mqOUT;HANDLE mhEM;//隊列容量的信號量HANDLE mhIN;//隊列容量的信號量HANDLE mhOUT;//隊列容量的信號量HANDLE mhoEM;//隊列訪問權的信號量HANDLE mhoIN;//隊列訪問權的信號量HANDLE mhoOUT;//隊列訪問權的信號量int mBufferID;static int mstaTotalBuffers; };
BufferPool.cpp
#include "BufferPool.h" #include <sstream> #include <iostream>using namespace _tool;extern HANDLE OUTPUT_MUTEX; int IOBuffer::mstaTotalBuffers = 0;IOBuffer::IOBuffer(int buffer_pool_capacity, int each_buffer_size) {/*初始化緩沖池,緩沖池中buffer緩沖塊數目總共有buffer_pool_capacity參數指定,每個緩沖塊buffer含有each_buffer_size個字節*/this->mpBuffers = new fixed_queue<char>[buffer_pool_capacity];for (int i=0; i<buffer_pool_capacity; i++){this->mpBuffers[i].resize(each_buffer_size);}//記錄下該緩沖池的編號this->mBufferID = ++IOBuffer::mstaTotalBuffers;std::stringstream ss;//設置string對象,用于后續提供內核的唯一性命名ss << "BufferPool NO." << IOBuffer::mstaTotalBuffers;/*設置以下3個互斥量,3各信號量,每個隊列對應一個Mutex和semaphore,其中Mutex對應隊列的具體操作權限,比如彈出隊列首,或壓入新元素,顯然每個時刻只能有一個線程獲取具體的操作權限。而semaphore對應具體隊列中的當前可用緩沖塊數目,故而應該先獲取semaphore,然后再或獲取操作權限,雙層內核加鎖保證最后的線程互斥*/this->mhEM = CreateSemaphore(NULL, buffer_capacity, buffer_capacity, (ss.str() + "EM").c_str());this->mhIN = CreateSemaphore(NULL, 0, buffer_capacity, (ss.str() + "IN").c_str());this->mhOUT = CreateSemaphore(NULL, 0, buffer_capacity, (ss.str() + "OUT").c_str());this->mhoEM = CreateMutex(NULL, false, (ss.str() + "oEM").c_str());this->mhoIN = CreateMutex(NULL, false, (ss.str() + "oIN").c_str());this->mhoOUT = CreateMutex(NULL, false, (ss.str() + "oOUT").c_str());//初始化空閑緩沖區隊列,將當前所有的空閑緩沖塊的索引按照順序依次加入到空閑隊列中,這也是for (int i=0; i<buffer_capacity; i++){this->mqEM.push_back(i);} }/*引用傳遞的初始化函數設計為無操作,你也可以自己實現,不過在這里不是很有意義*/ IOBuffer::IOBuffer(const IOBuffer &from) { }IOBuffer::~IOBuffer(void) {//刪除緩沖區delete[] this->mpBuffers;//清除信號量CloseHandle(this->mhEM);CloseHandle(this->mhIN);CloseHandle(this->mhOUT);CloseHandle(this->mhoEM);CloseHandle(this->mhoIN);CloseHandle(this->mhoOUT); }/*返回指定index的緩沖塊buffer的引用*/ fixed_queue<char> &IOBuffer::thebuffer(int number) {return this->mpBuffers[number]; }int IOBuffer::get_buff(int type) {queue<int> *pq; //臨時隊列指針,指向將要操作的緩沖塊的首地址HANDLE handle=NULL,ohandle=NULL;//臨時內核對象,用來獲取對應隊列的mutex和semaphore//根據類型選擇不同的隊列和控制句柄switch (type){case BT_EM:pq = &this->mqEM;handle = this->mhEM; //獲取空閑隊列的準入權限ohandle = this->mhoEM;//獲取空閑隊列的操作權限,break;case BT_IN:pq = &this->mqIN;handle = this->mhIN;ohandle = this->mhoIN;break;case BT_OUT:pq = &this->mqOUT;handle = this->mhOUT;ohandle = this->mhoOUT;break;default:assert(!"unknown type");break;}//申請一個空間,等待信號量WaitForSingleObject(handle, INFINITE); //先獲取具體隊列的準入權限,該操作會將該隊列的可用資源數減1int tmp;//隊列操作要互斥WaitForSingleObject(ohandle, INFINITE); //獲取具體隊列的操作權限tmp = pq->pop_front();//將該隊列的可用緩沖塊的Index彈出ReleaseMutex(ohandle);//釋放具體隊列的操作權限return tmp; }void IOBuffer::put_buf(int type, int work_buf) {queue<int> *pq;HANDLE handle=NULL,ohandle=NULL;//根據類型選擇不同的隊列和控制句柄switch (type){case BT_EM:pq = &this->mqEM;handle = this->mhEM;ohandle = this->mhoEM;break;case BT_IN:pq = &this->mqIN;handle = this->mhIN;ohandle = this->mhoIN;break;case BT_OUT:pq = &this->mqOUT;handle = this->mhOUT;ohandle = this->mhoOUT;break;default:assert(!"unknown type");break;}//釋放一個空間,釋放信號量WaitForSingleObject(ohandle, INFINITE);//獲取該隊列的操作權限pq->push_back(number);//將number指代的緩沖塊加入到該隊列的尾部ReleaseMutex(ohandle);ReleaseSemaphore(handle, 1, NULL);//將該隊列的可用資源計數加1 }/*為inq隊列裝入新的數據,先從空閑緩沖塊隊列中提取一個可用的空閑緩沖塊*/ int IOBuffer::accept_in(void) {return this->get_buff(BT_EM); }int IOBuffer::accept_out(void) {return this->get_buff(BT_EM); } /*將number號裝載號了有效數據的輸入緩沖塊加入到inq輸入緩沖塊等待隊列中,等待distill_in()函數提取處理*/ void IOBuffer::fill_in(int number) {this->put_buf(BT_IN, number); }void IOBuffer::fill_out(int number) {this->put_buf(BT_OUT, number); }/*提取輸入數據緩沖隊列的函數*/ void IOBuffer::distill_in(void) {//拿到一個已滿的輸入bufferint number = this->get_buff(BT_IN); //從輸入緩沖塊隊列中提取首部緩沖塊,get_buff()函數已經設計好了互斥機制WaitForSingleObject(OUTPUT_MUTEX, INFINITE); //獲取stdout標準輸出的輸出權限std::cout << "*****flushing buffer " << number << " to input*****" << std::endl;std::cout << "##INPUT> ";//依次將number號緩沖塊中存儲的字節序列數據依次輸出到stdoutwhile (!this->mpBuffers[number].empty()){std::cout << this->mpBuffers[number].pop_front();}std::cout << std::endl;ReleaseMutex(OUTPUT_MUTEX);//釋放掉bufferthis->put_buf(BT_EM, number); //將number號緩沖塊加入到空閑隊列中,put_buff()函數已經設計號了互斥機制 }/*提取輸出數據緩沖隊列的函數*/ void IOBuffer::distill_out(void) {//拿到一個已滿的輸出bufferint number = this->get_buff(BT_OUT);WaitForSingleObject(OUTPUT_MUTEX, INFINITE);std::cout << "flushing buffer " << number << " to output" << std::endl;std::cout << "##OUTPUT";while (!this->mpBuffers[number].empty()){std::cout << this->mpBuffers[number].pop_front();}std::cout << std::endl;ReleaseMutex(OUTPUT_MUTEX);//釋放掉bufferthis->put_buf(BT_EM, number); }2. BufferPool測試程序
test_main.cpp #include "queue.h" #include <iostream> #include "IOBuffer.h" #include <windows.h>HANDLE OUTPUT_MUTEX;//使用標準輸出設備stdout的互斥鎖,任一時刻只有一個線程可以輸出,防止把輸出搞亂 IOBuffer afxBuff(10, 10);//全局的一個緩沖池,一共有10個buffer,每個buffer空間為10 using namespace _tool;//使用定義在_tool命名空間中的queue,不然回合std中的queue沖突DWORD WINAPI distill_in(void*);//提取輸入,出于顯示目的,只設計一個提取輸入線程 DWORD WINAPI distill_out(void*);//提取輸出,出于顯示目的,只設計一個提取輸入線程 DWORD WINAPI accept_in(void *para);//收容輸入 DWORD WINAPI accept_out(void *para);//收容輸出int main(void) {OUTPUT_MUTEX = CreateMutex(NULL, false, "OUTPUT");//先創建一個MUTEX控制輸出,FALSE意味著該mutex并不為創建的宿主線程占用,一開始便處于激活狀態,可介紹任何線程的聲明占用HANDLE hfuncs[22];//創建22個線程,1各提取輸入線程,1個提取輸出線程,收容輸入和收容輸出線程各10個hfuncs[0] = CreateThread(NULL, 0, distill_in, NULL, 0, NULL);//創建一個提取輸入進程hfuncs[1] = CreateThread(NULL, 0, distill_out, NULL, 0, NULL);//創建一個提取輸出進程//創建10個收容輸入進程for (int i=0; i<10; i++){char c[11]="abcdefghij";hfuncs[2+i] = CreateThread(NULL, 0, accept_in, c+i, 0, NULL);}//創建10個收容輸出進程,為了區別收容輸入,輸出數據全部是用大寫for (int i=0; i<10; i++){char c[11]="ABCDEFGHIJ";hfuncs[12+i] = CreateThread(NULL, 0, accept_out, c+i, 0, NULL);}WaitForMultipleObjects(22, hfuncs, true, 10000);//10s自動結束阻塞等待,否則該程序將一直輸出return 0; }DWORD WINAPI distill_in(void*)//不斷地檢測輸入緩沖區隊列是否非空,有則輸入 {while (1){Sleep(700);afxBuff.distill_in();} }DWORD WINAPI distill_out(void*)//不斷地檢測輸出緩沖區隊列是否非空,有則輸出 {while (1){Sleep(800);afxBuff.distill_out();} }DWORD WINAPI accept_in(void *para)//不斷檢測是否有緩沖區可用,有則輸入 {char c = *((char*)para); //提取輸入的單個字符while (1){Sleep(900);int number = afxBuff.accept_in();WaitForSingleObject(OUTPUT_MUTEX,INFINITE);std::cout << "filling input buffer " << number << " with " << c << std::endl;ReleaseMutex(OUTPUT_MUTEX);//將該緩沖塊用該字符填滿while (!afxBuff.thebuffer(number).full()){afxBuff.thebuffer(number).push_back(c);}WaitForSingleObject(OUTPUT_MUTEX,INFINITE);std::cout << "put buffer " << number << " into input buffer queue" << std::endl;ReleaseMutex(OUTPUT_MUTEX);afxBuff.fill_in(number); //將裝滿的輸入緩沖塊裝入in輸入隊列中} }DWORD WINAPI accept_out(void *para)//不斷檢測是否有緩沖區可用,有則輸出 {char c = *((char*)para);while (1){Sleep(600);int number = afxBuff.accept_out();WaitForSingleObject(OUTPUT_MUTEX,INFINITE);std::cout << "filling output buffer " << number << " with " << c << std::endl;ReleaseMutex(OUTPUT_MUTEX);while (!afxBuff.thebuffer(number).full()){afxBuff.thebuffer(number).push_back(c);}WaitForSingleObject(OUTPUT_MUTEX,INFINITE);std::cout << "put buffer " << number << " into output buffer queue" << std::endl;ReleaseMutex(OUTPUT_MUTEX);afxBuff.fill_out(number);} }
在Windows下VC編譯器運行該程序,得到結果如下
這里的緩沖池技術實現的較為簡單,實際的緩沖池實現涉及到諸多的內核知識,諸如系統中斷、內存管理等,本文只是出于演示的目的,力求通過分析該簡單的程序講解緩沖池技術實現的重點。
總結
以上是生活随笔為你收集整理的缓冲技术之二:缓冲池BufferPool的简单实现的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Zynq7000 USB2.0协议解析及
- 下一篇: 学习LLC谐振变换电路的工作原理