鎖的機制
鎖和人很像,有的人樂觀,總會想到好的一方面,所以只要越努力,就會越幸運;有的人悲觀,總會想到不好的一方面,患得患失,所以經(jīng)常會做不好事。我一直把前一個當作為我前進的動力和方向,快樂充實的過好每一天。
常用的鎖機制也有兩種: 1、樂觀鎖:假設(shè)不會發(fā)生并發(fā)沖突,每次不加鎖而去完成某項操作,只在提交操作時,檢查是否違反數(shù)據(jù)完整性。如果因為沖突失敗就繼續(xù)重試,直到成功為止。而樂觀鎖用到的機制就是CAS。
樂觀鎖大多是基于數(shù)據(jù)版本記錄機制實現(xiàn)。 為數(shù)據(jù)增加一個版本標識,比如在基于數(shù)據(jù)庫表的版本解決方案中,一般是通過微數(shù)據(jù)庫表增加一個“version”字段來實現(xiàn)。讀取數(shù)據(jù)時,將此版本號一同讀出,之后更新時,對此版本號加一。此時,將提交數(shù)據(jù)的版本數(shù)據(jù)與數(shù)據(jù)庫表對應記錄的當前版本信息進行比對,如果提交的數(shù)據(jù)版本號大于數(shù)據(jù)庫表當前版本號,則予以更新,否則認為是過期數(shù)據(jù)。* 樂觀鎖的缺點是不能解決臟讀的問題*。
注意: 在實際生產(chǎn)環(huán)境里邊,如果并發(fā)量不大且不允許臟讀,可以使用悲觀鎖解決并發(fā)問題。但如果系統(tǒng)的并發(fā)非常大的話,悲觀鎖定會帶來非常大的性能問題,所以我們就要選擇樂觀鎖。
2、悲觀鎖:假定會發(fā)生并發(fā)沖突,屏蔽一切可能違反數(shù)據(jù)完整性的操作。悲觀鎖的實現(xiàn),往往依靠底層提供的鎖機制。悲觀鎖會導致其他所有需要鎖的線程掛起,等待持有鎖的線程釋放鎖。如果所有線程都在等待其他線程釋放鎖,而不能主動釋放鎖資源,那么也會造成死鎖問題。
鎖的機制存在以下問題: (1)在多線程競爭下,加鎖、釋放鎖會導致比較多的上下文切換和調(diào)度延時,引起性能問題。 (2)一個線程持有鎖會導致其他所有需要次所的線程掛起。 (3)如果一個優(yōu)先級搞得線程等待一個優(yōu)先級低的線程釋放鎖會導致優(yōu)先級倒置,引起性能風險。
CAS操作
Compare And Set(或Compare And Swap),CAS是解決多線程并行情況下使用鎖造成性能損耗的一種機制,CAS操作包含三個操作數(shù)——內(nèi)存位置(V)、預期原值(A)、新值(B) 。如果內(nèi)存位置的值與預期原值相同,那么處理器會自動將內(nèi)存的值更新為新值。否則,處理器不做任何操作。無論哪種情況,處理器都會在CAS指令之前返回該位置的值。CAS有效地說明了“我認為位置V應該包含值A(chǔ);如果包含該值,則將B放到這個位置;否則,不要更新該位置,只告訴我這個位置現(xiàn)在的值即可。” 現(xiàn)在幾乎所有的CPU指令都支持CAS的原子操作,X86下對應的是CMPXCHG 匯編指令。有了這個操作,我們就可以用其來實現(xiàn)各種無鎖的數(shù)據(jù)結(jié)構(gòu)。
這個操作可以用以下的例子來描述: 意思是,看一看內(nèi)存*reg里的值是不是oldval,如果是的話,則對其賦值newval,并返回true,表示更新成功,如果返回false,則表示修改失敗。
bool compare_and_swap(
int *reg,
int oldval,
int newval)
{
int reg_val = *reg;
if (reg_val == oldval){*reg = newval;
return true ;}
return false ;
}
CAS操作無鎖隊列的實現(xiàn)(參考)
Q:CAS的實現(xiàn) A:gcc提供了兩個函數(shù)
bool __sync_bool_compare_and_swap (type *ptr, type oldval, type newval,
... );
type __sync_val_compare_and_swap (type *ptr, type oldval, type newval,
... );
這兩個函數(shù)提供原子的比較和交換,如果*ptr == oldval,就將newval寫入*ptr, 第一個函數(shù)在相等并寫入的情況下返回true,這個函數(shù)比第二個好在,返回bool值可以知道有沒有更新成功。 第二個函數(shù)在返回操作之前的值。
第二個函數(shù)用c語言描述:
type __sync_val_compare_and_swap (type *ptr, type oldval, type newval,
... )
{type cur = *ptr;
if (cur == oldval){*ptr = newval;}
return cur;// 返回操作之前的值
}
type只能是1,2,4或8字節(jié)長度的int類型,否則會發(fā)生下面的錯誤 Q: 操作系統(tǒng)級別是如何實現(xiàn)的 A: X86中有一個CMPXCHG的匯編指令
Q: CAS指令有什么缺點 A: 1.存在ABA問題因為CAS需要在操作值的時候檢查下值有沒有發(fā)生變化,如果沒有發(fā)生變化則更新,但是如果一個值原來是A,變成了B,又變成了A,那么使用CAS進行檢查時會發(fā)現(xiàn)它的值沒有發(fā)生變化,但是實際上卻變化了。ABA問題的解決思路就是使用版本號。在變量前面追加上版本號,每次變量更新的時候把版本號加一,那么A-B-A 就會變成1A-2B-3A。 2.循環(huán)時間長開銷大自旋CAS如果長時間不成功,會給CPU帶來非常大的執(zhí)行開銷。 3.只能保證一個共享變量的原子操作對一個共享變量執(zhí)行操作時,我們可以使用循環(huán)CAS的方式來保證原子操作,但是對多個共享變量操作時,循環(huán)CAS就無法保證操作的原子性,這個時候就可以用鎖,或者有一個取巧的辦法,就是把多個共享變量合并成一個共享變量來操作。比如有兩個共享變量i=2,j=a,合并一下ij=2a,然后用CAS來操作ij。
gcc從4.1.2提供了__sync_*系列的built-in函數(shù),用于提供加減和邏輯運算的原子操作。
其聲明如下:
原子操作的后置加加type __sync_fetch_and_add (type *ptr, type value, …) 原子操作的前置加加type __sync_add_and_fetch (type *ptr, type value, …) 其他類比 type __sync_fetch_and_sub (type *ptr, type value, …) type __sync_fetch_and_or (type *ptr, type value, …) type __sync_fetch_and_and (type *ptr, type value, …) type __sync_fetch_and_xor (type *ptr, type value, …) type __sync_fetch_and_nand (type *ptr, type value, …)
type __sync_sub_and_fetch (type *ptr, type value, …) type __sync_or_and_fetch (type *ptr, type value, …) type __sync_and_and_fetch (type *ptr, type value, …) type __sync_xor_and_fetch (type *ptr, type value, …) type __sync_nand_and_fetch (type *ptr, type value, …) 這兩組函數(shù)的區(qū)別在于第一組返回更新前的值,第二組返回更新后的值。
關(guān)于CAS函數(shù),參考: http://blog.csdn.net/youfuchen/article/details/23179799
在多線程環(huán)境下有以下情景:
比如對同一鏈隊列進行入隊操作時一個線程正在將新的隊列節(jié)點 掛載到 隊尾節(jié)點的next上,可是還沒來的及更新隊尾節(jié)點 但同一時刻另一個線程也在進行入隊操作將新的隊列節(jié)點也掛在到了沒更新的隊尾節(jié)點那么先掛載的節(jié)點就丟失了。
為了解決多線程環(huán)境下的這些問題,我們第一時間肯定想到了加上互斥鎖控制同一時刻只能有一個線程可以對隊列進行寫操作,但是加鎖的操作太消耗系統(tǒng)資源了很繁重 。
因為對臨界區(qū)的操作只有一步 就是對隊列的尾節(jié)點進行更新,只要讓這一步進行的是原子操作就可以了,所以使用到了CAS操作。
為了有一個對比 寫了一份thread_queue.c是用鎖對臨界區(qū)進行控制訪問的 另一份是lock_free_queue.c是用CAS確保對臨界區(qū)的操作是原子操作
“
queue .h”
#ifndef QUEUE_H_
#define QUEUE_H_ #include <stdio.h>
#include <stdlib.h>
typedef struct QNode
{
int data;
struct QNode *next;
}QNode, *QueuePtr;
typedef struct LinkQueue
{QueuePtr front;QueuePtr rear;
}LinkQueue;
void init_Queue(LinkQueue *q);
void push_Queue(LinkQueue *q,
int e);
int pop_Queue(LinkQueue *q,
int *e);
int is_Empty(LinkQueue *q);
void show(LinkQueue *q);
#endif /* QUEUE_H_ */ “
queue .c”
#include "queue.h"
void init_Queue(LinkQueue *q)
{q->front = q->rear = (QNode *)
malloc (
sizeof (QNode));q->front->next = NULL;
}
void push_Queue(LinkQueue *q,
int e)
{QueuePtr newNode = (QueuePtr)
malloc (
sizeof (QNode));newNode->data = e;newNode->next = NULL;q->rear->next = newNode;q->rear = newNode;
}
void cas_push(LinkQueue *q,
int e)
{QueuePtr newNode = (QueuePtr)
malloc (
sizeof (QNode));newNode->data = e;newNode->next = NULL;QueuePtr tmp;
do {tmp = q->rear;}
while (!__sync_bool_compare_and_swap((
long *)(&(tmp->next)), NULL, (
long )newNode));q->rear = newNode;
}
int is_Empty(LinkQueue *q)
{
if (q->front->next == NULL){
return (
1 );}
return (
0 );
}
int pop_Queue(LinkQueue *q,
int *e)
{
if (is_Empty(q)){
return (
0 );}QueuePtr tmp;tmp = q->front->next;q->front->next = tmp->next;*e = tmp->data;
free (tmp);
return (
1 );
}
int cas_pop(LinkQueue *q,
int *e)
{QueuePtr tmp;
do {
if (is_Empty(q)){
return (
0 );}tmp = q->front->next;}
while (!__sync_bool_compare_and_swap((
long *)(&(q->front->next)), (
long )tmp, (
long )tmp->next));*e = tmp->data;
free (tmp);
return (
1 );
}
void show(LinkQueue *q)
{
printf (
"void show(LinkQueue *q)\n" );QueuePtr tmp = q->front->next;
while (tmp){
printf (
"%d " , tmp->data);tmp = tmp->next;}
printf (
"\n" );
}“thread_queue.c”
#include "queue.h"
#include <pthread.h>
#include <unistd.h>
#include <semaphore.h>
#include <assert.h> #define THREAD_NUMBER 4
pthread_mutex_t mutex;
void *thread_push(
void *arg);
void *thread_pop(
void *arg);
int main()
{LinkQueue que;init_Queue(&que);
int i;pthread_t threadArr[THREAD_NUMBER];
for (i =
0 ; i < THREAD_NUMBER; ++i){pthread_create(&threadArr[i], NULL, thread_push, (
void *)&que);}
for (i =
0 ; i < THREAD_NUMBER; ++i){pthread_join(threadArr[i], NULL);}show(&que);
for (i =
0 ; i < THREAD_NUMBER; ++i){pthread_create(&threadArr[i], NULL, thread_pop, (
void *)&que);}
for (i =
0 ; i < THREAD_NUMBER; ++i){pthread_join(threadArr[i], NULL);}
exit (EXIT_SUCCESS);
}
void *thread_push(
void *arg)
{
printf (
"start push\n" );LinkQueue * quePtr = (LinkQueue *)arg;
int i;
for (i =
0 ; i <
20 ; ++i){pthread_mutex_lock(&mutex);push_Queue(quePtr, i);pthread_mutex_unlock(&mutex);}
printf (
"finish push\n" );pthread_exit(NULL);
}
void *thread_pop(
void *arg)
{
printf (
"start pop\n" );LinkQueue * quePtr = (LinkQueue *)arg;
int tmp;
int res;
while (
1 ){pthread_mutex_lock(&mutex);res = pop_Queue(quePtr, &tmp);pthread_mutex_unlock(&mutex);
if (!res){
break ;}
printf (
"%d " , tmp);}
printf (
"finish pop\n" );pthread_exit(NULL);
}“l(fā)ock_free_queue.c”
#include "queue.h"
#include <pthread.h>
#include <unistd.h>
#include <assert.h> #define THREAD_NUMBER 4 void *thread_push(
void *arg);
void *thread_pop(
void *arg);
int main()
{LinkQueue que;init_Queue(&que);
int i;pthread_t threadArr[THREAD_NUMBER];
for (i =
0 ; i < THREAD_NUMBER; ++i){pthread_create(&threadArr[i], NULL, thread_push, (
void *)&que);}
for (i =
0 ; i < THREAD_NUMBER; ++i){pthread_join(threadArr[i], NULL);}show(&que);
for (i =
0 ; i < THREAD_NUMBER; ++i){pthread_create(&threadArr[i], NULL, thread_pop, (
void *)&que);}
for (i =
0 ; i < THREAD_NUMBER; ++i){pthread_join(threadArr[i], NULL);}
exit (EXIT_SUCCESS);
}
void *thread_push(
void *arg)
{
printf (
"start push\n" );LinkQueue * quePtr = (LinkQueue *)arg;
int i;
for (i =
0 ; i <
20 ; ++i){cas_push(quePtr, i);}
printf (
"finish push\n" );pthread_exit(NULL);
}
void *thread_pop(
void *arg)
{
printf (
"start pop\n" );LinkQueue * quePtr = (LinkQueue *)arg;
int tmp;
int res;
while (
1 ){res = cas_pop(quePtr, &tmp);
if (!res){
break ;}
printf (
"%d " , tmp);}
printf (
"finish pop\n" );pthread_exit(NULL);
}
總結(jié)
以上是生活随笔 為你收集整理的锁、CAS操作和无锁队列的实现 的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔 網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔 推薦給好友。