RocketMQ(1)—架構原理及環境搭建

一、架構簡述

RocketMQ阿里開源的一個分佈式消息傳遞和流媒體平台,具有低延遲,高性能和可靠性, 萬億級容量和靈活的可伸縮性。跟其它中間件相比,RocketMQ的特點是JAVA實現,在發生宕機和其它故障時消息丟失率更低

它由四個部分組成:nameserver,broker,生產者和消費者。它們中的每一個都可以水平擴展,而沒有單個故障點。  

Nameserver:提供輕量級的服務發現和路由。生產者和消費者通過nameserver獲取broker信息。它幾乎是無狀態的,nameserver結點之間沒有任何的數據同步,broker註冊信息時會註冊到每一個nameserver結點上面,所以每個nameserver節點都記錄了完整broker信息,提供相應的讀寫服務,並支持快速的存儲擴展。

Broker:通過提供輕量級的topic和queue機制來存儲消息。與nameserver中的每個節點建立長連接,定時註冊topic等信息到nameserver上面。broker一般都是主從模式,因為消息是真實存儲消息的地方,避免一個結點掛了,導致這個節點數據全部丟失。

Producer:與nameserver集群中的一個結點建立長連接,定期的拉取broker 的topic路由信息,再將消息發送到對應broker的topic上面

Consumer:與nameserver集群中的一個結點建立長連接,定期的拉取broker 的topic路由信息,再去消費對應broker的topic信息

 

 

二、環境搭建

 1.官網下載:http://rocketmq.apache.org/release_notes/release-notes-4.7.0/

2.解壓 unzip rocketmq-all-4.7.0-bin-release.zip

3.修改啟動參數配置。默認的jvm參數內存設置特別大,如果自己機器不行的話需要手動改下bin目錄下的啟動參數文件:runbroker.sh 和runserver.sh文件 我的虛擬機內存分配不大,改成256m 256m 128m

   這是默認的

 

 

4.啟動nameserver: nohup sh mqnamesrv ‐n 192.168.0.67:9876 &   (將日誌輸出當前目錄的nohub.out文件,方便查看啟動日誌,ip是當前機器的ip)

 

 

 

 5.啟動broker:nohup sh mqbroker ‐n 192.168.0.67:9876 autoCreateTopicEnable=true &  (autoCreateTopicEnable=true 自動創建topic,如果不設置true的話,生產者發送消息的時候如果沒有topic就會發送失敗,需要提前把topic創建好,設置true會在發送時自動創建topic,192.168.0.67:9876 是name server)

也可以使用配置文件啟動broker:nohup sh mqbroker ‐n 192.168.0.67:9876 ‐c conf/broker.conf &

簡單看下默認的配置文件中的一些參數:

#集群名字
brokerClusterName = DefaultCluster
#broker名字,集群中主從都要用這個名字,才會組成一個集群
brokerName = broker-a
#id為0的是master  非0的slava
brokerId = 0
#消息處理時間,凌晨4點
deleteWhen = 04
#消息保存時間默認48小時,48小時之後的凌晨4點就會清理
fileReservedTime = 48
#集群主從之間數據同步方式 
#異步只需要發到master成功就返回客戶端段成功,性能高,但是如果master掛了 slave還未同步就會丟失消息。根據自身業務場景選擇合適方式
brokerRole = ASYNC_MASTER
#消息刷盤機制,和主從數據同步類似,同步就是說需要寫進磁盤了才返回成功。異步就是寫進內存了就返回成功,後面再去落盤。
flushDiskType = ASYNC_FLUSH
#自動創建topic
autoCreateTopicEnable=true

使用配置文件啟動:nohup sh bin/mqbroker ‐n 192.168.0.67:9876 -c conf/broker.conf &

broker 192.168.0.67:10911關聯的nameserver是192.168.0.67:9876

 

 

至此一個單機的rocketMQ的環境就搭建好了   正常退出: sh mqshutdown broker  和  sh mqshutdown namesrv

 

測試下消息發送,使用rocketMQ提供的測試腳本:

export NAMESRV_ADDR=192.168.0.67:9876

生產者腳本

sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer 消費者腳本

sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer 發送消息:  

 

 消費消息:

 

三、控制台搭建

1.下載rocketMQ的擴展包,master分支:https://github.com/apache/rocketmq-external 

 2.啟動rocketmq-console-ng模塊

 

 3.修改此模塊配置:

  3.1 maven依賴rocketMQ版本改成自己部署版本對應的,我部署的MQ是最新的4.7.0版本

 

 

  3.2 配置文件中配置namserver地址和控制台數據存放地址

 

 正常來說改完這兩個地方就可以直接啟動控制台的這個springboot程序了。

但是因為我用的MQ是最新的4.7版本,控制台對應的還沒有更新到最新的。編譯都有會報錯的地方

1.DefaultMQPullConsumer這個類已經不推薦使用了,並且4.7.0中有兩個類似的構造器,原來代碼直接傳了一個null,第二個參數無法識別是哪個構造器的。修改下把第二參數強轉String或者RPCHook

 

 

2.MQAdminExt這個接口中加了新方法,但是控制台中MQAdminExtImpl還沒有實現對應的方法。這個問題在github上幾天前已經有人提了Issues了,我這裡是自己添加一下默認實現然後服務就可以正常啟動了,還不確定後續後面有什麼影響沒有,至少可以啟動了

 

 

 

 

之前使用測試腳本發送的消息 以及topic都可以在控制台看到了

 

 

 

 

 

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

網頁設計公司推薦不同的風格,搶佔消費者視覺第一線

※Google地圖已可更新顯示潭子電動車充電站設置地點!!

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

※別再煩惱如何寫文案,掌握八大原則!

網頁設計最專業,超強功能平台可客製化

磨皮美顏算法 附完整C代碼

前言

2017年底時候寫了這篇《集 降噪 美顏 虛化 增強 為一體的極速圖像潤色算法 附Demo程序

這也算是學習過程中比較有成就感的一個算法。

自2015年做算法開始到今天,還有個把月,就滿五年了。

歲月匆匆,人生能有多少個五年。

這五年裡,從音頻圖像到視頻,從傳統算法到深度學習,從2D到3D各種算法幾乎都走了一個遍。

好在,不論在哪個領域都能有些許建樹,這是博主我自身很欣慰的事情。

雖然有所間斷但是仍然堅持寫博客,並且堅持完整開源分享。

目的就是為了幫助那些一開始跟我一樣,想要學習算法的萌新,

一起踏入算法領域去跟大家“排排坐,吃果果”。

引子

在這個特別的時間點,就想做點特別的事情。

那就是開源當時寫的這個“美顏算法”,開源代碼和當時的版本有些許出入,但是思路是一樣的。

早些年的時候大家發現採用保邊濾波的思路可以做到降噪,進而衍生出來針對皮膚的降噪,簡稱磨皮或者美顏。

從此百家爭鳴,而這個課題到今天也還在發展,當然日新月異了。

故此,想談談針對美顏磨皮的一些算法思路,為後續想學習並改進的萌新提供一些養分。

概述美顏磨皮方法

1.基於保邊降噪

這類算法有很多方法,但不外乎2種基礎思路,

基於空間和基於頻率,當然再展開的話,還可以細分為紋理和顏色。

例如通過膚色或紋理區域做針對性的處理。

這類算法的優點是計算簡單,通用型強,但缺點就是不夠細膩完美。

2.基於人臉檢測貼圖

這種嚴格意義上來說,是易容術,就是基於人臉檢測出的關鍵數據。

例如人臉關鍵點,將人臉皮膚區域提取出來,重新貼上一張事先準備的皮膚圖,進行皮膚貼合融合。

臉已經被置換了,效果很贊。有點繆修斯之船的味道。

這類算法優點是效果極其驚艷,但是算法複雜通用性差,一般只能針對少數角度表情的人臉。

3.結合1和2的深度學習方法

前兩者的思路早期大行其道,如今到了數據時代,

基於深度學習的工具方案,可以非常好地結合前兩者的思路,進行訓練,求一個數據解。

很多人將深度學習等同於AI,這個做法有點激進。

基於深度學習的做法,仍然存在前兩者一樣的問題,簡單的不夠細膩,細膩的不夠簡單,

而如果要設計一個優秀的模型,其實跟設計一個傳統算法一樣困難。

基於數據驅動的算法,驗證成本非常高,可控性比較差,當然在金錢的驅動下確實能產出還不錯的算法模型。

這類算法的優點,往往能求出很不錯的局部最優解,甚至以假亂真,缺點就是需要大量金錢和數據的驅動。

總結來說的話,不付出代價,就別想有好的結果,非常的現實。

 

據我所知目前使用最多的方案是第一種和第三種,第二種可操作性不強,只有少數公司掌握了這方面的核心技術。

但是不管是哪種方案,無非就是以下幾個步驟。

1.確定人臉的皮膚區域

2.定位人臉的雜質(痘痘,斑點,痣,膚色不均等)

3.根據定位到雜質進行填補修復或濾除

 

這就是圖像處理經典三部曲

1.定位 2.檢測 3.處理

每一個細分展開,都非常宏大且複雜的算法。

 

以上,僅以磨皮美顏為例子,闡述圖像方面的算法想要或正在解決什麼樣的問題。

我們在工作中碰到的圖像問題無非以上幾個核心問題,問題都是類似的,只是不同場景和需求下各有難處。

本次開源的算法思路

本次開源的算法是基於保邊降噪的思路,

當然這個思路可以通過改寫,參數化后可以集成到深度學習中,作為一個先驗層輔助訓練。

算法步驟如下:

1.  檢測皮膚顏色,確定皮膚占圖像的比率

2. 根據皮膚比率進行邊緣檢測,產出細節映射圖

3. 基於細節映射圖和磨皮強度進行保邊降噪

4. 對降噪好的圖進行再一次膚色檢測,保留膚色區域的降噪,其他區域還原為原圖

步驟比較簡單,但是要同時兼顧效果性能,是很不容易的。

當然這個算法膚色檢測那一部分可以採用深度學習“語義分割”方面的思路進而改進效果。

做得好,將本算法改良到准商用,驚艷的程度是沒有問題的。

深度學習相關技術就不展開細說了,有能力的朋友,感興趣的話,可以自行實操。

 

完整源代碼開源地址:

https://github.com/cpuimage/skin_smoothing

項目沒有第三方依賴,完整純c代碼。

有編譯問題的同學自行參考《Windows下C,C++開發環境搭建指南》搭建編譯環境。

附上算法效果的示例:

 

 

以上,權當拋磚引玉之用。

授人以魚不如授人以漁。

 

2020年,疫情之下,

願大家都能事業有成,身體健康。

世界和平,人們皆友愛。

 

若有其他相關問題或者需求也可以郵件聯繫俺探討。

郵箱地址是: gaozhihan@vip.qq.com

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※自行創業缺乏曝光? 網頁設計幫您第一時間規劃公司的形象門面

網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

※想知道最厲害的網頁設計公司"嚨底家"!

※幫你省時又省力,新北清潔一流服務好口碑

※別再煩惱如何寫文案,掌握八大原則!

※產品缺大量曝光嗎?你需要的是一流包裝設計!

Kafka源碼解析(二)—Log分析

上一篇文章講了LogSegment和Log的初始化,這篇來講講Log的主要操作有哪些。

一般來說Log 的常見操作分為 4 大部分。

  1. 高水位管理操作
  2. 日誌段管理
  3. 關鍵位移值管理
  4. 讀寫操作

其中關鍵位移值管理主要包含Log Start Offset 和 LEO等。

高水位HighWatermark

高水位HighWatermark初始化

高水位是通過LogOffsetMetadata類來定義的:

@volatile private var highWatermarkMetadata: LogOffsetMetadata = LogOffsetMetadata(logStartOffset)

這裏傳入的初始值是logStartOffset,表明當首次構建高水位時,它會被賦值成 Log Start Offset 值。

我們再來看看LogOffsetMetadata類:

case class LogOffsetMetadata(messageOffset: Long,
                             segmentBaseOffset: Long = Log.UnknownOffset,
                             relativePositionInSegment: Int = LogOffsetMetadata.UnknownFilePosition) {

  // check if this offset is already on an older segment compared with the given offset
  def onOlderSegment(that: LogOffsetMetadata): Boolean = {
    if (messageOffsetOnly)
      throw new KafkaException(s"$this cannot compare its segment info with $that since it only has message offset info")

    this.segmentBaseOffset < that.segmentBaseOffset
  }
  ...
}

LogOffsetMetadata有三個初始值:

messageOffset表示消息位移值;

segmentBaseOffset保存消息位移值所在日誌段的起始位移,用來判斷兩條消息是否處於同一個日誌段的;

relativePositionSegment保存消息位移值所在日誌段的物理磁盤位置;

上面的onOlderSegment表明,要比較哪個日誌段更老,只需要比較segmentBaseOffset的大小就可以了。

高水位HighWatermark設值與更新

  private def updateHighWatermarkMetadata(newHighWatermark: LogOffsetMetadata): Unit = {
    //高水位的值不可能小於零
    if (newHighWatermark.messageOffset < 0)
      throw new IllegalArgumentException("High watermark offset should be non-negative")

    lock synchronized {// 保護Log對象修改的Monitor鎖
      highWatermarkMetadata = newHighWatermark// 賦值新的高水位值
      //事務相關,暫時忽略
      producerStateManager.onHighWatermarkUpdated(newHighWatermark.messageOffset)
      //事務相關,暫時忽略
      maybeIncrementFirstUnstableOffset()
    }
    trace(s"Setting high watermark $newHighWatermark")
  }

設置高水位的值是很簡單的,首先校驗高水位的值是否大於零,然後通過直接加鎖之後更新高水位的值。

更新更新高水位值的方法有兩個:updateHighWatermark 和 maybeIncrementHighWatermark,我們分別分析。

updateHighWatermark

  def updateHighWatermark(hw: Long): Long = {
    //傳入的高水位的值如果小於logStartOffset,設置為logStartOffset
    val newHighWatermark = if (hw < logStartOffset)
      logStartOffset
    //  傳入的高水位的值如果大於LEO,那麼設置為LEO
    else if (hw > logEndOffset)
      logEndOffset
    else
      hw
    //將newHighWatermark封裝成一個LogOffsetMetadata然後更新高水位的值
    updateHighWatermarkMetadata(LogOffsetMetadata(newHighWatermark))
    //返回新的高水位的值
    newHighWatermark
  }

這個方法邏輯也很簡潔,因為高水位的值是不可能大於LEO,也不可能小於logStartOffset,所以需要對傳入的hw校驗然後設置成正確的值,然後調用上面的設置高水位的方法設值。

maybeIncrementHighWatermark

/**
 * Update the high watermark to a new value if and only if it is larger than the old value. It is
 * an error to update to a value which is larger than the log end offset.
 *
 * This method is intended to be used by the leader to update the high watermark after follower
 * fetch offsets have been updated.
 *
 * @return the old high watermark, if updated by the new value
 */
//  當新的高水位的值大於舊的高水位的值時才做更新,如果新的高水位的值大於LEO,會報錯
//  這個方法是leader在確認Follower已經拉取了日誌之後才做更新
def maybeIncrementHighWatermark(newHighWatermark: LogOffsetMetadata): Option[LogOffsetMetadata] = {
  //如果新的高水位的值大於LEO,會報錯
  if (newHighWatermark.messageOffset > logEndOffset)
    throw new IllegalArgumentException(s"High watermark $newHighWatermark update exceeds current " +
      s"log end offset $logEndOffsetMetadata")

  lock.synchronized {
    // 獲取老的高水位值
    val oldHighWatermark = fetchHighWatermarkMetadata

    // Ensure that the high watermark increases monotonically. We also update the high watermark when the new
    // offset metadata is on a newer segment, which occurs whenever the log is rolled to a new segment.
    //只有當新的高水位值大於老的值,因為要維護高水位的單調遞增性
    //或者當新的高水位值和老的高水位值相等,但是新的高水位在一個新的日誌段上面時才做更新
    if (oldHighWatermark.messageOffset < newHighWatermark.messageOffset ||
      (oldHighWatermark.messageOffset == newHighWatermark.messageOffset && oldHighWatermark.onOlderSegment(newHighWatermark))) {
      updateHighWatermarkMetadata(newHighWatermark)
      Some(oldHighWatermark)// 返回老的高水位值
    } else {
      None
    }
  }
}

這個方法我將這個方法的英文註釋貼出來了,這個註釋的說明我也寫到方法上了,邏輯很清楚,大家看看註釋應該能理解。

這兩個方法主要的區別是,updateHighWatermark 方法,主要用在 Follower 副本從 Leader 副本獲取到消息后更新高水位值。而 maybeIncrementHighWatermark 方法,主要是用來更新 Leader 副本的高水位值。

上面的方法中通過調用fetchHighWatermarkMetadata來獲取高水位的值,我們下面看看這個方法:

fetchHighWatermarkMetadata

  private def fetchHighWatermarkMetadata: LogOffsetMetadata = {
    // 讀取時確保日誌不能被關閉
    checkIfMemoryMappedBufferClosed()

    val offsetMetadata = highWatermarkMetadata
    if (offsetMetadata.messageOffsetOnly) {//沒有獲得到完整的高水位元數據
      lock.synchronized {
        // 通過讀日誌文件的方式把完整的高水位元數據信息拉出來
        val fullOffset = convertToOffsetMetadataOrThrow(highWatermark)
        updateHighWatermarkMetadata(fullOffset)
        fullOffset
      }
    } else {
      offsetMetadata
    }
  }

  private def convertToOffsetMetadataOrThrow(offset: Long): LogOffsetMetadata = {
    //通過給的offset,去日誌文件中找到相應的日誌信息
    val fetchDataInfo = read(offset,
      maxLength = 1,
      isolation = FetchLogEnd,
      minOneMessage = false)
    fetchDataInfo.fetchOffsetMetadata
  }

然後我們提前看一下日誌的read方法,是如何根據索引讀取數據的:

日誌段操作

日誌讀取操作

read

  def read(startOffset: Long,
           maxLength: Int,
           isolation: FetchIsolation,
           minOneMessage: Boolean): FetchDataInfo = {
    maybeHandleIOException(s"Exception while reading from $topicPartition in dir ${dir.getParent}") {
      trace(s"Reading $maxLength bytes from offset $startOffset of length $size bytes")

      //convertToOffsetMetadataOrThrow傳進來是FetchLogEnd,所以這裡是false
      val includeAbortedTxns = isolation == FetchTxnCommitted
 
      // 由於沒有使用鎖,所以使用變量緩存當前的nextOffsetMetadata狀態
      val endOffsetMetadata = nextOffsetMetadata
      val endOffset = endOffsetMetadata.messageOffset
      // 到日字段中根據索引尋找最近的日誌段
      var segmentEntry = segments.floorEntry(startOffset)

      // return error on attempt to read beyond the log end offset or read below log start offset
      // 這裏給出了幾種異常場景:
      // 1. 給的日誌索引大於最大值;
      // 2. 通過索引找的日誌段為空;
      // 3. 給的日誌索引小於logStartOffset
      if (startOffset > endOffset || segmentEntry == null || startOffset < logStartOffset)
        throw new OffsetOutOfRangeException(s"Received request for offset $startOffset for partition $topicPartition, " +
          s"but we only have log segments in the range $logStartOffset to $endOffset.")

      //convertToOffsetMetadataOrThrow傳進來是FetchLogEnd,所以最大值是endOffsetMetadata
      // 查看一下讀取隔離級別設置。
      // 普通消費者能夠看到[Log Start Offset, LEO)之間的消息
      // 事務型消費者只能看到[Log Start Offset, Log Stable Offset]之間的消息。Log Stable Offset(LSO)是比LEO值小的位移值,為Kafka事務使用
      // Follower副本消費者能夠看到[Log Start Offset,高水位值]之間的消息
      val maxOffsetMetadata = isolation match {
        case FetchLogEnd => endOffsetMetadata
        case FetchHighWatermark => fetchHighWatermarkMetadata
        case FetchTxnCommitted => fetchLastStableOffsetMetadata
      }
      //如果尋找的索引等於maxOffsetMetadata,那麼直接返回
      if (startOffset == maxOffsetMetadata.messageOffset) {
        return emptyFetchDataInfo(maxOffsetMetadata, includeAbortedTxns)
      //如果尋找的索引大於maxOffsetMetadata,返回空的消息集合,因為沒法讀取任何消息
      } else if (startOffset > maxOffsetMetadata.messageOffset) {
        val startOffsetMetadata = convertToOffsetMetadataOrThrow(startOffset)
        return emptyFetchDataInfo(startOffsetMetadata, includeAbortedTxns)
      }
 
      // 開始遍曆日志段對象,直到讀出東西來或者讀到日誌末尾
      while (segmentEntry != null) {
        val segment = segmentEntry.getValue
        // 找到日誌段中最大的日誌位移
        val maxPosition = { 
          if (maxOffsetMetadata.segmentBaseOffset == segment.baseOffset) {
            maxOffsetMetadata.relativePositionInSegment
          } else {
            segment.size
          }
        }
        // 根據位移信息從日誌段中讀取日誌信息
        val fetchInfo = segment.read(startOffset, maxLength, maxPosition, minOneMessage)
        // 如果找不到日誌信息,那麼去日誌段集合中找更大的日誌位移的日誌段
        if (fetchInfo == null) {
          segmentEntry = segments.higherEntry(segmentEntry.getKey)
        } else {
          return if (includeAbortedTxns)
            addAbortedTransactions(startOffset, segmentEntry, fetchInfo)
          else
            fetchInfo
        }
      }

      //找了所有日誌段的位移依然找不到,這可能是因為大於指定的日誌位移的消息都被刪除了,這種情況返回空
      FetchDataInfo(nextOffsetMetadata, MemoryRecords.EMPTY)
    }
  }

read方法,有四個參數,分別是:

  • startOffset:讀取的日誌索引位置。
  • maxLength:讀取數據量長度。
  • isolation:隔離級別,多用於 Kafka 事務。
  • minOneMessage:是否至少返回一條消息。設想如果消息很大,超過了 maxLength,正常情況下 read 方法永遠不會返回任何消息。但如果設置了該參數為 true,read 方法就保證至少能夠返回一條消息。

代碼中使用了segments,來根據位移查找日誌段:

  private val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] = new ConcurrentSkipListMap[java.lang.Long, LogSegment]

我們下面看看read方法具體做了哪些事:

  1. 由於沒有使用鎖,所以使用變量緩存當前的nextOffsetMetadata狀態,作為最大索引LEO;
  2. 去日誌段集合里尋找小於或等於指定索引的日誌段;
  3. 校驗異常情況:
    1. startOffset是不是超過了LEO;
    2. 是不是日誌段集合里沒有索引小於startOffset;
    3. startOffset小於Log Start Offset;
  4. 接下來獲取一下隔離級別;
  5. 如果尋找的索引等於LEO,那麼返回空;
  6. 如果尋找的索引大於LEO,返回空的消息集合,因為沒法讀取任何消息;
  7. 開始遍曆日志段對象,直到讀出東西來或者讀到日誌末尾;
    1. 首先找到日誌段中最大的位置;
    2. 根據位移信息從日誌段中讀取日誌信息(這個read方法我們上一篇已經講解過了);
    3. 如果找不到日誌信息,那麼讀取日誌段集合中下一個日誌段;
  8. 找了所有日誌段的位移依然找不到,這可能是因為大於指定的日誌位移的消息都被刪除了,這種情況返回空;

我們在上面的read操作中可以看到,使用了segments來查找日誌。我們主要看看刪除操作

刪除日誌

刪除日誌的入口是:deleteOldSegments

  //  如果topic deletion開關是打開的,那麼會刪去過期的日誌段以及超過設置保留日誌大小的日誌
  // 無論是否開啟刪除規則,都會刪除在log start offset之前的日誌段
  def deleteOldSegments(): Int = {
    if (config.delete) {
      deleteRetentionMsBreachedSegments() + deleteRetentionSizeBreachedSegments() + deleteLogStartOffsetBreachedSegments()
    } else {
      deleteLogStartOffsetBreachedSegments()
    }
  }

deleteOldSegments方法會判斷是否開啟刪除規則,如果開啟,那麼會分別調用:

deleteRetentionMsBreachedSegments刪除segment的時間戳超過了設置時間的日誌段;

deleteRetentionSizeBreachedSegments刪除日誌段空間超過設置空間大小的日誌段;

deleteLogStartOffsetBreachedSegments刪除日誌段的baseOffset小於logStartOffset的日誌段;

我這裏列舉一下這三個方法主要是怎麼實現的:

  private def deleteRetentionMsBreachedSegments(): Int = {
    if (config.retentionMs < 0) return 0
    val startMs = time.milliseconds
    //調用deleteOldSegments方法,並傳入匿名函數,判斷當前的segment的時間戳是否超過了設置時間
    deleteOldSegments((segment, _) => startMs - segment.largestTimestamp > config.retentionMs,
      reason = s"retention time ${config.retentionMs}ms breach")
  }
  
  private def deleteRetentionSizeBreachedSegments(): Int = {
    if (config.retentionSize < 0 || size < config.retentionSize) return 0
    var diff = size - config.retentionSize
    //判斷日誌段空間是否超過設置空間大小
    //shouldDelete函數會將傳入的日誌段去減diff,直到小於等於零
    def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]) = {
      if (diff - segment.size >= 0) {
        diff -= segment.size
        true
      } else {
        false
      }
    }

    deleteOldSegments(shouldDelete, reason = s"retention size in bytes ${config.retentionSize} breach")
  }
  
  private def deleteLogStartOffsetBreachedSegments(): Int = {
    //shouldDelete函數主要判斷日誌段的baseOffset是否小於logStartOffset
    def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]) =
      nextSegmentOpt.exists(_.baseOffset <= logStartOffset)

    deleteOldSegments(shouldDelete, reason = s"log start offset $logStartOffset breach")
  }

這種寫代碼的方式非常的靈活,通過不同方法設置不同的函數來實現代碼復用的目的,最後都是通過調用deleteOldSegments來實現刪除日誌段的目的。

下面我們來看一下deleteOldSegments的操作:

deleteOldSegments

這個deleteOldSegments方法和上面的入口方法傳入的參數是不一致的,這個方法傳入了一個predicate函數,用於判斷哪些日誌段是可以被刪除的,reason用來說明被刪除的原因。

  private def deleteOldSegments(predicate: (LogSegment, Option[LogSegment]) => Boolean, reason: String): Int = {
    //刪除任何匹配到predicate規則的日誌段
    lock synchronized {
      val deletable = deletableSegments(predicate)
      if (deletable.nonEmpty)
        info(s"Found deletable segments with base offsets [${deletable.map(_.baseOffset).mkString(",")}] due to $reason")
      deleteSegments(deletable)
    }
  }

這個方法調用了兩個主要的方法,一個是deletableSegments,用於獲取可以被刪除的日誌段的集合;deleteSegments用於刪除日誌段。

deletableSegments

  private def deletableSegments(predicate: (LogSegment, Option[LogSegment]) => Boolean): Iterable[LogSegment] = {
    //如果日誌段是空的,那麼直接返回
    if (segments.isEmpty) {
      Seq.empty
    } else {
      val deletable = ArrayBuffer.empty[LogSegment]
      var segmentEntry = segments.firstEntry
      //如果日誌段集合不為空,找到第一個日誌段
      while (segmentEntry != null) {
        val segment = segmentEntry.getValue
        //獲取下一個日誌段
        val nextSegmentEntry = segments.higherEntry(segmentEntry.getKey)
        val (nextSegment, upperBoundOffset, isLastSegmentAndEmpty) = if (nextSegmentEntry != null)
          (nextSegmentEntry.getValue, nextSegmentEntry.getValue.baseOffset, false)
        else
          (null, logEndOffset, segment.size == 0)
        //如果下一個日誌段的位移沒有大於或等於HW,並且日誌段是匹配predicate函數的,下一個日誌段也不是空的
        //那麼將這個日誌段放入可刪除集合中,然後遍歷下一個日誌段
        if (highWatermark >= upperBoundOffset && predicate(segment, Option(nextSegment)) && !isLastSegmentAndEmpty) {
          deletable += segment
          segmentEntry = nextSegmentEntry
        } else {
          segmentEntry = null
        }
      }
      deletable
    }
  }

這個方法邏輯十分清晰,主要做了如下幾件事:

  1. 判斷日誌段集合是否為空,為空那麼直接返回空集合;

  2. 如果日誌段集合不為空,那麼從日誌段集合的第一個日誌段開始遍歷;

  3. 判斷當前被遍曆日志段是否能夠被刪除

    1. 日誌段的下一個日誌段的位移有沒有大於或等於HW;
    2. 日誌段是否能夠通過predicate函數校驗;
    3. 日誌段是否是最後一個日誌段;
  4. 將符合條件的日誌段都加入到deletable集合中,並返回。

接下來調用deleteSegments函數:

  private def deleteSegments(deletable: Iterable[LogSegment]): Int = {
    maybeHandleIOException(s"Error while deleting segments for $topicPartition in dir ${dir.getParent}") {
      val numToDelete = deletable.size
      if (numToDelete > 0) {
        // we must always have at least one segment, so if we are going to delete all the segments, create a new one first
        // 我們至少保證要存在一個日誌段,如果要刪除所有的日誌;
        //所以調用roll方法創建一個全新的日誌段對象,並且關閉當前寫入的日誌段對象;
        if (segments.size == numToDelete)
          roll()
        lock synchronized {
          // 確保Log對象沒有被關閉
          checkIfMemoryMappedBufferClosed()
          // remove the segments for lookups
          // 刪除給定的日誌段對象以及底層的物理文件
          removeAndDeleteSegments(deletable, asyncDelete = true)
          // 嘗試更新日誌的Log Start Offset值
          maybeIncrementLogStartOffset(segments.firstEntry.getValue.baseOffset)
        }
      }
      numToDelete
    }
  }

寫日誌

寫日誌的方法主要有兩個:

appendAsLeader

  def appendAsLeader(records: MemoryRecords, leaderEpoch: Int, isFromClient: Boolean = true,
                     interBrokerProtocolVersion: ApiVersion = ApiVersion.latestVersion): LogAppendInfo = {
    append(records, isFromClient, interBrokerProtocolVersion, assignOffsets = true, leaderEpoch)
  }

appendAsFollower

  def appendAsFollower(records: MemoryRecords): LogAppendInfo = {
    append(records, isFromClient = false, interBrokerProtocolVersion = ApiVersion.latestVersion, assignOffsets = false, leaderEpoch = -1)
  }

appendAsLeader 是用於寫 Leader 副本的,appendAsFollower 是用於 Follower 副本同步的。它們的底層都調用了 append 方法

append

  private def append(records: MemoryRecords, isFromClient: Boolean, interBrokerProtocolVersion: ApiVersion, assignOffsets: Boolean, leaderEpoch: Int): LogAppendInfo = {
    maybeHandleIOException(s"Error while appending records to $topicPartition in dir ${dir.getParent}") {
      // 第1步:分析和驗證待寫入消息集合,並返回校驗結果
      val appendInfo = analyzeAndValidateRecords(records, isFromClient = isFromClient)

      // return if we have no valid messages or if this is a duplicate of the last appended entry
      // 如果壓根就不需要寫入任何消息,直接返回即可
      if (appendInfo.shallowCount == 0)
        return appendInfo

      // trim any invalid bytes or partial messages before appending it to the on-disk log
      // 第2步:消息格式規整,即刪除無效格式消息或無效字節
      var validRecords = trimInvalidBytes(records, appendInfo)

      // they are valid, insert them in the log
      lock synchronized {
        // 確保Log對象未關閉
        checkIfMemoryMappedBufferClosed()
        //需要分配位移值
        if (assignOffsets) {
          // assign offsets to the message set
          // 第3步:使用當前LEO值作為待寫入消息集合中第一條消息的位移值,nextOffsetMetadata為LEO值
          val offset = new LongRef(nextOffsetMetadata.messageOffset)
          appendInfo.firstOffset = Some(offset.value)
          val now = time.milliseconds
          val validateAndOffsetAssignResult = try {
            LogValidator.validateMessagesAndAssignOffsets(validRecords,
              topicPartition,
              offset,
              time,
              now,
              appendInfo.sourceCodec,
              appendInfo.targetCodec,
              config.compact,
              config.messageFormatVersion.recordVersion.value,
              config.messageTimestampType,
              config.messageTimestampDifferenceMaxMs,
              leaderEpoch,
              isFromClient,
              interBrokerProtocolVersion,
              brokerTopicStats)
          } catch {
            case e: IOException =>
              throw new KafkaException(s"Error validating messages while appending to log $name", e)
          }
          // 更新校驗結果對象類LogAppendInfo
          validRecords = validateAndOffsetAssignResult.validatedRecords
          appendInfo.maxTimestamp = validateAndOffsetAssignResult.maxTimestamp
          appendInfo.offsetOfMaxTimestamp = validateAndOffsetAssignResult.shallowOffsetOfMaxTimestamp
          appendInfo.lastOffset = offset.value - 1
          appendInfo.recordConversionStats = validateAndOffsetAssignResult.recordConversionStats
          if (config.messageTimestampType == TimestampType.LOG_APPEND_TIME)
            appendInfo.logAppendTime = now

          // re-validate message sizes if there's a possibility that they have changed (due to re-compression or message
          // format conversion)
          // 第4步:驗證消息,確保消息大小不超限
          if (validateAndOffsetAssignResult.messageSizeMaybeChanged) {
            for (batch <- validRecords.batches.asScala) {
              if (batch.sizeInBytes > config.maxMessageSize) {
                // we record the original message set size instead of the trimmed size
                // to be consistent with pre-compression bytesRejectedRate recording
                brokerTopicStats.topicStats(topicPartition.topic).bytesRejectedRate.mark(records.sizeInBytes)
                brokerTopicStats.allTopicsStats.bytesRejectedRate.mark(records.sizeInBytes)
                throw new RecordTooLargeException(s"Message batch size is ${batch.sizeInBytes} bytes in append to" +
                  s"partition $topicPartition which exceeds the maximum configured size of ${config.maxMessageSize}.")
              }
            }
          }
          // 直接使用給定的位移值,無需自己分配位移值
        } else {
          // we are taking the offsets we are given
          if (!appendInfo.offsetsMonotonic)// 確保消息位移值的單調遞增性
            throw new OffsetsOutOfOrderException(s"Out of order offsets found in append to $topicPartition: " +
                                                 records.records.asScala.map(_.offset))

          if (appendInfo.firstOrLastOffsetOfFirstBatch < nextOffsetMetadata.messageOffset) {
            // we may still be able to recover if the log is empty
            // one example: fetching from log start offset on the leader which is not batch aligned,
            // which may happen as a result of AdminClient#deleteRecords()
            val firstOffset = appendInfo.firstOffset match {
              case Some(offset) => offset
              case None => records.batches.asScala.head.baseOffset()
            }

            val firstOrLast = if (appendInfo.firstOffset.isDefined) "First offset" else "Last offset of the first batch"
            throw new UnexpectedAppendOffsetException(
              s"Unexpected offset in append to $topicPartition. $firstOrLast " +
              s"${appendInfo.firstOrLastOffsetOfFirstBatch} is less than the next offset ${nextOffsetMetadata.messageOffset}. " +
              s"First 10 offsets in append: ${records.records.asScala.take(10).map(_.offset)}, last offset in" +
              s" append: ${appendInfo.lastOffset}. Log start offset = $logStartOffset",
              firstOffset, appendInfo.lastOffset)
          }
        }

        // update the epoch cache with the epoch stamped onto the message by the leader
        // 第5步:更新Leader Epoch緩存
        validRecords.batches.asScala.foreach { batch =>
          if (batch.magic >= RecordBatch.MAGIC_VALUE_V2) {
            maybeAssignEpochStartOffset(batch.partitionLeaderEpoch, batch.baseOffset)
          } else {
            // In partial upgrade scenarios, we may get a temporary regression to the message format. In
            // order to ensure the safety of leader election, we clear the epoch cache so that we revert
            // to truncation by high watermark after the next leader election.
            leaderEpochCache.filter(_.nonEmpty).foreach { cache =>
              warn(s"Clearing leader epoch cache after unexpected append with message format v${batch.magic}")
              cache.clearAndFlush()
            }
          }
        }

        // check messages set size may be exceed config.segmentSize
        // 第6步:確保消息大小不超限
        if (validRecords.sizeInBytes > config.segmentSize) {
          throw new RecordBatchTooLargeException(s"Message batch size is ${validRecords.sizeInBytes} bytes in append " +
            s"to partition $topicPartition, which exceeds the maximum configured segment size of ${config.segmentSize}.")
        }

        // maybe roll the log if this segment is full
        // 第7步:執行日誌切分。當前日誌段剩餘容量可能無法容納新消息集合,因此有必要創建一個新的日誌段來保存待寫入的所有消息
        //下面情況將會執行日誌切分:
        //logSegment 已經滿了
        //日誌段中的第一個消息的maxTime已經過期
        //index索引滿了
        val segment = maybeRoll(validRecords.sizeInBytes, appendInfo)

        val logOffsetMetadata = LogOffsetMetadata(
          messageOffset = appendInfo.firstOrLastOffsetOfFirstBatch,
          segmentBaseOffset = segment.baseOffset,
          relativePositionInSegment = segment.size)

        // now that we have valid records, offsets assigned, and timestamps updated, we need to
        // validate the idempotent/transactional state of the producers and collect some metadata
        // 第8步:驗證事務狀態
        val (updatedProducers, completedTxns, maybeDuplicate) = analyzeAndValidateProducerState(
          logOffsetMetadata, validRecords, isFromClient)

        maybeDuplicate.foreach { duplicate =>
          appendInfo.firstOffset = Some(duplicate.firstOffset)
          appendInfo.lastOffset = duplicate.lastOffset
          appendInfo.logAppendTime = duplicate.timestamp
          appendInfo.logStartOffset = logStartOffset
          return appendInfo
        }
        // 第9步:執行真正的消息寫入操作,主要調用日誌段對象的append方法實現
        segment.append(largestOffset = appendInfo.lastOffset,
          largestTimestamp = appendInfo.maxTimestamp,
          shallowOffsetOfMaxTimestamp = appendInfo.offsetOfMaxTimestamp,
          records = validRecords)

        // Increment the log end offset. We do this immediately after the append because a
        // write to the transaction index below may fail and we want to ensure that the offsets
        // of future appends still grow monotonically. The resulting transaction index inconsistency
        // will be cleaned up after the log directory is recovered. Note that the end offset of the
        // ProducerStateManager will not be updated and the last stable offset will not advance
        // if the append to the transaction index fails.
        // 第10步:更新LEO對象,其中,LEO值是消息集合中最後一條消息位移值+1
        // 前面說過,LEO值永遠指向下一條不存在的消息
        updateLogEndOffset(appendInfo.lastOffset + 1)

        // update the producer state
        // 第11步:更新事務狀態
        for (producerAppendInfo <- updatedProducers.values) {
          producerStateManager.update(producerAppendInfo)
        }

        // update the transaction index with the true last stable offset. The last offset visible
        // to consumers using READ_COMMITTED will be limited by this value and the high watermark.
        for (completedTxn <- completedTxns) {
          val lastStableOffset = producerStateManager.lastStableOffset(completedTxn)
          segment.updateTxnIndex(completedTxn, lastStableOffset)
          producerStateManager.completeTxn(completedTxn)
        }

        // always update the last producer id map offset so that the snapshot reflects the current offset
        // even if there isn't any idempotent data being written
        producerStateManager.updateMapEndOffset(appendInfo.lastOffset + 1)

        // update the first unstable offset (which is used to compute LSO)
        maybeIncrementFirstUnstableOffset()

        trace(s"Appended message set with last offset: ${appendInfo.lastOffset}, " +
          s"first offset: ${appendInfo.firstOffset}, " +
          s"next offset: ${nextOffsetMetadata.messageOffset}, " +
          s"and messages: $validRecords")
        // 是否需要手動落盤。一般情況下我們不需要設置Broker端參數log.flush.interval.messages
        // 落盤操作交由操作系統來完成。但某些情況下,可以設置該參數來確保高可靠性
        if (unflushedMessages >= config.flushInterval)
          flush()
        // 第12步:返回寫入結果
        appendInfo
      }
    }
  }

上面代碼的主要步驟如下:

我們下面看看analyzeAndValidateRecords是如何進行消息校驗的:

analyzeAndValidateRecords

  private def analyzeAndValidateRecords(records: MemoryRecords, isFromClient: Boolean): LogAppendInfo = {
    var shallowMessageCount = 0
    var validBytesCount = 0
    var firstOffset: Option[Long] = None
    var lastOffset = -1L
    var sourceCodec: CompressionCodec = NoCompressionCodec
    var monotonic = true
    var maxTimestamp = RecordBatch.NO_TIMESTAMP
    var offsetOfMaxTimestamp = -1L
    var readFirstMessage = false
    var lastOffsetOfFirstBatch = -1L

    for (batch <- records.batches.asScala) {
      // we only validate V2 and higher to avoid potential compatibility issues with older clients
      // 消息格式Version 2的消息批次,起始位移值必須從0開始
      if (batch.magic >= RecordBatch.MAGIC_VALUE_V2 && isFromClient && batch.baseOffset != 0)
        throw new InvalidRecordException(s"The baseOffset of the record batch in the append to $topicPartition should " +
          s"be 0, but it is ${batch.baseOffset}")

      // update the first offset if on the first message. For magic versions older than 2, we use the last offset
      // to avoid the need to decompress the data (the last offset can be obtained directly from the wrapper message).
      // For magic version 2, we can get the first offset directly from the batch header.
      // When appending to the leader, we will update LogAppendInfo.baseOffset with the correct value. In the follower
      // case, validation will be more lenient.
      // Also indicate whether we have the accurate first offset or not
      if (!readFirstMessage) {
        if (batch.magic >= RecordBatch.MAGIC_VALUE_V2)
          firstOffset = Some(batch.baseOffset) // 更新firstOffset字段
        lastOffsetOfFirstBatch = batch.lastOffset // 更新lastOffsetOfFirstBatch字段
        readFirstMessage = true
      }

      // check that offsets are monotonically increasing
      // 一旦出現當前lastOffset不小於下一個batch的lastOffset,說明上一個batch中有消息的位移值大於後面batch的消息
      // 這違反了位移值單調遞增性
      if (lastOffset >= batch.lastOffset)
        monotonic = false

      // update the last offset seen
      // 使用當前batch最後一條消息的位移值去更新lastOffset
      lastOffset = batch.lastOffset

      // Check if the message sizes are valid.
      val batchSize = batch.sizeInBytes
      // 檢查消息批次總字節數大小是否超限,即是否大於Broker端參數max.message.bytes值
      if (batchSize > config.maxMessageSize) {
        brokerTopicStats.topicStats(topicPartition.topic).bytesRejectedRate.mark(records.sizeInBytes)
        brokerTopicStats.allTopicsStats.bytesRejectedRate.mark(records.sizeInBytes)
        throw new RecordTooLargeException(s"The record batch size in the append to $topicPartition is $batchSize bytes " +
          s"which exceeds the maximum configured value of ${config.maxMessageSize}.")
      }

      // check the validity of the message by checking CRC
      // 執行消息批次校驗,包括格式是否正確以及CRC校驗
      if (!batch.isValid) {
        brokerTopicStats.allTopicsStats.invalidMessageCrcRecordsPerSec.mark()
        throw new CorruptRecordException(s"Record is corrupt (stored crc = ${batch.checksum()}) in topic partition $topicPartition.")
      }
      // 更新maxTimestamp字段和offsetOfMaxTimestamp
      if (batch.maxTimestamp > maxTimestamp) {
        maxTimestamp = batch.maxTimestamp
        offsetOfMaxTimestamp = lastOffset
      }
      // 累加消息批次計數器以及有效字節數,更新shallowMessageCount字段
      shallowMessageCount += 1
      validBytesCount += batchSize
      // 從消息批次中獲取壓縮器類型
      val messageCodec = CompressionCodec.getCompressionCodec(batch.compressionType.id)
      if (messageCodec != NoCompressionCodec)
        sourceCodec = messageCodec
    }

    // Apply broker-side compression if any
    // 獲取Broker端設置的壓縮器類型,即Broker端參數compression.type值。
    // 該參數默認值是producer,表示sourceCodec用的什麼壓縮器,targetCodec就用什麼
    val targetCodec = BrokerCompressionCodec.getTargetCompressionCodec(config.compressionType, sourceCodec)
    // 最後生成LogAppendInfo對象並返回
    LogAppendInfo(firstOffset, lastOffset, maxTimestamp, offsetOfMaxTimestamp, RecordBatch.NO_TIMESTAMP, logStartOffset,
      RecordConversionStats.EMPTY, sourceCodec, targetCodec, shallowMessageCount, validBytesCount, monotonic, lastOffsetOfFirstBatch)
  }

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

新北清潔公司,居家、辦公、裝潢細清專業服務

※別再煩惱如何寫文案,掌握八大原則!

※教你寫出一流的銷售文案?

※超省錢租車方案

FB行銷專家,教你從零開始的技巧

Docker Dockerfile 指令詳解與實戰案例

 

Dockerfile介紹及常用指令,包括FROM,RUN,還提及了 COPY,ADD,EXPOSE,WORKDIR等,其實 Dockerfile 功能很強大,它提供了十多個指令。

 

Dockerfile介紹

Dockerfile 是一個用來構建鏡像的文本文件,文本內容包含了一條條構建鏡像所需的指令和說明。

在Docker中創建鏡像最常用的方式,就是使用Dockerfile。Dockerfile是一個Docker鏡像的描述文件,我們可以理解成火箭發射的A、B、C、D…的步驟。Dockerfile其內部包含了一條條的指令,每一條指令構建一層,因此每一條指令的內容,就是描述該層應當如何構建。

FROM 指令-指定基礎鏡像

所謂定製鏡像,那一定是以一個鏡像為基礎,在其上進行定製。而 FROM 就是指定基礎鏡像,因此一個 Dockerfile 中 FROM 是必備的指令,並且必須是第一條指令。如下:

FROM centos

 

MAINTAINER 維護者信息

該鏡像是由誰維護的

MAINTAINER lightzhang lightzhang@xxx.com

 

ENV 設置環境變量

格式有兩種:

ENV <key> <value>
ENV <key1>=<value1> <key2>=<value2>...

 

這個指令很簡單,就是設置環境變量而已,無論是後面的其它指令,如 RUN,還是運行時的應用,都可以直接使用這裏定義的環境變量。

ENV VERSION=1.0 DEBUG=on \
    NAME="Happy Feet"

這個例子中演示了如何換行,以及對含有空格的值用雙引號括起來的辦法,這和 Shell 下的行為是一致的。

下列指令可以支持環境變量展開:

ADD、COPY、ENV、EXPOSE、FROM、LABEL、USER、WORKDIR、VOLUME、STOPSIGNAL、ONBUILD、RUN。

可以從這個指令列表裡感覺到,環境變量可以使用的地方很多,很強大。通過環境變量,我們可以讓一份 Dockerfile 製作更多的鏡像,只需使用不同的環境變量即可。

 

ARG 構建參數

格式:ARG <參數名>[=<默認值>]

構建參數和 ENV 的效果一樣,都是設置環境變量。所不同的是,ARG 所設置的構建環境的環境變量,在將來容器運行時是不會存在這些環境變量的。但是不要因此就使用 ARG 保存密碼之類的信息,因為 docker history 還是可以看到所有值的。

Dockerfile 中的 ARG 指令是定義參數名稱,以及定義其默認值。該默認值可以在構建命令 docker build 中用 –build-arg <參數名>=<值> 來覆蓋

在 1.13 之前的版本,要求 –build-arg 中的參數名,必須在 Dockerfile 中用 ARG 定義過了,換句話說,就是 –build-arg 指定的參數,必須在 Dockerfile 中使用了。如果對應參數沒有被使用,則會報錯退出構建。

從 1.13 開始,這種嚴格的限制被放開,不再報錯退出,而是显示警告信息,並繼續構建。這對於使用 CI 系統,用同樣的構建流程構建不同的 Dockerfile 的時候比較有幫助,避免構建命令必須根據每個 Dockerfile 的內容修改。

 

RUN 執行命令

RUN 指令是用來執行命令行命令的。由於命令行的強大能力,RUN 指令在定製鏡像時是最常用的指令之一。其格式有兩種:

shell 格式:RUN <命令>,就像直接在命令行中輸入的命令一樣。如下:

RUN echo '<h1>Hello, Docker!</h1>' > /usr/share/nginx/html/index.html

 

exec 格式:RUN [“可執行文件”, “參數1”, “參數2”],這更像是函數調用中的格式。

Dockerfile 中每一個指令都會建立一層,RUN 也不例外。每一個 RUN 的行為,都會新建立一層,在其上執行這些命令,執行結束后,commit 這一層的修改,構成新的鏡像。

Dockerfile 不推薦寫法:

 1 FROM debian:stretch
 2 
 3 RUN apt-get update
 4 RUN apt-get install -y gcc libc6-dev make wget
 5 RUN wget -O redis.tar.gz "http://download.redis.io/releases/redis-5.0.3.tar.gz"
 6 RUN mkdir -p /usr/src/redis
 7 RUN tar -xzf redis.tar.gz -C /usr/src/redis --strip-components=1
 8 RUN make -C /usr/src/redis
 9 RUN make -C /usr/src/redis install
10 RUN rm redis.tar.gz

上面的這種寫法,創建了 8 層鏡像。這是完全沒有意義的,而且很多運行時不需要的東西,都被裝進了鏡像里,比如編譯環境、更新的軟件包等等。最後一行即使刪除了軟件包,那也只是當前層的刪除;雖然我們看不見這個包了,但軟件包卻早已存在於鏡像中並一直跟隨着鏡像,沒有真正的刪除。

結果就是產生非常臃腫、非常多層的鏡像,不僅僅增加了構建部署的時間,也很容易出錯。 這是很多初學 Docker 的人常犯的一個錯誤。

另外:Union FS 是有最大層數限制的,比如 AUFS,曾經是最大不得超過 42 層,現在是不得超過 127 層。

 

Dockerfile 正確寫法:

 1 FROM debian:stretch
 2 
 3 RUN buildDeps='gcc libc6-dev make wget' \
 4     && apt-get update \
 5     && apt-get install -y $buildDeps \
 6     && wget -O redis.tar.gz "http://download.redis.io/releases/redis-5.0.3.tar.gz" \
 7     && mkdir -p /usr/src/redis \
 8     && tar -xzf redis.tar.gz -C /usr/src/redis --strip-components=1 \
 9     && make -C /usr/src/redis \
10     && make -C /usr/src/redis install \
11     && rm -rf /var/lib/apt/lists/* \
12     && rm redis.tar.gz \
13     && rm -r /usr/src/redis \
14     && apt-get purge -y --auto-remove $buildDeps

這裏沒有使用很多個 RUN 對應不同的命令,而是僅僅使用一個 RUN 指令,並使用 && 將各個所需命令串聯起來。將之前的 8 層,簡化為了 1 層,且後面刪除了不需要的包和目錄。在撰寫 Dockerfile 的時候,要經常提醒自己,這並不是在寫 Shell 腳本,而是在定義每一層該如何構建。因此鏡像構建時,一定要確保每一層只添加真正需要添加的東西,任何無關的東西都應該清理掉。

很多人初學 Docker 製作出了很臃腫的鏡像的原因之一,就是忘記了每一層構建的最後一定要清理掉無關文件。

 

COPY 複製文件

格式:

1 COPY [--chown=<user>:<group>] <源路徑>... <目標路徑>
2 COPY [--chown=<user>:<group>] ["<源路徑1>",... "<目標路徑>"]

 

COPY 指令將從構建上下文目錄中 <源路徑> 的文件/目錄複製到新的一層的鏡像內的 <目標路徑> 位置。如:

COPY package.json /usr/src/app/

 

<源路徑> 可以是多個,甚至可以是通配符,其通配符規則要滿足 Go 的 filepath.Match 規則,如:

1 COPY hom* /mydir/
2 COPY hom?.txt /mydir/

<目標路徑> 可以是容器內的絕對路徑,也可以是相對於工作目錄的相對路徑(工作目錄可以用 WORKDIR 指令來指定)。目標路徑不需要事先創建,如果目錄不存在會在複製文件前先行創建缺失目錄。

此外,還需要注意一點,使用 COPY 指令,源文件的各種元數據都會保留。比如讀、寫、執行權限、文件變更時間等。這個特性對於鏡像定製很有用。特別是構建相關文件都在使用 Git 進行管理的時候。

在使用該指令的時候還可以加上 –chown=<user>:<group> 選項來改變文件的所屬用戶及所屬組。

1 COPY --chown=55:mygroup files* /mydir/
2 COPY --chown=bin files* /mydir/
3 COPY --chown=1 files* /mydir/
4 COPY --chown=10:11 files* /mydir/

 

ADD 更高級的複製文件

ADD 指令和 COPY 的格式和性質基本一致。但是在 COPY 基礎上增加了一些功能。

如果 <源路徑> 為一個 tar 壓縮文件的話,壓縮格式為 gzip, bzip2 以及 xz 的情況下,ADD 指令將會自動解壓縮這個壓縮文件到 <目標路徑> 去。

在某些情況下,如果我們真的是希望複製個壓縮文件進去,而不解壓縮,這時就不可以使用 ADD 命令了。

在 Docker 官方的 Dockerfile 最佳實踐文檔 中要求,盡可能的使用 COPY,因為 COPY 的語義很明確,就是複製文件而已,而 ADD 則包含了更複雜的功能,其行為也不一定很清晰。最適合使用 ADD 的場合,就是所提及的需要自動解壓縮的場合。

特別說明:在 COPY 和 ADD 指令中選擇的時候,可以遵循這樣的原則,所有的文件複製均使用 COPY 指令,僅在需要自動解壓縮的場合使用 ADD。

在使用該指令的時候還可以加上 –chown=<user>:<group> 選項來改變文件的所屬用戶及所屬組。

1 ADD --chown=55:mygroup files* /mydir/
2 ADD --chown=bin files* /mydir/
3 ADD --chown=1 files* /mydir/
4 ADD --chown=10:11 files* /mydir/

 

WORKDIR 指定工作目錄

格式為 WORKDIR <工作目錄路徑>

使用 WORKDIR 指令可以來指定工作目錄(或者稱為當前目錄),以後各層的當前目錄就被改為指定的目錄,如該目錄不存在,WORKDIR 會幫你建立目錄。

之前提到一些初學者常犯的錯誤是把 Dockerfile 等同於 Shell 腳本來書寫,這種錯誤的理解還可能會導致出現下面這樣的錯誤:

1 RUN cd /app
2 RUN echo "hello" > world.txt

如果將這個 Dockerfile 進行構建鏡像運行后,會發現找不到 /app/world.txt 文件,或者其內容不是 hello。原因其實很簡單,在 Shell 中,連續兩行是同一個進程執行環境,因此前一個命令修改的內存狀態,會直接影響后一個命令;而在 Dockerfile 中,這兩行 RUN 命令的執行環境根本不同,是兩個完全不同的容器。這就是對 Dockerfile 構建分層存儲的概念不了解所導致的錯誤。

之前說過每一個 RUN 都是啟動一個容器、執行命令、然後提交存儲層文件變更。第一層 RUN cd /app 的執行僅僅是當前進程的工作目錄變更,一個內存上的變化而已,其結果不會造成任何文件變更。而到第二層的時候,啟動的是一個全新的容器,跟第一層的容器更完全沒關係,自然不可能繼承前一層構建過程中的內存變化。

因此如果需要改變以後各層的工作目錄的位置,那麼應該使用 WORKDIR 指令。

 

USER 指定當前用戶

格式:USER <用戶名>[:<用戶組>]

USER 指令和 WORKDIR 相似,都是改變環境狀態並影響以後的層。WORKDIR 是改變工作目錄,USER 則是改變之後層的執行 RUN, CMD 以及 ENTRYPOINT 這類命令的身份。

當然,和 WORKDIR 一樣,USER 只是幫助你切換到指定用戶而已,這個用戶必須是事先建立好的,否則無法切換。

1 RUN groupadd -r redis && useradd -r -g redis redis
2 USER redis
3 RUN [ "redis-server" ]

 

如果以 root 執行的腳本,在執行期間希望改變身份,比如希望以某個已經建立好的用戶來運行某個服務進程,不要使用 su 或者 sudo,這些都需要比較麻煩的配置,而且在 TTY 缺失的環境下經常出錯。建議使用 gosu。

1 # 建立 redis 用戶,並使用 gosu 換另一個用戶執行命令
2 RUN groupadd -r redis && useradd -r -g redis redis
3 # 下載 gosu
4 RUN wget -O /usr/local/bin/gosu "https://github.com/tianon/gosu/releases/download/1.7/gosu-amd64" \
5     && chmod +x /usr/local/bin/gosu \
6     && gosu nobody true
7 # 設置 CMD,並以另外的用戶執行
8 CMD [ "exec", "gosu", "redis", "redis-server" ]

 

VOLUME 定義匿名卷

格式為:

1 VOLUME ["<路徑1>", "<路徑2>"...]
2 VOLUME <路徑>

之前我們說過,容器運行時應該盡量保持容器存儲層不發生寫操作,對於數據庫類需要保存動態數據的應用,其數據庫文件應該保存於卷(volume)中。為了防止運行時用戶忘記將動態文件所保存目錄掛載為卷,在 Dockerfile 中,我們可以事先指定某些目錄掛載為匿名卷,這樣在運行時如果用戶不指定掛載,其應用也可以正常運行,不會向容器存儲層寫入大量數據。

 

VOLUME /data

這裏的 /data 目錄就會在運行時自動掛載為匿名卷,任何向 /data 中寫入的信息都不會記錄進容器存儲層,從而保證了容器存儲層的無狀態化。當然,運行時可以覆蓋這個掛載設置。比如:

docker run -d -v mydata:/data xxxx

在這行命令中,就使用了 mydata 這個命名卷掛載到了 /data 這個位置,替代了 Dockerfile 中定義的匿名卷的掛載配置。

 

EXPOSE 聲明端口

格式為 EXPOSE <端口1> [<端口2>...]

EXPOSE 指令是聲明運行時容器提供服務端口,這隻是一個聲明,在運行時並不會因為這個聲明應用就會開啟這個端口的服務。在 Dockerfile 中寫入這樣的聲明有兩個好處,一個是幫助鏡像使用者理解這個鏡像服務的守護端口,以方便配置映射;另一個用處則是在運行時使用隨機端口映射時,也就是 docker run -P 時,會自動隨機映射 EXPOSE 的端口。

要將 EXPOSE 和在運行時使用 -p <宿主端口>:<容器端口> 區分開來。-p,是映射宿主端口和容器端口,換句話說,就是將容器的對應端口服務公開給外界訪問,而 EXPOSE 僅僅是聲明容器打算使用什麼端口而已,並不會自動在宿主進行端口映射。

 

ENTRYPOINT 入口點

ENTRYPOINT 的格式和 RUN 指令格式一樣,分為 exec 格式和 shell 格式

ENTRYPOINT 的目的和 CMD 一樣,都是在指定容器啟動程序及參數。ENTRYPOINT 在運行時也可以替代,不過比 CMD 要略顯繁瑣,需要通過 docker run 的參數 –entrypoint 來指定。

當指定了 ENTRYPOINT 后,CMD 的含義就發生了改變,不再是直接的運行其命令,而是將 CMD 的內容作為參數【】傳給 ENTRYPOINT 指令,換句話說實際執行時,將變為:

<ENTRYPOINT> "<CMD>"

 

那麼有了 CMD 后,為什麼還要有 ENTRYPOINT 呢?這種 <ENTRYPOINT> “<CMD>“ 有什麼好處么?讓我們來看幾個場景。

場景一:讓鏡像變成像命令一樣使用

假設我們需要一個得知自己當前公網 IP 的鏡像,那麼可以先用 CMD 來實現:

1 FROM centos:7.7.1908
2 CMD [ "curl", "-s", "ifconfig.io" ]

 

假如我們使用 docker build -t myip . 來構建鏡像的話,如果我們需要查詢當前公網 IP,只需要執行:

1 $ docker run myip
2 183.226.75.148

 

嗯,這麼看起來好像可以直接把鏡像當做命令使用了,不過命令總有參數,如果我們希望加參數呢?比如從上面的 CMD 中可以看到實質的命令是 curl,那麼如果我們希望显示 HTTP 頭信息,就需要加上 -i 參數。那麼我們可以直接加 -i 參數給 docker run myip 么?

1 $ docker run myip -i
2 docker: Error response from daemon: OCI runtime create failed: container_linux.go:348: starting container process caused "exec: \"-i\": executable file not found in $PATH": unknown.
3 ERRO[0000] error waiting for container: context canceled

 

我們可以看到可執行文件找不到的報錯,executable file not found。之前我們說過,跟在鏡像名後面的是 command【Usage: docker run [OPTIONS] IMAGE [COMMAND] [ARG…]】,運行時會替換 CMD 的默認值。因此這裏的 -i 替換了原來的 CMD,而不是添加在原來的 curl -s ifconfig.io 後面。而 -i 根本不是命令,所以自然找不到。

那麼如果我們希望加入 -i 這參數,我們就必須重新完整的輸入這個命令:

$ docker run myip curl -s ifconfig.io -i

 

這顯然不是很好的解決方案,而使用 ENTRYPOINT 就可以解決這個問題。現在我們重新用 ENTRYPOINT 來實現這個鏡像:

1 FROM centos:7.7.1908
2 ENTRYPOINT [ "curl", "-s", "ifconfig.io" ]

 

使用 docker build -t myip2 . 構建完成后,這次我們再來嘗試直接使用 docker run myip2 -i:

 1 $ docker run myip2 
 2 183.226.75.148
 3 
 4 $ docker run myip2 -i
 5 HTTP/1.1 200 OK
 6 Date: Sun, 19 Apr 2020 02:20:48 GMT
 7 Content-Type: text/plain; charset=utf-8
 8 Content-Length: 15
 9 Connection: keep-alive
10 Set-Cookie: __cfduid=d76a2e007bbe7ec2d230b0a6636d115151587262848; expires=Tue, 19-May-20 02:20:48 GMT; path=/; domain=.ifconfig.io; HttpOnly; SameSite=Lax
11 CF-Cache-Status: DYNAMIC
12 Server: cloudflare
13 CF-RAY: 586326015c3199a1-LAX
14 alt-svc: h3-27=":443"; ma=86400, h3-25=":443"; ma=86400, h3-24=":443"; ma=86400, h3-23=":443"; ma=86400
15 cf-request-id: 0231d614d9000099a1e10d7200000001
16 
17 183.226.75.148

 

可以看到,這次成功了。這是因為當存在 ENTRYPOINT 后,CMD 的內容將會作為參數傳給 ENTRYPOINT,而這裏 -i 就是新的 CMD,因此會作為參數傳給 curl,從而達到了我們預期的效果。

 

場景二:應用運行前的準備工作

啟動容器就是啟動主進程,但有些時候,啟動主進程前,需要一些準備工作。

比如 mysql 類的數據庫,可能需要一些數據庫配置、初始化的工作,這些工作要在最終的 mysql 服務器運行之前解決。

此外,可能希望避免使用 root 用戶去啟動服務,從而提高安全性,而在啟動服務前還需要以 root 身份執行一些必要的準備工作,最後切換到服務用戶身份啟動服務。或者除了服務外,其它命令依舊可以使用 root 身份執行,方便調試等。

這些準備工作是和容器 CMD 無關的,無論 CMD 是什麼,都需要事先進行一個預處理的工作。這種情況下,可以寫一個腳本,然後放入 ENTRYPOINT 中去執行,而這個腳本會將接到的參數(也就是 <CMD>)作為命令,在腳本最後執行。比如官方鏡像 redis 中就是這麼做的:

1 FROM alpine:3.4
2 ...
3 RUN addgroup -S redis && adduser -S -G redis redis
4 ...
5 ENTRYPOINT ["docker-entrypoint.sh"]
6 
7 EXPOSE 6379
8 CMD [ "redis-server" ]

 

可以看到其中為了 redis 服務創建了 redis 用戶,並在最後指定了 ENTRYPOINT 為 docker-entrypoint.sh 腳本。

1 #!/bin/sh
2 ...
3 # allow the container to be started with `--user`
4 if [ "$1" = 'redis-server' -a "$(id -u)" = '0' ]; then
5     chown -R redis .
6     exec su-exec redis "$0" "$@"
7 fi
8 
9 exec "$@"

 

該腳本的內容就是根據 CMD 的內容來判斷,如果是 redis-server 的話,則切換到 redis 用戶身份啟動服務器,否則依舊使用 root 身份執行。比如:

1 $ docker run -it redis id
2 uid=0(root) gid=0(root) groups=0(root)

 

CMD 容器啟動命令

CMD 指令的格式和 RUN 相似,也是兩種格式和一種特殊格式:

1 shell 格式:CMD <命令>
2 exec 格式:CMD ["可執行文件", "參數1", "參數2"...]
3 參數列表格式:CMD ["參數1", "參數2"...]。在指定了 ENTRYPOINT 指令后,用 CMD 指定具體的參數。

 

之前介紹容器的時候曾經說過,Docker 不是虛擬機,容器就是進程。既然是進程,那麼在啟動容器的時候,需要指定所運行的程序及參數。CMD 指令就是用於指定默認的容器主進程的啟動命令的。

在指令格式上,一般推薦使用 exec 格式,這類格式在解析時會被解析為 JSON 數組,因此一定要使用雙引號 “,而不要使用單引號。

如果使用 shell 格式的話,實際的命令會被包裝為 sh -c 的參數的形式進行執行。比如:

CMD echo $HOME

 

在實際執行中,會將其變更為:

CMD [ "sh", "-c", "echo $HOME" ]

 

這就是為什麼我們可以使用環境變量的原因,因為這些環境變量會被 shell 進行解析處理。

提到 CMD 就不得不提容器中應用在前台執行和後台執行的問題。這是初學者常出現的一個混淆。

Docker 不是虛擬機,容器中的應用都應該以前台執行,而不是像虛擬機、物理機裏面那樣,用 systemd 去啟動後台服務,容器內沒有後台服務的概念

對於容器而言,其啟動程序就是容器應用進程,容器就是為了主進程而存在的,主進程退出,容器就失去了存在的意義,從而退出,其它輔助進程不是它需要關心的東西。

一些初學者將 CMD 寫為:

CMD service nginx start

 

使用 service nginx start 命令,則是希望 upstart 來以後台守護進程形式啟動 nginx 服務。而剛才說了 CMD service nginx start 會被理解為 CMD [ “sh”, “-c”, “service nginx start”],因此主進程實際上是 sh。那麼當 service nginx start 命令結束后,sh 也就結束了,sh 作為主進程退出了,自然就會令容器退出。

正確的做法是直接執行 nginx 可執行文件,並且要求以前台形式運行。比如:

CMD ["nginx", "-g", "daemon off;"]

 

構建鏡像案例-Nginx

構建文件

 1 [root@docker01 make03]# pwd
 2 /root/docker_test/make03
 3 [root@docker01 make03]# ll
 4 total 12
 5 -rw-r--r-- 1 root root 720 Apr 19 16:46 Dockerfile
 6 -rw-r--r-- 1 root root  95 Apr 19 16:19 entrypoint.sh
 7 -rw-r--r-- 1 root root  22 Apr 19 16:18 index.html
 8 [root@docker01 make03]# cat Dockerfile   # Dockerfile文件
 9 # 基礎鏡像
10 FROM centos:7.7.1908
11 
12 # 維護者
13 MAINTAINER lightzhang lightzhang@xxx.com
14 
15 # 命令:做了些什麼操作
16 RUN echo 'nameserver 223.5.5.5' > /etc/resolv.conf && echo 'nameserver 223.6.6.6' >> /etc/resolv.conf
17 RUN curl -o /etc/yum.repos.d/CentOS-Base.repo http://mirrors.aliyun.com/repo/Centos-7.repo && curl -o /etc/yum.repos.d/epel.repo http://mirrors.aliyun.com/repo/epel-7.repo
18 RUN yum install -y nginx-1.16.1 && yum clean all
19 RUN echo "daemon off;" >> /etc/nginx/nginx.conf
20 
21 # 添加文件
22 COPY index.html /usr/share/nginx/html/index.html
23 COPY entrypoint.sh /usr/local/bin/entrypoint.sh
24 
25 # 對外暴露端口聲明
26 EXPOSE 80
27 
28 # 執行
29 ENTRYPOINT ["sh", "entrypoint.sh"]
30 
31 # 執行命令
32 CMD ["nginx"]
33 
34 [root@docker01 make03]# cat index.html   # 訪問文件
35 nginx in docker, html
36 [root@docker01 make03]# cat entrypoint.sh   # entrypoint 文件
37 #!/bin/bash
38 if [ "$1" = 'nginx' ]; then
39     exec nginx -c /etc/nginx/nginx.conf
40 fi
41 exec "$@"

 

構建鏡像

 1 [root@docker01 make03]# docker build -t base/nginx:1.16.1 .
 2 Sending build context to Docker daemon  4.608kB
 3 Step 1/11 : FROM centos:7.7.1908
 4  ---> 08d05d1d5859
 5 Step 2/11 : MAINTAINER lightzhang lightzhang@xxx.com
 6  ---> Using cache
 7  ---> 1dc29e78d94f
 8 Step 3/11 : RUN echo 'nameserver 223.5.5.5' > /etc/resolv.conf && echo 'nameserver 223.6.6.6' >> /etc/resolv.conf
 9  ---> Using cache
10  ---> 19398ad9b023
11 Step 4/11 : RUN curl -o /etc/yum.repos.d/CentOS-Base.repo http://mirrors.aliyun.com/repo/Centos-7.repo && curl -o /etc/yum.repos.d/epel.repo http://mirrors.aliyun.com/repo/epel-7.repo
12  ---> Using cache
13  ---> b2451c5856c5
14 Step 5/11 : RUN yum install -y nginx-1.16.1 && yum clean all
15  ---> Using cache
16  ---> 291f27cae4df
17 Step 6/11 : RUN echo "daemon off;" >> /etc/nginx/nginx.conf
18  ---> Using cache
19  ---> 115e07b6313e
20 Step 7/11 : COPY index.html /usr/share/nginx/html/index.html
21  ---> Using cache
22  ---> 9d714d2e2a84
23 Step 8/11 : COPY entrypoint.sh /usr/local/bin/entrypoint.sh
24  ---> Using cache
25  ---> b16983911b56
26 Step 9/11 : EXPOSE 80
27  ---> Using cache
28  ---> d8675d6c2d43
29 Step 10/11 : ENTRYPOINT ["sh", "entrypoint.sh"]
30  ---> Using cache
31  ---> 802a1a67db37
32 Step 11/11 : CMD ["nginx"]
33  ---> Using cache
34  ---> f2517b4d5510
35 Successfully built f2517b4d5510
36 Successfully tagged base/nginx:1.16.1

 

發布容器與端口查看

 1 [root@docker01 ~]# docker run -d -p 80:80 --name mynginx_v2 base/nginx:1.16.1   # 啟動容器
 2 50a45a0894d8669308de7c70d47c96db8cd8990d3e34d1d125e5289ed062f126
 3 [root@docker01 ~]# 
 4 [root@docker01 ~]# docker ps 
 5 CONTAINER ID   IMAGE               COMMAND                  CREATED         STATUS         PORTS                NAMES
 6 50a45a0894d8   base/nginx:1.16.1   "sh entrypoint.sh ng…"   3 minutes ago   Up 3 minutes   0.0.0.0:80->80/tcp   mynginx_v2
 7 [root@docker01 ~]# netstat -lntup  # 端口查看
 8 Active Internet connections (only servers)
 9 Proto Recv-Q Send-Q Local Address           Foreign Address         State       PID/Program name    
10 tcp        0      0 127.0.0.1:25            0.0.0.0:*               LISTEN      1634/master         
11 tcp        0      0 0.0.0.0:111             0.0.0.0:*               LISTEN      1/systemd           
12 tcp        0      0 0.0.0.0:22              0.0.0.0:*               LISTEN      1349/sshd           
13 tcp6       0      0 ::1:25                  :::*                    LISTEN      1634/master         
14 tcp6       0      0 :::111                  :::*                    LISTEN      1/systemd           
15 tcp6       0      0 :::80                   :::*                    LISTEN      13625/docker-proxy  
16 tcp6       0      0 :::8080                 :::*                    LISTEN      2289/docker-proxy   
17 tcp6       0      0 :::22                   :::*                    LISTEN      1349/sshd           
18 udp        0      0 0.0.0.0:1021            0.0.0.0:*                           847/rpcbind         
19 udp        0      0 0.0.0.0:111             0.0.0.0:*                           1/systemd           
20 udp6       0      0 :::1021                 :::*                                847/rpcbind         
21 udp6       0      0 :::111                  :::*                                1/systemd

 

瀏覽器訪問

http://172.16.1.31/

 

 

  

 

 

———END———
如果覺得不錯就關注下唄 (-^O^-) !

 

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

新北清潔公司,居家、辦公、裝潢細清專業服務

※別再煩惱如何寫文案,掌握八大原則!

網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

※超省錢租車方案

※教你寫出一流的銷售文案?

網頁設計最專業,超強功能平台可客製化

Python 網絡爬蟲實戰:爬取 B站《全職高手》20萬條評論數據

本周我們的目標是:B站(嗶哩嗶哩彈幕網 https://www.bilibili.com )視頻評論數據。

我們都知道,B站有很多號稱“鎮站之寶”的視頻,擁有着數量極其恐怖的評論和彈幕。所以這次我們的目標就是,爬取B站視頻的評論數據,分析其為何會深受大家喜愛。

首先去調研一下,B站評論數量最多的視頻是哪一個。。。好在已經有大佬已經統計過了,我們來看一哈!

​【B站大數據可視化】B站評論數最多的視頻究竟是?來自 <https://www.bilibili.com/video/av34900167/>

 

嗯?《全職高手》,有點意思,第一集和最後一集分別佔據了評論數量排行榜的第二名和第一名,遠超了其他很多很火的番。那好,就拿它下手吧,看看它到底強在哪兒。

廢話不多說,先去B站看看這部神劇到底有多好看 https://www.bilibili.com/bangumi/play/ep107656

額,需要開通大會員才能觀看。。。

好吧,不看就不看,不過好在雖然視頻看不了,評論卻是可以看的。

感受到它的恐怖了嗎?63w6條的評論!9千多頁!果然是不同凡響啊。

接下來,我們就開始編寫爬蟲,爬取這些數據吧。

 

使用爬蟲爬取網頁一般分為四個階段:分析目標網頁,獲取網頁內容,提取關鍵信息,輸出保存。

1. 分析目標網頁

  • 首先觀察評論區結構,發現評論區為鼠標點擊翻頁形式,共 9399 頁,每一頁有 20 條評論,每條評論中包含 用戶名、評論內容、評論樓層、時間日期、點贊數等信息展示。

  • 接着我們按 F12 召喚出開發者工具,切換到Network。然後用鼠標點擊評論翻頁,觀察這個過程有什麼變化,並以此來制定我們的爬取策略。

  • 我們不難發現,整個過程中 URL 不變,說明評論區翻頁不是通過 URL 控制。而在每翻一頁的時候,網頁會向服務器發出這樣的請求(請看 Request URL)。

  • 點擊 Preview 欄,可以切換到預覽頁面,也就是說,可以看到這個請求返回的結果是什麼。下面是該請求返回的 json 文件,包含了在 replies 里包含了本頁的評論數據。在這個 json 文件里,我們可以發現,這裏面包含了太多的信息,除了網頁上展示的信息,還有很多沒展示出來的信息也有,簡直是挖到寶了。不過,我們這裏用不到,通通忽略掉,只挑我們關注的部分就好了。

2. 獲取網頁內容

網頁內容分析完畢,可以正式寫代碼爬了。

 1 import requests
 2 
 3 def fetchURL(url):
 4     '''
 5     功能:訪問 url 的網頁,獲取網頁內容並返回
 6     參數:
 7         url :目標網頁的 url
 8     返回:目標網頁的 html 內容
 9     '''
10     headers = {
11         'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8',
12         'user-agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/68.0.3440.106 Safari/537.36',
13     }
14     
15     try:
16         r = requests.get(url,headers=headers)
17         r.raise_for_status()
18         print(r.url)
19         return r.text
20     except requests.HTTPError as e:
21         print(e)
22         print("HTTPError")
23     except requests.RequestException as e:
24         print(e)
25     except:
26         print("Unknown Error !")
27         
28 
29 if __name__ == '__main__':
30     url = 'https://api.bilibili.com/x/v2/reply?callback=jQuery172020326544171595695_1541502273311&jsonp=jsonp&pn=2&type=1&oid=11357166&sort=0&_=1541502312050'
31     html = fetchURL(url)
32     print(html)

不過,在運行過後,你會發現,403 錯誤,服務器拒絕了我們的訪問。

運行結果:

403 Client Error: Forbidden for url: https://api.bilibili.com/x/v2/reply?callback=jQuery172020326544171595695_1541502273311&jsonp=jsonp&pn=2&type=1&oid=11357166&sort=0&_=1541502312050
HTTPError
None

同樣的,這個請求放瀏覽器地址欄裏面直接打開,會變403,什麼也訪問不到。

這是我們本次爬蟲遇到的第一個坑。在瀏覽器中能正常返迴響應,但是直接打開請求鏈接時,卻會被服務器拒絕。(我第一反應是 cookie ,將瀏覽器中的 cookie 放入爬蟲的請求頭中,重新訪問,發現沒用),或許這也算是一個小的反爬蟲機制吧。

網上查閱資料之後,我找到了解決的方法(雖然不了解原理),原請求的 URL 參數如下:

callback = jQuery1720913511919053787_1541340948898
jsonp = jsonp
pn = 2
type = 1
oid = 11357166&sort=0
_ = 1541341035236

其中,真正有用的參數只有三個:pn(頁數),type(=1)和oid(視頻id)。刪除其餘不必要的參數之後,用新整理出的url去訪問,成功獲取到評論數據。

https://api.bilibili.com/x/v2/reply?type=1&oid=11357166&pn=2

然後,在主函數中,通過寫一個 for 循環,通過改變 pn 的值,獲取每一頁的評論數據。

1 if __name__ == '__main__':
2     for page in range(0,9400):
3         url = 'https://api.bilibili.com/x/v2/reply?type=1&oid=11357166&pn=' + str(page)
4         html = fetchURL(url)

 

3. 提取關鍵信息

通過 json 庫對獲取到的響應內容進行解析,然後提取我們需要的內容:樓層,用戶名,性別,時間,評價,點贊數,回複數。

 1 import json
 2 import time
 3 
 4 def parserHtml(html):
 5     '''
 6     功能:根據參數 html 給定的內存型 HTML 文件,嘗試解析其結構,獲取所需內容
 7     參數:
 8             html:類似文件的內存 HTML 文本對象
 9     '''
10     s = json.loads(html)
11 
12     for i in range(20):
13         comment = s['data']['replies'][i]
14 
15         # 樓層,用戶名,性別,時間,評價,點贊數,回複數
16         floor = comment['floor']
17         username = comment['member']['uname']
18         sex = comment['member']['sex']
19         ctime = time.strftime("%Y-%m-%d %H:%M:%S",time.localtime(comment['ctime']))
20         content = comment['content']['message']
21         likes = comment['like']
22         rcounts = comment['rcount']
23 
24         print('--'+str(floor) + ':' + username + '('+sex+')' + ':'+ctime)
25         print(content)
26         print('like : '+ str(likes) + '      ' + 'replies : ' + str(rcounts))
27         print('  ')
部分運行結果如下:
--204187:day可可鈴(保密):2018-11-05 18:16:22
太太又出本了,這次真的木錢了(´;ω;`)
like : 1      replies : 0
  
--204186:長夜未央233(女):2018-11-05 16:24:52
12區打卡
like : 2      replies : 0
  
--204185:果然還是人渣一枚(男):2018-11-05 13:48:09
貌似忘來了好幾天
like : 1      replies : 1
  
--204183:day可可鈴(保密):2018-11-05 13:12:38
要準備去學校了,萬惡的期中考試( ´_ゝ`)
like : 2      replies : 0
  
--204182:拾秋以恭弘=叶 恭弘(保密):2018-11-05 12:04:19
11月5日打卡( ̄▽ ̄)
like : 1      replies : 0
  
--204181:芝米士噠(女):2018-11-05 07:53:43
這次是真的錯過了一個億[蛆音娘_扶額]
like : 2      replies : 1

4. 保存輸出

我們把這些數據以 csv 的格式保存於本地,即完成了本次爬蟲的全部任務。下面附上爬蟲的全部代碼。

  1 import requests
  2 import json
  3 import time
  4 
  5 def fetchURL(url):
  6     '''
  7     功能:訪問 url 的網頁,獲取網頁內容並返回
  8     參數:
  9         url :目標網頁的 url
 10     返回:目標網頁的 html 內容
 11     '''
 12     headers = {
 13         'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8',
 14         'user-agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/68.0.3440.106 Safari/537.36',
 15     }
 16     
 17     try:
 18         r = requests.get(url,headers=headers)
 19         r.raise_for_status()
 20         print(r.url)
 21         return r.text
 22     except requests.HTTPError as e:
 23         print(e)
 24         print("HTTPError")
 25     except requests.RequestException as e:
 26         print(e)
 27     except:
 28         print("Unknown Error !")
 29         
 30 
 31 def parserHtml(html):
 32     '''
 33     功能:根據參數 html 給定的內存型 HTML 文件,嘗試解析其結構,獲取所需內容
 34     參數:
 35             html:類似文件的內存 HTML 文本對象
 36     '''
 37     try:
 38         s = json.loads(html)
 39     except:
 40         print('error')
 41         
 42     commentlist = []
 43     hlist = []
 44 
 45     hlist.append("序號")
 46     hlist.append("名字")
 47     hlist.append("性別")
 48     hlist.append("時間")
 49     hlist.append("評論")
 50     hlist.append("點贊數")
 51     hlist.append("回複數")
 52 
 53     #commentlist.append(hlist)
 54 
 55     # 樓層,用戶名,性別,時間,評價,點贊數,回複數
 56     for i in range(20):
 57         comment = s['data']['replies'][i]
 58         blist = []
 59 
 60         floor = comment['floor']
 61         username = comment['member']['uname']
 62         sex = comment['member']['sex']
 63         ctime = time.strftime("%Y-%m-%d %H:%M:%S",time.localtime(comment['ctime']))
 64         content = comment['content']['message']
 65         likes = comment['like']
 66         rcounts = comment['rcount']
 67 
 68         blist.append(floor)
 69         blist.append(username)
 70         blist.append(sex)
 71         blist.append(ctime)
 72         blist.append(content)
 73         blist.append(likes)
 74         blist.append(rcounts)
 75 
 76         commentlist.append(blist)
 77 
 78     writePage(commentlist)
 79     print('---'*20)
 80 
 81 def writePage(urating):
 82     '''
 83         Function : To write the content of html into a local file
 84         html : The response content
 85         filename : the local filename to be used stored the response
 86     '''
 87     
 88     import pandas as pd
 89     dataframe = pd.DataFrame(urating)
 90     dataframe.to_csv('Bilibili_comment5-1000條.csv', mode='a', index=False, sep=',', header=False)
 91 
 92 
 93 if __name__ == '__main__':
 94     for page in range(0,9400):
 95         url = 'https://api.bilibili.com/x/v2/reply?type=1&oid=11357166&pn=' + str(page)
 96         html = fetchURL(url)
 97         parserHtml(html)
 98 
 99         # 為了降低被封ip的風險,每爬20頁便歇5秒。
100         if page%20 == 0:
101             time.sleep(5)

 

寫在最後

在爬取過程中,還是遇到了很多的小坑的。

1. 請求的 url 不能直接用,需要對參數進行篩選整理后才能訪問。

2. 爬取過程其實並不順利,因為如果爬取期間如果有用戶發表評論,則請求返回的響應會為空導致程序出錯。所以在實際爬取過程中,記錄爬取的位置,以便出錯之後從該位置繼續爬。(並且,挑選深夜一兩點這種發帖人數少的時間段,可以極大程度的減少程序出錯的機率)

3. 爬取到的數據有多處不一致,其實這個不算是坑,不過這裏還是講一下,免得產生困惑。

        a. 就是評論區樓層只到了20多萬,但是評論數量卻有63萬多條,這個不一致主要是由於B站的評論是可以回復的,回復的評論也會計算到總評論數里。我們這裏只爬樓層的評論,而評論的回復則忽略,只統計回複數即可。

        b. 評論區樓層在20萬條左右,但是我們最後爬取下來的數據只有18萬條左右,反覆檢查爬蟲程序及原網站后發現,這個屬於正常現象,因為有刪評論的情況,評論刪除之後,後面的樓層並不會重新排序,而是就這樣把刪掉的那層空下了。導致樓層數和評論數不一致。

 

 

 如果文章中有哪裡沒有講明白,或者講解有誤的地方,歡迎在評論區批評指正,或者掃描下面的二維碼,加我微信,大家一起學習交流,共同進步。

 

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※教你寫出一流的銷售文案?

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

※回頭車貨運收費標準

※別再煩惱如何寫文案,掌握八大原則!

※超省錢租車方案

※產品缺大量曝光嗎?你需要的是一流包裝設計!

tarjan算法求scc & 縮點

前置知識

圖的遍歷(dfs)

強連通&強連通分量

對於有向圖G中的任意兩個頂點u和v存在u->v的一條路徑,同時也存在v->u的路徑,我們則稱這兩個頂點強連通。以此類推,強連通分量就是某一個分量內各個頂點之間互相連通。

簡單來說,就是有向圖內的一個分量,其中的任意兩個點之家可以互相到達。

求有向圖內部強連通分量的方法大概有2種:tarjan算法,korasaju算法。這裏我們只對tarjan算法進行討論。

tarjan算法

tarjan算法是tarjan神仙提出的基於dfs時間戳和堆棧的算法,這裏我們可以先來看一下什麼是dfs時間戳

dfs時間戳

dfs時間戳就是dfs的先後順序,詳細來講,比如我們dfs最先訪問到的節點是A,於是A的時間戳就是1,第二個訪問到的節點是E,那麼E的時間戳就是2,我們用\(dfn[u]\)來表示u節點的時間戳,應該算是比較簡單的

算法步驟

首先,除了dfn以外我們還需要一個low數組,這個數組記錄了某個點通過圖上的邊能回溯到的dfn值最小的節點。這句話相信在大多數博客裏面都有提到,這裏我們來看一個簡單的例子:

首先,我們有一個圖G:

假設我們從a點出發開始dfs,我們可以畫出一個dfs樹:

為什麼我們畫出來的dfs樹和原來的圖不一樣呢?因為我們在dfs的過程中實際上是會忽略某一些連接到已訪問節點的邊的,這些邊我們暫且稱之為回邊。對於點u來說,\(low[u]\)保存的就是點u通過某一條(或者是幾條)回邊能到達的dfn值最小的節點(也就是被最先訪問的節點)。假設這個dfn值最小的節點是u’,我們可以知道,因為u和u’都是在一棵dfs樹上的,並且u’可以到達u,同時u可以通過一條或多條回邊到達u’,也就是說u’->u路徑上的任意節點都可以通過這一條回邊來互相到達,也就是說他們會形成一個強連通分量。

更加詳細的例子

我們有一個新圖G:

假設我們從A點出發開始dfs,一路跑到D點,那麼我們為這個圖上的每一個點加上dfn數組和low數組的值(dfn,low),整個圖就會長成這個樣子:

此時我們會遇到一條D->A的回邊,也就是說點D能訪問到的dfn值最小的節點從點D本身變化到了A點,所以點D的low值就會發生相應的變化,\(low[D]=min(low[D],dfn[A])\)

緊接着,dfs發生回溯,我們沿着之前的路徑逐步更新路徑上節點的low值,於是就有\(low[C]=min(low[C],low[D])\),直到更新到某一個dfn值和low值相同的節點。因為這個節點能訪問到的最小dfn的節點就是其本身,也就是說這個節點是整個scc最先被訪問到的節點。

全部搞完大概會變成這個樣子:

我們用一個輔助棧來保存dfs的路徑,這樣就可以在找到一個強連通分量裏面最早被訪問到的節點的時候可以輸出路徑。同時因為dfs訪問是一條路走到黑的,所以可以保證棧內在節點u(low[u]==dfn[u])之前的的節點都是屬於同一個scc的。

還是上面這幅圖,我們順便把E點給更新了:

跑完E點之後就會發現,E點本身的low就是和dfn相等的,所以此時棧內也只有E這一個節點。

於是上面這個圖的scc有以下幾個:

[E]

[A,B,C,D]

代碼實現

首先我們要發現,在dfs的初期我們每一個節點的low和dfn都是相同的,也就是說有dfn[u]=low[u]=++cnt(cnt為計數變量),並且在回溯的過程中要用后訪問節點的low值來更新先訪問節點的low值,也就是說有\(low[u]=min(low[u],low[v])\),當訪問到某一個在棧中的節點的時候,我們要用這個節點的dfn值來更新其他節點,所以有\(low[u]=min(low[u],dfn[v])\)

那麼我們一個簡單的代碼就可以寫出來了:

void tarjan(int u){
	dfn[u]=low[u]=++cnt;
	s.push(u);
	ins[u]=1;
	for(int i=0;i<gpe[u].size();i++){
		int v=gpe[u][i].to;
		if(!dfn[v]){//如果節點未訪問,則訪問之
			tarjan(v);
			low[u]=min(low[u],low[v]);
		}else if(ins[v]){//ins是為棧中節點做的一個標記
			low[u]=min(low[u],dfn[v]);
		}
	}
}

當更新完畢之後,我們需要找出一個完整的scc,因為我們提前已經用輔助棧來記錄節點了,剩下的工作就只剩下從棧中不停地pop就完事了

if(low[u]==dfn[u]){
		ins[u]=0;
		scc[u]=++sccn;//sccn是強連通分量的編號
		size[sccn]=1;//size記錄了強連通分量的大小
    //找到某一個low[u]==dfn[u]的節點的時候就要立即處理,因為這個節點也屬於一個新的scc
		while(s.top()!=u){
			scc[s.top()]=sccn;//scc[u]記錄了u點屬於哪一個scc
			ins[s.top()]=0;
			size[sccn]+=1;
			s.pop();
		}
		s.pop();
    //這裏pop掉的就是一開始的那個low[u]==dfn[u]的節點。因為相關信息已經維護完畢,所以這裏直接pop也沒問題
	}

把這兩部分結合在一起,就是tarjan求scc的完整代碼了:

void tarjan(int u){
	dfn[u]=low[u]=++cnt;
	s.push(u);
	ins[u]=1;
	for(int i=0;i<gpe[u].size();i++){
		int v=gpe[u][i].to;
		if(!dfn[v]){
			tarjan(v);
			low[u]=min(low[u],low[v]);
		}else if(ins[v]){
			low[u]=min(low[u],dfn[v]);
		}
	}
	if(low[u]==dfn[u]){
		ins[u]=0;
		scc[u]=++sccn;
		size[sccn]=1;
		printf("%d ",u);
		while(s.top()!=u){
			scc[s.top()]=sccn;
			printf("%d ",s.top());
			ins[s.top()]=0;
			size[sccn]+=1;
			s.pop();
		}
		s.pop();
		printf("\n");
	}
	return;
}

tarjan與縮點

tarjan算法最有用的地方就是縮點了。縮點,顧名思義,就是把圖上的某一塊的信息整合成一個點,從而使得後續處理的速度加快(個人的簡單總結,可能會有遺漏之類的)。

先來一個模板題吧:

P2341 受歡迎的牛 G

emmm……題目大意就是對於一條邊u->v代表了u喜歡v ,然後給出了一個奶牛和奶牛之間的關係網(不要問我為什麼是奶牛,這不是usaco題目的傳統藝能嗎),要你求出這群奶牛之中的明星奶牛。明星奶牛就是那些被所有奶牛所喜歡的奶牛。這裏要注意,喜歡是可以傳遞的,也就是說a->b,b->c,那麼a->c。(更多題目細節可以去連接裏面看看)

首先最樸素的dfs方法就是對於每一個點來檢查喜歡它的節點的數量,但是這樣的效率肯定是太低了,所以我們考慮縮點。如果在這個關係網內部存在某一個強連通分量,也就是說這個分量裏面的每一個奶牛都是互相喜歡着的,並且任何喜歡這個分量的奶牛都會喜歡到這個分量內部的每一個奶牛,於是我們可以把這個分量當成一個點來看待。

縮點結束之後的新圖肯定是一個DAG(有向無環圖),又因為縮點本身對題目是沒有影響的,所以我們可以基於這個DAG來分析題目,比之前算是簡單許多了。

很明顯,一個DAG裏面只能有一個明星牛(或者是由明星牛組成的SCC),因為當存在兩個的時候他們是無法互相喜歡的(如果互相喜歡的話就會被縮成一個點)

答案就很明顯了,我們只需要維護每一個SCC的出度(出度為0則證明這就是一個明星),如果存在兩個或兩個以上的明星則證明這個圖裡面沒有明星。如果只有一個的話我們就在tarjan裏面順手維護每一個scc的大小,最後統計一下輸出就完事了

AC代碼:

#include <bits/stdc++.h>
using namespace std;
const int maxn=10010;
struct edge{
	int to;
	edge(int to_){
		to=to_;
	}
};
vector<edge> gpe[maxn];
int dfn[maxn],low[maxn],ins[maxn],scc[maxn],size[maxn],cnt=0,sccn=0;
stack<int> s;
void tarjan(int u){
	dfn[u]=low[u]=++cnt;
	s.push(u);
	ins[u]=1;
	for(int i=0;i<gpe[u].size();i++){
		int v=gpe[u][i].to;
		if(!dfn[v]){
			tarjan(v);
			low[u]=min(low[u],low[v]);
		}else if(ins[v]){
			low[u]=min(low[u],dfn[v]);
		}
	}
	if(low[u]==dfn[u]){
		ins[u]=0;
		scc[u]=++sccn;
		size[sccn]=1;
		while(s.top()!=u){
			scc[s.top()]=sccn;
			ins[s.top()]=0;
			size[sccn]+=1;
			s.pop();
		}
		s.pop();
	}
	return;
}
int n,m,oud[maxn];
int main(void){
	scanf("%d %d",&n,&m);
	memset(low,0x3f,sizeof(low));
	memset(ins,0,sizeof(ins));
	for(int i=1;i<=m;i++){
		int u,v;
		scanf("%d %d",&u,&v);
		gpe[u].push_back(edge(v));
	}
	for(int i=1;i<=n;i++){
		if(!dfn[i]){
			cnt=0;
			tarjan(i);
		}
	}
	for(int u=1;u<=n;u++){
		for(int i=0;i<gpe[u].size();i++){
			int v=gpe[u][i].to;
			if(scc[u]!=scc[v]) oud[scc[u]]++;
		}
	}
	int cont=0,ans=0;
	for(int i=1;i<=sccn;i++){
		if(oud[i]==0){
			cont++;
			ans+=size[i];
		}
	}
	if(cont==1){
		printf("%d",ans);
	}else{
		printf("0");
	}
	return 0;
}

代碼以前寫的,略冗長,見諒

題目推薦:

真·模板題: P2863 [USACO06JAN]The Cow Prom S

P1262 間諜網絡

P2746 [USACO5.3]校園網Network of Schools

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※超省錢租車方案

※別再煩惱如何寫文案,掌握八大原則!

※回頭車貨運收費標準

※教你寫出一流的銷售文案?

FB行銷專家,教你從零開始的技巧

專家針對武漢肺炎警告 都市人長期處於空污下 恐增加感染死亡風險

環境資訊中心綜合外電;姜唯 編譯;林大利 審校

本站聲明:網站內容來源環境資訊中心https://e-info.org.tw/,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

網頁設計公司推薦不同的風格,搶佔消費者視覺第一線

※Google地圖已可更新顯示潭子電動車充電站設置地點!!

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

※別再煩惱如何寫文案,掌握八大原則!

網頁設計最專業,超強功能平台可客製化

不敵客訴壓力 澳洲超市延後對塑膠袋收費

摘錄自2018年8月3日蘋果日報澳洲報導

澳洲一間大型連鎖超市原本要實施塑膠袋需收費政策,希望可推動「減塑」為環保出一分力,不料顧客對此大感不滿,政策推出後已經第2次延長收費期限。

連鎖超市科爾斯(Coles)自上月1日起停用一次性塑膠袋,改向顧客提供可重複使用、質料更耐用的塑膠袋作過渡,直至本周三(1日)為止。超市原定顧客此後若索取塑膠袋,每個須收取15澳分(約3.4元台幣)。

然而科爾斯周三發聲明指,自禁用一次性塑膠袋後,有顧客反映需要更多過渡時間,以適應使用可重複使用塑膠袋,集團因此決定在昆士蘭、新南威爾斯、維多利亞及西澳洲等地繼續提供免費塑膠袋至8月29日,而南澳洲、塔斯馬尼亞等地區則仍繼續收費。

本站聲明:網站內容來源環境資訊中心https://e-info.org.tw/,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※自行創業缺乏曝光? 網頁設計幫您第一時間規劃公司的形象門面

網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

※想知道最厲害的網頁設計公司"嚨底家"!

※幫你省時又省力,新北清潔一流服務好口碑

※別再煩惱如何寫文案,掌握八大原則!

※產品缺大量曝光嗎?你需要的是一流包裝設計!

湄公河跨國水資源爭奪戰 寮國沙耶武里大壩爭議中即將啟用

環境資訊中心外電;姜唯 翻譯;林大利 審校;稿源:ENS

位於寮國北部湄公河的沙耶武里水壩將在數日內正式啟用。沙耶武里水壩是湄公河主流下游的第一座水壩,它的啟用象徵著湄公河命運的重要轉折點。

湄公河長4,350公里,是世界第12長河,排水量世界第八。發源於青藏高原,流經中國、緬甸、寮國和泰國,接著湧入柬埔寨和越南的沖積平原和三角洲。

沙耶武里水壩的主要目的是水力發電,其95%的發電量將由泰國電力局購買。

沙耶武里水壩打從一開始就是一個爭議性的工程,許多人擔心它對河流系統的作用,包括使湄公河的洄游魚類和沈積物難以往下游移動,可能連鄰國都會受影響。

大壩對環境的影響進而威脅流域內居民的糧食來源、生計和社會文化體系。

沙耶武里水壩即將完工。照片來源:

許多專家認為,湄公河上游中國境內已經建了六座水壩,寮國甚至柬埔寨下游還要再蓋,已經讓湄公河深陷危機。

沙耶武里水壩諮詢過程中,有許多利害關係者表示關切,質疑資料和研究是否充分。

越南政府呼籲暫停所有主流上水壩的建設10年,以進一步研究、更深入地了解河流系統和水壩的可能影響。

泰國湄公河沿岸的社區代表於2012年向泰國行政法院提起訴訟,質疑泰國向沙耶武里水壩購買電力的計畫。此訴訟案別具指標性,但經過數次上訴,七年後的今日仍懸而未決。

儘管如此,沙耶武里水壩的開發並沒有停止,開發商重新設計以減輕疑慮。

後續的大壩工程計畫也持續在進行。本月,湄公河委員會宣布開始對湄公河下游的第五座主流水壩瑯勃拉邦進行事前諮商。

在沙耶武里水壩啟用前,美國非營利組織國際河網(International Rivers)發布了關於水壩的新報告。該組織邀請兩位獨立專家針對湄公河委員會今年稍早發布的沙耶武里水壩設計變更審查報告發表評論。

兩位專家分別是澳洲雪梨大學人文地理學教授賀屈(Philip Hirsch)博士和英格蘭諾桑比亞大學社會科學副教授亨森格斯(Oliver Hensengerth)博士。他們檢視沙耶武里水壩如何成為主流水壩決策模式的基準,強調「迫切需要一個真正的區域性標準程序來保護湄公河的未來。」

國際河網的聲明指出,雖然國際河網自身的立場是認為大壩的開發正在「扼殺」湄公河,但該專家評論的目的並非批評或評估湄公河委員會的管理審查報告,而是在試圖「找出關鍵點,討論它們對沙耶武里水壩和其他規劃中或興建中水壩對湄公河下游主流以及該地區內的影響。」

沙耶武里水壩開發前的地景樣貌。照片來源: (CC BY 2.0)

23日,一場針對國際河網報告的座談會在泰國曼谷外國記者俱樂部舉行,與會學者、社區和民間社團熱烈討論沙耶武里水壩工程的歷史、決策過程缺陷、進行中的活動以及對生態系統與居民的影響。

國際河網認為,沙耶武里水壩興建過程「工程先行,研究後補」的做法很不負責任。(詳見)

湄公河沖積平原和三角洲是全世界農業產量極高、生物多樣性極豐富的水域之一,但是海平面上升、土地沉降、上游超過126個規劃中水壩以及各式各樣三角洲水利基礎設施讓人們不得不對水力發電的潛在問題感到憂心。

國際河網不是唯一一個對湄公河流域感到擔憂的環境組織。

丹麥DHI顧問集團針對湄公河三角洲進行的研究得出的結論是,「即使是現有最佳的魚道技術,也可能無法因應大量的魚類遷徙。在高峰時期,遷徙魚群最多可達每小時300萬條。此外,可能也難以滿足該流域數百種魚類有百百種遷移方式。」

根據22日在寮國首都永珍發布的最新報告《》,負責管理流域水壩開發的湄公河委員會也對此表示關注。

湄公河委員會成立於1995年,是一個政府間組織,直接與柬埔寨、寮國、泰國和越南政府合作,共同管理共享水資源和湄公河的永續發展。

該組織是水務外交的區域平台,也是維持該區域永續發展的水資源管理知識中心。

湄公河委員會報告警告:「主流水流體系明顯永久性改變,沉積物被阻攔造成泥沙流量大量減少,濕地持續喪失,河流生態環境惡化,捕撈漁業的壓力不斷增加以及目前水開發設施和用水資訊共享有限」,是湄公河流域國家面臨的主要挑戰。

「我們現在必須解決這些問題,盡可能減少對環境的損害,並在僅剩的濕地和河邊生態環境消失之前加以保護,同時利用更穩定且有所增加的旱季流量,實現湄公河地區最佳永續發展。」湄公河委員會執行長An Pich Hatda博士在啟用儀式上對來自四個湄公河國家、近100位官員說。

最新的流域狀況報告建議:「必須緊急採取更積極的區域性流域規劃和管理方法,並加強系統性地共享資訊,並嚴格監控河流流量,以因應這些流域挑戰。」

Dam Development Is ‘Silencing’ the Mekong River BANGKOK, Thailand, October 24, 2019 (ENS)

 In five days, the Xayaburi Hydropower Project on the Mekong River in northern Laos will formally begin operations. As the first dam on the lower Mekong mainstream, this marks a turning point for the Mekong River.

The Mekong River is 4,350 kilometers (2,703 miles) long, ranked 12th in length and eighth in water discharge in the world. The river originates in the Tibetan Plateau and flows through China, Myanmar, Laos, and Thailand before pouring into the alluvial floodplains and delta in Cambodia and Vietnam.

The main purpose of the Xayaburi dam is to produce hydroelectric power, 95 percent of which is to be purchased by the Electricity Generating Authority of Thailand.

From the outset, the Xayaburi dam was a controversial project due to widespread concerns over its expected impacts on the river system, including transboundary impacts in neighboring countries.

Major predicted impacts include the destruction of Mekong migratory fisheries and trapping of sediment, preventing it from traveling downstream.

The dam’s environmental impacts, in turn, threaten the food, livelihoods and socio-cultural systems of populations residing within the river basin.

Many experts believe that the Mekong, already suffering from the impacts of six dams installed in China on the Upper Mekong, and with more dams planned downstream in Laos and possibly Cambodia, is in crisis.

During the Xayaburi dam consultation process, many stakeholders raised concerns over the project and questioned the adequacy of the data and studies.

The Vietnamese government called for a project suspension and a 10-year moratorium on all mainstream dams pending further study to better understand the river system and the impacts of planned dams.

In Thailand, community representatives along the Mekong River filed a landmark lawsuit in the Thai Administrative Court challenging Thailand’s power purchase from the project. Originally filed in 2012, following several appeals, the lawsuit remains pending more than seven years later.

Despite this, the Xayaburi dam moved forward, with the developers undertaking a redesign in an effort to mitigate concerns.

Subsequent dam projects have followed. This month, the Mekong River Commission announced the commencement of Prior Consultation for Luang Prabang, the fifth lower Mekong mainstream dam to undergo the process.

In the lead-up to the commissioning of the Xayaburi dam, the U.S.-based nonprofit group International Rivers issued a new report on the dam. The group asked two independent experts to provide comments on the Mekong River Commission’s review of the Xayaburi redesign, released earlier this year.

The report of the experts, Dr. Philip Hirsch, professor of Human Geography at the University of Sydney, Australia; and Dr. Oliver Hensengerth, associate professor of social sciences at Northumbria University, England, examines the pattern of Xayaburi in setting a benchmark for decisions on mainstream dams and highlights “the urgent need for a truly regional approach to safeguard the Mekong’s future.”

Although International Rivers says dam development is “silencing” the Mekong River, this expert commentary is not intended as a critique or assessment of the MRC Review, said the group in a statement. “Rather, it seeks to draw out key points and discuss their implications for Xayaburi and other dams under construction or consideration on the lower Mekong mainstream and within the region.”

On Wednesday, a panel discussion of the International Rivers report with academic, community and civil society speakers at the Foreign Correspondents’ Club of Thailand in Bangkok provoked comments on the project’s history, its flawed decision-making process, the ongoing campaigns, and Xayaburi’s implications for the ecosystems and people of the Mekong Basin.

International Rivers has described the “build first, study later” approach propagated by the Xayaburi Dam process as “a dangerously irresponsible model for dam-building in the Mekong.”

To read the report, “Review of Design Changes Made for the Xayaburi Hydropower Project,” click .

The Mekong floodplains and delta are among the most agriculturally productive and biologically diverse waterscapes of the world, but sea level rise, land subsidence, and the proposed upstream development of over 126 hydropower dams and extensive delta-based water infrastructure have raised concerns about the potential impacts on the hydrology of the region.

International Rivers is not the only environmental group with concerns about the Mekong River Basin.

A Mekong Delta Study conducted by Denmark’s DHI Consulting Group concluded it was likely “that even the best available fish passage technologies’ may not be able to handle either the massive volume of fish migrations, which during peak periods can reach up to three million fish per hour, or the diversity of migration strategies that characterise the hundreds of fish species in the basin.”

The Mekong River Commission, which governs the dam development of the basin, is also concerned, according to the latest report, State of the Basin Report 2018, released on Tuesday in Vientiane, the Laotian capital city.

Established in 1995, the Mekong River Commission, MRC, is an inter-governmental organization that works directly with the governments of Cambodia, Laos, Thailand, and Vietnam to jointly manage the shared water resources and the sustainable development of the Mekong River.

The organization serves as a regional platform for water diplomacy as well as a knowledge hub of water resources management for the sustainable development of the region.

The MRC report warns, “The apparent permanent modification of mainstream flow regime, the substantial reduction in sediment flows due to sediment trapping, the continuing loss of wetlands, the deterioration of riverine habitats, the growing pressures on capture fisheries, and the limited information sharing on current water development facilities and water use,” are some of the major challenges facing countries in the Mekong Basin.

“We need to address these issues now in order to minimize further environmental harm and protect remaining wetlands and riverine habitats before they are gone, while leveraging the benefits of more secure and increased dry season flows and achieving a more optimal and sustainable development of the Mekong basin,” Dr. An Pich Hatda, chief executive officer of the MRC Secretariat, told nearly 100 officials from the four MRC countries at the launch ceremony.

This latest State of the Basin Report advises that “a more proactive regional approach to basin planning and management, with an enhanced and systematic information sharing mechanism and robust monitoring of river flow must be put in place urgently to address these basin-wide challenges.”

※ 全文及圖片詳見:

作者

如果有一件事是重要的,如果能為孩子實現一個願望,那就是人類與大自然和諧共存。

於特有生物研究保育中心服務,小鳥和棲地是主要的研究對象。是龜毛的讀者,認為龜毛是探索世界的美德。

延伸閱讀

本站聲明:網站內容來源環境資訊中心https://e-info.org.tw/,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

新北清潔公司,居家、辦公、裝潢細清專業服務

※別再煩惱如何寫文案,掌握八大原則!

※教你寫出一流的銷售文案?

※超省錢租車方案

FB行銷專家,教你從零開始的技巧

遊客遽增破壞環境 紐西蘭國會報告示警

摘錄自2019年12月18日中央社威靈頓報導

紐西蘭國會今天(18日)提出報告警示,遽增的遊客數量正危害紐西蘭環境,也讓當地聞名遐邇、極具魅力的寧靜印象受到逐步破壞。

法新社報導,紐西蘭一向標榜「100%純淨」和「乾淨又環保」的形象,但近年來遊客數量遽增,許多人朝聖「魔戒」(The Lord of the Rings)電影拍攝地點大玩自拍,還有登山客、健行者及野生動物愛好人士。

紐西蘭國會環境事務專員厄普頓(Simon Upton)提出報告之際,紐國旅遊業正受到嚴格審查,因為白島(White Island)火山在9日爆發,造成16名外籍旅客和2名導遊喪命。

擁有490萬人口的紐西蘭每年吸引近400萬名外國遊客,厄普頓說,這項數據到了2050年可能增加2倍,厄普頓表示,基礎設施變得吃緊、環境承受壓力,紐西蘭原有的諸多品質正在消失,他說:「大批群眾正逐漸破壞許多外國遊客赴紐西蘭旅遊所尋覓的獨處感、寧靜和親近大自然的感覺,我們必須要問:我們是否正在殺雞取卵?」

厄普頓提到,紐西蘭人也是問題的一部分,在紐西蘭國定假期,著名景點湧現的國內遊客人潮比外國遊客還要多。

他還說,紐西蘭人已經習慣東加里羅步道(Tongariro Crossing)等著名景點「被遊客團團包圍」,而且問題只會持續惡化。

本站聲明:網站內容來源環境資訊中心https://e-info.org.tw/,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

新北清潔公司,居家、辦公、裝潢細清專業服務

※別再煩惱如何寫文案,掌握八大原則!

網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

※超省錢租車方案

※教你寫出一流的銷售文案?

網頁設計最專業,超強功能平台可客製化