鎖是用來控制多個線程訪問共享資源的方式,一般來說,一個鎖能防止多個線程同時訪問共享資源(但是有些鎖可以允許多個線程并發的訪問共享資源,如讀寫鎖)。在以前,Java程序是靠synchronized來實現鎖功能的,而在Java SE 5之后,并發包中新增了Lock接口(以及相關實現類)用來實現鎖功能,他提供了與synchronized關鍵字類似的同步功能,只是在使用時需要顯式的獲取鎖和釋放鎖,雖然它缺少了synchronized提供的隱式獲取釋放鎖的便捷性,但是卻擁有了鎖獲取和釋放的可操作性、可中斷的獲取鎖以及超時獲取鎖等多種synchronized關鍵字不具備的同步特性。很多鎖都通過實現Lock接口來完成對鎖的操作,比如可重入鎖(ReentrantLock)、前一張講的Redisson分布式鎖等,而Lock接口的實現,基本是都是通過聚合了一個同步器的子類來完成線程訪問控制的,而同步器,就是我們常說的AQS(AbstractQueuedSynchronizer),也是今天要記錄的內容。
/*** 非公平鎖*/static final class NonfairSync extends Sync {private static final long serialVersionUID = 7316153563782823691L;/*** Performs lock. Try immediate barge, backing up to normal* acquire on failure.*/final void lock() {if (compareAndSetState(0, 1))setExclusiveOwnerThread(Thread.currentThread());elseacquire(1);}protected final boolean tryAcquire(int acquires) {// nonfairTryAcquire方法和下面的公平鎖方法除了判斷是否在隊列首位之外沒有不同return nonfairTryAcquire(acquires);}}/*** 公平鎖*/static final class FairSync extends Sync {private static final long serialVersionUID = -3000897897090466540L;final void lock() {acquire(1);}/*** Fair version of tryAcquire. Don't grant access unless* recursive call or no waiters or is first.*/protected final boolean tryAcquire(int acquires) {final Thread current = Thread.currentThread();// 獲取當前同步狀態int c = getState();// 判斷是否符合預期if (c == 0) {// 判斷是否在隊列首位并且CAS設置當前狀態成功if (!hasQueuedPredecessors() &&compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}else if (current == getExclusiveOwnerThread()) {int nextc = c + acquires;if (nextc < 0)throw new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;}}
protected final boolean isHeldExclusively() {// While we must in general read state before owner,// we don't need to do so to check if current thread is ownerreturn getExclusiveOwnerThread() == Thread.currentThread();}
以下代碼為讀寫鎖繼承同步器后重寫的方法:
protected int tryAcquireShared(int arg):共享式獲取同步狀態,返回大于0的值,表示獲取鎖成功,反之獲取鎖失敗。
protected final int tryAcquireShared(int unused) {Thread current = Thread.currentThread();// 獲取當前同步狀態int c = getState();// 如果有線程持有寫鎖,則返回-1if (exclusiveCount(c) != 0 &&getExclusiveOwnerThread() != current)return -1;// 獲取持有鎖的線程數int r = sharedCount(c);// 判斷是否阻塞,判斷線程數量,判斷CAS設置是否成功if (!readerShouldBlock() &&r < MAX_COUNT &&compareAndSetState(c, c + SHARED_UNIT)) {// 當前線程是第一個獲取到鎖的線程if (r == 0) {firstReader = current;firstReaderHoldCount = 1;// 否則就++} else if (firstReader == current) {firstReaderHoldCount++;} else { HoldCounter rh = cachedHoldCounter;if (rh == null || rh.tid != getThreadId(current))cachedHoldCounter = rh = readHolds.get();else if (rh.count == 0)readHolds.set(rh);rh.count++;}return 1;}// 死循環去獲取鎖return fullTryAcquireShared(current);}
public final void acquire(int arg) {// 如果獲取鎖失敗,則加入同步隊列,如果加入同步隊列成功則自旋阻塞喚醒來不斷的嘗試獲取鎖,直到線程被中斷或獲取到鎖if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();}
public final void acquireInterruptibly(int arg) throws InterruptedException {// 線程中斷拋出異常if (Thread.interrupted())throw new InterruptedException();// 如果沒有獲取到鎖if (!tryAcquire(arg))// 不斷自旋嘗試獲取鎖doAcquireInterruptibly(arg);}
boolean tryAcquireNanos(int arg, long nanosTimeout):在acquireInterruptibly()方法的基礎上增加了超時時間,如果在超時時間內獲取到了鎖,則返回true,否則返回false。
public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {// 中斷拋異常if (Thread.interrupted())throw new InterruptedException();// 相應時間內不斷獲取鎖,超時返回falsereturn tryAcquire(arg) ||doAcquireNanos(arg, nanosTimeout);}
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {// 中斷拋出異常if (Thread.interrupted())throw new InterruptedException();// 如果獲取鎖失敗,則不斷自旋嘗試獲取鎖if (tryAcquireShared(arg) < 0)doAcquireSharedInterruptibly(arg);}
boolean tryAcquireSharedNanos(int arg, long nanosTimeout):在acquireSharedInterruptibly方法的基礎上增加了超時時間。
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException {// 中斷拋出異常if (Thread.interrupted())throw new InterruptedException();// 在超時時間內如果獲取鎖失敗,則不斷自旋嘗試獲取鎖return tryAcquireShared(arg) >= 0 ||doAcquireSharedNanos(arg, nanosTimeout);}
public final boolean release(int arg) {// 如果獲取鎖成功if (tryRelease(arg)) {Node h = head;if (h != null && h.waitStatus != 0)// 喚醒第一個節點的線程unparkSuccessor(h);return true;}return false;}
boolean releaseShared(int arg):共享式釋放同步狀態。
public final boolean releaseShared(int arg) {// 如果釋放鎖成功if (tryReleaseShared(arg)) {// 喚醒線程doReleaseShared();return true;}return false;}
Collection getQueuedThreads():獲取等待在同步隊列上的線程集合。
public final Collection<Thread> getQueuedThreads() {ArrayList<Thread> list = new ArrayList<Thread>();for (Node p = tail; p != null; p = p.prev) {Thread t = p.thread;if (t != null)list.add(t);}return list;}