无锁编程实战演练
版權(quán)聲明:本文為博主原創(chuàng)文章。歡迎轉(zhuǎn)載。 https://blog.csdn.net/hzhsan/article/details/25837189
前段時間研究過一陣子無鎖化編程。
剛寫了幾個簡單的程序,來驗證了下自己學(xué)到的一些概念。
測試場景:假設(shè)有一個應(yīng)用:如今有一個全局變量,用來計數(shù),再創(chuàng)建10個線程并發(fā)運行,每個線程中循環(huán)對這個全局變量進(jìn)行++操作(i++)。循環(huán)加2000000次。
所以非常easy知道,這必定會涉及到并發(fā)相互排斥操作。
以下通過三種方式來實現(xiàn)這樣的并發(fā)操作。并對照出其在效率上的不同之處。
這里先貼上代碼。共5個文件:2個用于做時間統(tǒng)計的文件:timer.h timer.cpp。這兩個文件是暫時封裝的,僅僅用來計時。能夠不必細(xì)看。
timer.h
#ifndef TIMER_H
#define TIMER_H
#include <sys/time.h>
class Timer
{
public:
Timer();
// 開始計時時間
void Start();
// 終止計時時間
void Stop();
// 又一次設(shè)定
void Reset();
// 耗時時間
void Cost_time();
private:
struct timeval t1;
struct timeval t2;
bool b1,b2;
};
#endif
timer.cpp
#include "timer.h"
#include <stdio.h>
Timer::Timer()
{
b1 = false;
b2 = false;
}
void Timer::Start()
{
gettimeofday(&t1,NULL);
b1 = true;
b2 = false;
}
void Timer::Stop()
{
if (b1 == true)
{
gettimeofday(&t2,NULL);
b2 = true;
}
}
void Timer::Reset()
{
b1 = false;
b2 = false;
}
void Timer::Cost_time()
{
if (b1 == false)
{
printf("計時出錯,應(yīng)該先運行Start(),然后運行Stop(),再來運行Cost_time()");
return ;
}
else if (b2 == false)
{
printf("計時出錯。應(yīng)該運行完Stop(),再來運行Cost_time()");
return ;
}
else
{
int usec,sec;
bool borrow = false;
if (t2.tv_usec > t1.tv_usec)
{
usec = t2.tv_usec - t1.tv_usec;
}
else
{
borrow = true;
usec = t2.tv_usec+1000000 - t1.tv_usec;
}
if (borrow)
{
sec = t2.tv_sec-1 - t1.tv_sec;
}
else
{
sec = t2.tv_sec - t1.tv_sec;
}
printf("花費時間:%d秒 %d微秒
",sec,usec);
}
}
傳統(tǒng)相互排斥量加鎖方式 lock.cpp
#include <stdio.h> #include <stdlib.h> #include <pthread.h> #include <time.h> #include "timer.h" pthread_mutex_t mutex_lock; static volatile int count = 0; void *test_func(void *arg) { int i = 0; for(i = 0; i < 2000000; i++) { pthread_mutex_lock(&mutex_lock); count++; pthread_mutex_unlock(&mutex_lock); } return NULL; } int main(int argc, const char *argv[]) { Timer timer; // 為了計時,暫時封裝的一個類Timer。
timer.Start(); // 計時開始 pthread_mutex_init(&mutex_lock, NULL); pthread_t thread_ids[10]; int i = 0; for(i = 0; i < sizeof(thread_ids)/sizeof(pthread_t); i++) { pthread_create(&thread_ids[i], NULL, test_func, NULL); } for(i = 0; i < sizeof(thread_ids)/sizeof(pthread_t); i++) { pthread_join(thread_ids[i], NULL); } timer.Stop();// 計時結(jié)束 timer.Cost_time();// 打印花費時間 printf("結(jié)果:count = %d ",count); return 0; }
no lock 不加鎖的形式 nolock.cpp
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>
#include <time.h>
#include "timer.h"
int mutex = 0;
int lock = 0;
int unlock = 1;
static volatile int count = 0;
void *test_func(void *arg)
{
int i = 0;
for(i = 0; i < 2000000; i++)
{
while (!(__sync_bool_compare_and_swap (&mutex,lock, 1) ))usleep(100000);
count++;
__sync_bool_compare_and_swap (&mutex, unlock, 0);
}
return NULL;
}
int main(int argc, const char *argv[])
{
Timer timer;
timer.Start();
pthread_t thread_ids[10];
int i = 0;
for(i = 0; i < sizeof(thread_ids)/sizeof(pthread_t); i++)
{
pthread_create(&thread_ids[i], NULL, test_func, NULL);
}
for(i = 0; i < sizeof(thread_ids)/sizeof(pthread_t); i++)
{
pthread_join(thread_ids[i], NULL);
}
timer.Stop();
timer.Cost_time();
printf("結(jié)果:count = %d
",count);
return 0;
}
原子函數(shù)進(jìn)行統(tǒng)計方式 atomic.cpp
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>
#include <time.h>
#include "timer.h"
static volatile int count = 0;
void *test_func(void *arg)
{
int i = 0;
for(i = 0; i < 2000000; i++)
{
__sync_fetch_and_add(&count, 1);
}
return NULL;
}
int main(int argc, const char *argv[])
{
Timer timer;
timer.Start();
pthread_t thread_ids[10];
int i = 0;
for(i = 0; i < sizeof(thread_ids)/sizeof(pthread_t); i++){
pthread_create(&thread_ids[i], NULL, test_func, NULL);
}
for(i = 0; i < sizeof(thread_ids)/sizeof(pthread_t); i++){
pthread_join(thread_ids[i], NULL);
}
timer.Stop();
timer.Cost_time();
printf("結(jié)果:count = %d
",count);
return 0;
}
#################################################################3
好,代碼粘貼完成。以下進(jìn)入測試環(huán)節(jié):
編譯:
[adapter@ZHEJIANG test3]$ g++ lock.cpp ./timer.cpp -lpthread -o lock ;
[adapter@ZHEJIANG test3]$ g++ nolock.cpp ./timer.cpp -lpthread -o nolock ;
[adapter@ZHEJIANG test3]$ g++ atomic.cpp ./timer.cpp -lpthread -o atomic ;
每個線程循環(huán)加2000000次。
第一組測驗
[adapter@ZHEJIANG test3]$ ./lock
花費時間:3秒 109807微秒
結(jié)果:count = 20000000
[adapter@ZHEJIANG test3]$ ./nolock
花費時間:7秒 595784微秒
結(jié)果:count = 20000000
[adapter@ZHEJIANG test3]$ ./atomic
花費時間:0秒 381022微秒
結(jié)果:count = 20000000
結(jié)論:
能夠看出,原子操作函數(shù)的速度是最快的,其它兩種方式根本就沒法比。
而無鎖操作是在原子操作函數(shù)的基礎(chǔ)上形成的。
為什么無鎖操作的效率會這么低?
假設(shè)效率低的話。那還有什么意義,為什么如今大家都提倡無鎖編程呢?為什么?咱先不
解釋。先用數(shù)據(jù)說話。
第二組測驗:
原子操作代碼不變,加鎖操作代碼不變。修改一下無鎖操作的代碼。
將例如以下代碼更改
while (!(__sync_bool_compare_and_swap (&mutex,lock, 1) ));
更改后:while (!(__sync_bool_compare_and_swap (&mutex,lock, 1) )) usleep(1);
讓他睡一微秒。
為什么要這樣改代碼?這樣啟不是會更慢?你的推測是不無道理的,可是一個不歇息的人干的活未必比有歇息的人干的活多。
[adapter@ZHEJIANG test3]$ ./lock
花費時間:2秒 970773微秒
結(jié)果:count = 20000000
[adapter@ZHEJIANG test3]$ ./nolock
花費時間:0秒 685404微秒
結(jié)果:count = 20000000
[adapter@ZHEJIANG test3]$ ./atomic
花費時間:0秒 380675微秒
結(jié)果:count = 20000000
結(jié)論:
不用明說。大家看到的結(jié)果是不是非常詫異?是不是!有木有!怎么會是這樣。無鎖加上usleep(1),睡一會,反而會變得這么快。
雖和原子操作相比次了一點。但已經(jīng)甩開有鎖同步好幾條街了,無鎖比有鎖快是應(yīng)該的,但為什么睡一會會更快,不睡就比有鎖
還慢那么多呢?怎么回事。
是不是這個測試的時候cpu出現(xiàn)了不穩(wěn)定的事情。
那好,那再測試幾次。
[adapter@ZHEJIANG test3]$ ./nolock
花費時間:0秒 684938微秒
結(jié)果:count = 20000000
[adapter@ZHEJIANG test3]$ ./nolock
花費時間:0秒 686039微秒
結(jié)果:count = 20000000
[adapter@ZHEJIANG test3]$ ./nolock
花費時間:0秒 685928微秒
結(jié)果:count = 20000000
如今總沒話可說了,這是事實。但為什么,我也不會解釋。
非常好奇,為什么越歇息,效率越高。電腦是機(jī)器,它可不是人。
怎么會這樣?
那我就讓它多歇息一會:
while (!(__sync_bool_compare_and_swap (&mutex,lock, 1) ))usleep(10);//之前是1,如今改成10了。
以下就再單獨對照一個nolock無鎖方式。
[adapter@ZHEJIANG test3]$ ./nolock//usleep(1);
花費時間:0秒 686039微秒
結(jié)果:count = 20000000
[adapter@ZHEJIANG test3]$ ./nolock//usleep(10);
花費時間:0秒 680307微秒
結(jié)果:count = 20000000
nolock,結(jié)果usleep(10)竟然比uleep(1)還要快一點。
那么這樣呢:
while (!(__sync_bool_compare_and_swap (&mutex,lock, 1) ))usleep(100);//之前是10,如今改成100了。
[adapter@ZHEJIANG test3]$ ./nolock//usleep(100)
花費時間:0秒 661935微秒
結(jié)果:count = 20000000
還是睡的越久,效率越高。
那我再試一下usleep(1000)
while (!(__sync_bool_compare_and_swap (&mutex,lock, 1) ))usleep(1000);//之前是100,如今改成1000了。
[adapter@ZHEJIANG test3]$ ./nolock// usleep(1000);
花費時間:0秒 652411微秒
結(jié)果:count = 20000000
還是睡的越久,效率越高。
那我再試一下usleep(10000)
while (!(__sync_bool_compare_and_swap (&mutex,lock, 1) ))usleep(10000);//之前是1000,如今改成10000了。
[adapter@ZHEJIANG test3]$ ./nolock
花費時間:0秒 626267微秒
結(jié)果:count = 20000000
還是睡的越久,效率越高。
那我再試一下usleep(100000)
while (!(__sync_bool_compare_and_swap (&mutex,lock, 1) ))usleep(100000);//之前是10000。如今改成100000了,也就是0.1秒。
[adapter@ZHEJIANG test3]$ ./nolock
花費時間:0秒 942445微秒
結(jié)果:count = 20000000
哦,如今開始速度慢了。
運行環(huán)境:
gcc版本號信息:
[adapter@ZHEJIANG test3]$ g++ -v
Using built-in specs.
Target: x86_64-redhat-linux
gcc version 4.4.5 20110214 (Red Hat 4.4.5-6) (GCC)
cpu信息:
[adapter@ZHEJIANG test3]$ cat /proc/cpuinfo | grep name | cut -f2 -d: | uniq -c
4 Intel(R) Core(TM) i5-3470 CPU @ 3.20GHz
通過編程測試及測試得出結(jié)論:
1、假設(shè)是想用全局變量來做統(tǒng)計操作。而又不得不考慮多線程間的相互排斥訪問的話,最好使用編譯器支持的原子操作函數(shù)。
再滿足相互排斥訪問的前提下,編程最簡單,效率最高。
2、lock-free。無鎖編程方式確實能夠比傳統(tǒng)加鎖方式效率高,經(jīng)上面測試能夠發(fā)現(xiàn),能夠快到5倍左右。
所以在高并發(fā)程序中
採用無鎖編程的方式能夠進(jìn)一步提高程序效率。
3、可是,得對無鎖方式有足夠熟悉的了解,不然效率反而會更低。并且easy出錯。
4、沒想明確的疑問:為什么上面的循環(huán)檢測時。加uleep比不加。效率更高。為什么在一定程度上。usleep越久效率越高?
請高手路過的時候,為小弟解答一下。謝謝。
總結(jié)
- 上一篇: Metasploit的基本使用教程
- 下一篇: 脊柱侧弯患者挂什么科