寶馬計畫從2013年開始投產i子品牌電動車

寶馬集團計畫從2013年開始將i子品牌車型投入生產,該品牌為專屬品牌。

寶馬集團董事長諾伯特·雷瑟夫(Norbert Reithofer)日前表示,公司將集中精力準備i系列新款電動車的生產工作,按照計畫i子品牌將在2013年下半年投放到市場,而投產時間則早於該節點。此前有消息稱該子品牌旗下的i3車型明年將在德國萊比錫(Leipzig)工廠投產,初步產能3萬輛/年。

2011年2月21日,寶馬正式宣佈發佈i子品牌。根據寶馬注冊商標透露的資訊,車型覆蓋從i1到i9。寶馬官方正式公佈了i3純電動車和i8插電式混動車兩款車型,分別以Megacity Vehicle概念車和Vision EfficientDynamics概念技術為藍本。另外,據悉i1為小型城市電動車,i4為兩座酷派,i5為帶有酷派風格的四門Sedan轎車。i4將在今年11月底洛杉磯車展上亮相。

此前有傳聞指出,寶馬i專案工程浩大,耗費成本甚巨,導致集團考慮放棄該子品牌。不過寶馬方面否認了該說法。

本站聲明:網站內容來源於EnergyTrend https://www.energytrend.com.tw/ev/,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

新北清潔公司,居家、辦公、裝潢細清專業服務

※別再煩惱如何寫文案,掌握八大原則!

※教你寫出一流的銷售文案?

※超省錢租車方案

FB行銷專家,教你從零開始的技巧

中國汽車技術研究中心正在制定新能源車碰撞安全標準

中國汽車技術研究中心副主任高和生日前指出,最大的隱患在於電池,在受到撞擊之後電池會變形、起火甚至爆炸,這對汽車的安全帶來了巨大的隱患,因此中心目前也正在考慮制定新能源汽車的相關安全標準。但並沒有透露具體的出臺時間。

隨著國家的大力宣導,目前新能源汽車的發展正在逐步提速,雖然私人購車數量不多,但越來越多的新能源汽車已經逐步進入到如計程車等服務領域,而近一段時間頻頻出現的安全問題,也讓人們對新能源汽車產生了擔憂。

同時高和生透露,C-NCAP未來將把進口車碰撞作為工作重點之一,儘管廠家會很反對,但是本著對消費者負責的態度,我們還是會做這個事情,我們將從當前熱銷的進口車中選擇車型進行碰撞試驗,而銷量很小的車型則不在考慮範圍。

本站聲明:網站內容來源於EnergyTrend https://www.energytrend.com.tw/ev/,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※超省錢租車方案

※別再煩惱如何寫文案,掌握八大原則!

※回頭車貨運收費標準

※教你寫出一流的銷售文案?

FB行銷專家,教你從零開始的技巧

本田新電動概念車量產版預計2013年日本發佈

近日,一款精緻可愛的小車出現在人們的視線,外觀看上去跟smart fortwo以及雷諾Twizy很相像,它就是於去年東京車展亮相的本田純電動小車概念車Micro Commuter的量產版,預計這款車的原型車將於2013年日本發佈,這款純電動小車正是符合了日本政府提倡環保的要求。

這款精緻的小車車身長、寬、高分別是2.5/1.25/1.45米,車內前排座位只設計了駕駛者座位,後排座位只能坐兩個小孩,後排座位對於成年人可是很牽強。車內傳統的儀錶板以及操控都是由嵌入式平板電腦來實現。

動力方面,其搭載了一款後置以及一個容量15KW的鋰電池,最大續航里程僅59km/h,看來這款小車的實用價值還有待實踐,極速也只有80km/h,看來這款小車的推出只適合在城市間短距離行駛。

本站聲明:網站內容來源於EnergyTrend https://www.energytrend.com.tw/ev/,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

新北清潔公司,居家、辦公、裝潢細清專業服務

※別再煩惱如何寫文案,掌握八大原則!

※教你寫出一流的銷售文案?

※超省錢租車方案

FB行銷專家,教你從零開始的技巧

微軟將為豐田提供新能源汽車車載終端系統

2011年豐田與微軟簽署合作協定,微軟為豐田提供車載遠距離通信方面的技術支援,及的車載控制與娛樂終端。該系統將基於Windows Azure作業系統開發,通過利用微軟的資訊服務平臺,將通過將車輛變成資訊終端以提升他們的價值,並提高車輛和交通的安全性。並在170個國家提供該服務。

豐田汽車北美技術總監紮克。希克斯(Zack Hicks)表示,下一樣重要的互聯網設備是汽車,豐田與微軟合作開發智慧車載系統。但蘋果近期不會大規模投入該領域,蘋果一直在與汽車廠商合作,將其技術應用到汽車上。6月份,蘋果公佈了一款名為Siri Eyes Free的新產品,它實際上是針對汽車的Siri.它能讓司機用語音命令執行一系列操作,例如打電話、播放音樂、發短信、導航等。蘋果稱,搭載這一系統的車型將於2013年夏季面世。

微軟多年來也為汽車廠商提供了類似技術。福特汽車的Sync智慧娛樂系統和豐田汽車的Entune導航系統都採用了微軟的技術。

本站聲明:網站內容來源於EnergyTrend https://www.energytrend.com.tw/ev/,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※超省錢租車方案

※別再煩惱如何寫文案,掌握八大原則!

※回頭車貨運收費標準

※教你寫出一流的銷售文案?

FB行銷專家,教你從零開始的技巧

Kafka源碼解析(二)—Log分析

上一篇文章講了LogSegment和Log的初始化,這篇來講講Log的主要操作有哪些。

一般來說Log 的常見操作分為 4 大部分。

  1. 高水位管理操作
  2. 日誌段管理
  3. 關鍵位移值管理
  4. 讀寫操作

其中關鍵位移值管理主要包含Log Start Offset 和 LEO等。

高水位HighWatermark

高水位HighWatermark初始化

高水位是通過LogOffsetMetadata類來定義的:

@volatile private var highWatermarkMetadata: LogOffsetMetadata = LogOffsetMetadata(logStartOffset)

這裏傳入的初始值是logStartOffset,表明當首次構建高水位時,它會被賦值成 Log Start Offset 值。

我們再來看看LogOffsetMetadata類:

case class LogOffsetMetadata(messageOffset: Long,
                             segmentBaseOffset: Long = Log.UnknownOffset,
                             relativePositionInSegment: Int = LogOffsetMetadata.UnknownFilePosition) {

  // check if this offset is already on an older segment compared with the given offset
  def onOlderSegment(that: LogOffsetMetadata): Boolean = {
    if (messageOffsetOnly)
      throw new KafkaException(s"$this cannot compare its segment info with $that since it only has message offset info")

    this.segmentBaseOffset < that.segmentBaseOffset
  }
  ...
}

LogOffsetMetadata有三個初始值:

messageOffset表示消息位移值;

segmentBaseOffset保存消息位移值所在日誌段的起始位移,用來判斷兩條消息是否處於同一個日誌段的;

relativePositionSegment保存消息位移值所在日誌段的物理磁盤位置;

上面的onOlderSegment表明,要比較哪個日誌段更老,只需要比較segmentBaseOffset的大小就可以了。

高水位HighWatermark設值與更新

  private def updateHighWatermarkMetadata(newHighWatermark: LogOffsetMetadata): Unit = {
    //高水位的值不可能小於零
    if (newHighWatermark.messageOffset < 0)
      throw new IllegalArgumentException("High watermark offset should be non-negative")

    lock synchronized {// 保護Log對象修改的Monitor鎖
      highWatermarkMetadata = newHighWatermark// 賦值新的高水位值
      //事務相關,暫時忽略
      producerStateManager.onHighWatermarkUpdated(newHighWatermark.messageOffset)
      //事務相關,暫時忽略
      maybeIncrementFirstUnstableOffset()
    }
    trace(s"Setting high watermark $newHighWatermark")
  }

設置高水位的值是很簡單的,首先校驗高水位的值是否大於零,然後通過直接加鎖之後更新高水位的值。

更新更新高水位值的方法有兩個:updateHighWatermark 和 maybeIncrementHighWatermark,我們分別分析。

updateHighWatermark

  def updateHighWatermark(hw: Long): Long = {
    //傳入的高水位的值如果小於logStartOffset,設置為logStartOffset
    val newHighWatermark = if (hw < logStartOffset)
      logStartOffset
    //  傳入的高水位的值如果大於LEO,那麼設置為LEO
    else if (hw > logEndOffset)
      logEndOffset
    else
      hw
    //將newHighWatermark封裝成一個LogOffsetMetadata然後更新高水位的值
    updateHighWatermarkMetadata(LogOffsetMetadata(newHighWatermark))
    //返回新的高水位的值
    newHighWatermark
  }

這個方法邏輯也很簡潔,因為高水位的值是不可能大於LEO,也不可能小於logStartOffset,所以需要對傳入的hw校驗然後設置成正確的值,然後調用上面的設置高水位的方法設值。

maybeIncrementHighWatermark

/**
 * Update the high watermark to a new value if and only if it is larger than the old value. It is
 * an error to update to a value which is larger than the log end offset.
 *
 * This method is intended to be used by the leader to update the high watermark after follower
 * fetch offsets have been updated.
 *
 * @return the old high watermark, if updated by the new value
 */
//  當新的高水位的值大於舊的高水位的值時才做更新,如果新的高水位的值大於LEO,會報錯
//  這個方法是leader在確認Follower已經拉取了日誌之後才做更新
def maybeIncrementHighWatermark(newHighWatermark: LogOffsetMetadata): Option[LogOffsetMetadata] = {
  //如果新的高水位的值大於LEO,會報錯
  if (newHighWatermark.messageOffset > logEndOffset)
    throw new IllegalArgumentException(s"High watermark $newHighWatermark update exceeds current " +
      s"log end offset $logEndOffsetMetadata")

  lock.synchronized {
    // 獲取老的高水位值
    val oldHighWatermark = fetchHighWatermarkMetadata

    // Ensure that the high watermark increases monotonically. We also update the high watermark when the new
    // offset metadata is on a newer segment, which occurs whenever the log is rolled to a new segment.
    //只有當新的高水位值大於老的值,因為要維護高水位的單調遞增性
    //或者當新的高水位值和老的高水位值相等,但是新的高水位在一個新的日誌段上面時才做更新
    if (oldHighWatermark.messageOffset < newHighWatermark.messageOffset ||
      (oldHighWatermark.messageOffset == newHighWatermark.messageOffset && oldHighWatermark.onOlderSegment(newHighWatermark))) {
      updateHighWatermarkMetadata(newHighWatermark)
      Some(oldHighWatermark)// 返回老的高水位值
    } else {
      None
    }
  }
}

這個方法我將這個方法的英文註釋貼出來了,這個註釋的說明我也寫到方法上了,邏輯很清楚,大家看看註釋應該能理解。

這兩個方法主要的區別是,updateHighWatermark 方法,主要用在 Follower 副本從 Leader 副本獲取到消息后更新高水位值。而 maybeIncrementHighWatermark 方法,主要是用來更新 Leader 副本的高水位值。

上面的方法中通過調用fetchHighWatermarkMetadata來獲取高水位的值,我們下面看看這個方法:

fetchHighWatermarkMetadata

  private def fetchHighWatermarkMetadata: LogOffsetMetadata = {
    // 讀取時確保日誌不能被關閉
    checkIfMemoryMappedBufferClosed()

    val offsetMetadata = highWatermarkMetadata
    if (offsetMetadata.messageOffsetOnly) {//沒有獲得到完整的高水位元數據
      lock.synchronized {
        // 通過讀日誌文件的方式把完整的高水位元數據信息拉出來
        val fullOffset = convertToOffsetMetadataOrThrow(highWatermark)
        updateHighWatermarkMetadata(fullOffset)
        fullOffset
      }
    } else {
      offsetMetadata
    }
  }

  private def convertToOffsetMetadataOrThrow(offset: Long): LogOffsetMetadata = {
    //通過給的offset,去日誌文件中找到相應的日誌信息
    val fetchDataInfo = read(offset,
      maxLength = 1,
      isolation = FetchLogEnd,
      minOneMessage = false)
    fetchDataInfo.fetchOffsetMetadata
  }

然後我們提前看一下日誌的read方法,是如何根據索引讀取數據的:

日誌段操作

日誌讀取操作

read

  def read(startOffset: Long,
           maxLength: Int,
           isolation: FetchIsolation,
           minOneMessage: Boolean): FetchDataInfo = {
    maybeHandleIOException(s"Exception while reading from $topicPartition in dir ${dir.getParent}") {
      trace(s"Reading $maxLength bytes from offset $startOffset of length $size bytes")

      //convertToOffsetMetadataOrThrow傳進來是FetchLogEnd,所以這裡是false
      val includeAbortedTxns = isolation == FetchTxnCommitted
 
      // 由於沒有使用鎖,所以使用變量緩存當前的nextOffsetMetadata狀態
      val endOffsetMetadata = nextOffsetMetadata
      val endOffset = endOffsetMetadata.messageOffset
      // 到日字段中根據索引尋找最近的日誌段
      var segmentEntry = segments.floorEntry(startOffset)

      // return error on attempt to read beyond the log end offset or read below log start offset
      // 這裏給出了幾種異常場景:
      // 1. 給的日誌索引大於最大值;
      // 2. 通過索引找的日誌段為空;
      // 3. 給的日誌索引小於logStartOffset
      if (startOffset > endOffset || segmentEntry == null || startOffset < logStartOffset)
        throw new OffsetOutOfRangeException(s"Received request for offset $startOffset for partition $topicPartition, " +
          s"but we only have log segments in the range $logStartOffset to $endOffset.")

      //convertToOffsetMetadataOrThrow傳進來是FetchLogEnd,所以最大值是endOffsetMetadata
      // 查看一下讀取隔離級別設置。
      // 普通消費者能夠看到[Log Start Offset, LEO)之間的消息
      // 事務型消費者只能看到[Log Start Offset, Log Stable Offset]之間的消息。Log Stable Offset(LSO)是比LEO值小的位移值,為Kafka事務使用
      // Follower副本消費者能夠看到[Log Start Offset,高水位值]之間的消息
      val maxOffsetMetadata = isolation match {
        case FetchLogEnd => endOffsetMetadata
        case FetchHighWatermark => fetchHighWatermarkMetadata
        case FetchTxnCommitted => fetchLastStableOffsetMetadata
      }
      //如果尋找的索引等於maxOffsetMetadata,那麼直接返回
      if (startOffset == maxOffsetMetadata.messageOffset) {
        return emptyFetchDataInfo(maxOffsetMetadata, includeAbortedTxns)
      //如果尋找的索引大於maxOffsetMetadata,返回空的消息集合,因為沒法讀取任何消息
      } else if (startOffset > maxOffsetMetadata.messageOffset) {
        val startOffsetMetadata = convertToOffsetMetadataOrThrow(startOffset)
        return emptyFetchDataInfo(startOffsetMetadata, includeAbortedTxns)
      }
 
      // 開始遍曆日志段對象,直到讀出東西來或者讀到日誌末尾
      while (segmentEntry != null) {
        val segment = segmentEntry.getValue
        // 找到日誌段中最大的日誌位移
        val maxPosition = { 
          if (maxOffsetMetadata.segmentBaseOffset == segment.baseOffset) {
            maxOffsetMetadata.relativePositionInSegment
          } else {
            segment.size
          }
        }
        // 根據位移信息從日誌段中讀取日誌信息
        val fetchInfo = segment.read(startOffset, maxLength, maxPosition, minOneMessage)
        // 如果找不到日誌信息,那麼去日誌段集合中找更大的日誌位移的日誌段
        if (fetchInfo == null) {
          segmentEntry = segments.higherEntry(segmentEntry.getKey)
        } else {
          return if (includeAbortedTxns)
            addAbortedTransactions(startOffset, segmentEntry, fetchInfo)
          else
            fetchInfo
        }
      }

      //找了所有日誌段的位移依然找不到,這可能是因為大於指定的日誌位移的消息都被刪除了,這種情況返回空
      FetchDataInfo(nextOffsetMetadata, MemoryRecords.EMPTY)
    }
  }

read方法,有四個參數,分別是:

  • startOffset:讀取的日誌索引位置。
  • maxLength:讀取數據量長度。
  • isolation:隔離級別,多用於 Kafka 事務。
  • minOneMessage:是否至少返回一條消息。設想如果消息很大,超過了 maxLength,正常情況下 read 方法永遠不會返回任何消息。但如果設置了該參數為 true,read 方法就保證至少能夠返回一條消息。

代碼中使用了segments,來根據位移查找日誌段:

  private val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] = new ConcurrentSkipListMap[java.lang.Long, LogSegment]

我們下面看看read方法具體做了哪些事:

  1. 由於沒有使用鎖,所以使用變量緩存當前的nextOffsetMetadata狀態,作為最大索引LEO;
  2. 去日誌段集合里尋找小於或等於指定索引的日誌段;
  3. 校驗異常情況:
    1. startOffset是不是超過了LEO;
    2. 是不是日誌段集合里沒有索引小於startOffset;
    3. startOffset小於Log Start Offset;
  4. 接下來獲取一下隔離級別;
  5. 如果尋找的索引等於LEO,那麼返回空;
  6. 如果尋找的索引大於LEO,返回空的消息集合,因為沒法讀取任何消息;
  7. 開始遍曆日志段對象,直到讀出東西來或者讀到日誌末尾;
    1. 首先找到日誌段中最大的位置;
    2. 根據位移信息從日誌段中讀取日誌信息(這個read方法我們上一篇已經講解過了);
    3. 如果找不到日誌信息,那麼讀取日誌段集合中下一個日誌段;
  8. 找了所有日誌段的位移依然找不到,這可能是因為大於指定的日誌位移的消息都被刪除了,這種情況返回空;

我們在上面的read操作中可以看到,使用了segments來查找日誌。我們主要看看刪除操作

刪除日誌

刪除日誌的入口是:deleteOldSegments

  //  如果topic deletion開關是打開的,那麼會刪去過期的日誌段以及超過設置保留日誌大小的日誌
  // 無論是否開啟刪除規則,都會刪除在log start offset之前的日誌段
  def deleteOldSegments(): Int = {
    if (config.delete) {
      deleteRetentionMsBreachedSegments() + deleteRetentionSizeBreachedSegments() + deleteLogStartOffsetBreachedSegments()
    } else {
      deleteLogStartOffsetBreachedSegments()
    }
  }

deleteOldSegments方法會判斷是否開啟刪除規則,如果開啟,那麼會分別調用:

deleteRetentionMsBreachedSegments刪除segment的時間戳超過了設置時間的日誌段;

deleteRetentionSizeBreachedSegments刪除日誌段空間超過設置空間大小的日誌段;

deleteLogStartOffsetBreachedSegments刪除日誌段的baseOffset小於logStartOffset的日誌段;

我這裏列舉一下這三個方法主要是怎麼實現的:

  private def deleteRetentionMsBreachedSegments(): Int = {
    if (config.retentionMs < 0) return 0
    val startMs = time.milliseconds
    //調用deleteOldSegments方法,並傳入匿名函數,判斷當前的segment的時間戳是否超過了設置時間
    deleteOldSegments((segment, _) => startMs - segment.largestTimestamp > config.retentionMs,
      reason = s"retention time ${config.retentionMs}ms breach")
  }
  
  private def deleteRetentionSizeBreachedSegments(): Int = {
    if (config.retentionSize < 0 || size < config.retentionSize) return 0
    var diff = size - config.retentionSize
    //判斷日誌段空間是否超過設置空間大小
    //shouldDelete函數會將傳入的日誌段去減diff,直到小於等於零
    def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]) = {
      if (diff - segment.size >= 0) {
        diff -= segment.size
        true
      } else {
        false
      }
    }

    deleteOldSegments(shouldDelete, reason = s"retention size in bytes ${config.retentionSize} breach")
  }
  
  private def deleteLogStartOffsetBreachedSegments(): Int = {
    //shouldDelete函數主要判斷日誌段的baseOffset是否小於logStartOffset
    def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]) =
      nextSegmentOpt.exists(_.baseOffset <= logStartOffset)

    deleteOldSegments(shouldDelete, reason = s"log start offset $logStartOffset breach")
  }

這種寫代碼的方式非常的靈活,通過不同方法設置不同的函數來實現代碼復用的目的,最後都是通過調用deleteOldSegments來實現刪除日誌段的目的。

下面我們來看一下deleteOldSegments的操作:

deleteOldSegments

這個deleteOldSegments方法和上面的入口方法傳入的參數是不一致的,這個方法傳入了一個predicate函數,用於判斷哪些日誌段是可以被刪除的,reason用來說明被刪除的原因。

  private def deleteOldSegments(predicate: (LogSegment, Option[LogSegment]) => Boolean, reason: String): Int = {
    //刪除任何匹配到predicate規則的日誌段
    lock synchronized {
      val deletable = deletableSegments(predicate)
      if (deletable.nonEmpty)
        info(s"Found deletable segments with base offsets [${deletable.map(_.baseOffset).mkString(",")}] due to $reason")
      deleteSegments(deletable)
    }
  }

這個方法調用了兩個主要的方法,一個是deletableSegments,用於獲取可以被刪除的日誌段的集合;deleteSegments用於刪除日誌段。

deletableSegments

  private def deletableSegments(predicate: (LogSegment, Option[LogSegment]) => Boolean): Iterable[LogSegment] = {
    //如果日誌段是空的,那麼直接返回
    if (segments.isEmpty) {
      Seq.empty
    } else {
      val deletable = ArrayBuffer.empty[LogSegment]
      var segmentEntry = segments.firstEntry
      //如果日誌段集合不為空,找到第一個日誌段
      while (segmentEntry != null) {
        val segment = segmentEntry.getValue
        //獲取下一個日誌段
        val nextSegmentEntry = segments.higherEntry(segmentEntry.getKey)
        val (nextSegment, upperBoundOffset, isLastSegmentAndEmpty) = if (nextSegmentEntry != null)
          (nextSegmentEntry.getValue, nextSegmentEntry.getValue.baseOffset, false)
        else
          (null, logEndOffset, segment.size == 0)
        //如果下一個日誌段的位移沒有大於或等於HW,並且日誌段是匹配predicate函數的,下一個日誌段也不是空的
        //那麼將這個日誌段放入可刪除集合中,然後遍歷下一個日誌段
        if (highWatermark >= upperBoundOffset && predicate(segment, Option(nextSegment)) && !isLastSegmentAndEmpty) {
          deletable += segment
          segmentEntry = nextSegmentEntry
        } else {
          segmentEntry = null
        }
      }
      deletable
    }
  }

這個方法邏輯十分清晰,主要做了如下幾件事:

  1. 判斷日誌段集合是否為空,為空那麼直接返回空集合;

  2. 如果日誌段集合不為空,那麼從日誌段集合的第一個日誌段開始遍歷;

  3. 判斷當前被遍曆日志段是否能夠被刪除

    1. 日誌段的下一個日誌段的位移有沒有大於或等於HW;
    2. 日誌段是否能夠通過predicate函數校驗;
    3. 日誌段是否是最後一個日誌段;
  4. 將符合條件的日誌段都加入到deletable集合中,並返回。

接下來調用deleteSegments函數:

  private def deleteSegments(deletable: Iterable[LogSegment]): Int = {
    maybeHandleIOException(s"Error while deleting segments for $topicPartition in dir ${dir.getParent}") {
      val numToDelete = deletable.size
      if (numToDelete > 0) {
        // we must always have at least one segment, so if we are going to delete all the segments, create a new one first
        // 我們至少保證要存在一個日誌段,如果要刪除所有的日誌;
        //所以調用roll方法創建一個全新的日誌段對象,並且關閉當前寫入的日誌段對象;
        if (segments.size == numToDelete)
          roll()
        lock synchronized {
          // 確保Log對象沒有被關閉
          checkIfMemoryMappedBufferClosed()
          // remove the segments for lookups
          // 刪除給定的日誌段對象以及底層的物理文件
          removeAndDeleteSegments(deletable, asyncDelete = true)
          // 嘗試更新日誌的Log Start Offset值
          maybeIncrementLogStartOffset(segments.firstEntry.getValue.baseOffset)
        }
      }
      numToDelete
    }
  }

寫日誌

寫日誌的方法主要有兩個:

appendAsLeader

  def appendAsLeader(records: MemoryRecords, leaderEpoch: Int, isFromClient: Boolean = true,
                     interBrokerProtocolVersion: ApiVersion = ApiVersion.latestVersion): LogAppendInfo = {
    append(records, isFromClient, interBrokerProtocolVersion, assignOffsets = true, leaderEpoch)
  }

appendAsFollower

  def appendAsFollower(records: MemoryRecords): LogAppendInfo = {
    append(records, isFromClient = false, interBrokerProtocolVersion = ApiVersion.latestVersion, assignOffsets = false, leaderEpoch = -1)
  }

appendAsLeader 是用於寫 Leader 副本的,appendAsFollower 是用於 Follower 副本同步的。它們的底層都調用了 append 方法

append

  private def append(records: MemoryRecords, isFromClient: Boolean, interBrokerProtocolVersion: ApiVersion, assignOffsets: Boolean, leaderEpoch: Int): LogAppendInfo = {
    maybeHandleIOException(s"Error while appending records to $topicPartition in dir ${dir.getParent}") {
      // 第1步:分析和驗證待寫入消息集合,並返回校驗結果
      val appendInfo = analyzeAndValidateRecords(records, isFromClient = isFromClient)

      // return if we have no valid messages or if this is a duplicate of the last appended entry
      // 如果壓根就不需要寫入任何消息,直接返回即可
      if (appendInfo.shallowCount == 0)
        return appendInfo

      // trim any invalid bytes or partial messages before appending it to the on-disk log
      // 第2步:消息格式規整,即刪除無效格式消息或無效字節
      var validRecords = trimInvalidBytes(records, appendInfo)

      // they are valid, insert them in the log
      lock synchronized {
        // 確保Log對象未關閉
        checkIfMemoryMappedBufferClosed()
        //需要分配位移值
        if (assignOffsets) {
          // assign offsets to the message set
          // 第3步:使用當前LEO值作為待寫入消息集合中第一條消息的位移值,nextOffsetMetadata為LEO值
          val offset = new LongRef(nextOffsetMetadata.messageOffset)
          appendInfo.firstOffset = Some(offset.value)
          val now = time.milliseconds
          val validateAndOffsetAssignResult = try {
            LogValidator.validateMessagesAndAssignOffsets(validRecords,
              topicPartition,
              offset,
              time,
              now,
              appendInfo.sourceCodec,
              appendInfo.targetCodec,
              config.compact,
              config.messageFormatVersion.recordVersion.value,
              config.messageTimestampType,
              config.messageTimestampDifferenceMaxMs,
              leaderEpoch,
              isFromClient,
              interBrokerProtocolVersion,
              brokerTopicStats)
          } catch {
            case e: IOException =>
              throw new KafkaException(s"Error validating messages while appending to log $name", e)
          }
          // 更新校驗結果對象類LogAppendInfo
          validRecords = validateAndOffsetAssignResult.validatedRecords
          appendInfo.maxTimestamp = validateAndOffsetAssignResult.maxTimestamp
          appendInfo.offsetOfMaxTimestamp = validateAndOffsetAssignResult.shallowOffsetOfMaxTimestamp
          appendInfo.lastOffset = offset.value - 1
          appendInfo.recordConversionStats = validateAndOffsetAssignResult.recordConversionStats
          if (config.messageTimestampType == TimestampType.LOG_APPEND_TIME)
            appendInfo.logAppendTime = now

          // re-validate message sizes if there's a possibility that they have changed (due to re-compression or message
          // format conversion)
          // 第4步:驗證消息,確保消息大小不超限
          if (validateAndOffsetAssignResult.messageSizeMaybeChanged) {
            for (batch <- validRecords.batches.asScala) {
              if (batch.sizeInBytes > config.maxMessageSize) {
                // we record the original message set size instead of the trimmed size
                // to be consistent with pre-compression bytesRejectedRate recording
                brokerTopicStats.topicStats(topicPartition.topic).bytesRejectedRate.mark(records.sizeInBytes)
                brokerTopicStats.allTopicsStats.bytesRejectedRate.mark(records.sizeInBytes)
                throw new RecordTooLargeException(s"Message batch size is ${batch.sizeInBytes} bytes in append to" +
                  s"partition $topicPartition which exceeds the maximum configured size of ${config.maxMessageSize}.")
              }
            }
          }
          // 直接使用給定的位移值,無需自己分配位移值
        } else {
          // we are taking the offsets we are given
          if (!appendInfo.offsetsMonotonic)// 確保消息位移值的單調遞增性
            throw new OffsetsOutOfOrderException(s"Out of order offsets found in append to $topicPartition: " +
                                                 records.records.asScala.map(_.offset))

          if (appendInfo.firstOrLastOffsetOfFirstBatch < nextOffsetMetadata.messageOffset) {
            // we may still be able to recover if the log is empty
            // one example: fetching from log start offset on the leader which is not batch aligned,
            // which may happen as a result of AdminClient#deleteRecords()
            val firstOffset = appendInfo.firstOffset match {
              case Some(offset) => offset
              case None => records.batches.asScala.head.baseOffset()
            }

            val firstOrLast = if (appendInfo.firstOffset.isDefined) "First offset" else "Last offset of the first batch"
            throw new UnexpectedAppendOffsetException(
              s"Unexpected offset in append to $topicPartition. $firstOrLast " +
              s"${appendInfo.firstOrLastOffsetOfFirstBatch} is less than the next offset ${nextOffsetMetadata.messageOffset}. " +
              s"First 10 offsets in append: ${records.records.asScala.take(10).map(_.offset)}, last offset in" +
              s" append: ${appendInfo.lastOffset}. Log start offset = $logStartOffset",
              firstOffset, appendInfo.lastOffset)
          }
        }

        // update the epoch cache with the epoch stamped onto the message by the leader
        // 第5步:更新Leader Epoch緩存
        validRecords.batches.asScala.foreach { batch =>
          if (batch.magic >= RecordBatch.MAGIC_VALUE_V2) {
            maybeAssignEpochStartOffset(batch.partitionLeaderEpoch, batch.baseOffset)
          } else {
            // In partial upgrade scenarios, we may get a temporary regression to the message format. In
            // order to ensure the safety of leader election, we clear the epoch cache so that we revert
            // to truncation by high watermark after the next leader election.
            leaderEpochCache.filter(_.nonEmpty).foreach { cache =>
              warn(s"Clearing leader epoch cache after unexpected append with message format v${batch.magic}")
              cache.clearAndFlush()
            }
          }
        }

        // check messages set size may be exceed config.segmentSize
        // 第6步:確保消息大小不超限
        if (validRecords.sizeInBytes > config.segmentSize) {
          throw new RecordBatchTooLargeException(s"Message batch size is ${validRecords.sizeInBytes} bytes in append " +
            s"to partition $topicPartition, which exceeds the maximum configured segment size of ${config.segmentSize}.")
        }

        // maybe roll the log if this segment is full
        // 第7步:執行日誌切分。當前日誌段剩餘容量可能無法容納新消息集合,因此有必要創建一個新的日誌段來保存待寫入的所有消息
        //下面情況將會執行日誌切分:
        //logSegment 已經滿了
        //日誌段中的第一個消息的maxTime已經過期
        //index索引滿了
        val segment = maybeRoll(validRecords.sizeInBytes, appendInfo)

        val logOffsetMetadata = LogOffsetMetadata(
          messageOffset = appendInfo.firstOrLastOffsetOfFirstBatch,
          segmentBaseOffset = segment.baseOffset,
          relativePositionInSegment = segment.size)

        // now that we have valid records, offsets assigned, and timestamps updated, we need to
        // validate the idempotent/transactional state of the producers and collect some metadata
        // 第8步:驗證事務狀態
        val (updatedProducers, completedTxns, maybeDuplicate) = analyzeAndValidateProducerState(
          logOffsetMetadata, validRecords, isFromClient)

        maybeDuplicate.foreach { duplicate =>
          appendInfo.firstOffset = Some(duplicate.firstOffset)
          appendInfo.lastOffset = duplicate.lastOffset
          appendInfo.logAppendTime = duplicate.timestamp
          appendInfo.logStartOffset = logStartOffset
          return appendInfo
        }
        // 第9步:執行真正的消息寫入操作,主要調用日誌段對象的append方法實現
        segment.append(largestOffset = appendInfo.lastOffset,
          largestTimestamp = appendInfo.maxTimestamp,
          shallowOffsetOfMaxTimestamp = appendInfo.offsetOfMaxTimestamp,
          records = validRecords)

        // Increment the log end offset. We do this immediately after the append because a
        // write to the transaction index below may fail and we want to ensure that the offsets
        // of future appends still grow monotonically. The resulting transaction index inconsistency
        // will be cleaned up after the log directory is recovered. Note that the end offset of the
        // ProducerStateManager will not be updated and the last stable offset will not advance
        // if the append to the transaction index fails.
        // 第10步:更新LEO對象,其中,LEO值是消息集合中最後一條消息位移值+1
        // 前面說過,LEO值永遠指向下一條不存在的消息
        updateLogEndOffset(appendInfo.lastOffset + 1)

        // update the producer state
        // 第11步:更新事務狀態
        for (producerAppendInfo <- updatedProducers.values) {
          producerStateManager.update(producerAppendInfo)
        }

        // update the transaction index with the true last stable offset. The last offset visible
        // to consumers using READ_COMMITTED will be limited by this value and the high watermark.
        for (completedTxn <- completedTxns) {
          val lastStableOffset = producerStateManager.lastStableOffset(completedTxn)
          segment.updateTxnIndex(completedTxn, lastStableOffset)
          producerStateManager.completeTxn(completedTxn)
        }

        // always update the last producer id map offset so that the snapshot reflects the current offset
        // even if there isn't any idempotent data being written
        producerStateManager.updateMapEndOffset(appendInfo.lastOffset + 1)

        // update the first unstable offset (which is used to compute LSO)
        maybeIncrementFirstUnstableOffset()

        trace(s"Appended message set with last offset: ${appendInfo.lastOffset}, " +
          s"first offset: ${appendInfo.firstOffset}, " +
          s"next offset: ${nextOffsetMetadata.messageOffset}, " +
          s"and messages: $validRecords")
        // 是否需要手動落盤。一般情況下我們不需要設置Broker端參數log.flush.interval.messages
        // 落盤操作交由操作系統來完成。但某些情況下,可以設置該參數來確保高可靠性
        if (unflushedMessages >= config.flushInterval)
          flush()
        // 第12步:返回寫入結果
        appendInfo
      }
    }
  }

上面代碼的主要步驟如下:

我們下面看看analyzeAndValidateRecords是如何進行消息校驗的:

analyzeAndValidateRecords

  private def analyzeAndValidateRecords(records: MemoryRecords, isFromClient: Boolean): LogAppendInfo = {
    var shallowMessageCount = 0
    var validBytesCount = 0
    var firstOffset: Option[Long] = None
    var lastOffset = -1L
    var sourceCodec: CompressionCodec = NoCompressionCodec
    var monotonic = true
    var maxTimestamp = RecordBatch.NO_TIMESTAMP
    var offsetOfMaxTimestamp = -1L
    var readFirstMessage = false
    var lastOffsetOfFirstBatch = -1L

    for (batch <- records.batches.asScala) {
      // we only validate V2 and higher to avoid potential compatibility issues with older clients
      // 消息格式Version 2的消息批次,起始位移值必須從0開始
      if (batch.magic >= RecordBatch.MAGIC_VALUE_V2 && isFromClient && batch.baseOffset != 0)
        throw new InvalidRecordException(s"The baseOffset of the record batch in the append to $topicPartition should " +
          s"be 0, but it is ${batch.baseOffset}")

      // update the first offset if on the first message. For magic versions older than 2, we use the last offset
      // to avoid the need to decompress the data (the last offset can be obtained directly from the wrapper message).
      // For magic version 2, we can get the first offset directly from the batch header.
      // When appending to the leader, we will update LogAppendInfo.baseOffset with the correct value. In the follower
      // case, validation will be more lenient.
      // Also indicate whether we have the accurate first offset or not
      if (!readFirstMessage) {
        if (batch.magic >= RecordBatch.MAGIC_VALUE_V2)
          firstOffset = Some(batch.baseOffset) // 更新firstOffset字段
        lastOffsetOfFirstBatch = batch.lastOffset // 更新lastOffsetOfFirstBatch字段
        readFirstMessage = true
      }

      // check that offsets are monotonically increasing
      // 一旦出現當前lastOffset不小於下一個batch的lastOffset,說明上一個batch中有消息的位移值大於後面batch的消息
      // 這違反了位移值單調遞增性
      if (lastOffset >= batch.lastOffset)
        monotonic = false

      // update the last offset seen
      // 使用當前batch最後一條消息的位移值去更新lastOffset
      lastOffset = batch.lastOffset

      // Check if the message sizes are valid.
      val batchSize = batch.sizeInBytes
      // 檢查消息批次總字節數大小是否超限,即是否大於Broker端參數max.message.bytes值
      if (batchSize > config.maxMessageSize) {
        brokerTopicStats.topicStats(topicPartition.topic).bytesRejectedRate.mark(records.sizeInBytes)
        brokerTopicStats.allTopicsStats.bytesRejectedRate.mark(records.sizeInBytes)
        throw new RecordTooLargeException(s"The record batch size in the append to $topicPartition is $batchSize bytes " +
          s"which exceeds the maximum configured value of ${config.maxMessageSize}.")
      }

      // check the validity of the message by checking CRC
      // 執行消息批次校驗,包括格式是否正確以及CRC校驗
      if (!batch.isValid) {
        brokerTopicStats.allTopicsStats.invalidMessageCrcRecordsPerSec.mark()
        throw new CorruptRecordException(s"Record is corrupt (stored crc = ${batch.checksum()}) in topic partition $topicPartition.")
      }
      // 更新maxTimestamp字段和offsetOfMaxTimestamp
      if (batch.maxTimestamp > maxTimestamp) {
        maxTimestamp = batch.maxTimestamp
        offsetOfMaxTimestamp = lastOffset
      }
      // 累加消息批次計數器以及有效字節數,更新shallowMessageCount字段
      shallowMessageCount += 1
      validBytesCount += batchSize
      // 從消息批次中獲取壓縮器類型
      val messageCodec = CompressionCodec.getCompressionCodec(batch.compressionType.id)
      if (messageCodec != NoCompressionCodec)
        sourceCodec = messageCodec
    }

    // Apply broker-side compression if any
    // 獲取Broker端設置的壓縮器類型,即Broker端參數compression.type值。
    // 該參數默認值是producer,表示sourceCodec用的什麼壓縮器,targetCodec就用什麼
    val targetCodec = BrokerCompressionCodec.getTargetCompressionCodec(config.compressionType, sourceCodec)
    // 最後生成LogAppendInfo對象並返回
    LogAppendInfo(firstOffset, lastOffset, maxTimestamp, offsetOfMaxTimestamp, RecordBatch.NO_TIMESTAMP, logStartOffset,
      RecordConversionStats.EMPTY, sourceCodec, targetCodec, shallowMessageCount, validBytesCount, monotonic, lastOffsetOfFirstBatch)
  }

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

新北清潔公司,居家、辦公、裝潢細清專業服務

※別再煩惱如何寫文案,掌握八大原則!

※教你寫出一流的銷售文案?

※超省錢租車方案

FB行銷專家,教你從零開始的技巧

tarjan算法求scc & 縮點

前置知識

圖的遍歷(dfs)

強連通&強連通分量

對於有向圖G中的任意兩個頂點u和v存在u->v的一條路徑,同時也存在v->u的路徑,我們則稱這兩個頂點強連通。以此類推,強連通分量就是某一個分量內各個頂點之間互相連通。

簡單來說,就是有向圖內的一個分量,其中的任意兩個點之家可以互相到達。

求有向圖內部強連通分量的方法大概有2種:tarjan算法,korasaju算法。這裏我們只對tarjan算法進行討論。

tarjan算法

tarjan算法是tarjan神仙提出的基於dfs時間戳和堆棧的算法,這裏我們可以先來看一下什麼是dfs時間戳

dfs時間戳

dfs時間戳就是dfs的先後順序,詳細來講,比如我們dfs最先訪問到的節點是A,於是A的時間戳就是1,第二個訪問到的節點是E,那麼E的時間戳就是2,我們用\(dfn[u]\)來表示u節點的時間戳,應該算是比較簡單的

算法步驟

首先,除了dfn以外我們還需要一個low數組,這個數組記錄了某個點通過圖上的邊能回溯到的dfn值最小的節點。這句話相信在大多數博客裏面都有提到,這裏我們來看一個簡單的例子:

首先,我們有一個圖G:

假設我們從a點出發開始dfs,我們可以畫出一個dfs樹:

為什麼我們畫出來的dfs樹和原來的圖不一樣呢?因為我們在dfs的過程中實際上是會忽略某一些連接到已訪問節點的邊的,這些邊我們暫且稱之為回邊。對於點u來說,\(low[u]\)保存的就是點u通過某一條(或者是幾條)回邊能到達的dfn值最小的節點(也就是被最先訪問的節點)。假設這個dfn值最小的節點是u’,我們可以知道,因為u和u’都是在一棵dfs樹上的,並且u’可以到達u,同時u可以通過一條或多條回邊到達u’,也就是說u’->u路徑上的任意節點都可以通過這一條回邊來互相到達,也就是說他們會形成一個強連通分量。

更加詳細的例子

我們有一個新圖G:

假設我們從A點出發開始dfs,一路跑到D點,那麼我們為這個圖上的每一個點加上dfn數組和low數組的值(dfn,low),整個圖就會長成這個樣子:

此時我們會遇到一條D->A的回邊,也就是說點D能訪問到的dfn值最小的節點從點D本身變化到了A點,所以點D的low值就會發生相應的變化,\(low[D]=min(low[D],dfn[A])\)

緊接着,dfs發生回溯,我們沿着之前的路徑逐步更新路徑上節點的low值,於是就有\(low[C]=min(low[C],low[D])\),直到更新到某一個dfn值和low值相同的節點。因為這個節點能訪問到的最小dfn的節點就是其本身,也就是說這個節點是整個scc最先被訪問到的節點。

全部搞完大概會變成這個樣子:

我們用一個輔助棧來保存dfs的路徑,這樣就可以在找到一個強連通分量裏面最早被訪問到的節點的時候可以輸出路徑。同時因為dfs訪問是一條路走到黑的,所以可以保證棧內在節點u(low[u]==dfn[u])之前的的節點都是屬於同一個scc的。

還是上面這幅圖,我們順便把E點給更新了:

跑完E點之後就會發現,E點本身的low就是和dfn相等的,所以此時棧內也只有E這一個節點。

於是上面這個圖的scc有以下幾個:

[E]

[A,B,C,D]

代碼實現

首先我們要發現,在dfs的初期我們每一個節點的low和dfn都是相同的,也就是說有dfn[u]=low[u]=++cnt(cnt為計數變量),並且在回溯的過程中要用后訪問節點的low值來更新先訪問節點的low值,也就是說有\(low[u]=min(low[u],low[v])\),當訪問到某一個在棧中的節點的時候,我們要用這個節點的dfn值來更新其他節點,所以有\(low[u]=min(low[u],dfn[v])\)

那麼我們一個簡單的代碼就可以寫出來了:

void tarjan(int u){
	dfn[u]=low[u]=++cnt;
	s.push(u);
	ins[u]=1;
	for(int i=0;i<gpe[u].size();i++){
		int v=gpe[u][i].to;
		if(!dfn[v]){//如果節點未訪問,則訪問之
			tarjan(v);
			low[u]=min(low[u],low[v]);
		}else if(ins[v]){//ins是為棧中節點做的一個標記
			low[u]=min(low[u],dfn[v]);
		}
	}
}

當更新完畢之後,我們需要找出一個完整的scc,因為我們提前已經用輔助棧來記錄節點了,剩下的工作就只剩下從棧中不停地pop就完事了

if(low[u]==dfn[u]){
		ins[u]=0;
		scc[u]=++sccn;//sccn是強連通分量的編號
		size[sccn]=1;//size記錄了強連通分量的大小
    //找到某一個low[u]==dfn[u]的節點的時候就要立即處理,因為這個節點也屬於一個新的scc
		while(s.top()!=u){
			scc[s.top()]=sccn;//scc[u]記錄了u點屬於哪一個scc
			ins[s.top()]=0;
			size[sccn]+=1;
			s.pop();
		}
		s.pop();
    //這裏pop掉的就是一開始的那個low[u]==dfn[u]的節點。因為相關信息已經維護完畢,所以這裏直接pop也沒問題
	}

把這兩部分結合在一起,就是tarjan求scc的完整代碼了:

void tarjan(int u){
	dfn[u]=low[u]=++cnt;
	s.push(u);
	ins[u]=1;
	for(int i=0;i<gpe[u].size();i++){
		int v=gpe[u][i].to;
		if(!dfn[v]){
			tarjan(v);
			low[u]=min(low[u],low[v]);
		}else if(ins[v]){
			low[u]=min(low[u],dfn[v]);
		}
	}
	if(low[u]==dfn[u]){
		ins[u]=0;
		scc[u]=++sccn;
		size[sccn]=1;
		printf("%d ",u);
		while(s.top()!=u){
			scc[s.top()]=sccn;
			printf("%d ",s.top());
			ins[s.top()]=0;
			size[sccn]+=1;
			s.pop();
		}
		s.pop();
		printf("\n");
	}
	return;
}

tarjan與縮點

tarjan算法最有用的地方就是縮點了。縮點,顧名思義,就是把圖上的某一塊的信息整合成一個點,從而使得後續處理的速度加快(個人的簡單總結,可能會有遺漏之類的)。

先來一個模板題吧:

P2341 受歡迎的牛 G

emmm……題目大意就是對於一條邊u->v代表了u喜歡v ,然後給出了一個奶牛和奶牛之間的關係網(不要問我為什麼是奶牛,這不是usaco題目的傳統藝能嗎),要你求出這群奶牛之中的明星奶牛。明星奶牛就是那些被所有奶牛所喜歡的奶牛。這裏要注意,喜歡是可以傳遞的,也就是說a->b,b->c,那麼a->c。(更多題目細節可以去連接裏面看看)

首先最樸素的dfs方法就是對於每一個點來檢查喜歡它的節點的數量,但是這樣的效率肯定是太低了,所以我們考慮縮點。如果在這個關係網內部存在某一個強連通分量,也就是說這個分量裏面的每一個奶牛都是互相喜歡着的,並且任何喜歡這個分量的奶牛都會喜歡到這個分量內部的每一個奶牛,於是我們可以把這個分量當成一個點來看待。

縮點結束之後的新圖肯定是一個DAG(有向無環圖),又因為縮點本身對題目是沒有影響的,所以我們可以基於這個DAG來分析題目,比之前算是簡單許多了。

很明顯,一個DAG裏面只能有一個明星牛(或者是由明星牛組成的SCC),因為當存在兩個的時候他們是無法互相喜歡的(如果互相喜歡的話就會被縮成一個點)

答案就很明顯了,我們只需要維護每一個SCC的出度(出度為0則證明這就是一個明星),如果存在兩個或兩個以上的明星則證明這個圖裡面沒有明星。如果只有一個的話我們就在tarjan裏面順手維護每一個scc的大小,最後統計一下輸出就完事了

AC代碼:

#include <bits/stdc++.h>
using namespace std;
const int maxn=10010;
struct edge{
	int to;
	edge(int to_){
		to=to_;
	}
};
vector<edge> gpe[maxn];
int dfn[maxn],low[maxn],ins[maxn],scc[maxn],size[maxn],cnt=0,sccn=0;
stack<int> s;
void tarjan(int u){
	dfn[u]=low[u]=++cnt;
	s.push(u);
	ins[u]=1;
	for(int i=0;i<gpe[u].size();i++){
		int v=gpe[u][i].to;
		if(!dfn[v]){
			tarjan(v);
			low[u]=min(low[u],low[v]);
		}else if(ins[v]){
			low[u]=min(low[u],dfn[v]);
		}
	}
	if(low[u]==dfn[u]){
		ins[u]=0;
		scc[u]=++sccn;
		size[sccn]=1;
		while(s.top()!=u){
			scc[s.top()]=sccn;
			ins[s.top()]=0;
			size[sccn]+=1;
			s.pop();
		}
		s.pop();
	}
	return;
}
int n,m,oud[maxn];
int main(void){
	scanf("%d %d",&n,&m);
	memset(low,0x3f,sizeof(low));
	memset(ins,0,sizeof(ins));
	for(int i=1;i<=m;i++){
		int u,v;
		scanf("%d %d",&u,&v);
		gpe[u].push_back(edge(v));
	}
	for(int i=1;i<=n;i++){
		if(!dfn[i]){
			cnt=0;
			tarjan(i);
		}
	}
	for(int u=1;u<=n;u++){
		for(int i=0;i<gpe[u].size();i++){
			int v=gpe[u][i].to;
			if(scc[u]!=scc[v]) oud[scc[u]]++;
		}
	}
	int cont=0,ans=0;
	for(int i=1;i<=sccn;i++){
		if(oud[i]==0){
			cont++;
			ans+=size[i];
		}
	}
	if(cont==1){
		printf("%d",ans);
	}else{
		printf("0");
	}
	return 0;
}

代碼以前寫的,略冗長,見諒

題目推薦:

真·模板題: P2863 [USACO06JAN]The Cow Prom S

P1262 間諜網絡

P2746 [USACO5.3]校園網Network of Schools

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※超省錢租車方案

※別再煩惱如何寫文案,掌握八大原則!

※回頭車貨運收費標準

※教你寫出一流的銷售文案?

FB行銷專家,教你從零開始的技巧

湄公河跨國水資源爭奪戰 寮國沙耶武里大壩爭議中即將啟用

環境資訊中心外電;姜唯 翻譯;林大利 審校;稿源:ENS

位於寮國北部湄公河的沙耶武里水壩將在數日內正式啟用。沙耶武里水壩是湄公河主流下游的第一座水壩,它的啟用象徵著湄公河命運的重要轉折點。

湄公河長4,350公里,是世界第12長河,排水量世界第八。發源於青藏高原,流經中國、緬甸、寮國和泰國,接著湧入柬埔寨和越南的沖積平原和三角洲。

沙耶武里水壩的主要目的是水力發電,其95%的發電量將由泰國電力局購買。

沙耶武里水壩打從一開始就是一個爭議性的工程,許多人擔心它對河流系統的作用,包括使湄公河的洄游魚類和沈積物難以往下游移動,可能連鄰國都會受影響。

大壩對環境的影響進而威脅流域內居民的糧食來源、生計和社會文化體系。

沙耶武里水壩即將完工。照片來源:

許多專家認為,湄公河上游中國境內已經建了六座水壩,寮國甚至柬埔寨下游還要再蓋,已經讓湄公河深陷危機。

沙耶武里水壩諮詢過程中,有許多利害關係者表示關切,質疑資料和研究是否充分。

越南政府呼籲暫停所有主流上水壩的建設10年,以進一步研究、更深入地了解河流系統和水壩的可能影響。

泰國湄公河沿岸的社區代表於2012年向泰國行政法院提起訴訟,質疑泰國向沙耶武里水壩購買電力的計畫。此訴訟案別具指標性,但經過數次上訴,七年後的今日仍懸而未決。

儘管如此,沙耶武里水壩的開發並沒有停止,開發商重新設計以減輕疑慮。

後續的大壩工程計畫也持續在進行。本月,湄公河委員會宣布開始對湄公河下游的第五座主流水壩瑯勃拉邦進行事前諮商。

在沙耶武里水壩啟用前,美國非營利組織國際河網(International Rivers)發布了關於水壩的新報告。該組織邀請兩位獨立專家針對湄公河委員會今年稍早發布的沙耶武里水壩設計變更審查報告發表評論。

兩位專家分別是澳洲雪梨大學人文地理學教授賀屈(Philip Hirsch)博士和英格蘭諾桑比亞大學社會科學副教授亨森格斯(Oliver Hensengerth)博士。他們檢視沙耶武里水壩如何成為主流水壩決策模式的基準,強調「迫切需要一個真正的區域性標準程序來保護湄公河的未來。」

國際河網的聲明指出,雖然國際河網自身的立場是認為大壩的開發正在「扼殺」湄公河,但該專家評論的目的並非批評或評估湄公河委員會的管理審查報告,而是在試圖「找出關鍵點,討論它們對沙耶武里水壩和其他規劃中或興建中水壩對湄公河下游主流以及該地區內的影響。」

沙耶武里水壩開發前的地景樣貌。照片來源: (CC BY 2.0)

23日,一場針對國際河網報告的座談會在泰國曼谷外國記者俱樂部舉行,與會學者、社區和民間社團熱烈討論沙耶武里水壩工程的歷史、決策過程缺陷、進行中的活動以及對生態系統與居民的影響。

國際河網認為,沙耶武里水壩興建過程「工程先行,研究後補」的做法很不負責任。(詳見)

湄公河沖積平原和三角洲是全世界農業產量極高、生物多樣性極豐富的水域之一,但是海平面上升、土地沉降、上游超過126個規劃中水壩以及各式各樣三角洲水利基礎設施讓人們不得不對水力發電的潛在問題感到憂心。

國際河網不是唯一一個對湄公河流域感到擔憂的環境組織。

丹麥DHI顧問集團針對湄公河三角洲進行的研究得出的結論是,「即使是現有最佳的魚道技術,也可能無法因應大量的魚類遷徙。在高峰時期,遷徙魚群最多可達每小時300萬條。此外,可能也難以滿足該流域數百種魚類有百百種遷移方式。」

根據22日在寮國首都永珍發布的最新報告《》,負責管理流域水壩開發的湄公河委員會也對此表示關注。

湄公河委員會成立於1995年,是一個政府間組織,直接與柬埔寨、寮國、泰國和越南政府合作,共同管理共享水資源和湄公河的永續發展。

該組織是水務外交的區域平台,也是維持該區域永續發展的水資源管理知識中心。

湄公河委員會報告警告:「主流水流體系明顯永久性改變,沉積物被阻攔造成泥沙流量大量減少,濕地持續喪失,河流生態環境惡化,捕撈漁業的壓力不斷增加以及目前水開發設施和用水資訊共享有限」,是湄公河流域國家面臨的主要挑戰。

「我們現在必須解決這些問題,盡可能減少對環境的損害,並在僅剩的濕地和河邊生態環境消失之前加以保護,同時利用更穩定且有所增加的旱季流量,實現湄公河地區最佳永續發展。」湄公河委員會執行長An Pich Hatda博士在啟用儀式上對來自四個湄公河國家、近100位官員說。

最新的流域狀況報告建議:「必須緊急採取更積極的區域性流域規劃和管理方法,並加強系統性地共享資訊,並嚴格監控河流流量,以因應這些流域挑戰。」

Dam Development Is ‘Silencing’ the Mekong River BANGKOK, Thailand, October 24, 2019 (ENS)

 In five days, the Xayaburi Hydropower Project on the Mekong River in northern Laos will formally begin operations. As the first dam on the lower Mekong mainstream, this marks a turning point for the Mekong River.

The Mekong River is 4,350 kilometers (2,703 miles) long, ranked 12th in length and eighth in water discharge in the world. The river originates in the Tibetan Plateau and flows through China, Myanmar, Laos, and Thailand before pouring into the alluvial floodplains and delta in Cambodia and Vietnam.

The main purpose of the Xayaburi dam is to produce hydroelectric power, 95 percent of which is to be purchased by the Electricity Generating Authority of Thailand.

From the outset, the Xayaburi dam was a controversial project due to widespread concerns over its expected impacts on the river system, including transboundary impacts in neighboring countries.

Major predicted impacts include the destruction of Mekong migratory fisheries and trapping of sediment, preventing it from traveling downstream.

The dam’s environmental impacts, in turn, threaten the food, livelihoods and socio-cultural systems of populations residing within the river basin.

Many experts believe that the Mekong, already suffering from the impacts of six dams installed in China on the Upper Mekong, and with more dams planned downstream in Laos and possibly Cambodia, is in crisis.

During the Xayaburi dam consultation process, many stakeholders raised concerns over the project and questioned the adequacy of the data and studies.

The Vietnamese government called for a project suspension and a 10-year moratorium on all mainstream dams pending further study to better understand the river system and the impacts of planned dams.

In Thailand, community representatives along the Mekong River filed a landmark lawsuit in the Thai Administrative Court challenging Thailand’s power purchase from the project. Originally filed in 2012, following several appeals, the lawsuit remains pending more than seven years later.

Despite this, the Xayaburi dam moved forward, with the developers undertaking a redesign in an effort to mitigate concerns.

Subsequent dam projects have followed. This month, the Mekong River Commission announced the commencement of Prior Consultation for Luang Prabang, the fifth lower Mekong mainstream dam to undergo the process.

In the lead-up to the commissioning of the Xayaburi dam, the U.S.-based nonprofit group International Rivers issued a new report on the dam. The group asked two independent experts to provide comments on the Mekong River Commission’s review of the Xayaburi redesign, released earlier this year.

The report of the experts, Dr. Philip Hirsch, professor of Human Geography at the University of Sydney, Australia; and Dr. Oliver Hensengerth, associate professor of social sciences at Northumbria University, England, examines the pattern of Xayaburi in setting a benchmark for decisions on mainstream dams and highlights “the urgent need for a truly regional approach to safeguard the Mekong’s future.”

Although International Rivers says dam development is “silencing” the Mekong River, this expert commentary is not intended as a critique or assessment of the MRC Review, said the group in a statement. “Rather, it seeks to draw out key points and discuss their implications for Xayaburi and other dams under construction or consideration on the lower Mekong mainstream and within the region.”

On Wednesday, a panel discussion of the International Rivers report with academic, community and civil society speakers at the Foreign Correspondents’ Club of Thailand in Bangkok provoked comments on the project’s history, its flawed decision-making process, the ongoing campaigns, and Xayaburi’s implications for the ecosystems and people of the Mekong Basin.

International Rivers has described the “build first, study later” approach propagated by the Xayaburi Dam process as “a dangerously irresponsible model for dam-building in the Mekong.”

To read the report, “Review of Design Changes Made for the Xayaburi Hydropower Project,” click .

The Mekong floodplains and delta are among the most agriculturally productive and biologically diverse waterscapes of the world, but sea level rise, land subsidence, and the proposed upstream development of over 126 hydropower dams and extensive delta-based water infrastructure have raised concerns about the potential impacts on the hydrology of the region.

International Rivers is not the only environmental group with concerns about the Mekong River Basin.

A Mekong Delta Study conducted by Denmark’s DHI Consulting Group concluded it was likely “that even the best available fish passage technologies’ may not be able to handle either the massive volume of fish migrations, which during peak periods can reach up to three million fish per hour, or the diversity of migration strategies that characterise the hundreds of fish species in the basin.”

The Mekong River Commission, which governs the dam development of the basin, is also concerned, according to the latest report, State of the Basin Report 2018, released on Tuesday in Vientiane, the Laotian capital city.

Established in 1995, the Mekong River Commission, MRC, is an inter-governmental organization that works directly with the governments of Cambodia, Laos, Thailand, and Vietnam to jointly manage the shared water resources and the sustainable development of the Mekong River.

The organization serves as a regional platform for water diplomacy as well as a knowledge hub of water resources management for the sustainable development of the region.

The MRC report warns, “The apparent permanent modification of mainstream flow regime, the substantial reduction in sediment flows due to sediment trapping, the continuing loss of wetlands, the deterioration of riverine habitats, the growing pressures on capture fisheries, and the limited information sharing on current water development facilities and water use,” are some of the major challenges facing countries in the Mekong Basin.

“We need to address these issues now in order to minimize further environmental harm and protect remaining wetlands and riverine habitats before they are gone, while leveraging the benefits of more secure and increased dry season flows and achieving a more optimal and sustainable development of the Mekong basin,” Dr. An Pich Hatda, chief executive officer of the MRC Secretariat, told nearly 100 officials from the four MRC countries at the launch ceremony.

This latest State of the Basin Report advises that “a more proactive regional approach to basin planning and management, with an enhanced and systematic information sharing mechanism and robust monitoring of river flow must be put in place urgently to address these basin-wide challenges.”

※ 全文及圖片詳見:

作者

如果有一件事是重要的,如果能為孩子實現一個願望,那就是人類與大自然和諧共存。

於特有生物研究保育中心服務,小鳥和棲地是主要的研究對象。是龜毛的讀者,認為龜毛是探索世界的美德。

延伸閱讀

本站聲明:網站內容來源環境資訊中心https://e-info.org.tw/,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

新北清潔公司,居家、辦公、裝潢細清專業服務

※別再煩惱如何寫文案,掌握八大原則!

※教你寫出一流的銷售文案?

※超省錢租車方案

FB行銷專家,教你從零開始的技巧

對照圖鑑也會誤判 日本野菇中毒事件頻傳

文:宋瑞文

本站聲明:網站內容來源環境資訊中心https://e-info.org.tw/,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※超省錢租車方案

※別再煩惱如何寫文案,掌握八大原則!

※回頭車貨運收費標準

※教你寫出一流的銷售文案?

FB行銷專家,教你從零開始的技巧

JAVA設計模式 2【創建型】原型模式的理解與使用、理解淺克隆和深克隆

在本節中,我們將學習和使用原型模式;這一節學習的原型模式也是創建型 模式的其中之一。再次複習一下:創建型 模式就是描述如何去更好的創建一個對象。

我們都知道,在JAVA 語言中。使用new 關鍵字創建一個新對象。將新的對象放到堆內存 裏面。當然,這個內存肯定是有大小限制的,況且,JAVA 不同於C語言等。 有內存管理機制,就是我們常說的垃圾回收器GC,才可以保證內存不被溢出。

說這些其實就是為了表示:為啥要用單例模式,能節省內存的時候,能用一個對象解決重複的事情,絕對不會創建多個。

概述

原型模式描述的如何快速創建重複的對象,並且減少new 關鍵字的使用。

  • 抽象原型類
  • 具體原型類
  • 訪問類

容我來一個一個解釋:

抽象原型類 也就是我們具體要實現的某個類,這個類在JAVA 裏面是有具體的接口的,其實是一個空接口,Cloneable

 * @author  unascribed
 * @see     java.lang.CloneNotSupportedException
 * @see     java.lang.Object#clone()
 * @since   JDK1.0
 */
public interface Cloneable {
}

我們會發現,這個類沒有任何的方法,怎麼來實現它,不要慌。先接着走。

具體原型類 也就是我們具體要克隆 的對象。比如我們重複的要創建100個學生Student 對象,那麼具體的學生對象就是具體原型類

public class Student implements Cloneable {

    private int id;

    private String name;

    private int sex;
}

訪問類 我就不必多說了

淺克隆和深克隆

原型模式其實也分淺克隆和深克隆。如何理解這兩個概念呢?

淺克隆

protected native Object clone() throws CloneNotSupportedException;

淺克隆,只需要具體原型類 實現Cloneable 接口,並且重寫父類Object類的clone() 方法,即可實現對象的淺克隆。

Student student1 = new Student(1, "李四");
Student student2 = student1.clone();

System.out.println(student1);
System.out.println(student2);

System.out.println(student1 == student2);
---------------------
學號:1,姓名:李四
學號:1,姓名:李四
false
  • 通過執行clone() 方法即可創建一個相同的,具有同樣屬性的對象。
  • 並且是新的對象,內存地址有所不同。

我們來看看,對於引用類型的變量,淺克隆是否可以進行克隆;

Teacher teacher = new Teacher(1, "張老師");

Student student1 = new Student(1, "李四", teacher);
Student student2 = student1.clone();

System.out.println(student1);
System.out.println(student2);

System.out.println(student1 == student2);
------------
學號:1,姓名:李四,老師=Teacher@1b6d3586
學號:1,姓名:李四,老師=Teacher@1b6d3586
false

我們發現,引用類型並沒有被克隆,也就是說:

特點

  • 淺克隆對於基本類型,可以進行完全的克隆,並且克隆的對象是一個新的對象
  • 但是對象裏面的引用,是無法被克隆的。

深克隆(序列化)

何謂序列化?

我們創建的都是保存在內存裏面的,只要被虛擬機GC進行回收,那麼這個對象的任何屬性都是消失,我們能不能找一個方法,將內存中這種對象的屬性以及對象的狀態通過某種東西保存下來,比如保存到數據庫,下次從數據庫將這個對象還原到內存裏面。 這就是序列化。

  • 序列化 內存對象->序列字符
  • 反序列化 序列字符->內存對象

請參考: https://baike.baidu.com/item/序列化/2890184

JAVA 序列化

 * @see java.io.Externalizable
 * @since   JDK1.1
 */
public interface Serializable {
}

JAVA 提供了一個空接口,其實這個接口和上面的Cloneable 一樣,都是一個空接口,其實這個空接口就是作為一種標識 你的對象實現了這個接口,JAVA 認為你的這個就可以被序列化 ,就是這麼簡單。

Teacher teacher = new Teacher(1, "張老師");

ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
ObjectOutputStream stream = new ObjectOutputStream(outputStream);

stream.writeObject(teacher);
System.out.println(Arrays.toString(outputStream.toByteArray()));
----------
[-84, -19, 0, 5, 115, 114, 0, 7, 84, 101, 97,。。。。。。

通過將對象序列化、其實也就是將內存中的對象轉化為二進制 字節數組

反序列化

Teacher teacher = new Teacher(1, "張老師");
System.out.println(teacher);

ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
ObjectOutputStream stream = new ObjectOutputStream(outputStream);

stream.writeObject(teacher);
System.out.println(Arrays.toString(outputStream.toByteArray()));

ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(outputStream.toByteArray());
ObjectInputStream inputStream = new ObjectInputStream(byteArrayInputStream);

Teacher teacher1 = (Teacher) inputStream.readObject();
System.out.println(teacher1);
---------------
id=1,name=張老師
[-84, -19, 0, 5, 115, xxxxx,-127, -27, -72, -120]
id=1,name=張老師

通過序列化和反序列化,即可對象的深克隆

小結

這一節,在講述 原型模式的同時,將原有實現原型模式的clone() 淺克隆,延伸到深克隆這一概念。其實JAVA 的原型模式,實現起來較為簡單。但還是要按需要實現,Object 類提供的 clone 淺克隆 是沒辦法克隆對象的引用類型的。需要克隆引用類型,還是需要序列化 深克隆

參考

http://c.biancheng.net/view/1343.html
https://www.liaoxuefeng.com/wiki/1252599548343744/1298366845681698

代碼示例

https://gitee.com/mrc1999/Dev-Examples

歡迎關注

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

新北清潔公司,居家、辦公、裝潢細清專業服務

※別再煩惱如何寫文案,掌握八大原則!

※教你寫出一流的銷售文案?

※超省錢租車方案

FB行銷專家,教你從零開始的技巧

【Spring註解驅動開發】使用InitializingBean和DisposableBean來管理bean的生命周期,你真的了解嗎?

寫在前面

在《【Spring註解驅動開發】如何使用@Bean註解指定初始化和銷毀的方法?看這一篇就夠了!!》一文中,我們講述了如何使用@Bean註解來指定bean初始化和銷毀的方法。具體的用法就是在@Bean註解中使用init-method屬性和destroy-method屬性來指定初始化方法和銷毀方法。除此之外,Spring中是否還提供了其他的方式來對bean實例進行初始化和銷毀呢?

項目工程源碼已經提交到GitHub:https://github.com/sunshinelyz/spring-annotation

InitializingBean接口

1.InitializingBean接口概述

Spring中提供了一個InitializingBean接口,InitializingBean接口為bean提供了屬性初始化后的處理方法,它只包括afterPropertiesSet方法,凡是繼承該接口的類,在bean的屬性初始化后都會執行該方法。InitializingBean接口的源碼如下所示。

package org.springframework.beans.factory;
public interface InitializingBean {
	void afterPropertiesSet() throws Exception;
}

根據InitializingBean接口中提供的afterPropertiesSet()方法的名字可以推斷出:afterPropertiesSet()方法是在屬性賦好值之後調用的。那到底是不是這樣呢?我們來分析下afterPropertiesSet()方法的調用時機。

2.何時調用InitializingBean接口?

我們定位到Spring中的org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory類下的invokeInitMethods()方法中,來查看Spring加載bean的方法。

題外話:不要問我為什麼會是這個invokeInitMethods()方法,如果你和我一樣對Spring的源碼非常熟悉的話,你也會知道是這個invokeInitMethods()方法,哈哈哈哈!所以,小夥伴們不要只顧着使用Spring,還是要多看看Spring的源碼啊!Spring框架中使用了大量優秀的設計模型,其代碼的編寫規範和嚴謹程度也是業界開源框架中數一數二的,非常值得閱讀。

我們來到AbstractAutowireCapableBeanFactory類下的invokeInitMethods()方法,如下所示。

protected void invokeInitMethods(String beanName, final Object bean, @Nullable RootBeanDefinition mbd)
    throws Throwable {
	//判斷該bean是否實現了實現了InitializingBean接口,如果實現了InitializingBean接口,則調用bean的afterPropertiesSet方法
    boolean isInitializingBean = (bean instanceof InitializingBean);
    if (isInitializingBean && (mbd == null || !mbd.isExternallyManagedInitMethod("afterPropertiesSet"))) {
        if (logger.isTraceEnabled()) {
            logger.trace("Invoking afterPropertiesSet() on bean with name '" + beanName + "'");
        }
        if (System.getSecurityManager() != null) {
            try {
                AccessController.doPrivileged((PrivilegedExceptionAction<Object>) () -> {
                    //調用afterPropertiesSet()方法
                    ((InitializingBean) bean).afterPropertiesSet();
                    return null;
                }, getAccessControlContext());
            }
            catch (PrivilegedActionException pae) {
                throw pae.getException();
            }
        }
        else {
            //調用afterPropertiesSet()方法
            ((InitializingBean) bean).afterPropertiesSet();
        }
    }

    if (mbd != null && bean.getClass() != NullBean.class) {
        String initMethodName = mbd.getInitMethodName();
        if (StringUtils.hasLength(initMethodName) &&
            !(isInitializingBean && "afterPropertiesSet".equals(initMethodName)) &&
            !mbd.isExternallyManagedInitMethod(initMethodName)) {
            //通過反射的方式調用init-method
            invokeCustomInitMethod(beanName, bean, mbd);
        }
    }
}

分析上述代碼后,我們可以初步得出如下信息:

  • Spring為bean提供了兩種初始化bean的方式,實現InitializingBean接口,實現afterPropertiesSet方法,或者在配置文件和@Bean註解中通過init-method指定,兩種方式可以同時使用。
  • 實現InitializingBean接口是直接調用afterPropertiesSet()方法,比通過反射調用init-method指定的方法效率相對來說要高點。但是init-method方式消除了對Spring的依賴。
  • 如果調用afterPropertiesSet方法時出錯,則不調用init-method指定的方法。

也就是說Spring為bean提供了兩種初始化的方式,第一種實現InitializingBean接口,實現afterPropertiesSet方法,第二種配置文件或@Bean註解中通過init-method指定,兩種方式可以同時使用,同時使用先調用afterPropertiesSet方法,后執行init-method指定的方法。

DisposableBean接口

1.DisposableBean接口概述

實現org.springframework.beans.factory.DisposableBean接口的bean在銷毀前,Spring將會調用DisposableBean接口的destroy()方法。我們先來看下DisposableBean接口的源碼,如下所示。

package org.springframework.beans.factory;
public interface DisposableBean {
	void destroy() throws Exception;
}

可以看到,在DisposableBean接口中只定義了一個destroy()方法。

在Bean生命周期結束前調用destory()方法做一些收尾工作,亦可以使用destory-method。前者與Spring耦合高,使用類型強轉.方法名(),效率高。後者耦合低,使用反射,效率相對低

2.DisposableBean接口注意事項

多例bean的生命周期不歸Spring容器來管理,這裏的DisposableBean中的方法是由Spring容器來調用的,所以如果一個多例實現了DisposableBean是沒有啥意義的,因為相應的方法根本不會被調用,當然在XML配置文件中指定了destroy方法,也是沒有意義的。所以,在多實例bean情況下,Spring不會自動調用bean的銷毀方法。

單實例bean案例

創建一個Animal的類實現InitializingBean和DisposableBean接口,代碼如下:

package io.mykit.spring.plugins.register.bean;

import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.stereotype.Component;
/**
 * @author binghe
 * @version 1.0.0
 * @description 測試InitializingBean接口和DisposableBean接口
 */
public class Animal implements InitializingBean, DisposableBean {
    public Animal(){
        System.out.println("執行了Animal類的無參數構造方法");
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        System.out.println("執行了Animal類的初始化方法。。。。。");

    }
    @Override
    public void destroy() throws Exception {
        System.out.println("執行了Animal類的銷毀方法。。。。。");

    }
}

接下來,我們新建一個AnimalConfig類,並將Animal通過@Bean註解的方式註冊到Spring容器中,如下所示。

package io.mykit.spring.plugins.register.config;

import io.mykit.spring.plugins.register.bean.Animal;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
/**
 * @author binghe
 * @version 1.0.0
 * @description AnimalConfig
 */
@Configuration
@ComponentScan("io.mykit.spring.plugins.register.bean")
public class AnimalConfig {
    @Bean
    public Animal animal(){
        return new Animal();
    }
}

接下來,我們在BeanLifeCircleTest類中新增testBeanLifeCircle02()方法來進行測試,如下所示。

@Test
public void testBeanLifeCircle02(){
    //創建IOC容器
    AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(AnimalConfig.class);
    System.out.println("IOC容器創建完成...");
    //關閉IOC容器
    context.close();
}

運行BeanLifeCircleTest類中的testBeanLifeCircle02()方法,輸出的結果信息如下所示。

執行了Animal類的無參數構造方法
執行了Animal類的初始化方法。。。。。
IOC容器創建完成...
執行了Animal類的銷毀方法。。。。。

從輸出的結果信息可以看出:單實例bean下,IOC容器創建完成后,會自動調用bean的初始化方法;而在容器銷毀前,會自動調用bean的銷毀方法。

多實例bean案例

多實例bean的案例代碼基本與單實例bean的案例代碼相同,只不過在AnimalConfig類中,我們在animal()方法上添加了@Scope(“prototype”)註解,如下所示。

package io.mykit.spring.plugins.register.config;
import io.mykit.spring.plugins.register.bean.Animal;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;
/**
 * @author binghe
 * @version 1.0.0
 * @description AnimalConfig
 */
@Configuration
@ComponentScan("io.mykit.spring.plugins.register.bean")
public class AnimalConfig {
    @Bean
    @Scope("prototype")
    public Animal animal(){
        return new Animal();
    }
}

接下來,我們在BeanLifeCircleTest類中新增testBeanLifeCircle03()方法來進行測試,如下所示。

@Test
public void testBeanLifeCircle03(){
    //創建IOC容器
    AnnotationConfigApplicationContext ctx = new AnnotationConfigApplicationContext(AnimalConfig.class);
    System.out.println("IOC容器創建完成...");
    System.out.println("-------");
    //調用時創建對象
    Object bean = ctx.getBean("animal");
    System.out.println("-------");
    //調用時創建對象
    Object bean1 = ctx.getBean("animal");
    System.out.println("-------");
    //關閉IOC容器
    ctx.close();
}

運行BeanLifeCircleTest類中的testBeanLifeCircle03()方法,輸出的結果信息如下所示。

IOC容器創建完成...
-------
執行了Animal類的無參數構造方法
執行了Animal類的初始化方法。。。。。
-------
執行了Animal類的無參數構造方法
執行了Animal類的初始化方法。。。。。
-------

從輸出的結果信息中可以看出:在多實例bean情況下,Spring不會自動調用bean的銷毀方法。

好了,咱們今天就聊到這兒吧!別忘了給個在看和轉發,讓更多的人看到,一起學習一起進步!!

項目工程源碼已經提交到GitHub:https://github.com/sunshinelyz/spring-annotation

寫在最後

如果覺得文章對你有點幫助,請微信搜索並關注「 冰河技術 」微信公眾號,跟冰河學習Spring註解驅動開發。公眾號回復“spring註解”關鍵字,領取Spring註解驅動開發核心知識圖,讓Spring註解驅動開發不再迷茫。

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※超省錢租車方案

※別再煩惱如何寫文案,掌握八大原則!

※回頭車貨運收費標準

※教你寫出一流的銷售文案?

FB行銷專家,教你從零開始的技巧