C++11 并发指南九(综合运用: C++11 多线程下生产者消费者模型详解)
前面八章介紹了 C++11 并發編程的基礎(抱歉哈,第五章-第八章還在草稿中),本文將綜合運用 C++11 中的新的基礎設施(主要是多線程、鎖、條件變量)來闡述一個經典問題——生產者消費者模型,并給出完整的解決方案。
生產者消費者問題是多線程并發中一個非常經典的問題,相信學過操作系統課程的同學都清楚這個問題的根源。本文將就四種情況分析并介紹生產者和消費者問題,它們分別是:單生產者-單消費者模型,單生產者-多消費者模型,多生產者-單消費者模型,多生產者-多消費者模型,我會給出四種情況下的 C++11 并發解決方案,如果文中出現了錯誤或者你對代碼有異議,歡迎交流 ;-)。
單生產者-單消費者模型
顧名思義,單生產者-單消費者模型中只有一個生產者和一個消費者,生產者不停地往產品庫中放入產品,消費者則從產品庫中取走產品,產品庫容積有限制,只能容納一定數目的產品,如果生產者生產產品的速度過快,則需要等待消費者取走產品之后,產品庫不為空才能繼續往產品庫中放置新的產品,相反,如果消費者取走產品的速度過快,則可能面臨產品庫中沒有產品可使用的情況,此時需要等待生產者放入一個產品后,消費者才能繼續工作。C++11實現單生產者單消費者模型的代碼如下:
#include <unistd.h>#include <cstdlib> #include <condition_variable> #include <iostream> #include <mutex> #include <thread>static const int kItemRepositorySize = 10; // Item buffer size. static const int kItemsToProduce = 1000; // How many items we plan to produce.struct ItemRepository {int item_buffer[kItemRepositorySize]; // 產品緩沖區, 配合 read_position 和 write_position 模型環形隊列.size_t read_position; // 消費者讀取產品位置.size_t write_position; // 生產者寫入產品位置.std::mutex mtx; // 互斥量,保護產品緩沖區std::condition_variable repo_not_full; // 條件變量, 指示產品緩沖區不為滿.std::condition_variable repo_not_empty; // 條件變量, 指示產品緩沖區不為空. } gItemRepository; // 產品庫全局變量, 生產者和消費者操作該變量. typedef struct ItemRepository ItemRepository;void ProduceItem(ItemRepository *ir, int item) {std::unique_lock<std::mutex> lock(ir->mtx);while(((ir->write_position + 1) % kItemRepositorySize)== ir->read_position) { // item buffer is full, just wait here.std::cout << "Producer is waiting for an empty slot...\n";(ir->repo_not_full).wait(lock); // 生產者等待"產品庫緩沖區不為滿"這一條件發生. }(ir->item_buffer)[ir->write_position] = item; // 寫入產品.(ir->write_position)++; // 寫入位置后移.if (ir->write_position == kItemRepositorySize) // 寫入位置若是在隊列最后則重新設置為初始位置.ir->write_position = 0;(ir->repo_not_empty).notify_all(); // 通知消費者產品庫不為空.lock.unlock(); // 解鎖. }int ConsumeItem(ItemRepository *ir) {int data;std::unique_lock<std::mutex> lock(ir->mtx);// item buffer is empty, just wait here.while(ir->write_position == ir->read_position) {std::cout << "Consumer is waiting for items...\n";(ir->repo_not_empty).wait(lock); // 消費者等待"產品庫緩沖區不為空"這一條件發生. }data = (ir->item_buffer)[ir->read_position]; // 讀取某一產品(ir->read_position)++; // 讀取位置后移if (ir->read_position >= kItemRepositorySize) // 讀取位置若移到最后,則重新置位.ir->read_position = 0;(ir->repo_not_full).notify_all(); // 通知消費者產品庫不為滿.lock.unlock(); // 解鎖.return data; // 返回產品. }void ProducerTask() // 生產者任務 {for (int i = 1; i <= kItemsToProduce; ++i) {// sleep(1);std::cout << "Produce the " << i << "^th item..." << std::endl;ProduceItem(&gItemRepository, i); // 循環生產 kItemsToProduce 個產品. } }void ConsumerTask() // 消費者任務 {static int cnt = 0;while(1) {sleep(1);int item = ConsumeItem(&gItemRepository); // 消費一個產品.std::cout << "Consume the " << item << "^th item" << std::endl;if (++cnt == kItemsToProduce) break; // 如果產品消費個數為 kItemsToProduce, 則退出. } }void InitItemRepository(ItemRepository *ir) {ir->write_position = 0; // 初始化產品寫入位置.ir->read_position = 0; // 初始化產品讀取位置. }int main() {InitItemRepository(&gItemRepository);std::thread producer(ProducerTask); // 創建生產者線程.std::thread consumer(ConsumerTask); // 創建消費之線程. producer.join();consumer.join(); }?單生產者-多消費者模型
與單生產者和單消費者模型不同的是,單生產者-多消費者模型中可以允許多個消費者同時從產品庫中取走產品。所以除了保護產品庫在多個讀寫線程下互斥之外,還需要維護消費者取走產品的計數器,代碼如下:
#include <unistd.h>#include <cstdlib> #include <condition_variable> #include <iostream> #include <mutex> #include <thread>static const int kItemRepositorySize = 4; // Item buffer size. static const int kItemsToProduce = 10; // How many items we plan to produce.struct ItemRepository {int item_buffer[kItemRepositorySize];size_t read_position;size_t write_position;size_t item_counter;std::mutex mtx;std::mutex item_counter_mtx;std::condition_variable repo_not_full;std::condition_variable repo_not_empty; } gItemRepository;typedef struct ItemRepository ItemRepository;void ProduceItem(ItemRepository *ir, int item) {std::unique_lock<std::mutex> lock(ir->mtx);while(((ir->write_position + 1) % kItemRepositorySize)== ir->read_position) { // item buffer is full, just wait here.std::cout << "Producer is waiting for an empty slot...\n";(ir->repo_not_full).wait(lock);}(ir->item_buffer)[ir->write_position] = item;(ir->write_position)++;if (ir->write_position == kItemRepositorySize)ir->write_position = 0;(ir->repo_not_empty).notify_all();lock.unlock(); }int ConsumeItem(ItemRepository *ir) {int data;std::unique_lock<std::mutex> lock(ir->mtx);// item buffer is empty, just wait here.while(ir->write_position == ir->read_position) {std::cout << "Consumer is waiting for items...\n";(ir->repo_not_empty).wait(lock);}data = (ir->item_buffer)[ir->read_position];(ir->read_position)++;if (ir->read_position >= kItemRepositorySize)ir->read_position = 0;(ir->repo_not_full).notify_all();lock.unlock();return data; }void ProducerTask() {for (int i = 1; i <= kItemsToProduce; ++i) {// sleep(1);std::cout << "Producer thread " << std::this_thread::get_id()<< " producing the " << i << "^th item..." << std::endl;ProduceItem(&gItemRepository, i);}std::cout << "Producer thread " << std::this_thread::get_id()<< " is exiting..." << std::endl; }void ConsumerTask() {bool ready_to_exit = false;while(1) {sleep(1);std::unique_lock<std::mutex> lock(gItemRepository.item_counter_mtx);if (gItemRepository.item_counter < kItemsToProduce) {int item = ConsumeItem(&gItemRepository);++(gItemRepository.item_counter);std::cout << "Consumer thread " << std::this_thread::get_id()<< " is consuming the " << item << "^th item" << std::endl;} else ready_to_exit = true;lock.unlock();if (ready_to_exit == true) break;}std::cout << "Consumer thread " << std::this_thread::get_id()<< " is exiting..." << std::endl; }void InitItemRepository(ItemRepository *ir) {ir->write_position = 0;ir->read_position = 0;ir->item_counter = 0; }int main() {InitItemRepository(&gItemRepository);std::thread producer(ProducerTask);std::thread consumer1(ConsumerTask);std::thread consumer2(ConsumerTask);std::thread consumer3(ConsumerTask);std::thread consumer4(ConsumerTask);producer.join();consumer1.join();consumer2.join();consumer3.join();consumer4.join(); }?多生產者-單消費者模型
與單生產者和單消費者模型不同的是,多生產者-單消費者模型中可以允許多個生產者同時向產品庫中放入產品。所以除了保護產品庫在多個讀寫線程下互斥之外,還需要維護生產者放入產品的計數器,代碼如下:
#include <unistd.h>#include <cstdlib> #include <condition_variable> #include <iostream> #include <mutex> #include <thread>static const int kItemRepositorySize = 4; // Item buffer size. static const int kItemsToProduce = 10; // How many items we plan to produce.struct ItemRepository {int item_buffer[kItemRepositorySize];size_t read_position;size_t write_position;size_t item_counter;std::mutex mtx;std::mutex item_counter_mtx;std::condition_variable repo_not_full;std::condition_variable repo_not_empty; } gItemRepository;typedef struct ItemRepository ItemRepository;void ProduceItem(ItemRepository *ir, int item) {std::unique_lock<std::mutex> lock(ir->mtx);while(((ir->write_position + 1) % kItemRepositorySize)== ir->read_position) { // item buffer is full, just wait here.std::cout << "Producer is waiting for an empty slot...\n";(ir->repo_not_full).wait(lock);}(ir->item_buffer)[ir->write_position] = item;(ir->write_position)++;if (ir->write_position == kItemRepositorySize)ir->write_position = 0;(ir->repo_not_empty).notify_all();lock.unlock(); }int ConsumeItem(ItemRepository *ir) {int data;std::unique_lock<std::mutex> lock(ir->mtx);// item buffer is empty, just wait here.while(ir->write_position == ir->read_position) {std::cout << "Consumer is waiting for items...\n";(ir->repo_not_empty).wait(lock);}data = (ir->item_buffer)[ir->read_position];(ir->read_position)++;if (ir->read_position >= kItemRepositorySize)ir->read_position = 0;(ir->repo_not_full).notify_all();lock.unlock();return data; }void ProducerTask() {bool ready_to_exit = false;while(1) {sleep(1);std::unique_lock<std::mutex> lock(gItemRepository.item_counter_mtx);if (gItemRepository.item_counter < kItemsToProduce) {++(gItemRepository.item_counter);ProduceItem(&gItemRepository, gItemRepository.item_counter);std::cout << "Producer thread " << std::this_thread::get_id()<< " is producing the " << gItemRepository.item_counter<< "^th item" << std::endl;} else ready_to_exit = true;lock.unlock();if (ready_to_exit == true) break;}std::cout << "Producer thread " << std::this_thread::get_id()<< " is exiting..." << std::endl; }void ConsumerTask() {static int item_consumed = 0;while(1) {sleep(1);++item_consumed;if (item_consumed <= kItemsToProduce) {int item = ConsumeItem(&gItemRepository);std::cout << "Consumer thread " << std::this_thread::get_id()<< " is consuming the " << item << "^th item" << std::endl;} else break;}std::cout << "Consumer thread " << std::this_thread::get_id()<< " is exiting..." << std::endl; }void InitItemRepository(ItemRepository *ir) {ir->write_position = 0;ir->read_position = 0;ir->item_counter = 0; }int main() {InitItemRepository(&gItemRepository);std::thread producer1(ProducerTask);std::thread producer2(ProducerTask);std::thread producer3(ProducerTask);std::thread producer4(ProducerTask);std::thread consumer(ConsumerTask);producer1.join();producer2.join();producer3.join();producer4.join();consumer.join(); }多生產者-多消費者模型
該模型可以說是前面兩種模型的綜合,程序需要維護兩個計數器,分別是生產者已生產產品的數目和消費者已取走產品的數目。另外也需要保護產品庫在多個生產者和多個消費者互斥地訪問。
代碼如下:
#include <unistd.h>#include <cstdlib> #include <condition_variable> #include <iostream> #include <mutex> #include <thread>static const int kItemRepositorySize = 4; // Item buffer size. static const int kItemsToProduce = 10; // How many items we plan to produce.struct ItemRepository {int item_buffer[kItemRepositorySize];size_t read_position;size_t write_position;size_t produced_item_counter;size_t consumed_item_counter;std::mutex mtx;std::mutex produced_item_counter_mtx;std::mutex consumed_item_counter_mtx;std::condition_variable repo_not_full;std::condition_variable repo_not_empty; } gItemRepository;typedef struct ItemRepository ItemRepository;void ProduceItem(ItemRepository *ir, int item) {std::unique_lock<std::mutex> lock(ir->mtx);while(((ir->write_position + 1) % kItemRepositorySize)== ir->read_position) { // item buffer is full, just wait here.std::cout << "Producer is waiting for an empty slot...\n";(ir->repo_not_full).wait(lock);}(ir->item_buffer)[ir->write_position] = item;(ir->write_position)++;if (ir->write_position == kItemRepositorySize)ir->write_position = 0;(ir->repo_not_empty).notify_all();lock.unlock(); }int ConsumeItem(ItemRepository *ir) {int data;std::unique_lock<std::mutex> lock(ir->mtx);// item buffer is empty, just wait here.while(ir->write_position == ir->read_position) {std::cout << "Consumer is waiting for items...\n";(ir->repo_not_empty).wait(lock);}data = (ir->item_buffer)[ir->read_position];(ir->read_position)++;if (ir->read_position >= kItemRepositorySize)ir->read_position = 0;(ir->repo_not_full).notify_all();lock.unlock();return data; }void ProducerTask() {bool ready_to_exit = false;while(1) {sleep(1);std::unique_lock<std::mutex> lock(gItemRepository.produced_item_counter_mtx);if (gItemRepository.produced_item_counter < kItemsToProduce) {++(gItemRepository.produced_item_counter);ProduceItem(&gItemRepository, gItemRepository.produced_item_counter);std::cout << "Producer thread " << std::this_thread::get_id()<< " is producing the " << gItemRepository.produced_item_counter<< "^th item" << std::endl;} else ready_to_exit = true;lock.unlock();if (ready_to_exit == true) break;}std::cout << "Producer thread " << std::this_thread::get_id()<< " is exiting..." << std::endl; }void ConsumerTask() {bool ready_to_exit = false;while(1) {sleep(1);std::unique_lock<std::mutex> lock(gItemRepository.consumed_item_counter_mtx);if (gItemRepository.consumed_item_counter < kItemsToProduce) {int item = ConsumeItem(&gItemRepository);++(gItemRepository.consumed_item_counter);std::cout << "Consumer thread " << std::this_thread::get_id()<< " is consuming the " << item << "^th item" << std::endl;} else ready_to_exit = true;lock.unlock();if (ready_to_exit == true) break;}std::cout << "Consumer thread " << std::this_thread::get_id()<< " is exiting..." << std::endl; }void InitItemRepository(ItemRepository *ir) {ir->write_position = 0;ir->read_position = 0;ir->produced_item_counter = 0;ir->consumed_item_counter = 0; }int main() {InitItemRepository(&gItemRepository);std::thread producer1(ProducerTask);std::thread producer2(ProducerTask);std::thread producer3(ProducerTask);std::thread producer4(ProducerTask);std::thread consumer1(ConsumerTask);std::thread consumer2(ConsumerTask);std::thread consumer3(ConsumerTask);std::thread consumer4(ConsumerTask);producer1.join();producer2.join();producer3.join();producer4.join();consumer1.join();consumer2.join();consumer3.join();consumer4.join(); }?另外,所有例子的代碼(包括前面一些指南的代碼均放在github上),希望對大家學習 C++11 多線程并發有所幫助。
總結
以上是生活随笔為你收集整理的C++11 并发指南九(综合运用: C++11 多线程下生产者消费者模型详解)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: STL之Set
- 下一篇: spring boot 表单的实体提交错