当前位置:网站首页>快進來!花幾分鐘看一下 ReentrantReadWriteLock 的原理!

快進來!花幾分鐘看一下 ReentrantReadWriteLock 的原理!

2020-11-07 16:46:57 itread01

> **前言** > > > 在看完 ReentrantLock 之後,在高併發場景下 ReentrantLock 已經足夠使用,但是因為 ReentrantLock 是獨佔鎖,同時只有一個執行緒可以獲取該鎖,而很多應用場景都是讀多寫少,這時候使用 ReentrantLock 就不太合適了。讀多寫少的場景該如何使用?在 JUC 包下同樣提供了讀寫鎖 ReentrantReadWriteLock 來應對讀多寫少的場景。 > > > 公眾號:『 劉志航 』,記錄工作學習中的技術、開發及原始碼筆記;時不時分享一些生活中的見聞感悟。歡迎大佬來指導! ### 介紹 支援類似 ReentrantLock 語義的 ReadWriteLock 的實現。 具有以下屬性: - **獲取順序** 此類不會將讀取優先或寫入優先強加給鎖訪問的排序。但是,它確實支援可選的*公平* 策略。 支援**公平模式**和**非公平模式**,預設為**非公平模式**。 - **重入** 允許 reader 和 writer 按照 `ReentrantLock` 的樣式重新獲取讀鎖或寫鎖。在寫執行緒釋放持有的所有寫鎖後,reader 才允許重入使用它們。此外,writer 可以獲取讀鎖,但反過來則不成立。 - **鎖降級** 重入還允許從寫鎖降級為讀鎖,通過先獲取寫鎖,然後獲取讀鎖,最後釋放寫鎖的方式降級。但是,從讀鎖升級到寫鎖是**不可能的**。 - **鎖獲取的中斷** 讀鎖和寫鎖都支援鎖獲取期間的中斷。 - **`Condition` 支援** 寫鎖提供了一個 `Condition` 實現,對於寫鎖來說,該實現的方式與 `ReentrantLock.newCondition()` 提供的 `Condition` 實現對 `ReentrantLock` 所做的行為相同。當然,此 `Condition` 只能用於寫鎖。讀鎖不支援 `Condition`。 - **監測** 此類支援一些確定是保持鎖還是爭用鎖的方法。這些方法設計用於監視系統狀態,而不是同步控制。 鎖最多支援 65535 個遞迴寫鎖和 65535 個讀鎖 以上為 *Java Api 官方文件* [1] 的解釋,總結一下內容如下: 1. 支援非公平和公平模式,預設為非公平模式。 2. 支援重入,讀鎖可以重入獲取讀鎖,寫鎖可以重入獲取寫鎖,寫鎖可以獲取讀鎖,讀鎖不可以獲取寫鎖。 3. 鎖可以降級,從寫鎖降級為讀鎖,但是不可能從讀鎖升級到寫鎖。 #### 基本使用 ```java class CachedData { Object data; volatile boolean cacheValid; final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); void processCachedData() { // 讀鎖加鎖 rwl.readLock().lock(); if (!cacheValid) { // 獲取寫鎖之前必須釋放讀鎖 rwl.readLock().unlock(); // 寫鎖加鎖 rwl.writeLock().lock(); try { // 重新檢查狀態,因為另一個執行緒可能 // 在執行操作之前獲取了寫鎖定並更改了狀態 if (!cacheValid) { data = ... cacheValid = true; } // 通過在釋放寫鎖之前獲取讀鎖來降級 rwl.readLock().lock(); } finally { rwl.writeLock().unlock(); // Unlock write, still hold read } } try { use(data); } finally { rwl.readLock().unlock(); } } } ``` 上面只是官方文件提供的一個 demo。 #### 問題疑問 1. 在 ReentrantReadWriteLock 中 state 代表什麼? 2. 執行緒獲取鎖的流程是怎麼樣的? 3. 讀鎖和寫鎖的可重入性是如何實現的? 4. 當前執行緒獲取鎖失敗,被阻塞的後續操作是什麼? 5. 鎖降級是怎麼降級的? ### 原始碼分析 #### 程式碼結構
```java public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable { private static final long serialVersionUID = -6992448646407690164L; /** 提供讀鎖的內部類 */ private final ReentrantReadWriteLock.ReadLock readerLock; /** 提供寫鎖的內部類 */ private final ReentrantReadWriteLock.WriteLock writerLock; /** 執行所有同步機制 */ final Sync sync; } ``` #### state 之前在閱讀 ReentrantLock 原始碼的時候 state 代表了鎖的狀態,0 表示沒有執行緒持有鎖,大於 1 表示已經有執行緒持有鎖及其重入的次數。而在 ReentrantReadWriteLock 是讀寫鎖,那就需要儲存**讀鎖**和**寫鎖**兩種狀態的,那是怎麼樣表示的呢? 在 ReentrantReadWriteLock 中同樣存在一個 Sync 繼承了 AbstractQueuedSynchronizer,也是 FairSync、NonfairSync 的父類。內部定義了 state 的一些操作。 ```java abstract static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 6317671515068378041L; // 移位數 static final int SHARED_SHIFT = 16; // 單位 static final int SHARED_UNIT = (1 << SHARED_SHIFT); // 最大數量 1 << 16 -> 65536 static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1; // 計算獨佔數使用 1 << 16 -> 65536 static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1; // 返回共享保留數 static int sharedCount(int c) { return c >>> SHARED_SHIFT; } // 返回獨佔保留數 static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; } } ``` 在 AQS 中定義 state 為 int 型別,而在 ReentrantReadWriteLock 中,將 state 的 高 16 位和低 16 位拆開表示讀寫鎖。其中高 16 位表示讀鎖,低 16 位表示寫鎖。分別使用 sharedCount 和 exclusiveCount 方法獲取讀鎖和寫鎖的當前狀態。
下面分別從讀鎖和寫鎖的角度來看如何進行加鎖和釋放鎖的? #### ReadLock.lock ```java public static class ReadLock implements Lock, java.io.Serializable { /** * 獲取讀取鎖。 * 如果寫鎖沒有被另一個執行緒持有,則獲取讀鎖並立即返回。 * 如果寫鎖由另一個執行緒持有,則出於執行緒排程目的, * 當前執行緒將被禁用,並處於休眠狀態,直到獲取讀鎖為止。 */ public void lock() { // 呼叫 AQS 獲取共享資源 sync.acquireShared(1); } } ``` ![ReentrantReadWriteLock-AQS-Share-gTrD2e](https://cdn.jsdelivr.net/gh/liuzhihang/oss/pic/article/ReentrantReadWriteLock-AQS-Share-gTrD2e.png) 獲取共享資源,這塊使用的 AQS 的邏輯,其中 tryAcquireShared(arg) 是在 ReentrantReadWriteLock.Sync 中實現的。並且 AQS 中有規定,tryAcquireShared 分為三種返回值: 1. 小於 0: 表示失敗; 2. 等於 0: 表示共享模式獲取資源成功,但後續的節點不能以共享模式獲取成功; 3. 大於 0: 表示共享模式獲取資源成功,後續節點在共享模式獲取也可能會成功,在這種情況下,後續等待執行緒必須檢查可用性。 ```java abstract static class Sync extends AbstractQueuedSynchronizer { protected final int tryAcquireShared(int unused) { Thread current = Thread.currentThread(); // 獲取 state 值 int c = getState(); // 獨佔計數不為 0 且 不是當前執行緒, 說明已經有寫鎖 if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1; // 獲取共享計數(讀鎖計數) int r = sharedCount(c); // 不需要阻塞讀鎖 && 共享計數小於最大值 && state 更新成功 if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) { if (r == 0) { // 當前讀鎖計數為 0 // firstReader是獲得讀鎖的第一個執行緒 // firstReaderHoldCount是firstReader的保持計數 firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { // 讀鎖重入 firstReaderHoldCount++; } else { // 當前快取計數 HoldCounter rh = cachedHoldCounter; // 當前執行緒沒有計數 或者 沒有建立計數器 if (rh == null || rh.tid != getThreadId(current)) // 建立計數,基於 ThreadLocal cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); // 計數累加 rh.count++; } return 1; } // 完整地獲取共享鎖方法,作為tryAcquireShared方法因CAS獲取鎖失敗後的處理。 // 因為前面可能失敗 CAS 失敗, 佇列策略失敗等原因。 return fullTryAcquireShared(current); } } ``` 1. 先獲取 state ,通過 exclusiveCount 方法獲取到寫鎖的計數值,不為 0 且 不是當前執行緒, 說明已經有寫鎖。返回 -1 失敗。 2. 通過 sharedCount 獲取讀鎖計數,判斷是否需要阻塞以及是否超過上限後,使用 CAS 更新 讀鎖計數。 3. 設定或更新 firstReader、firstReaderHoldCount、 cachedHoldCounter。 4. 最後會進行完整的獲取共享鎖方法,作為之前獲取失敗的後續處理方法。 firstReader:firstReader是獲得讀鎖的第一個執行緒; firstReaderHoldCount:firstReaderHoldCount是firstReader的保持計數。即獲得讀鎖的第一個執行緒的重入次數。 cachedHoldCounter:最後一個獲得讀鎖的執行緒獲得讀鎖的重入次數。 ```java final int fullTryAcquireShared(Thread current) { HoldCounter rh = null; // 無限迴圈 for (;;) { int c = getState(); // 是否有寫鎖 if (exclusiveCount(c) != 0) { // 有寫鎖,但是不是當前執行緒,直接返回失敗 if (getExclusiveOwnerThread() != current) return -1; } else if (readerShouldBlock()) { // 需要阻塞 // 沒有寫鎖,確保沒有重新獲取讀鎖 if (firstReader == current) { // assert firstReaderHoldCount > 0; } else { // 當前執行緒的讀鎖計數 ThreadLocal 中 if (rh == null) { rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) { rh = readHolds.get(); // 計數結束,remove 掉 if (rh.count == 0) readHolds.remove(); } } // 為 0 直接失敗 if (rh.count == 0) return -1; } } // 到達上限 丟擲異常 if (sharedCount(c) == MAX_COUNT) throw new Error("Maximum lock count exceeded"); // CAS 設定讀鎖 if (compareAndSetState(c, c + SHARED_UNIT)) { if (sharedCount(c) == 0) { firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { if (rh == null) rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; cachedHoldCounter = rh; // cache for release } return 1; } } } ``` 1. 首先會一直迴圈 2. 有寫鎖,但是不是當前執行緒,直接返回失敗。**但是,有寫鎖,如果是當前執行緒,是會繼續執行的。** 3. 設定或更新 firstReader、firstReaderHoldCount、 cachedHoldCounter。 當存在寫鎖(獨佔鎖)時,方法會返回 -1 失敗,後續會呼叫 AQS 的 doAcquireShared 方法,迴圈獲取資源。doAcquireShared 方法會不斷迴圈,嘗試獲取讀鎖,一旦獲取到讀鎖,當前節點會立即喚醒後續節點,後續節點開始嘗試獲取讀鎖,依次傳播。 ![ReentrantReadWriteLock-1-rl0DjC](https://cdn.jsdelivr.net/gh/liuzhihang/oss/pic/article/ReentrantReadWriteLock-1-rl0DjC.png) #### ReadLock.unlock ```java public static class ReadLock implements Lock, java.io.Serializable { public void unlock() { sync.releaseShared(1); } } ``` 呼叫 AQS 的 releaseShared 釋放共享資源方法。 ![ReadLock-unlock-LE7vUH](https://cdn.jsdelivr.net/gh/liuzhihang/oss/pic/article/ReadLock-unlock-LE7vUH.png) 其中 tryReleaseShared 有 ReadLock 實現。 ```java protected final boolean tryReleaseShared(int unused) { Thread current = Thread.currentThread(); if (firstReader == current) { // 第一個執行緒是當前執行緒 if (firstReaderHoldCount == 1) firstReader = null; else firstReaderHoldCount--; } else { // 第一個執行緒不是當前執行緒,更新自己的 ThreadLocal 裡面的計數 HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); int count = rh.count; if (count <= 1) { readHolds.remove(); if (count <= 0) throw unmatchedUnlockException(); } --rh.count; } // 迴圈 for (;;) { int c = getState(); int nextc = c - SHARED_UNIT; // 使用 CAS 更新 state if (compareAndSetState(c, nextc)) // 但是如果現在讀和寫鎖都已釋放, // 它可能允許等待的寫程式繼續進行。 return nextc == 0; } } ``` 1. 如果是第一個執行緒,直接更新技術,不是則更新自己 ThreadLocal 裡面儲存的計數。 2. 迴圈,使用 CAS 更新 state 的值。 3. 如果 state 更新後的值為 0,說明沒有執行緒持有讀鎖或者寫鎖了。 4. 當 state 為 0,此時會呼叫 AQS 的 doReleaseShared 方法。此時佇列如果有寫鎖,那就會被寫鎖獲取的鎖。 #### WriteLock.lock ```java public static class WriteLock implements Lock, java.io.Serializable { /** * 獲取寫入鎖。 * 如果沒有其他執行緒持有讀鎖或寫鎖,會直接返回,並將寫鎖計數設定為1。 * 如果當前執行緒持有寫鎖,則將寫鎖計數 +1,然後返回。 * 如果鎖正在被其他執行緒持有,則當前執行緒用於執行緒排程目的, * 當前執行緒將被禁用,並處於休眠狀態,直到獲取讀鎖並將寫鎖計數設定為1。 */ public void lock() { sync.acquire(1); } } ``` ![WriteLock.lock-wBuvUA](https://cdn.jsdelivr.net/gh/liuzhihang/oss/pic/article/WriteLock.lock-wBuvUA.png) tryAcquire 方法由 Write 自己實現,方式和 ReentrantLock 類似。 ```java protected final boolean tryAcquire(int acquires) { // 如果讀鎖計數為非零或寫鎖計數為非零,並且所有者是另一個執行緒,則失敗。 // 如果計數飽和,則失敗。只有在count不為零時,才可能發生這種情況。 // 否則,如果該執行緒是可重入獲取或佇列策略允許的話,則有資格進行鎖定。 // 如果是這樣,請更新狀態並設定所有者。 Thread current = Thread.currentThread(); int c = getState(); // 寫鎖計數 int w = exclusiveCount(c); // c != 0 說明有有執行緒獲取鎖了 if (c != 0) { // (Note: if c != 0 and w == 0 then shared count != 0) // 判斷是不是自己,不是自己 返回 false if (w == 0 || current != getExclusiveOwnerThread()) return false; // 判斷有沒有超過上限 if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded"); // 重入 setState(c + acquires); return true; } // 不需要阻塞,或者 CAS 更新 state 失敗 if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) return false; setExclusiveOwnerThread(current); return true; } ``` 1. 獲取 state , 如果 state 不為 0 則判斷是否為當前執行緒重入獲取。 2. state 為 0 ,則當前執行緒 CAS 更新 state,獲取鎖。 3. 更新成功之後綁定當前執行緒。 4. 如果失敗會繼續呼叫 AQS 的 acquireQueued,將當前阻塞放在 AQS 佇列中。AQS 會不斷迴圈,等待上一個鎖釋放後,嘗試獲得鎖。 ![ReentrantReadWriteLock-2-mQAgGL](https://cdn.jsdelivr.net/gh/liuzhihang/oss/pic/article/ReentrantReadWriteLock-2-mQAgGL.png) #### WriteLock.unlock ```java public static class WriteLock implements Lock, java.io.Serializable { // 如果當前執行緒是此鎖的持有者,則保持計數遞減。 // 如果保持現在的計數為零,則解除鎖定。 // 如果當前執行緒不是此鎖的持有者則IllegalMonitorStateException異常。 public void unlock() { sync.release(1); } } ``` ![Write-unlock-bwHAcw](https://cdn.jsdelivr.net/gh/liuzhihang/oss/pic/article/Write-unlock-bwHAcw.png) 同樣這塊程式碼是使用 AQS 的邏輯,tryRelease 部分由 WriteLock 自己實現。 ```java protected final boolean tryRelease(int releases) { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); int nextc = getState() - releases; boolean free = exclusiveCount(nextc) == 0; if (free) setExclusiveOwnerThread(null); setState(nextc); return free; } ``` 1. 如果是當前執行緒重入,扣減重入次數。 2. 扣減後如果為 0,則設定鎖持有執行緒為 null,更新 state 值。AQS 會喚醒後續節點獲取鎖。 ### 總結 #### 問題 **Q:**在 ReentrantReadWriteLock 中 state 代表什麼? **A:**state 代表鎖的狀態。state 為 0 ,沒有執行緒持有鎖,state 的高 16 為代表讀鎖狀態,低 16 為代表寫鎖狀態。通過位運算可以獲取讀寫鎖的實際值。 **Q:**執行緒獲取鎖的流程是怎麼樣的? **A:**可以參考上面的原始碼筆記,以及後面的流程圖。 **Q:**讀鎖和寫鎖的可重入性是如何實現的? **A:**在加鎖的時候,判斷是否為當前執行緒,如果是當前執行緒,則直接累加計數。值得注意的是:讀鎖重入計數使用的 ThreadLocal 線上程中快取計數,而寫鎖則直接用的 state 進行累加(其實和 state 低 16 位進行累加一樣)。 **Q:**當前執行緒獲取鎖失敗,被阻塞的後續操作是什麼? **A:**獲取失敗,會放到 AQS 等待佇列中,在佇列中不斷迴圈,監視前一個節點是否為 head ,是的話,會重新嘗試獲取鎖。 **Q:**鎖降級是怎麼降級的? **A:** ![write-to-read-koAuqm](https://cdn.jsdelivr.net/gh/liuzhihang/oss/pic/article/write-to-read-koAuqm.png) 如圖,在圈出部分 fullTryAcquireShared 程式碼中,可以看出來,在獲取讀鎖的時候,如果當前執行緒持有寫鎖,是可以獲取讀鎖的。這塊就是指鎖降級,比如執行緒 A 獲取到了寫鎖,當執行緒 A 執行完畢時,它需要獲取當前資料,假設不支援鎖降級,就會導致 A 釋放寫鎖,然後再次請求讀鎖。而在這中間是有可能被其他阻塞的執行緒獲取到寫鎖的。從而導致執行緒 A 在一次執行過程中資料不一致。 ### 小結 1. ReentrantReadWriteLock 讀寫鎖,內部實現是 ReadLock 讀鎖 和 WriteLock 寫鎖。讀鎖,允許共享;寫鎖,是獨佔鎖。 2. 讀寫鎖都支援重入,讀鎖的重入次數記錄線上程維護的 ThreadLocal 中,寫鎖維護在 state 上(低 16 位)。 3. 支援鎖降級,從寫鎖降級為讀鎖,防止髒讀。 4. ReadLock 和 WriteLock 都是通過 AQS 來實現的。獲取鎖失敗後會放到 AQS 等待佇列中,後續不斷嘗試獲取鎖。區別在讀鎖只有存在寫鎖的時候才放到等待佇列,而寫鎖是隻要存在非當前執行緒鎖(無論寫鎖還是讀鎖)都會放到等待佇列。!![read-write-different-oI9wB1](https://cdn.jsdelivr.net/gh/liuzhihang/oss/pic/article/read-write-different-oI9wB1.png) 5. 通過原始碼分析,可以得出讀寫鎖適合在**讀多寫少**的場景中使用。 #### 相關資料 [1] Java Api:https://docs.oracle.com/javase/8/docs/api/overview-summ

版权声明
本文为[itread01]所创,转载请带上原文链接,感谢
https://www.itread01.com/content/1604736185.html