搞定ReentrantReadWriteLock 幾道小小數學題就夠了

| 好看請贊,養成習慣

  • 你有一個思想,我有一個思想,我們交換后,一個人就有兩個思想

  • If you can NOT explain it simply, you do NOT understand it well enough

現陸續將Demo代碼和技術文章整理在一起 Github實踐精選 ,方便大家閱讀查看,本文同樣收錄在此,覺得不錯,還請Star

前言

  • 文章 Java AQS隊列同步器以及ReentrantLock的應用 介紹了AQS獨佔式獲取同步狀態的實現,並以 ReentrantLock 為例說明其是如何自定義同步器實現互斥鎖的
  • 文章 Java AQS共享式獲取同步狀態及Semaphore的應用分析 介紹 AQS 共享式獲取同步狀態的實現,並說明了 Semaphore 是如何自定義同步器實現簡單限流作用的

有了以上兩篇文章的鋪墊,來理解本文要介紹的既有獨佔式,又有共享式獲取同步狀態的 ReadWriteLock,就非常輕鬆了

ReadWriteLock

ReadWriteLock 直譯過來為【讀寫鎖】。現實中,讀多寫少的業務場景是非常普遍的,比如應用緩存

一個線程將數據寫入緩存,其他線程可以直接讀取緩存中的數據,提高數據查詢效率

之前提到的互斥鎖都是排他鎖,也就是說同一時刻只允許一個線程進行訪問,當面對可共享讀的業務場景,互斥鎖顯然是比較低效的一種處理方式。為了提高效率,讀寫鎖模型就誕生了

效率提升是一方面,但併發編程更重要的是在保證準確性的前提下提高效率

一個寫線程改變了緩存中的值,其他讀線程一定是可以 “感知” 到的,否則可能導致查詢到的值不準確

所以關於讀寫鎖模型就了下面這 3 條規定:

  1. 允許多個線程同時讀共享變量
  2. 只允許一個線程寫共享變量
  3. 如果寫線程正在執行寫操作,此時則禁止其他讀線程讀共享變量

ReadWriteLock 是一個接口,其內部只有兩個方法:

public interface ReadWriteLock {
    // 返回用於讀的鎖
    Lock readLock();

    // 返回用於寫的鎖
    Lock writeLock();
}

所以要了解整個讀/寫鎖的整個應用過程,需要從它的實現類 ReentrantReadWriteLock 說起

ReentrantReadWriteLock 類結構

直接對比ReentrantReadWriteLock 與 ReentrantLock的類結構

他們又很相似吧,根據類名稱以及類結構,按照咱們前序文章的分析,你也就能看出 ReentrantReadWriteLock 的基本特性:

其中黃顏色標記的的 鎖降級 是看不出來的, 這裏先有個印象,下面會單獨說明

另外,不知道你是否還記得,Java AQS隊列同步器以及ReentrantLock的應用 說過,Lock 和 AQS 同步器是一種組合形式的存在,既然這裡是讀/寫兩種鎖,他們的組合模式也就分成了兩種:

  1. 讀鎖與自定義同步器的聚合
  2. 寫鎖與自定義同步器的聚合
    public ReentrantReadWriteLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
        readerLock = new ReadLock(this);
        writerLock = new WriteLock(this);
    }

這裏只是提醒大家,模式沒有變,不要被讀/寫兩種鎖迷惑

基本示例

說了這麼多,如果你忘了前序知識,整體理解感覺應該是有斷檔的,所以先來看個示例(模擬使用緩存)讓大家對 ReentrantReadWriteLock 有個直觀的使用印象

public class ReentrantReadWriteLockCache {

	// 定義一個非線程安全的 HashMap 用於緩存對象
	static Map<String, Object> map = new HashMap<String, Object>();
	// 創建讀寫鎖對象
	static ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
	// 構建讀鎖
	static Lock rl = readWriteLock.readLock();
	// 構建寫鎖
	static Lock wl = readWriteLock.writeLock();

	public static final Object get(String key) {
		rl.lock();
		try{
			return map.get(key);
		}finally {
			rl.unlock();
		}
	}

	public static final Object put(String key, Object value){
		wl.lock();
		try{
			return map.put(key, value);
		}finally {
			wl.unlock();
		}
	}
}

你瞧,使用就是這麼簡單。但是你知道的,AQS 的核心是鎖的實現,即控制同步狀態 state 的值,ReentrantReadWriteLock 也是應用AQS的 state 來控制同步狀態的,那麼問題來了:

一個 int 類型的 state 怎麼既控制讀的同步狀態,又可以控制寫的同步狀態呢?

顯然需要一點設計了

讀寫狀態設計

如果要在一個 int 類型變量上維護多個狀態,那肯定就需要拆分了。我們知道 int 類型數據佔32位,所以我們就有機會按位切割使用state了。我們將其切割成兩部分:

  1. 高16位表示讀
  2. 低16位表示寫

所以,要想準確的計算讀/寫各自的狀態值,肯定就要應用位運算了,下面代碼是 JDK1.8,ReentrantReadWriteLock 自定義同步器 Sync 的位操作

abstract static class Sync extends AbstractQueuedSynchronizer {
       

        static final int SHARED_SHIFT   = 16;
        static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
        static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
        static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;


        static int sharedCount(int c) { 
          return c >>> SHARED_SHIFT; 
        }

        static int exclusiveCount(int c) { 
          return c & EXCLUSIVE_MASK; 
        }
}

乍一看真是有些複雜的可怕,別慌,咱們通過幾道小小數學題就可以搞定整個位運算過程

整個 ReentrantReadWriteLock 中 讀/寫狀態的計算就是反覆應用這幾道數學題,所以,在閱讀下面內容之前,希望你搞懂這簡單的運算

基礎鋪墊足夠了,我們進入源碼分析吧

源碼分析

寫鎖分析

由於寫鎖是排他的,所以肯定是要重寫 AQS 中 tryAcquire 方法

        protected final boolean tryAcquire(int acquires) {        
            Thread current = Thread.currentThread();
          	// 獲取 state 整體的值
            int c = getState();
            // 獲取寫狀態的值
            int w = exclusiveCount(c);
            if (c != 0) {
                // w=0: 根據推理二,整體狀態不等於零,寫狀態等於零,所以,讀狀態大於0,即存在讀鎖
              	// 或者當前線程不是已獲取寫鎖的線程
              	// 二者之一條件成真,則獲取寫狀態失敗
                if (w == 0 || current != getExclusiveOwnerThread())
                    return false;
                if (w + exclusiveCount(acquires) > MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                // 根據推理一第 1 條,更新寫狀態值
                setState(c + acquires);
                return true;
            }
            if (writerShouldBlock() ||
                !compareAndSetState(c, c + acquires))
                return false;
            setExclusiveOwnerThread(current);
            return true;
        }

上述代碼 第 19 行 writerShouldBlock 也並沒有什麼神秘的,只不過是公平/非公平獲取鎖方式的判斷(是否有前驅節點來判斷)

你瞧,寫鎖獲取方式就是這麼簡單

讀鎖分析

由於讀鎖是共享式的,所以肯定是要重寫 AQS 中 tryAcquireShared 方法

        protected final int tryAcquireShared(int unused) {
            Thread current = Thread.currentThread();
            int c = getState();
          	// 寫狀態不等於0,並且鎖的持有者不是當前線程,根據約定 3,則獲取讀鎖失敗
            if (exclusiveCount(c) != 0 &&
                getExclusiveOwnerThread() != current)
                return -1;
          	// 獲取讀狀態值
            int r = sharedCount(c);
          	// 這個地方有點不一樣,我們單獨說明
            if (!readerShouldBlock() &&
                r < MAX_COUNT &&
                compareAndSetState(c, c + SHARED_UNIT)) {
                if (r == 0) {
                    firstReader = current;
                    firstReaderHoldCount = 1;
                } else if (firstReader == current) {
                    firstReaderHoldCount++;
                } else {
                    HoldCounter rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current))
                        cachedHoldCounter = rh = readHolds.get();
                    else if (rh.count == 0)
                        readHolds.set(rh);
                    rh.count++;
                }
                return 1;
            }
          	// 如果獲取讀鎖失敗則進入自旋獲取
            return fullTryAcquireShared(current);
        }

readerShouldBlockwriterShouldBlock 在公平鎖的實現上都是判斷是否有前驅節點,但是在非公平鎖的實現上,前者是這樣的:

final boolean readerShouldBlock() {
	return apparentlyFirstQueuedIsExclusive();
}

final boolean apparentlyFirstQueuedIsExclusive() {
  Node h, s;
  return (h = head) != null &&
    // 等待隊列頭節點的下一個節點
    (s = h.next)  != null &&
    // 如果是排他式的節點
    !s.isShared()         &&
    s.thread != null;
}

簡單來說,如果請求讀鎖的當前線程發現同步隊列的 head 節點的下一個節點為排他式節點,那麼就說明有一個線程在等待獲取寫鎖(爭搶寫鎖失敗,被放入到同步隊列中),那麼請求讀鎖的線程就要阻塞,畢竟讀多寫少,如果還沒有這點判斷機制,寫鎖可能會發生【飢餓】

上述條件都滿足了,也就會進入 tryAcquireShared 代碼的第 14 行到第 25 行,這段代碼主要是為了記錄線程持有鎖的次數。讀鎖是共享式的,還想記錄每個線程持有讀鎖的次數,就要用到 ThreadLocal 了,因為這不影響同步狀態 state 的值,所以就不分析了, 只把關係放在這吧

到這裏讀鎖的獲取也就結束了,比寫鎖稍稍複雜那麼一丟丟,接下來就說明一下那個可能讓你迷惑的鎖升級/降級問題吧

讀寫鎖的升級與降級

個人理解:讀鎖是可以被多線程共享的,寫鎖是單線程獨佔的,也就是說寫鎖的併發限制比讀鎖高,所以

在真正了解讀寫鎖的升級與降級之前,我們需要完善一下本文開頭 ReentrantReadWriteLock 的例子

	public static final Object get(String key) {
		Object obj = null;
		rl.lock();
		try{
      // 獲取緩存中的值
			obj = map.get(key);
		}finally {
			rl.unlock();
		}
		// 緩存中值不為空,直接返回
		if (obj!= null) {
			return obj;
		}
		
    // 緩存中值為空,則通過寫鎖查詢DB,並將其寫入到緩存中
		wl.lock();
		try{
      // 再次嘗試獲取緩存中的值
			obj = map.get(key);
      // 再次獲取緩存中值還是為空
			if (obj == null) {
        // 查詢DB
				obj = getDataFromDB(key); // 偽代碼:getDataFromDB
        // 將其放入到緩存中
				map.put(key, obj);
			}
		}finally {
			wl.unlock();
		}
		return obj;
	}

有童鞋可能會有疑問

在寫鎖裏面,為什麼代碼第19行還要再次獲取緩存中的值呢?不是多此一舉嗎?

其實這裏再次嘗試獲取緩存中的值是很有必要的,因為可能存在多個線程同時執行 get 方法,並且參數 key 也是相同的,執行到代碼第 16 行 wl.lock() ,比如這樣:

線程 A,B,C 同時執行到臨界區 wl.lock(), 只有線程 A 獲取寫鎖成功,線程B,C只能阻塞,直到線程A 釋放寫鎖。這時,當線程B 或者 C 再次進入臨界區時,線程 A 已經將值更新到緩存中了,所以線程B,C沒必要再查詢一次DB,而是再次嘗試查詢緩存中的值

既然再次獲取緩存很有必要,我能否在讀鎖里直接判斷,如果緩存中沒有值,那就再次獲取寫鎖來查詢DB不就可以了嘛,就像這樣:

	public static final Object getLockUpgrade(String key) {
		Object obj = null;
		rl.lock();
		try{
			obj = map.get(key);
			if (obj == null){
				wl.lock();
				try{
					obj = map.get(key);
					if (obj == null) {
						obj = getDataFromDB(key); // 偽代碼:getDataFromDB
						map.put(key, obj);
					}
				}finally {
					wl.unlock();
				}
			}
		}finally {
			rl.unlock();
		}

		return obj;
	}

這還真是不可以的,因為獲取一個寫入鎖需要先釋放所有的讀取鎖,如果有兩個讀取鎖試圖獲取寫入鎖,且都不釋放讀取鎖時,就會發生死鎖,所以在這裏,鎖的升級是不被允許的

讀寫鎖的升級是不可以的,那麼鎖的降級是可以的嘛?這個是 Oracle 官網關於鎖降級的示例 ,我將代碼粘貼在此處,大家有興趣可以點進去連接看更多內容

 class CachedData {
   Object data;
   volatile boolean cacheValid;
   final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();

   void processCachedData() {
     rwl.readLock().lock();
     if (!cacheValid) {
        // 必須在獲取寫鎖之前釋放讀鎖,因為鎖的升級是不被允許的
        rwl.readLock().unlock();
        rwl.writeLock().lock();
        try {
          // 再次檢查,原因可能是其他線程已經更新過緩存
          if (!cacheValid) {
            data = ...
            cacheValid = true;
          }
					//在釋放寫鎖前,降級為讀鎖
          rwl.readLock().lock();
        } finally {
          //釋放寫鎖,此時持有讀鎖
          rwl.writeLock().unlock(); 
        }
     }

     try {
       use(data);
     } finally {
       rwl.readLock().unlock();
     }
   }
 }

代碼中聲明了一個 volatile 類型的 cacheValid 變量,保證其可見性。

  1. 首先獲取讀鎖,如果cache不可用,則釋放讀鎖
  2. 然後獲取寫鎖
  3. 在更改數據之前,再檢查一次cacheValid的值,然後修改數據,將cacheValid置為true
  4. 然後在釋放寫鎖前獲取讀鎖 此時
  5. cache中數據可用,處理cache中數據,最後釋放讀鎖

這個過程就是一個完整的鎖降級的過程,目的是保證數據可見性,聽起來很有道理的樣子,那麼問題來了:

上述代碼為什麼在釋放寫鎖之前要獲取讀鎖呢?

如果當前的線程A在修改完cache中的數據后,沒有獲取讀鎖而是直接釋放了寫鎖;假設此時另一個線程B 獲取了寫鎖並修改了數據,那麼線程A無法感知到數據已被修改,但線程A還應用了緩存數據,所以就可能出現數據錯誤

如果遵循鎖降級的步驟,線程A 在釋放寫鎖之前獲取讀鎖,那麼線程B在獲取寫鎖時將被阻塞,直到線程A完成數據處理過程,釋放讀鎖,從而保證數據的可見性

那問題又來了:

使用寫鎖一定要降級嗎?

如果你理解了上面的問題,相信這個問題已經有了答案。假如線程A修改完數據之後, 經過耗時操作后想要再使用數據時,希望使用的是自己修改后的數據,而不是其他線程修改后的數據,這樣的話確實是需要鎖降級;如果只是希望最後使用數據的時候,拿到的是最新的數據,而不一定是自己剛修改過的數據,那麼先釋放寫鎖,再獲取讀鎖,然後使用數據也無妨

在這裏我要額外說明一下你可能存在的誤解:

  • 如果已經釋放了讀鎖再獲取寫鎖不叫鎖的升級

  • 如果已經釋放了寫鎖在獲取讀鎖也不叫鎖的降級

相信你到這裏也理解了鎖的升級與降級過程,以及他們被允許或被禁止的原因了

總結

本文主要說明了 ReentrantReadWriteLock 是如何應用 state 做位拆分實現讀/寫兩種同步狀態的,另外也通過源碼分析了讀/寫鎖獲取同步狀態的過程,最後又了解了讀寫鎖的升級/降級機制,相信到這裏你對讀寫鎖已經有了一定的理解。如果你對文中的哪些地方覺得理解有些困難,強烈建議你回看本文開頭的兩篇文章,那裡鋪墊了非常多的內容。接下來我們就看看在應用AQS的最後一個併發工具類 CountDownLatch 吧

靈魂追問

  1. 讀鎖也沒修改數據,還允許共享式獲取,那還有必要設置讀鎖嗎?
  2. 在分佈式環境中,你是如何保證緩存數據一致性的呢?
  3. 當你打開看ReentrantReadWriteLock源碼時,你會發現,WriteLock 中可以使用 Condition,但是ReadLock 使用Condition卻會拋出UnsupportedOperationException,這是為什麼呢?
// WriteLock
public Condition newCondition() {
	return sync.newCondition();
}

// ReadLock
public Condition newCondition() {
	throw new UnsupportedOperationException();
}

個人博客:https://dayarch.top
加我微信好友, 進群娛樂學習交流,備註「進群」

歡迎持續關注公眾號:「日拱一兵」

  • 前沿 Java 技術乾貨分享
  • 高效工具匯總 | 回復「工具」
  • 面試問題分析與解答
  • 技術資料領取 | 回復「資料」

以讀偵探小說思維輕鬆趣味學習 Java 技術棧相關知識,本着將複雜問題簡單化,抽象問題具體化和圖形化原則逐步分解技術問題,技術持續更新,請持續關注……

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※自行創業缺乏曝光? 網頁設計幫您第一時間規劃公司的形象門面

網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

※想知道最厲害的網頁設計公司"嚨底家"!

※幫你省時又省力,新北清潔一流服務好口碑

※別再煩惱如何寫文案,掌握八大原則!