阻塞隊列一——java中的阻塞隊列

目錄

  • 阻塞隊列簡介:介紹阻塞隊列的特性與應用場景
  • java中的阻塞隊列:介紹java中實現的供開發者使用的阻塞隊列
  • BlockQueue中方法:介紹阻塞隊列的API接口
  • 阻塞隊列的實現原理:具體的例子說明阻塞隊列的實現原理
  • 總結

阻塞隊列簡介

阻塞隊列(BlockingQueue)首先是一個支持先進先出的隊列,與普通的隊列完全相同;
其次是一個支持阻塞操作的隊列,即:

  • 當隊列滿時,會阻塞執行插入操作的線程,直到隊列不滿。
  • 當隊列為空時,會阻塞執行獲取操作的線程,直到隊列不為空。

阻塞隊列用在多線程的場景下,因此阻塞隊列使用了鎖機制來保證同步,這裏使用的可重入鎖;
而對於阻塞與喚醒機制則有與鎖綁定的Condition實現

應用場景:生產者消費者模式

java中的阻塞隊列

java中的阻塞隊列根據容量可以分為有界隊列和無界隊列:

  • 有界隊列:隊列中只能存儲有限個元素,超出后存放元素線程會被阻塞或者失敗。
  • 無界隊列:隊列中可以存儲無限個元素。

java8中提供了7種阻塞隊列阻塞隊列供開發者使用,如下錶:

類名 描述
ArrayBlockingQueue 一個由數組結構組成的有界阻塞隊列
LinkedBlockingQueue 由鏈表結構組成的有界阻塞隊列(默認大小Integer.MAX_VALUE)
PriorityBlockingQueue 支持優先級排序的無界阻塞隊列
DelayQueue 使用優先級隊列實現的延遲無界阻塞隊列
SynchronousQueue 不存儲元素的阻塞隊列,即單個元素的隊列
LinkedTransferQueue 由鏈表結構組成的無界阻塞隊列
LinkedBlockingDeque 由鏈表結構組成的雙向阻塞隊列

另外還有一個在ScheduledThreadPoolExecutor中實現的DelayedWorkQueue阻塞隊列,
但這個阻塞隊列開發者不能使用。它們之間的UML類圖如下圖:

BlockingQueue接口是阻塞隊列對外的訪問接口,所有的阻塞隊列都實現了BlockQueue中的方法

BlockQueue中方法

作為一個隊列的核心方法就是入隊和出隊。由於存在阻塞策略,BlockQueue將出隊入隊的情況分為了四組,每組提供不同的方法:

  • 拋出異常:當隊列滿時,如果再往隊列中插入元素,則拋出IllegalStateException異常;
    當隊列為空時,從隊列中獲取元素則拋出NoSuchElementException異常。

  • 返回特定值(布爾值):當隊列滿時,如果再往隊列中插入元素,則返回false;當隊列為空時,從隊列中獲取元素則返回null。

  • 一直阻塞:當隊列滿時,如果再往隊列中插入元素,阻塞當前線程直到隊列中至少一個被移除或者響應中斷退出;
    當隊列為空時,則阻塞當前線程直到至少一個元素元素入隊或者響應中斷退出。

  • 超時退出:當隊列滿時,如果再往隊列中插入元素,阻塞當前線程直到隊列中至少一個被移除或者達到指定的等待時間退出或者響應中斷退出;
    當隊列為空時,則阻塞當前線程直到至少一個元素元素入隊或者達到指定的等待時間退出或者響應中斷退出。

對於每種情況BlockingQueue提供的方法如下錶:

方法\處理方式 拋出異常 返回特定值(布爾值) 一直阻塞 超時退出
插入 add(e) offer(e) put(e) offer(e,time,unit)
移除 remove() poll() take() poll(time.unit)
檢查 element() peek() 不可用 不可用

上述方法一般用於生產者-消費者模型中,是其中的生產和消費操作隊列的核心方法。
除了這些方法,BlockingQueue還提供了一些其他的方法如下錶:

方法名稱 描述
remove(Object o) 從隊列中移除一個指定值
size() 獲取隊列中元素的個數
contains(Object o) 判斷隊列是否包含指定的元素,但是這個元素在這次判斷完可能就會被消費
drainTo(Collection<? super E> c) 將隊列中元素放在給定的集合中,並返回添加的元素個數
drainTo(Collection<? super E> c, int maxElements) 將隊列中元素取maxElements(不超過隊列中元素個數)個放在給定的集合中,並返回添加的元素個數
remainingCapacity() 計算隊列中還可以存放的元素個數
toArray() 以objetc數組的形式獲取隊列中所有的元素
toArray(T[] a) 以給定類型數組的方式獲取隊列中所有的元素
clear() 清空隊列,危險的操作

阻塞隊列的實現原理

阻塞隊列的實現依靠通知模式實現:當生產者向滿了的隊列中添加元素時,會阻塞住生產者,
直到消費者消費了一個隊列中的元素後會通知消費者隊列可用,此時再由生產者向隊列中添加元素。反之亦然。

阻塞隊列的阻塞喚醒依靠Condition——條件隊列來實現。

ArrayBlockingQueue為例說明:

ArrayBlockingQueue的定義:

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {
   
    /** The queued items */
    //以數組的結構存儲隊列的元素,採用的是循環數組
    final Object[] items;

    /** items index for next take, poll, peek or remove */
    //隊列的隊頭索引
    int takeIndex;

    /** items index for next put, offer, or add */
    //隊列的隊尾索引
    int putIndex;

    /** Number of elements in the queue */
    //隊列中元素的個數
    int count;

    /** Main lock guarding all access */
    //對於ArrayBlockingQueue所有的操作都需要加鎖,
    final ReentrantLock lock;

    /** Condition for waiting takes */
    //條件隊列,當隊列為空時阻塞消費者並在生產者生產後喚醒消費者
    private final Condition notEmpty;

    /** Condition for waiting puts */
    //條件隊列,當隊列滿時阻塞生產者,並在消費者消費隊列后喚醒生產者
    private final Condition notFull;
}

根據類的定義字段可以看到,有兩個Condition條件隊列,猜測以下過程

  • 當隊列為空,消費者試圖消費時應該調用notEmpty.await()方法阻塞,並在生產者生產後調用notEmpty.single()方法
  • 當隊列已滿,生產者試圖放入元素應調用notFull.await()方法阻塞,並在消費者消費隊列后調用notFull.single()方法

向隊列中添加元素put()方法的添加過程。

    /**
    * 向隊列中添加元素
    * 當隊列已滿時需要阻塞當前線程
    * 放入元素后喚醒因隊列為空阻塞的消費者
    */
    public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            //當隊列已滿時需要notFull.await()阻塞當前線程
            //offer(e,time,unit)方法就是阻塞的時候加了超時設定
            while (count == items.length)
                notFull.await();
            //放入元素的過程
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }
    
    /**enqueue實際添加元素的方法*/
    private void enqueue(E x) {
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        final Object[] items = this.items;
        items[putIndex] = x;
        if (++putIndex == items.length)
            putIndex = 0;
        count++;
        //如果條件隊列中存在等待的線程
        //喚醒
        notEmpty.signal();
    }

從隊列中獲取元素take()方法的獲取過程。

    /**
    * 從隊列中獲取元素
    * 當隊列已空時阻塞當前線程
    * 從隊列中消費元素后喚醒等待的生產線程
    */
    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            //隊列為空需要阻塞當前線程
            while (count == 0)
                notEmpty.await();
            //獲取元素的過程
            return dequeue();
        } finally {
            lock.unlock();
        }
    }
    
    /**dequeue實際消費元素的方法*/
    private E dequeue() {
       // assert lock.getHoldCount() == 1;
       // assert items[takeIndex] != null;
       final Object[] items = this.items;
       @SuppressWarnings("unchecked")
       E x = (E) items[takeIndex];
       items[takeIndex] = null;
       if (++takeIndex == items.length)
           takeIndex = 0;
       count--;
       if (itrs != null)
           itrs.elementDequeued();
       //消費元素后從喚醒阻塞的生產者線程
       notFull.signal();
       return x;
    }

總結

阻塞隊列提供了不同於普通隊列的增加、刪除元素的方法,核心在與隊列滿時阻塞生產者和隊列空時阻塞消費者。
這一阻塞過程依靠與鎖綁定的Condition對象實現。Condition接口的實現在AQS中實現,具體的實現類是
ConditionObject

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※別再煩惱如何寫文案,掌握八大原則!

網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

※超省錢租車方案

※教你寫出一流的銷售文案?

網頁設計最專業,超強功能平台可客製化

※產品缺大量曝光嗎?你需要的是一流包裝設計!