本田新電動概念車量產版預計2013年日本發佈

近日,一款精緻可愛的小車出現在人們的視線,外觀看上去跟smart fortwo以及雷諾Twizy很相像,它就是於去年東京車展亮相的本田純電動小車概念車Micro Commuter的量產版,預計這款車的原型車將於2013年日本發佈,這款純電動小車正是符合了日本政府提倡環保的要求。

這款精緻的小車車身長、寬、高分別是2.5/1.25/1.45米,車內前排座位只設計了駕駛者座位,後排座位只能坐兩個小孩,後排座位對於成年人可是很牽強。車內傳統的儀錶板以及操控都是由嵌入式平板電腦來實現。

動力方面,其搭載了一款後置以及一個容量15KW的鋰電池,最大續航里程僅59km/h,看來這款小車的實用價值還有待實踐,極速也只有80km/h,看來這款小車的推出只適合在城市間短距離行駛。

本站聲明:網站內容來源於EnergyTrend https://www.energytrend.com.tw/ev/,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

新北清潔公司,居家、辦公、裝潢細清專業服務

※別再煩惱如何寫文案,掌握八大原則!

※教你寫出一流的銷售文案?

※超省錢租車方案

FB行銷專家,教你從零開始的技巧

深圳已成全球新能源汽車應用規模最大的城市

目前深圳公交行業已累計投入運營的2350輛,開通新能源公交線路136條,各類新能源汽車累計行駛里程達1.9億公里,初步形成了新能源公交服務網路。成為全球新能源汽車應用規模最大的城市。

深圳市交委日前介紹,2009年6月,深圳巴士集團率先使用混合動力公車輛,開創了使用新能源車輛進行公交營運的先河。至今,巴士集團已有超千輛新能源車輛在營運,深圳市成為世界最大的新能源車輛的營運城市。

今年9月22日世界無車日當天,深圳首條純電動大巴線路226路全線投入運營,成為深圳純電動大巴從中等規模分散運營到較大規模集中運營的轉捩點。由於純電動大巴具有無噪音、無污染的特點,因而受到了市民的高度稱讚和熱捧。

據瞭解,目前,深圳公交行業已累計投入運營的新能源汽車2350輛,其中插電式混合動力大巴1751輛,混合動力雙層大巴20輛,純電動大巴253輛,純電動中巴26輛,純電動計程車300輛。現已開通新能源公交線路136條,各類新能源汽車累計行駛里程達1.9億公里,初步形成了新能源公交服務網路。

據悉,深圳交通運輸部門將逐步加大新能源汽車在公交行業的推廣應用。2012年,計畫推廣純電動公交大巴1000輛,純電動計程車500輛。力爭至2015年,推廣新能源公交大巴7000輛,純電動計程車3000輛,新能源公交大巴約占全市公車輛總數的50%,使深圳成為全球新能源公車投放最多、運行效果最好、管理最規範的示範城市。

本站聲明:網站內容來源於EnergyTrend https://www.energytrend.com.tw/ev/,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

新北清潔公司,居家、辦公、裝潢細清專業服務

※別再煩惱如何寫文案,掌握八大原則!

網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

※超省錢租車方案

※教你寫出一流的銷售文案?

網頁設計最專業,超強功能平台可客製化

C#數據結構與算法系列(十):逆波蘭計算器——逆波蘭表達式(後綴表達式)

1.介紹

後綴表達式又稱逆波蘭表達式,與前綴表達式相似,只是運算符位於操作數之後

2.舉例說明

(3+4)*5-6對應的後綴表達式就是3 4 +5 * 6 –

3.示例

輸入一個逆波蘭表達式(後綴表達式),使用棧(Stack),計算其結果

思路分析:

從左至右掃描表達式,遇到数字時,將数字壓入堆棧,遇到運算符時,彈出棧頂的兩個數,用運算符對它們做相應的計算(次頂元素 和 棧頂元素),並將結果入棧;

重複上述過程直到表達式最右端,最後運算得出的值即為表達式的結果例如: (3+4)×5-6 對應的後綴表達式就是 3 4 + 5 × 6 – , 

針對後綴表達式求值步驟如下:

從左至右掃描,將3和4壓入堆棧;
遇到+運算符,因此彈出4和3(4為棧頂元素,3為次頂元素),計算出3+4的值,得7,再將7入棧;
將5入棧;
接下來是×運算符,因此彈出5和7,計算出7×5=35,將35入棧;
將6入棧;
最後是-運算符,計算出35-6的值,即29,由此得出最終結果

代碼實現:

using System;
using System.Collections.Generic;
using System.Text.RegularExpressions;

namespace DataStructure
{
    public class PolandNotation
    {
        public static void Test()
        {
            try
            {
                //定義逆波蘭表達式
                string suffixExpression = "3 4 + 5 * 6 -";

                //將suffixExpression轉換成鏈表的方式
                var list = GetListString(suffixExpression);

                //輸出結果
                var result = Calculate(list);

                Console.WriteLine($"{suffixExpression}的結果是{result}");
            }
            catch (Exception ex)
            {
                Console.WriteLine(ex.Message);
            }
           
        }
        /// <summary>
        /// 獲取集合
        /// </summary>
        /// <param name="suffixExpression"></param>
        /// <returns></returns>
        public static List<string> GetListString(string suffixExpression)
        {
            //首先實例化List
            List<string> list = new List<string>();

            //將字符串通過空格切換成數組
            string[] split=suffixExpression.Split(" ");

            //循環添加
            foreach (var item in split)
            {
                list.Add(item);
            }

            return list;
        }

        /// <summary>
        /// 計算
        /// </summary>
        /// <param name="list"></param>
        /// <returns></returns>
        public static int Calculate(List<string> list)
        {
            //創建棧
            Stack<string> stack = new Stack<string>();

            //循環遍歷
            list.ForEach(item =>
            {
                //正則表達式判斷是否是数字,匹配的是多位數
                if (Regex.IsMatch(item,"\\d+"))
                {
                    //如果是数字直接入棧
                    stack.Push(item);
                }
                //如果是操作符
                else
                {
                    //出棧兩個数字,並運算,再入棧
                    int num1 =int.Parse(stack.Pop());

                    int num2 = int.Parse(stack.Pop());

                    int result = 0;

                    if(item.Equals("+"))
                    {
                        result = num2 + num1;
                    }
                    else if(item.Equals("*"))
                    {
                        result = num2 * num1;
                    }
                    else if(item.Equals("/"))
                    {
                        result = num2 / num1;
                    }
                    else if (item.Equals("-"))
                    {
                        result = num2 - num1;
                    }
                    else
                    {
                        throw new Exception("無法識別符號");
                    }

                    stack.Push(""+result);
                }
            });

            //最後把stack中數據返回
            return int.Parse(stack.Pop());
        }
    }
}

結果圖:

 

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※為什麼 USB CONNECTOR 是電子產業重要的元件?

網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

※台北網頁設計公司全省服務真心推薦

※想知道最厲害的網頁設計公司"嚨底家"!

新北清潔公司,居家、辦公、裝潢細清專業服務

※推薦評價好的iphone維修中心

Kubernetes學習筆記(九):StatefulSet–部署有狀態的多副本應用

StatefulSet如何提供穩定的網絡標識和狀態

ReplicaSet中的Pod都是無狀態,可隨意替代的。又因為ReplicaSet中的Pod是根據模板生成的多副本,無法對每個副本都指定單獨的PVC。

來看一下StatefulSet如何解決的。

提供穩定的網絡標識

StatefulSet創建Pod都有一個從零開始的順序索引,這會體現在Pod的名稱和主機名上,同樣也會體現在Pod對應的固定存儲上。所以這些名字是可預先知道的,不同於ReplicaSet的隨機生成名字。

因為他們的名字都是固定的,而且彼此狀態都不同,通常會操作他們其中的一個。如此情況,一般都會創建一個與之對應的headless Service,通過這個Service,每個Pod將擁有獨立的DNS記錄。

擴容一個StatefulSet會使用下一個順序索引創建一個新的Pod,縮容會刪除索引值最高的。並且縮容任何時候只會操作一個Pod。

如何提供穩定的存儲

StatefulSet可以擁有一個或多個PVC模板,這些PVC會在創建Pod前創建出來,綁定到一個Pod實例上。

擴容的時候會創建一個Pod以及若干個PVC,刪除的時候只會刪除Pod。StatefulSet縮容時不會刪除PVC,擴容時會重新掛上。

使用StatefulSet

定義三個PV

定義pv-(a|b|c)

# stateful-pv-list.yaml
apiVersion: v1
kind: PersistentVolume
metadata:
  name: pv-a
spec:
  capacity:
    storage: 1Mi
  accessModes:
  - ReadWriteOnce
  persistentVolumeReclaimPolicy: Recycle
  hostPath:
    path: /tmp/pva
---
apiVersion: v1
kind: PersistentVolume
# 以下忽略

headless的Service

# stateful-service-headless.yaml
apiVersion: v1
kind: Service
metadata:
  name: rwfile
spec:
  clusterIP: None
  selector:
    app: rwfile
  ports:
  - port: 80

定義StatefulSet

先創建兩個Pod副本。使用volumeClaimTemplates定義了PVC模板。

# stateful.yaml
apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: rwfile
spec:
  replicas: 2
  serviceName: rwfile
  selector:
    matchLabels:
     app: rwfile
  template:
    metadata:
      labels:
        app: rwfile
    spec:
      containers:
      - image: registry.cn-hangzhou.aliyuncs.com/orzi/rwfile
        name: rwfile
        ports:
        - containerPort: 8000
        volumeMounts:
        - name: data
          mountPath: /tmp/data
  volumeClaimTemplates:
  - metadata:
      name: data
    spec:
      resources:
        requests:
          storage: 1Mi
      accessModes:
      - ReadWriteOnce

創建三個PV,一個headless的Service,一個StatefulSet

-> [root@kube0.vm] [~] k create -f stateful-pv-list.yaml
persistentvolume/pv-a created
persistentvolume/pv-b created
persistentvolume/pv-c created

-> [root@kube0.vm] [~] k create -f stateful-service-headless.yaml
service/rwfile created

-> [root@kube0.vm] [~] k create -f stateful.yaml
statefulset.apps/rwfile created

查看

-> [root@kube0.vm] [~] k get all -o wide
NAME                    READY   STATUS      RESTARTS   AGE   IP            NODE       NOMINATED NODE   READINESS GATES
pod/rwfile-0            1/1     Running     0          12s   10.244.1.52   kube1.vm   <none>           <none>
pod/rwfile-1            1/1     Running     0          8s    10.244.2.56   kube2.vm   <none>           <none>

NAME                 TYPE        CLUSTER-IP   EXTERNAL-IP   PORT(S)   AGE   SELECTOR
service/kubernetes   ClusterIP   10.96.0.1    <none>        443/TCP   81s   <none>
service/rwfile       ClusterIP   None         <none>        80/TCP    23s   app=rwfile

NAME                      READY   AGE   CONTAINERS   IMAGES
statefulset.apps/rwfile   2/2     12s   rwfile       registry.cn-hangzhou.aliyuncs.com/orzi/rwfile

查看PV和PVC,可以看到已經有兩個PVC綁定了PV

-> [root@kube0.vm] [~] k get pv,pvc -o wide
NAME                    CAPACITY   ACCESS MODES   RECLAIM POLICY   STATUS      CLAIM                   STORAGECLASS   REASON   AGE     VOLUMEMODE
persistentvolume/pv-a   1Mi        RWO            Recycle          Bound       default/data-rwfile-0                           7m20s   Filesystem
persistentvolume/pv-b   1Mi        RWO            Recycle          Bound       default/data-rwfile-1                           7m20s   Filesystem
persistentvolume/pv-c   1Mi        RWO            Recycle          Available                                                   7m20s   Filesystem

NAME                                  STATUS   VOLUME   CAPACITY   ACCESS MODES   STORAGECLASS   AGE     VOLUMEMODE
persistentvolumeclaim/data-rwfile-0   Bound    pv-a     1Mi        RWO                           6m55s   Filesystem
persistentvolumeclaim/data-rwfile-1   Bound    pv-b     1Mi        RWO                           6m51s   Filesystem

請求Pod

啟動代理

-> [root@kube0.vm] [~] k proxy
Starting to serve on 127.0.0.1:8001

發送請求

-> [root@kube0.vm] [~] curl http://localhost:8001/api/v1/namespaces/default/pods/rwfile-0/proxy/ -d "a=123"
data stored in : rwfile-0

-> [root@kube0.vm] [~] curl http://localhost:8001/api/v1/namespaces/default/pods/rwfile-0/proxy/
a=123

刪除測試

刪除rwfile-0,然後查看,從時間上看確實是被刪除重建的。

-> [root@kube0.vm] [~] k delete po rwfile-0
pod "rwfile-0" deleted

-> [root@kube0.vm] [~] k get po
NAME                READY   STATUS      RESTARTS   AGE
rwfile-0            1/1     Running     0          7s
rwfile-1            1/1     Running     0          19m

看一下之前存儲的數據還在不在

-> [root@kube0.vm] [~] curl http://localhost:8001/api/v1/namespaces/default/pods/rwfile-0/proxy/
a=123

還是在的,此次測試實際上也證明了StatefulSet提供了穩定的網絡標識和存儲。

發現StatefulSet的夥伴節點

使用DNS解析headless的Service的FQDN。
例子以後再寫吧。。

如何處理節點失效

除非確定節點無法運行或者不會在訪問,否則不要強制刪除有狀態的Pod

k delete pod rwfile-0 --force --grace-period 0

小結

  • StatefulSet創建Pod都有一個從零開始的順序索引
  • 通常會創建一個與StatefulSet對應的headless Service。
  • 擴容一個StatefulSet會使用下一個順序索引創建一個新的Pod,縮容會刪除索引值最高的。
  • 新建StatefulSet需要指定headless ServiceName和volumeClaimTemplates。
  • 使用DNS發現StatefulSet的夥伴節點
  • 強制刪除:k delete pod rwfile-0 --force --grace-period 0

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理
【其他文章推薦】

USB CONNECTOR掌控什麼技術要點? 帶您認識其相關發展及效能

台北網頁設計公司這麼多該如何選擇?

※智慧手機時代的來臨,RWD網頁設計為架站首選

※評比南投搬家公司費用收費行情懶人包大公開

※幫你省時又省力,新北清潔一流服務好口碑

※回頭車貨運收費標準

接口中的默認方法與靜態方法

在Java8之前的版本中,接口中只能聲明常量和抽象方法,接口的實現類中必須實現接口中所有的抽象方法。而在Java8中,接口中可以聲明默認方法靜態方法。

接口中的默認方法

Java 8中允許接口中包含具有具體實現的方法,該方法稱為“默認方法”,默認方法使用“ default ”關鍵字修飾 。

示例:

public interface MyInterface {
    default String getMsg(String srcMsg){
        return "======"+srcMsg;
    }
}

接口中的默認方法,有一個“類優先”原則:

若一個接口中定義了一個默認方法,而另外一個父類或接口中又定義了同一個同名的方法時:

  • 選擇父類中的方法。如果一個父類提供了具體的實現,那麼接口中具有相同名稱的參數的默認方法會被忽略。
  • 接口衝突。如果一個父接口提供一個默認方法,而另一個接口中也提供了一個具有相同名稱和參數列表的方法(不管方法是否是默認方法),那麼必須覆蓋該方法來解決衝突。

示例1:

public interface MyInterface1 {
    default String getMsg(String srcMsg){
        return "===我是MyInterface1111111==="+srcMsg;
    }
}
///////////////////////////////////////////////////////
public class MyClass1 {
    public String getMsg(String srcMsg){
        return "===我是MyClass11111==="+srcMsg;
    }
}
///////////////////////////////////////////////////////
public class MySubClass1 extends MyClass1 implements MyInterface1 {
}

///////////////////////////////////////////////////////
public class InterfaceTest {

    public static void main(String[] args) {
        MySubClass1 ms1 = new MySubClass1();

        String srcMsg = "Java 牛逼!!";
        //MySubClass1 類繼承了 MyClass1 類,實現了MyInterface1 接口,根據類優先原則,調用同名方法時,會忽略掉接口 MyInterface1 中的默認方法。
        System.out.println(ms1.getMsg(srcMsg));//輸出結果:===我是MyClass11111===Java 牛逼!!

    }
}

示例2:

public interface MyInterface2 {
    default String getMsg(String srcMsg){
        return "===我是MyInterface2222222==="+srcMsg;
    }
}
////////////////////////////////////////////////////////////////
public class MySubClass2 implements MyInterface1,MyInterface2 {
    @Override
    public String getMsg(String srcMsg) {
        //同時實現了 MyInterface1,MyInterface2  接口,根據 類優先 原則,兩個父接口中都提供了相同的方法,那麼子類中就必須重寫這個方法來解決衝突。
        return MyInterface1.super.getMsg(srcMsg);
        //return MyInterface2.super.getMsg(srcMsg);
        //return "------"+srcMsg;
    }
}
////////////////////////////////////////////////////////////////
public class InterfaceTest {

    public static void main(String[] args) {
        MySubClass2 ms2 = new MySubClass2();

        //MySubClass2 重新實現了兩個父接口中都存在的相同名稱的方法。
        System.out.println(ms2.getMsg(srcMsg));//輸出結果:===我是MyInterface1111111===Java 牛逼!!
    }
}

 

接口中的靜態方法

在Java8中,接口中允許添加 靜態方法,使用方式:“接口名.方法名”

示例:

public interface MyInterface3 {
    static String getMsg(String msg){
        return "我是接口中的靜態方法:"+msg;
    }

    static void main(String[] args) {
        System.out.println(MyInterface3.getMsg("Java牛逼!!"));
    }
}

 

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

網頁設計公司推薦不同的風格,搶佔消費者視覺第一線

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

※自行創業缺乏曝光? 網頁設計幫您第一時間規劃公司的形象門面

南投搬家公司費用需注意的眉眉角角,別等搬了再說!

新北清潔公司,居家、辦公、裝潢細清專業服務

※教你寫出一流的銷售文案?

磨皮美顏算法 附完整C代碼

前言

2017年底時候寫了這篇《集 降噪 美顏 虛化 增強 為一體的極速圖像潤色算法 附Demo程序

這也算是學習過程中比較有成就感的一個算法。

自2015年做算法開始到今天,還有個把月,就滿五年了。

歲月匆匆,人生能有多少個五年。

這五年裡,從音頻圖像到視頻,從傳統算法到深度學習,從2D到3D各種算法幾乎都走了一個遍。

好在,不論在哪個領域都能有些許建樹,這是博主我自身很欣慰的事情。

雖然有所間斷但是仍然堅持寫博客,並且堅持完整開源分享。

目的就是為了幫助那些一開始跟我一樣,想要學習算法的萌新,

一起踏入算法領域去跟大家“排排坐,吃果果”。

引子

在這個特別的時間點,就想做點特別的事情。

那就是開源當時寫的這個“美顏算法”,開源代碼和當時的版本有些許出入,但是思路是一樣的。

早些年的時候大家發現採用保邊濾波的思路可以做到降噪,進而衍生出來針對皮膚的降噪,簡稱磨皮或者美顏。

從此百家爭鳴,而這個課題到今天也還在發展,當然日新月異了。

故此,想談談針對美顏磨皮的一些算法思路,為後續想學習並改進的萌新提供一些養分。

概述美顏磨皮方法

1.基於保邊降噪

這類算法有很多方法,但不外乎2種基礎思路,

基於空間和基於頻率,當然再展開的話,還可以細分為紋理和顏色。

例如通過膚色或紋理區域做針對性的處理。

這類算法的優點是計算簡單,通用型強,但缺點就是不夠細膩完美。

2.基於人臉檢測貼圖

這種嚴格意義上來說,是易容術,就是基於人臉檢測出的關鍵數據。

例如人臉關鍵點,將人臉皮膚區域提取出來,重新貼上一張事先準備的皮膚圖,進行皮膚貼合融合。

臉已經被置換了,效果很贊。有點繆修斯之船的味道。

這類算法優點是效果極其驚艷,但是算法複雜通用性差,一般只能針對少數角度表情的人臉。

3.結合1和2的深度學習方法

前兩者的思路早期大行其道,如今到了數據時代,

基於深度學習的工具方案,可以非常好地結合前兩者的思路,進行訓練,求一個數據解。

很多人將深度學習等同於AI,這個做法有點激進。

基於深度學習的做法,仍然存在前兩者一樣的問題,簡單的不夠細膩,細膩的不夠簡單,

而如果要設計一個優秀的模型,其實跟設計一個傳統算法一樣困難。

基於數據驅動的算法,驗證成本非常高,可控性比較差,當然在金錢的驅動下確實能產出還不錯的算法模型。

這類算法的優點,往往能求出很不錯的局部最優解,甚至以假亂真,缺點就是需要大量金錢和數據的驅動。

總結來說的話,不付出代價,就別想有好的結果,非常的現實。

 

據我所知目前使用最多的方案是第一種和第三種,第二種可操作性不強,只有少數公司掌握了這方面的核心技術。

但是不管是哪種方案,無非就是以下幾個步驟。

1.確定人臉的皮膚區域

2.定位人臉的雜質(痘痘,斑點,痣,膚色不均等)

3.根據定位到雜質進行填補修復或濾除

 

這就是圖像處理經典三部曲

1.定位 2.檢測 3.處理

每一個細分展開,都非常宏大且複雜的算法。

 

以上,僅以磨皮美顏為例子,闡述圖像方面的算法想要或正在解決什麼樣的問題。

我們在工作中碰到的圖像問題無非以上幾個核心問題,問題都是類似的,只是不同場景和需求下各有難處。

本次開源的算法思路

本次開源的算法是基於保邊降噪的思路,

當然這個思路可以通過改寫,參數化后可以集成到深度學習中,作為一個先驗層輔助訓練。

算法步驟如下:

1.  檢測皮膚顏色,確定皮膚占圖像的比率

2. 根據皮膚比率進行邊緣檢測,產出細節映射圖

3. 基於細節映射圖和磨皮強度進行保邊降噪

4. 對降噪好的圖進行再一次膚色檢測,保留膚色區域的降噪,其他區域還原為原圖

步驟比較簡單,但是要同時兼顧效果性能,是很不容易的。

當然這個算法膚色檢測那一部分可以採用深度學習“語義分割”方面的思路進而改進效果。

做得好,將本算法改良到准商用,驚艷的程度是沒有問題的。

深度學習相關技術就不展開細說了,有能力的朋友,感興趣的話,可以自行實操。

 

完整源代碼開源地址:

https://github.com/cpuimage/skin_smoothing

項目沒有第三方依賴,完整純c代碼。

有編譯問題的同學自行參考《Windows下C,C++開發環境搭建指南》搭建編譯環境。

附上算法效果的示例:

 

 

以上,權當拋磚引玉之用。

授人以魚不如授人以漁。

 

2020年,疫情之下,

願大家都能事業有成,身體健康。

世界和平,人們皆友愛。

 

若有其他相關問題或者需求也可以郵件聯繫俺探討。

郵箱地址是: gaozhihan@vip.qq.com

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※自行創業缺乏曝光? 網頁設計幫您第一時間規劃公司的形象門面

網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

※想知道最厲害的網頁設計公司"嚨底家"!

※幫你省時又省力,新北清潔一流服務好口碑

※別再煩惱如何寫文案,掌握八大原則!

※產品缺大量曝光嗎?你需要的是一流包裝設計!

Kafka源碼解析(二)—Log分析

上一篇文章講了LogSegment和Log的初始化,這篇來講講Log的主要操作有哪些。

一般來說Log 的常見操作分為 4 大部分。

  1. 高水位管理操作
  2. 日誌段管理
  3. 關鍵位移值管理
  4. 讀寫操作

其中關鍵位移值管理主要包含Log Start Offset 和 LEO等。

高水位HighWatermark

高水位HighWatermark初始化

高水位是通過LogOffsetMetadata類來定義的:

@volatile private var highWatermarkMetadata: LogOffsetMetadata = LogOffsetMetadata(logStartOffset)

這裏傳入的初始值是logStartOffset,表明當首次構建高水位時,它會被賦值成 Log Start Offset 值。

我們再來看看LogOffsetMetadata類:

case class LogOffsetMetadata(messageOffset: Long,
                             segmentBaseOffset: Long = Log.UnknownOffset,
                             relativePositionInSegment: Int = LogOffsetMetadata.UnknownFilePosition) {

  // check if this offset is already on an older segment compared with the given offset
  def onOlderSegment(that: LogOffsetMetadata): Boolean = {
    if (messageOffsetOnly)
      throw new KafkaException(s"$this cannot compare its segment info with $that since it only has message offset info")

    this.segmentBaseOffset < that.segmentBaseOffset
  }
  ...
}

LogOffsetMetadata有三個初始值:

messageOffset表示消息位移值;

segmentBaseOffset保存消息位移值所在日誌段的起始位移,用來判斷兩條消息是否處於同一個日誌段的;

relativePositionSegment保存消息位移值所在日誌段的物理磁盤位置;

上面的onOlderSegment表明,要比較哪個日誌段更老,只需要比較segmentBaseOffset的大小就可以了。

高水位HighWatermark設值與更新

  private def updateHighWatermarkMetadata(newHighWatermark: LogOffsetMetadata): Unit = {
    //高水位的值不可能小於零
    if (newHighWatermark.messageOffset < 0)
      throw new IllegalArgumentException("High watermark offset should be non-negative")

    lock synchronized {// 保護Log對象修改的Monitor鎖
      highWatermarkMetadata = newHighWatermark// 賦值新的高水位值
      //事務相關,暫時忽略
      producerStateManager.onHighWatermarkUpdated(newHighWatermark.messageOffset)
      //事務相關,暫時忽略
      maybeIncrementFirstUnstableOffset()
    }
    trace(s"Setting high watermark $newHighWatermark")
  }

設置高水位的值是很簡單的,首先校驗高水位的值是否大於零,然後通過直接加鎖之後更新高水位的值。

更新更新高水位值的方法有兩個:updateHighWatermark 和 maybeIncrementHighWatermark,我們分別分析。

updateHighWatermark

  def updateHighWatermark(hw: Long): Long = {
    //傳入的高水位的值如果小於logStartOffset,設置為logStartOffset
    val newHighWatermark = if (hw < logStartOffset)
      logStartOffset
    //  傳入的高水位的值如果大於LEO,那麼設置為LEO
    else if (hw > logEndOffset)
      logEndOffset
    else
      hw
    //將newHighWatermark封裝成一個LogOffsetMetadata然後更新高水位的值
    updateHighWatermarkMetadata(LogOffsetMetadata(newHighWatermark))
    //返回新的高水位的值
    newHighWatermark
  }

這個方法邏輯也很簡潔,因為高水位的值是不可能大於LEO,也不可能小於logStartOffset,所以需要對傳入的hw校驗然後設置成正確的值,然後調用上面的設置高水位的方法設值。

maybeIncrementHighWatermark

/**
 * Update the high watermark to a new value if and only if it is larger than the old value. It is
 * an error to update to a value which is larger than the log end offset.
 *
 * This method is intended to be used by the leader to update the high watermark after follower
 * fetch offsets have been updated.
 *
 * @return the old high watermark, if updated by the new value
 */
//  當新的高水位的值大於舊的高水位的值時才做更新,如果新的高水位的值大於LEO,會報錯
//  這個方法是leader在確認Follower已經拉取了日誌之後才做更新
def maybeIncrementHighWatermark(newHighWatermark: LogOffsetMetadata): Option[LogOffsetMetadata] = {
  //如果新的高水位的值大於LEO,會報錯
  if (newHighWatermark.messageOffset > logEndOffset)
    throw new IllegalArgumentException(s"High watermark $newHighWatermark update exceeds current " +
      s"log end offset $logEndOffsetMetadata")

  lock.synchronized {
    // 獲取老的高水位值
    val oldHighWatermark = fetchHighWatermarkMetadata

    // Ensure that the high watermark increases monotonically. We also update the high watermark when the new
    // offset metadata is on a newer segment, which occurs whenever the log is rolled to a new segment.
    //只有當新的高水位值大於老的值,因為要維護高水位的單調遞增性
    //或者當新的高水位值和老的高水位值相等,但是新的高水位在一個新的日誌段上面時才做更新
    if (oldHighWatermark.messageOffset < newHighWatermark.messageOffset ||
      (oldHighWatermark.messageOffset == newHighWatermark.messageOffset && oldHighWatermark.onOlderSegment(newHighWatermark))) {
      updateHighWatermarkMetadata(newHighWatermark)
      Some(oldHighWatermark)// 返回老的高水位值
    } else {
      None
    }
  }
}

這個方法我將這個方法的英文註釋貼出來了,這個註釋的說明我也寫到方法上了,邏輯很清楚,大家看看註釋應該能理解。

這兩個方法主要的區別是,updateHighWatermark 方法,主要用在 Follower 副本從 Leader 副本獲取到消息后更新高水位值。而 maybeIncrementHighWatermark 方法,主要是用來更新 Leader 副本的高水位值。

上面的方法中通過調用fetchHighWatermarkMetadata來獲取高水位的值,我們下面看看這個方法:

fetchHighWatermarkMetadata

  private def fetchHighWatermarkMetadata: LogOffsetMetadata = {
    // 讀取時確保日誌不能被關閉
    checkIfMemoryMappedBufferClosed()

    val offsetMetadata = highWatermarkMetadata
    if (offsetMetadata.messageOffsetOnly) {//沒有獲得到完整的高水位元數據
      lock.synchronized {
        // 通過讀日誌文件的方式把完整的高水位元數據信息拉出來
        val fullOffset = convertToOffsetMetadataOrThrow(highWatermark)
        updateHighWatermarkMetadata(fullOffset)
        fullOffset
      }
    } else {
      offsetMetadata
    }
  }

  private def convertToOffsetMetadataOrThrow(offset: Long): LogOffsetMetadata = {
    //通過給的offset,去日誌文件中找到相應的日誌信息
    val fetchDataInfo = read(offset,
      maxLength = 1,
      isolation = FetchLogEnd,
      minOneMessage = false)
    fetchDataInfo.fetchOffsetMetadata
  }

然後我們提前看一下日誌的read方法,是如何根據索引讀取數據的:

日誌段操作

日誌讀取操作

read

  def read(startOffset: Long,
           maxLength: Int,
           isolation: FetchIsolation,
           minOneMessage: Boolean): FetchDataInfo = {
    maybeHandleIOException(s"Exception while reading from $topicPartition in dir ${dir.getParent}") {
      trace(s"Reading $maxLength bytes from offset $startOffset of length $size bytes")

      //convertToOffsetMetadataOrThrow傳進來是FetchLogEnd,所以這裡是false
      val includeAbortedTxns = isolation == FetchTxnCommitted
 
      // 由於沒有使用鎖,所以使用變量緩存當前的nextOffsetMetadata狀態
      val endOffsetMetadata = nextOffsetMetadata
      val endOffset = endOffsetMetadata.messageOffset
      // 到日字段中根據索引尋找最近的日誌段
      var segmentEntry = segments.floorEntry(startOffset)

      // return error on attempt to read beyond the log end offset or read below log start offset
      // 這裏給出了幾種異常場景:
      // 1. 給的日誌索引大於最大值;
      // 2. 通過索引找的日誌段為空;
      // 3. 給的日誌索引小於logStartOffset
      if (startOffset > endOffset || segmentEntry == null || startOffset < logStartOffset)
        throw new OffsetOutOfRangeException(s"Received request for offset $startOffset for partition $topicPartition, " +
          s"but we only have log segments in the range $logStartOffset to $endOffset.")

      //convertToOffsetMetadataOrThrow傳進來是FetchLogEnd,所以最大值是endOffsetMetadata
      // 查看一下讀取隔離級別設置。
      // 普通消費者能夠看到[Log Start Offset, LEO)之間的消息
      // 事務型消費者只能看到[Log Start Offset, Log Stable Offset]之間的消息。Log Stable Offset(LSO)是比LEO值小的位移值,為Kafka事務使用
      // Follower副本消費者能夠看到[Log Start Offset,高水位值]之間的消息
      val maxOffsetMetadata = isolation match {
        case FetchLogEnd => endOffsetMetadata
        case FetchHighWatermark => fetchHighWatermarkMetadata
        case FetchTxnCommitted => fetchLastStableOffsetMetadata
      }
      //如果尋找的索引等於maxOffsetMetadata,那麼直接返回
      if (startOffset == maxOffsetMetadata.messageOffset) {
        return emptyFetchDataInfo(maxOffsetMetadata, includeAbortedTxns)
      //如果尋找的索引大於maxOffsetMetadata,返回空的消息集合,因為沒法讀取任何消息
      } else if (startOffset > maxOffsetMetadata.messageOffset) {
        val startOffsetMetadata = convertToOffsetMetadataOrThrow(startOffset)
        return emptyFetchDataInfo(startOffsetMetadata, includeAbortedTxns)
      }
 
      // 開始遍曆日志段對象,直到讀出東西來或者讀到日誌末尾
      while (segmentEntry != null) {
        val segment = segmentEntry.getValue
        // 找到日誌段中最大的日誌位移
        val maxPosition = { 
          if (maxOffsetMetadata.segmentBaseOffset == segment.baseOffset) {
            maxOffsetMetadata.relativePositionInSegment
          } else {
            segment.size
          }
        }
        // 根據位移信息從日誌段中讀取日誌信息
        val fetchInfo = segment.read(startOffset, maxLength, maxPosition, minOneMessage)
        // 如果找不到日誌信息,那麼去日誌段集合中找更大的日誌位移的日誌段
        if (fetchInfo == null) {
          segmentEntry = segments.higherEntry(segmentEntry.getKey)
        } else {
          return if (includeAbortedTxns)
            addAbortedTransactions(startOffset, segmentEntry, fetchInfo)
          else
            fetchInfo
        }
      }

      //找了所有日誌段的位移依然找不到,這可能是因為大於指定的日誌位移的消息都被刪除了,這種情況返回空
      FetchDataInfo(nextOffsetMetadata, MemoryRecords.EMPTY)
    }
  }

read方法,有四個參數,分別是:

  • startOffset:讀取的日誌索引位置。
  • maxLength:讀取數據量長度。
  • isolation:隔離級別,多用於 Kafka 事務。
  • minOneMessage:是否至少返回一條消息。設想如果消息很大,超過了 maxLength,正常情況下 read 方法永遠不會返回任何消息。但如果設置了該參數為 true,read 方法就保證至少能夠返回一條消息。

代碼中使用了segments,來根據位移查找日誌段:

  private val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] = new ConcurrentSkipListMap[java.lang.Long, LogSegment]

我們下面看看read方法具體做了哪些事:

  1. 由於沒有使用鎖,所以使用變量緩存當前的nextOffsetMetadata狀態,作為最大索引LEO;
  2. 去日誌段集合里尋找小於或等於指定索引的日誌段;
  3. 校驗異常情況:
    1. startOffset是不是超過了LEO;
    2. 是不是日誌段集合里沒有索引小於startOffset;
    3. startOffset小於Log Start Offset;
  4. 接下來獲取一下隔離級別;
  5. 如果尋找的索引等於LEO,那麼返回空;
  6. 如果尋找的索引大於LEO,返回空的消息集合,因為沒法讀取任何消息;
  7. 開始遍曆日志段對象,直到讀出東西來或者讀到日誌末尾;
    1. 首先找到日誌段中最大的位置;
    2. 根據位移信息從日誌段中讀取日誌信息(這個read方法我們上一篇已經講解過了);
    3. 如果找不到日誌信息,那麼讀取日誌段集合中下一個日誌段;
  8. 找了所有日誌段的位移依然找不到,這可能是因為大於指定的日誌位移的消息都被刪除了,這種情況返回空;

我們在上面的read操作中可以看到,使用了segments來查找日誌。我們主要看看刪除操作

刪除日誌

刪除日誌的入口是:deleteOldSegments

  //  如果topic deletion開關是打開的,那麼會刪去過期的日誌段以及超過設置保留日誌大小的日誌
  // 無論是否開啟刪除規則,都會刪除在log start offset之前的日誌段
  def deleteOldSegments(): Int = {
    if (config.delete) {
      deleteRetentionMsBreachedSegments() + deleteRetentionSizeBreachedSegments() + deleteLogStartOffsetBreachedSegments()
    } else {
      deleteLogStartOffsetBreachedSegments()
    }
  }

deleteOldSegments方法會判斷是否開啟刪除規則,如果開啟,那麼會分別調用:

deleteRetentionMsBreachedSegments刪除segment的時間戳超過了設置時間的日誌段;

deleteRetentionSizeBreachedSegments刪除日誌段空間超過設置空間大小的日誌段;

deleteLogStartOffsetBreachedSegments刪除日誌段的baseOffset小於logStartOffset的日誌段;

我這裏列舉一下這三個方法主要是怎麼實現的:

  private def deleteRetentionMsBreachedSegments(): Int = {
    if (config.retentionMs < 0) return 0
    val startMs = time.milliseconds
    //調用deleteOldSegments方法,並傳入匿名函數,判斷當前的segment的時間戳是否超過了設置時間
    deleteOldSegments((segment, _) => startMs - segment.largestTimestamp > config.retentionMs,
      reason = s"retention time ${config.retentionMs}ms breach")
  }
  
  private def deleteRetentionSizeBreachedSegments(): Int = {
    if (config.retentionSize < 0 || size < config.retentionSize) return 0
    var diff = size - config.retentionSize
    //判斷日誌段空間是否超過設置空間大小
    //shouldDelete函數會將傳入的日誌段去減diff,直到小於等於零
    def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]) = {
      if (diff - segment.size >= 0) {
        diff -= segment.size
        true
      } else {
        false
      }
    }

    deleteOldSegments(shouldDelete, reason = s"retention size in bytes ${config.retentionSize} breach")
  }
  
  private def deleteLogStartOffsetBreachedSegments(): Int = {
    //shouldDelete函數主要判斷日誌段的baseOffset是否小於logStartOffset
    def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]) =
      nextSegmentOpt.exists(_.baseOffset <= logStartOffset)

    deleteOldSegments(shouldDelete, reason = s"log start offset $logStartOffset breach")
  }

這種寫代碼的方式非常的靈活,通過不同方法設置不同的函數來實現代碼復用的目的,最後都是通過調用deleteOldSegments來實現刪除日誌段的目的。

下面我們來看一下deleteOldSegments的操作:

deleteOldSegments

這個deleteOldSegments方法和上面的入口方法傳入的參數是不一致的,這個方法傳入了一個predicate函數,用於判斷哪些日誌段是可以被刪除的,reason用來說明被刪除的原因。

  private def deleteOldSegments(predicate: (LogSegment, Option[LogSegment]) => Boolean, reason: String): Int = {
    //刪除任何匹配到predicate規則的日誌段
    lock synchronized {
      val deletable = deletableSegments(predicate)
      if (deletable.nonEmpty)
        info(s"Found deletable segments with base offsets [${deletable.map(_.baseOffset).mkString(",")}] due to $reason")
      deleteSegments(deletable)
    }
  }

這個方法調用了兩個主要的方法,一個是deletableSegments,用於獲取可以被刪除的日誌段的集合;deleteSegments用於刪除日誌段。

deletableSegments

  private def deletableSegments(predicate: (LogSegment, Option[LogSegment]) => Boolean): Iterable[LogSegment] = {
    //如果日誌段是空的,那麼直接返回
    if (segments.isEmpty) {
      Seq.empty
    } else {
      val deletable = ArrayBuffer.empty[LogSegment]
      var segmentEntry = segments.firstEntry
      //如果日誌段集合不為空,找到第一個日誌段
      while (segmentEntry != null) {
        val segment = segmentEntry.getValue
        //獲取下一個日誌段
        val nextSegmentEntry = segments.higherEntry(segmentEntry.getKey)
        val (nextSegment, upperBoundOffset, isLastSegmentAndEmpty) = if (nextSegmentEntry != null)
          (nextSegmentEntry.getValue, nextSegmentEntry.getValue.baseOffset, false)
        else
          (null, logEndOffset, segment.size == 0)
        //如果下一個日誌段的位移沒有大於或等於HW,並且日誌段是匹配predicate函數的,下一個日誌段也不是空的
        //那麼將這個日誌段放入可刪除集合中,然後遍歷下一個日誌段
        if (highWatermark >= upperBoundOffset && predicate(segment, Option(nextSegment)) && !isLastSegmentAndEmpty) {
          deletable += segment
          segmentEntry = nextSegmentEntry
        } else {
          segmentEntry = null
        }
      }
      deletable
    }
  }

這個方法邏輯十分清晰,主要做了如下幾件事:

  1. 判斷日誌段集合是否為空,為空那麼直接返回空集合;

  2. 如果日誌段集合不為空,那麼從日誌段集合的第一個日誌段開始遍歷;

  3. 判斷當前被遍曆日志段是否能夠被刪除

    1. 日誌段的下一個日誌段的位移有沒有大於或等於HW;
    2. 日誌段是否能夠通過predicate函數校驗;
    3. 日誌段是否是最後一個日誌段;
  4. 將符合條件的日誌段都加入到deletable集合中,並返回。

接下來調用deleteSegments函數:

  private def deleteSegments(deletable: Iterable[LogSegment]): Int = {
    maybeHandleIOException(s"Error while deleting segments for $topicPartition in dir ${dir.getParent}") {
      val numToDelete = deletable.size
      if (numToDelete > 0) {
        // we must always have at least one segment, so if we are going to delete all the segments, create a new one first
        // 我們至少保證要存在一個日誌段,如果要刪除所有的日誌;
        //所以調用roll方法創建一個全新的日誌段對象,並且關閉當前寫入的日誌段對象;
        if (segments.size == numToDelete)
          roll()
        lock synchronized {
          // 確保Log對象沒有被關閉
          checkIfMemoryMappedBufferClosed()
          // remove the segments for lookups
          // 刪除給定的日誌段對象以及底層的物理文件
          removeAndDeleteSegments(deletable, asyncDelete = true)
          // 嘗試更新日誌的Log Start Offset值
          maybeIncrementLogStartOffset(segments.firstEntry.getValue.baseOffset)
        }
      }
      numToDelete
    }
  }

寫日誌

寫日誌的方法主要有兩個:

appendAsLeader

  def appendAsLeader(records: MemoryRecords, leaderEpoch: Int, isFromClient: Boolean = true,
                     interBrokerProtocolVersion: ApiVersion = ApiVersion.latestVersion): LogAppendInfo = {
    append(records, isFromClient, interBrokerProtocolVersion, assignOffsets = true, leaderEpoch)
  }

appendAsFollower

  def appendAsFollower(records: MemoryRecords): LogAppendInfo = {
    append(records, isFromClient = false, interBrokerProtocolVersion = ApiVersion.latestVersion, assignOffsets = false, leaderEpoch = -1)
  }

appendAsLeader 是用於寫 Leader 副本的,appendAsFollower 是用於 Follower 副本同步的。它們的底層都調用了 append 方法

append

  private def append(records: MemoryRecords, isFromClient: Boolean, interBrokerProtocolVersion: ApiVersion, assignOffsets: Boolean, leaderEpoch: Int): LogAppendInfo = {
    maybeHandleIOException(s"Error while appending records to $topicPartition in dir ${dir.getParent}") {
      // 第1步:分析和驗證待寫入消息集合,並返回校驗結果
      val appendInfo = analyzeAndValidateRecords(records, isFromClient = isFromClient)

      // return if we have no valid messages or if this is a duplicate of the last appended entry
      // 如果壓根就不需要寫入任何消息,直接返回即可
      if (appendInfo.shallowCount == 0)
        return appendInfo

      // trim any invalid bytes or partial messages before appending it to the on-disk log
      // 第2步:消息格式規整,即刪除無效格式消息或無效字節
      var validRecords = trimInvalidBytes(records, appendInfo)

      // they are valid, insert them in the log
      lock synchronized {
        // 確保Log對象未關閉
        checkIfMemoryMappedBufferClosed()
        //需要分配位移值
        if (assignOffsets) {
          // assign offsets to the message set
          // 第3步:使用當前LEO值作為待寫入消息集合中第一條消息的位移值,nextOffsetMetadata為LEO值
          val offset = new LongRef(nextOffsetMetadata.messageOffset)
          appendInfo.firstOffset = Some(offset.value)
          val now = time.milliseconds
          val validateAndOffsetAssignResult = try {
            LogValidator.validateMessagesAndAssignOffsets(validRecords,
              topicPartition,
              offset,
              time,
              now,
              appendInfo.sourceCodec,
              appendInfo.targetCodec,
              config.compact,
              config.messageFormatVersion.recordVersion.value,
              config.messageTimestampType,
              config.messageTimestampDifferenceMaxMs,
              leaderEpoch,
              isFromClient,
              interBrokerProtocolVersion,
              brokerTopicStats)
          } catch {
            case e: IOException =>
              throw new KafkaException(s"Error validating messages while appending to log $name", e)
          }
          // 更新校驗結果對象類LogAppendInfo
          validRecords = validateAndOffsetAssignResult.validatedRecords
          appendInfo.maxTimestamp = validateAndOffsetAssignResult.maxTimestamp
          appendInfo.offsetOfMaxTimestamp = validateAndOffsetAssignResult.shallowOffsetOfMaxTimestamp
          appendInfo.lastOffset = offset.value - 1
          appendInfo.recordConversionStats = validateAndOffsetAssignResult.recordConversionStats
          if (config.messageTimestampType == TimestampType.LOG_APPEND_TIME)
            appendInfo.logAppendTime = now

          // re-validate message sizes if there's a possibility that they have changed (due to re-compression or message
          // format conversion)
          // 第4步:驗證消息,確保消息大小不超限
          if (validateAndOffsetAssignResult.messageSizeMaybeChanged) {
            for (batch <- validRecords.batches.asScala) {
              if (batch.sizeInBytes > config.maxMessageSize) {
                // we record the original message set size instead of the trimmed size
                // to be consistent with pre-compression bytesRejectedRate recording
                brokerTopicStats.topicStats(topicPartition.topic).bytesRejectedRate.mark(records.sizeInBytes)
                brokerTopicStats.allTopicsStats.bytesRejectedRate.mark(records.sizeInBytes)
                throw new RecordTooLargeException(s"Message batch size is ${batch.sizeInBytes} bytes in append to" +
                  s"partition $topicPartition which exceeds the maximum configured size of ${config.maxMessageSize}.")
              }
            }
          }
          // 直接使用給定的位移值,無需自己分配位移值
        } else {
          // we are taking the offsets we are given
          if (!appendInfo.offsetsMonotonic)// 確保消息位移值的單調遞增性
            throw new OffsetsOutOfOrderException(s"Out of order offsets found in append to $topicPartition: " +
                                                 records.records.asScala.map(_.offset))

          if (appendInfo.firstOrLastOffsetOfFirstBatch < nextOffsetMetadata.messageOffset) {
            // we may still be able to recover if the log is empty
            // one example: fetching from log start offset on the leader which is not batch aligned,
            // which may happen as a result of AdminClient#deleteRecords()
            val firstOffset = appendInfo.firstOffset match {
              case Some(offset) => offset
              case None => records.batches.asScala.head.baseOffset()
            }

            val firstOrLast = if (appendInfo.firstOffset.isDefined) "First offset" else "Last offset of the first batch"
            throw new UnexpectedAppendOffsetException(
              s"Unexpected offset in append to $topicPartition. $firstOrLast " +
              s"${appendInfo.firstOrLastOffsetOfFirstBatch} is less than the next offset ${nextOffsetMetadata.messageOffset}. " +
              s"First 10 offsets in append: ${records.records.asScala.take(10).map(_.offset)}, last offset in" +
              s" append: ${appendInfo.lastOffset}. Log start offset = $logStartOffset",
              firstOffset, appendInfo.lastOffset)
          }
        }

        // update the epoch cache with the epoch stamped onto the message by the leader
        // 第5步:更新Leader Epoch緩存
        validRecords.batches.asScala.foreach { batch =>
          if (batch.magic >= RecordBatch.MAGIC_VALUE_V2) {
            maybeAssignEpochStartOffset(batch.partitionLeaderEpoch, batch.baseOffset)
          } else {
            // In partial upgrade scenarios, we may get a temporary regression to the message format. In
            // order to ensure the safety of leader election, we clear the epoch cache so that we revert
            // to truncation by high watermark after the next leader election.
            leaderEpochCache.filter(_.nonEmpty).foreach { cache =>
              warn(s"Clearing leader epoch cache after unexpected append with message format v${batch.magic}")
              cache.clearAndFlush()
            }
          }
        }

        // check messages set size may be exceed config.segmentSize
        // 第6步:確保消息大小不超限
        if (validRecords.sizeInBytes > config.segmentSize) {
          throw new RecordBatchTooLargeException(s"Message batch size is ${validRecords.sizeInBytes} bytes in append " +
            s"to partition $topicPartition, which exceeds the maximum configured segment size of ${config.segmentSize}.")
        }

        // maybe roll the log if this segment is full
        // 第7步:執行日誌切分。當前日誌段剩餘容量可能無法容納新消息集合,因此有必要創建一個新的日誌段來保存待寫入的所有消息
        //下面情況將會執行日誌切分:
        //logSegment 已經滿了
        //日誌段中的第一個消息的maxTime已經過期
        //index索引滿了
        val segment = maybeRoll(validRecords.sizeInBytes, appendInfo)

        val logOffsetMetadata = LogOffsetMetadata(
          messageOffset = appendInfo.firstOrLastOffsetOfFirstBatch,
          segmentBaseOffset = segment.baseOffset,
          relativePositionInSegment = segment.size)

        // now that we have valid records, offsets assigned, and timestamps updated, we need to
        // validate the idempotent/transactional state of the producers and collect some metadata
        // 第8步:驗證事務狀態
        val (updatedProducers, completedTxns, maybeDuplicate) = analyzeAndValidateProducerState(
          logOffsetMetadata, validRecords, isFromClient)

        maybeDuplicate.foreach { duplicate =>
          appendInfo.firstOffset = Some(duplicate.firstOffset)
          appendInfo.lastOffset = duplicate.lastOffset
          appendInfo.logAppendTime = duplicate.timestamp
          appendInfo.logStartOffset = logStartOffset
          return appendInfo
        }
        // 第9步:執行真正的消息寫入操作,主要調用日誌段對象的append方法實現
        segment.append(largestOffset = appendInfo.lastOffset,
          largestTimestamp = appendInfo.maxTimestamp,
          shallowOffsetOfMaxTimestamp = appendInfo.offsetOfMaxTimestamp,
          records = validRecords)

        // Increment the log end offset. We do this immediately after the append because a
        // write to the transaction index below may fail and we want to ensure that the offsets
        // of future appends still grow monotonically. The resulting transaction index inconsistency
        // will be cleaned up after the log directory is recovered. Note that the end offset of the
        // ProducerStateManager will not be updated and the last stable offset will not advance
        // if the append to the transaction index fails.
        // 第10步:更新LEO對象,其中,LEO值是消息集合中最後一條消息位移值+1
        // 前面說過,LEO值永遠指向下一條不存在的消息
        updateLogEndOffset(appendInfo.lastOffset + 1)

        // update the producer state
        // 第11步:更新事務狀態
        for (producerAppendInfo <- updatedProducers.values) {
          producerStateManager.update(producerAppendInfo)
        }

        // update the transaction index with the true last stable offset. The last offset visible
        // to consumers using READ_COMMITTED will be limited by this value and the high watermark.
        for (completedTxn <- completedTxns) {
          val lastStableOffset = producerStateManager.lastStableOffset(completedTxn)
          segment.updateTxnIndex(completedTxn, lastStableOffset)
          producerStateManager.completeTxn(completedTxn)
        }

        // always update the last producer id map offset so that the snapshot reflects the current offset
        // even if there isn't any idempotent data being written
        producerStateManager.updateMapEndOffset(appendInfo.lastOffset + 1)

        // update the first unstable offset (which is used to compute LSO)
        maybeIncrementFirstUnstableOffset()

        trace(s"Appended message set with last offset: ${appendInfo.lastOffset}, " +
          s"first offset: ${appendInfo.firstOffset}, " +
          s"next offset: ${nextOffsetMetadata.messageOffset}, " +
          s"and messages: $validRecords")
        // 是否需要手動落盤。一般情況下我們不需要設置Broker端參數log.flush.interval.messages
        // 落盤操作交由操作系統來完成。但某些情況下,可以設置該參數來確保高可靠性
        if (unflushedMessages >= config.flushInterval)
          flush()
        // 第12步:返回寫入結果
        appendInfo
      }
    }
  }

上面代碼的主要步驟如下:

我們下面看看analyzeAndValidateRecords是如何進行消息校驗的:

analyzeAndValidateRecords

  private def analyzeAndValidateRecords(records: MemoryRecords, isFromClient: Boolean): LogAppendInfo = {
    var shallowMessageCount = 0
    var validBytesCount = 0
    var firstOffset: Option[Long] = None
    var lastOffset = -1L
    var sourceCodec: CompressionCodec = NoCompressionCodec
    var monotonic = true
    var maxTimestamp = RecordBatch.NO_TIMESTAMP
    var offsetOfMaxTimestamp = -1L
    var readFirstMessage = false
    var lastOffsetOfFirstBatch = -1L

    for (batch <- records.batches.asScala) {
      // we only validate V2 and higher to avoid potential compatibility issues with older clients
      // 消息格式Version 2的消息批次,起始位移值必須從0開始
      if (batch.magic >= RecordBatch.MAGIC_VALUE_V2 && isFromClient && batch.baseOffset != 0)
        throw new InvalidRecordException(s"The baseOffset of the record batch in the append to $topicPartition should " +
          s"be 0, but it is ${batch.baseOffset}")

      // update the first offset if on the first message. For magic versions older than 2, we use the last offset
      // to avoid the need to decompress the data (the last offset can be obtained directly from the wrapper message).
      // For magic version 2, we can get the first offset directly from the batch header.
      // When appending to the leader, we will update LogAppendInfo.baseOffset with the correct value. In the follower
      // case, validation will be more lenient.
      // Also indicate whether we have the accurate first offset or not
      if (!readFirstMessage) {
        if (batch.magic >= RecordBatch.MAGIC_VALUE_V2)
          firstOffset = Some(batch.baseOffset) // 更新firstOffset字段
        lastOffsetOfFirstBatch = batch.lastOffset // 更新lastOffsetOfFirstBatch字段
        readFirstMessage = true
      }

      // check that offsets are monotonically increasing
      // 一旦出現當前lastOffset不小於下一個batch的lastOffset,說明上一個batch中有消息的位移值大於後面batch的消息
      // 這違反了位移值單調遞增性
      if (lastOffset >= batch.lastOffset)
        monotonic = false

      // update the last offset seen
      // 使用當前batch最後一條消息的位移值去更新lastOffset
      lastOffset = batch.lastOffset

      // Check if the message sizes are valid.
      val batchSize = batch.sizeInBytes
      // 檢查消息批次總字節數大小是否超限,即是否大於Broker端參數max.message.bytes值
      if (batchSize > config.maxMessageSize) {
        brokerTopicStats.topicStats(topicPartition.topic).bytesRejectedRate.mark(records.sizeInBytes)
        brokerTopicStats.allTopicsStats.bytesRejectedRate.mark(records.sizeInBytes)
        throw new RecordTooLargeException(s"The record batch size in the append to $topicPartition is $batchSize bytes " +
          s"which exceeds the maximum configured value of ${config.maxMessageSize}.")
      }

      // check the validity of the message by checking CRC
      // 執行消息批次校驗,包括格式是否正確以及CRC校驗
      if (!batch.isValid) {
        brokerTopicStats.allTopicsStats.invalidMessageCrcRecordsPerSec.mark()
        throw new CorruptRecordException(s"Record is corrupt (stored crc = ${batch.checksum()}) in topic partition $topicPartition.")
      }
      // 更新maxTimestamp字段和offsetOfMaxTimestamp
      if (batch.maxTimestamp > maxTimestamp) {
        maxTimestamp = batch.maxTimestamp
        offsetOfMaxTimestamp = lastOffset
      }
      // 累加消息批次計數器以及有效字節數,更新shallowMessageCount字段
      shallowMessageCount += 1
      validBytesCount += batchSize
      // 從消息批次中獲取壓縮器類型
      val messageCodec = CompressionCodec.getCompressionCodec(batch.compressionType.id)
      if (messageCodec != NoCompressionCodec)
        sourceCodec = messageCodec
    }

    // Apply broker-side compression if any
    // 獲取Broker端設置的壓縮器類型,即Broker端參數compression.type值。
    // 該參數默認值是producer,表示sourceCodec用的什麼壓縮器,targetCodec就用什麼
    val targetCodec = BrokerCompressionCodec.getTargetCompressionCodec(config.compressionType, sourceCodec)
    // 最後生成LogAppendInfo對象並返回
    LogAppendInfo(firstOffset, lastOffset, maxTimestamp, offsetOfMaxTimestamp, RecordBatch.NO_TIMESTAMP, logStartOffset,
      RecordConversionStats.EMPTY, sourceCodec, targetCodec, shallowMessageCount, validBytesCount, monotonic, lastOffsetOfFirstBatch)
  }

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

新北清潔公司,居家、辦公、裝潢細清專業服務

※別再煩惱如何寫文案,掌握八大原則!

※教你寫出一流的銷售文案?

※超省錢租車方案

FB行銷專家,教你從零開始的技巧

Docker Dockerfile 指令詳解與實戰案例

 

Dockerfile介紹及常用指令,包括FROM,RUN,還提及了 COPY,ADD,EXPOSE,WORKDIR等,其實 Dockerfile 功能很強大,它提供了十多個指令。

 

Dockerfile介紹

Dockerfile 是一個用來構建鏡像的文本文件,文本內容包含了一條條構建鏡像所需的指令和說明。

在Docker中創建鏡像最常用的方式,就是使用Dockerfile。Dockerfile是一個Docker鏡像的描述文件,我們可以理解成火箭發射的A、B、C、D…的步驟。Dockerfile其內部包含了一條條的指令,每一條指令構建一層,因此每一條指令的內容,就是描述該層應當如何構建。

FROM 指令-指定基礎鏡像

所謂定製鏡像,那一定是以一個鏡像為基礎,在其上進行定製。而 FROM 就是指定基礎鏡像,因此一個 Dockerfile 中 FROM 是必備的指令,並且必須是第一條指令。如下:

FROM centos

 

MAINTAINER 維護者信息

該鏡像是由誰維護的

MAINTAINER lightzhang lightzhang@xxx.com

 

ENV 設置環境變量

格式有兩種:

ENV <key> <value>
ENV <key1>=<value1> <key2>=<value2>...

 

這個指令很簡單,就是設置環境變量而已,無論是後面的其它指令,如 RUN,還是運行時的應用,都可以直接使用這裏定義的環境變量。

ENV VERSION=1.0 DEBUG=on \
    NAME="Happy Feet"

這個例子中演示了如何換行,以及對含有空格的值用雙引號括起來的辦法,這和 Shell 下的行為是一致的。

下列指令可以支持環境變量展開:

ADD、COPY、ENV、EXPOSE、FROM、LABEL、USER、WORKDIR、VOLUME、STOPSIGNAL、ONBUILD、RUN。

可以從這個指令列表裡感覺到,環境變量可以使用的地方很多,很強大。通過環境變量,我們可以讓一份 Dockerfile 製作更多的鏡像,只需使用不同的環境變量即可。

 

ARG 構建參數

格式:ARG <參數名>[=<默認值>]

構建參數和 ENV 的效果一樣,都是設置環境變量。所不同的是,ARG 所設置的構建環境的環境變量,在將來容器運行時是不會存在這些環境變量的。但是不要因此就使用 ARG 保存密碼之類的信息,因為 docker history 還是可以看到所有值的。

Dockerfile 中的 ARG 指令是定義參數名稱,以及定義其默認值。該默認值可以在構建命令 docker build 中用 –build-arg <參數名>=<值> 來覆蓋

在 1.13 之前的版本,要求 –build-arg 中的參數名,必須在 Dockerfile 中用 ARG 定義過了,換句話說,就是 –build-arg 指定的參數,必須在 Dockerfile 中使用了。如果對應參數沒有被使用,則會報錯退出構建。

從 1.13 開始,這種嚴格的限制被放開,不再報錯退出,而是显示警告信息,並繼續構建。這對於使用 CI 系統,用同樣的構建流程構建不同的 Dockerfile 的時候比較有幫助,避免構建命令必須根據每個 Dockerfile 的內容修改。

 

RUN 執行命令

RUN 指令是用來執行命令行命令的。由於命令行的強大能力,RUN 指令在定製鏡像時是最常用的指令之一。其格式有兩種:

shell 格式:RUN <命令>,就像直接在命令行中輸入的命令一樣。如下:

RUN echo '<h1>Hello, Docker!</h1>' > /usr/share/nginx/html/index.html

 

exec 格式:RUN [“可執行文件”, “參數1”, “參數2”],這更像是函數調用中的格式。

Dockerfile 中每一個指令都會建立一層,RUN 也不例外。每一個 RUN 的行為,都會新建立一層,在其上執行這些命令,執行結束后,commit 這一層的修改,構成新的鏡像。

Dockerfile 不推薦寫法:

 1 FROM debian:stretch
 2 
 3 RUN apt-get update
 4 RUN apt-get install -y gcc libc6-dev make wget
 5 RUN wget -O redis.tar.gz "http://download.redis.io/releases/redis-5.0.3.tar.gz"
 6 RUN mkdir -p /usr/src/redis
 7 RUN tar -xzf redis.tar.gz -C /usr/src/redis --strip-components=1
 8 RUN make -C /usr/src/redis
 9 RUN make -C /usr/src/redis install
10 RUN rm redis.tar.gz

上面的這種寫法,創建了 8 層鏡像。這是完全沒有意義的,而且很多運行時不需要的東西,都被裝進了鏡像里,比如編譯環境、更新的軟件包等等。最後一行即使刪除了軟件包,那也只是當前層的刪除;雖然我們看不見這個包了,但軟件包卻早已存在於鏡像中並一直跟隨着鏡像,沒有真正的刪除。

結果就是產生非常臃腫、非常多層的鏡像,不僅僅增加了構建部署的時間,也很容易出錯。 這是很多初學 Docker 的人常犯的一個錯誤。

另外:Union FS 是有最大層數限制的,比如 AUFS,曾經是最大不得超過 42 層,現在是不得超過 127 層。

 

Dockerfile 正確寫法:

 1 FROM debian:stretch
 2 
 3 RUN buildDeps='gcc libc6-dev make wget' \
 4     && apt-get update \
 5     && apt-get install -y $buildDeps \
 6     && wget -O redis.tar.gz "http://download.redis.io/releases/redis-5.0.3.tar.gz" \
 7     && mkdir -p /usr/src/redis \
 8     && tar -xzf redis.tar.gz -C /usr/src/redis --strip-components=1 \
 9     && make -C /usr/src/redis \
10     && make -C /usr/src/redis install \
11     && rm -rf /var/lib/apt/lists/* \
12     && rm redis.tar.gz \
13     && rm -r /usr/src/redis \
14     && apt-get purge -y --auto-remove $buildDeps

這裏沒有使用很多個 RUN 對應不同的命令,而是僅僅使用一個 RUN 指令,並使用 && 將各個所需命令串聯起來。將之前的 8 層,簡化為了 1 層,且後面刪除了不需要的包和目錄。在撰寫 Dockerfile 的時候,要經常提醒自己,這並不是在寫 Shell 腳本,而是在定義每一層該如何構建。因此鏡像構建時,一定要確保每一層只添加真正需要添加的東西,任何無關的東西都應該清理掉。

很多人初學 Docker 製作出了很臃腫的鏡像的原因之一,就是忘記了每一層構建的最後一定要清理掉無關文件。

 

COPY 複製文件

格式:

1 COPY [--chown=<user>:<group>] <源路徑>... <目標路徑>
2 COPY [--chown=<user>:<group>] ["<源路徑1>",... "<目標路徑>"]

 

COPY 指令將從構建上下文目錄中 <源路徑> 的文件/目錄複製到新的一層的鏡像內的 <目標路徑> 位置。如:

COPY package.json /usr/src/app/

 

<源路徑> 可以是多個,甚至可以是通配符,其通配符規則要滿足 Go 的 filepath.Match 規則,如:

1 COPY hom* /mydir/
2 COPY hom?.txt /mydir/

<目標路徑> 可以是容器內的絕對路徑,也可以是相對於工作目錄的相對路徑(工作目錄可以用 WORKDIR 指令來指定)。目標路徑不需要事先創建,如果目錄不存在會在複製文件前先行創建缺失目錄。

此外,還需要注意一點,使用 COPY 指令,源文件的各種元數據都會保留。比如讀、寫、執行權限、文件變更時間等。這個特性對於鏡像定製很有用。特別是構建相關文件都在使用 Git 進行管理的時候。

在使用該指令的時候還可以加上 –chown=<user>:<group> 選項來改變文件的所屬用戶及所屬組。

1 COPY --chown=55:mygroup files* /mydir/
2 COPY --chown=bin files* /mydir/
3 COPY --chown=1 files* /mydir/
4 COPY --chown=10:11 files* /mydir/

 

ADD 更高級的複製文件

ADD 指令和 COPY 的格式和性質基本一致。但是在 COPY 基礎上增加了一些功能。

如果 <源路徑> 為一個 tar 壓縮文件的話,壓縮格式為 gzip, bzip2 以及 xz 的情況下,ADD 指令將會自動解壓縮這個壓縮文件到 <目標路徑> 去。

在某些情況下,如果我們真的是希望複製個壓縮文件進去,而不解壓縮,這時就不可以使用 ADD 命令了。

在 Docker 官方的 Dockerfile 最佳實踐文檔 中要求,盡可能的使用 COPY,因為 COPY 的語義很明確,就是複製文件而已,而 ADD 則包含了更複雜的功能,其行為也不一定很清晰。最適合使用 ADD 的場合,就是所提及的需要自動解壓縮的場合。

特別說明:在 COPY 和 ADD 指令中選擇的時候,可以遵循這樣的原則,所有的文件複製均使用 COPY 指令,僅在需要自動解壓縮的場合使用 ADD。

在使用該指令的時候還可以加上 –chown=<user>:<group> 選項來改變文件的所屬用戶及所屬組。

1 ADD --chown=55:mygroup files* /mydir/
2 ADD --chown=bin files* /mydir/
3 ADD --chown=1 files* /mydir/
4 ADD --chown=10:11 files* /mydir/

 

WORKDIR 指定工作目錄

格式為 WORKDIR <工作目錄路徑>

使用 WORKDIR 指令可以來指定工作目錄(或者稱為當前目錄),以後各層的當前目錄就被改為指定的目錄,如該目錄不存在,WORKDIR 會幫你建立目錄。

之前提到一些初學者常犯的錯誤是把 Dockerfile 等同於 Shell 腳本來書寫,這種錯誤的理解還可能會導致出現下面這樣的錯誤:

1 RUN cd /app
2 RUN echo "hello" > world.txt

如果將這個 Dockerfile 進行構建鏡像運行后,會發現找不到 /app/world.txt 文件,或者其內容不是 hello。原因其實很簡單,在 Shell 中,連續兩行是同一個進程執行環境,因此前一個命令修改的內存狀態,會直接影響后一個命令;而在 Dockerfile 中,這兩行 RUN 命令的執行環境根本不同,是兩個完全不同的容器。這就是對 Dockerfile 構建分層存儲的概念不了解所導致的錯誤。

之前說過每一個 RUN 都是啟動一個容器、執行命令、然後提交存儲層文件變更。第一層 RUN cd /app 的執行僅僅是當前進程的工作目錄變更,一個內存上的變化而已,其結果不會造成任何文件變更。而到第二層的時候,啟動的是一個全新的容器,跟第一層的容器更完全沒關係,自然不可能繼承前一層構建過程中的內存變化。

因此如果需要改變以後各層的工作目錄的位置,那麼應該使用 WORKDIR 指令。

 

USER 指定當前用戶

格式:USER <用戶名>[:<用戶組>]

USER 指令和 WORKDIR 相似,都是改變環境狀態並影響以後的層。WORKDIR 是改變工作目錄,USER 則是改變之後層的執行 RUN, CMD 以及 ENTRYPOINT 這類命令的身份。

當然,和 WORKDIR 一樣,USER 只是幫助你切換到指定用戶而已,這個用戶必須是事先建立好的,否則無法切換。

1 RUN groupadd -r redis && useradd -r -g redis redis
2 USER redis
3 RUN [ "redis-server" ]

 

如果以 root 執行的腳本,在執行期間希望改變身份,比如希望以某個已經建立好的用戶來運行某個服務進程,不要使用 su 或者 sudo,這些都需要比較麻煩的配置,而且在 TTY 缺失的環境下經常出錯。建議使用 gosu。

1 # 建立 redis 用戶,並使用 gosu 換另一個用戶執行命令
2 RUN groupadd -r redis && useradd -r -g redis redis
3 # 下載 gosu
4 RUN wget -O /usr/local/bin/gosu "https://github.com/tianon/gosu/releases/download/1.7/gosu-amd64" \
5     && chmod +x /usr/local/bin/gosu \
6     && gosu nobody true
7 # 設置 CMD,並以另外的用戶執行
8 CMD [ "exec", "gosu", "redis", "redis-server" ]

 

VOLUME 定義匿名卷

格式為:

1 VOLUME ["<路徑1>", "<路徑2>"...]
2 VOLUME <路徑>

之前我們說過,容器運行時應該盡量保持容器存儲層不發生寫操作,對於數據庫類需要保存動態數據的應用,其數據庫文件應該保存於卷(volume)中。為了防止運行時用戶忘記將動態文件所保存目錄掛載為卷,在 Dockerfile 中,我們可以事先指定某些目錄掛載為匿名卷,這樣在運行時如果用戶不指定掛載,其應用也可以正常運行,不會向容器存儲層寫入大量數據。

 

VOLUME /data

這裏的 /data 目錄就會在運行時自動掛載為匿名卷,任何向 /data 中寫入的信息都不會記錄進容器存儲層,從而保證了容器存儲層的無狀態化。當然,運行時可以覆蓋這個掛載設置。比如:

docker run -d -v mydata:/data xxxx

在這行命令中,就使用了 mydata 這個命名卷掛載到了 /data 這個位置,替代了 Dockerfile 中定義的匿名卷的掛載配置。

 

EXPOSE 聲明端口

格式為 EXPOSE <端口1> [<端口2>...]

EXPOSE 指令是聲明運行時容器提供服務端口,這隻是一個聲明,在運行時並不會因為這個聲明應用就會開啟這個端口的服務。在 Dockerfile 中寫入這樣的聲明有兩個好處,一個是幫助鏡像使用者理解這個鏡像服務的守護端口,以方便配置映射;另一個用處則是在運行時使用隨機端口映射時,也就是 docker run -P 時,會自動隨機映射 EXPOSE 的端口。

要將 EXPOSE 和在運行時使用 -p <宿主端口>:<容器端口> 區分開來。-p,是映射宿主端口和容器端口,換句話說,就是將容器的對應端口服務公開給外界訪問,而 EXPOSE 僅僅是聲明容器打算使用什麼端口而已,並不會自動在宿主進行端口映射。

 

ENTRYPOINT 入口點

ENTRYPOINT 的格式和 RUN 指令格式一樣,分為 exec 格式和 shell 格式

ENTRYPOINT 的目的和 CMD 一樣,都是在指定容器啟動程序及參數。ENTRYPOINT 在運行時也可以替代,不過比 CMD 要略顯繁瑣,需要通過 docker run 的參數 –entrypoint 來指定。

當指定了 ENTRYPOINT 后,CMD 的含義就發生了改變,不再是直接的運行其命令,而是將 CMD 的內容作為參數【】傳給 ENTRYPOINT 指令,換句話說實際執行時,將變為:

<ENTRYPOINT> "<CMD>"

 

那麼有了 CMD 后,為什麼還要有 ENTRYPOINT 呢?這種 <ENTRYPOINT> “<CMD>“ 有什麼好處么?讓我們來看幾個場景。

場景一:讓鏡像變成像命令一樣使用

假設我們需要一個得知自己當前公網 IP 的鏡像,那麼可以先用 CMD 來實現:

1 FROM centos:7.7.1908
2 CMD [ "curl", "-s", "ifconfig.io" ]

 

假如我們使用 docker build -t myip . 來構建鏡像的話,如果我們需要查詢當前公網 IP,只需要執行:

1 $ docker run myip
2 183.226.75.148

 

嗯,這麼看起來好像可以直接把鏡像當做命令使用了,不過命令總有參數,如果我們希望加參數呢?比如從上面的 CMD 中可以看到實質的命令是 curl,那麼如果我們希望显示 HTTP 頭信息,就需要加上 -i 參數。那麼我們可以直接加 -i 參數給 docker run myip 么?

1 $ docker run myip -i
2 docker: Error response from daemon: OCI runtime create failed: container_linux.go:348: starting container process caused "exec: \"-i\": executable file not found in $PATH": unknown.
3 ERRO[0000] error waiting for container: context canceled

 

我們可以看到可執行文件找不到的報錯,executable file not found。之前我們說過,跟在鏡像名後面的是 command【Usage: docker run [OPTIONS] IMAGE [COMMAND] [ARG…]】,運行時會替換 CMD 的默認值。因此這裏的 -i 替換了原來的 CMD,而不是添加在原來的 curl -s ifconfig.io 後面。而 -i 根本不是命令,所以自然找不到。

那麼如果我們希望加入 -i 這參數,我們就必須重新完整的輸入這個命令:

$ docker run myip curl -s ifconfig.io -i

 

這顯然不是很好的解決方案,而使用 ENTRYPOINT 就可以解決這個問題。現在我們重新用 ENTRYPOINT 來實現這個鏡像:

1 FROM centos:7.7.1908
2 ENTRYPOINT [ "curl", "-s", "ifconfig.io" ]

 

使用 docker build -t myip2 . 構建完成后,這次我們再來嘗試直接使用 docker run myip2 -i:

 1 $ docker run myip2 
 2 183.226.75.148
 3 
 4 $ docker run myip2 -i
 5 HTTP/1.1 200 OK
 6 Date: Sun, 19 Apr 2020 02:20:48 GMT
 7 Content-Type: text/plain; charset=utf-8
 8 Content-Length: 15
 9 Connection: keep-alive
10 Set-Cookie: __cfduid=d76a2e007bbe7ec2d230b0a6636d115151587262848; expires=Tue, 19-May-20 02:20:48 GMT; path=/; domain=.ifconfig.io; HttpOnly; SameSite=Lax
11 CF-Cache-Status: DYNAMIC
12 Server: cloudflare
13 CF-RAY: 586326015c3199a1-LAX
14 alt-svc: h3-27=":443"; ma=86400, h3-25=":443"; ma=86400, h3-24=":443"; ma=86400, h3-23=":443"; ma=86400
15 cf-request-id: 0231d614d9000099a1e10d7200000001
16 
17 183.226.75.148

 

可以看到,這次成功了。這是因為當存在 ENTRYPOINT 后,CMD 的內容將會作為參數傳給 ENTRYPOINT,而這裏 -i 就是新的 CMD,因此會作為參數傳給 curl,從而達到了我們預期的效果。

 

場景二:應用運行前的準備工作

啟動容器就是啟動主進程,但有些時候,啟動主進程前,需要一些準備工作。

比如 mysql 類的數據庫,可能需要一些數據庫配置、初始化的工作,這些工作要在最終的 mysql 服務器運行之前解決。

此外,可能希望避免使用 root 用戶去啟動服務,從而提高安全性,而在啟動服務前還需要以 root 身份執行一些必要的準備工作,最後切換到服務用戶身份啟動服務。或者除了服務外,其它命令依舊可以使用 root 身份執行,方便調試等。

這些準備工作是和容器 CMD 無關的,無論 CMD 是什麼,都需要事先進行一個預處理的工作。這種情況下,可以寫一個腳本,然後放入 ENTRYPOINT 中去執行,而這個腳本會將接到的參數(也就是 <CMD>)作為命令,在腳本最後執行。比如官方鏡像 redis 中就是這麼做的:

1 FROM alpine:3.4
2 ...
3 RUN addgroup -S redis && adduser -S -G redis redis
4 ...
5 ENTRYPOINT ["docker-entrypoint.sh"]
6 
7 EXPOSE 6379
8 CMD [ "redis-server" ]

 

可以看到其中為了 redis 服務創建了 redis 用戶,並在最後指定了 ENTRYPOINT 為 docker-entrypoint.sh 腳本。

1 #!/bin/sh
2 ...
3 # allow the container to be started with `--user`
4 if [ "$1" = 'redis-server' -a "$(id -u)" = '0' ]; then
5     chown -R redis .
6     exec su-exec redis "$0" "$@"
7 fi
8 
9 exec "$@"

 

該腳本的內容就是根據 CMD 的內容來判斷,如果是 redis-server 的話,則切換到 redis 用戶身份啟動服務器,否則依舊使用 root 身份執行。比如:

1 $ docker run -it redis id
2 uid=0(root) gid=0(root) groups=0(root)

 

CMD 容器啟動命令

CMD 指令的格式和 RUN 相似,也是兩種格式和一種特殊格式:

1 shell 格式:CMD <命令>
2 exec 格式:CMD ["可執行文件", "參數1", "參數2"...]
3 參數列表格式:CMD ["參數1", "參數2"...]。在指定了 ENTRYPOINT 指令后,用 CMD 指定具體的參數。

 

之前介紹容器的時候曾經說過,Docker 不是虛擬機,容器就是進程。既然是進程,那麼在啟動容器的時候,需要指定所運行的程序及參數。CMD 指令就是用於指定默認的容器主進程的啟動命令的。

在指令格式上,一般推薦使用 exec 格式,這類格式在解析時會被解析為 JSON 數組,因此一定要使用雙引號 “,而不要使用單引號。

如果使用 shell 格式的話,實際的命令會被包裝為 sh -c 的參數的形式進行執行。比如:

CMD echo $HOME

 

在實際執行中,會將其變更為:

CMD [ "sh", "-c", "echo $HOME" ]

 

這就是為什麼我們可以使用環境變量的原因,因為這些環境變量會被 shell 進行解析處理。

提到 CMD 就不得不提容器中應用在前台執行和後台執行的問題。這是初學者常出現的一個混淆。

Docker 不是虛擬機,容器中的應用都應該以前台執行,而不是像虛擬機、物理機裏面那樣,用 systemd 去啟動後台服務,容器內沒有後台服務的概念

對於容器而言,其啟動程序就是容器應用進程,容器就是為了主進程而存在的,主進程退出,容器就失去了存在的意義,從而退出,其它輔助進程不是它需要關心的東西。

一些初學者將 CMD 寫為:

CMD service nginx start

 

使用 service nginx start 命令,則是希望 upstart 來以後台守護進程形式啟動 nginx 服務。而剛才說了 CMD service nginx start 會被理解為 CMD [ “sh”, “-c”, “service nginx start”],因此主進程實際上是 sh。那麼當 service nginx start 命令結束后,sh 也就結束了,sh 作為主進程退出了,自然就會令容器退出。

正確的做法是直接執行 nginx 可執行文件,並且要求以前台形式運行。比如:

CMD ["nginx", "-g", "daemon off;"]

 

構建鏡像案例-Nginx

構建文件

 1 [root@docker01 make03]# pwd
 2 /root/docker_test/make03
 3 [root@docker01 make03]# ll
 4 total 12
 5 -rw-r--r-- 1 root root 720 Apr 19 16:46 Dockerfile
 6 -rw-r--r-- 1 root root  95 Apr 19 16:19 entrypoint.sh
 7 -rw-r--r-- 1 root root  22 Apr 19 16:18 index.html
 8 [root@docker01 make03]# cat Dockerfile   # Dockerfile文件
 9 # 基礎鏡像
10 FROM centos:7.7.1908
11 
12 # 維護者
13 MAINTAINER lightzhang lightzhang@xxx.com
14 
15 # 命令:做了些什麼操作
16 RUN echo 'nameserver 223.5.5.5' > /etc/resolv.conf && echo 'nameserver 223.6.6.6' >> /etc/resolv.conf
17 RUN curl -o /etc/yum.repos.d/CentOS-Base.repo http://mirrors.aliyun.com/repo/Centos-7.repo && curl -o /etc/yum.repos.d/epel.repo http://mirrors.aliyun.com/repo/epel-7.repo
18 RUN yum install -y nginx-1.16.1 && yum clean all
19 RUN echo "daemon off;" >> /etc/nginx/nginx.conf
20 
21 # 添加文件
22 COPY index.html /usr/share/nginx/html/index.html
23 COPY entrypoint.sh /usr/local/bin/entrypoint.sh
24 
25 # 對外暴露端口聲明
26 EXPOSE 80
27 
28 # 執行
29 ENTRYPOINT ["sh", "entrypoint.sh"]
30 
31 # 執行命令
32 CMD ["nginx"]
33 
34 [root@docker01 make03]# cat index.html   # 訪問文件
35 nginx in docker, html
36 [root@docker01 make03]# cat entrypoint.sh   # entrypoint 文件
37 #!/bin/bash
38 if [ "$1" = 'nginx' ]; then
39     exec nginx -c /etc/nginx/nginx.conf
40 fi
41 exec "$@"

 

構建鏡像

 1 [root@docker01 make03]# docker build -t base/nginx:1.16.1 .
 2 Sending build context to Docker daemon  4.608kB
 3 Step 1/11 : FROM centos:7.7.1908
 4  ---> 08d05d1d5859
 5 Step 2/11 : MAINTAINER lightzhang lightzhang@xxx.com
 6  ---> Using cache
 7  ---> 1dc29e78d94f
 8 Step 3/11 : RUN echo 'nameserver 223.5.5.5' > /etc/resolv.conf && echo 'nameserver 223.6.6.6' >> /etc/resolv.conf
 9  ---> Using cache
10  ---> 19398ad9b023
11 Step 4/11 : RUN curl -o /etc/yum.repos.d/CentOS-Base.repo http://mirrors.aliyun.com/repo/Centos-7.repo && curl -o /etc/yum.repos.d/epel.repo http://mirrors.aliyun.com/repo/epel-7.repo
12  ---> Using cache
13  ---> b2451c5856c5
14 Step 5/11 : RUN yum install -y nginx-1.16.1 && yum clean all
15  ---> Using cache
16  ---> 291f27cae4df
17 Step 6/11 : RUN echo "daemon off;" >> /etc/nginx/nginx.conf
18  ---> Using cache
19  ---> 115e07b6313e
20 Step 7/11 : COPY index.html /usr/share/nginx/html/index.html
21  ---> Using cache
22  ---> 9d714d2e2a84
23 Step 8/11 : COPY entrypoint.sh /usr/local/bin/entrypoint.sh
24  ---> Using cache
25  ---> b16983911b56
26 Step 9/11 : EXPOSE 80
27  ---> Using cache
28  ---> d8675d6c2d43
29 Step 10/11 : ENTRYPOINT ["sh", "entrypoint.sh"]
30  ---> Using cache
31  ---> 802a1a67db37
32 Step 11/11 : CMD ["nginx"]
33  ---> Using cache
34  ---> f2517b4d5510
35 Successfully built f2517b4d5510
36 Successfully tagged base/nginx:1.16.1

 

發布容器與端口查看

 1 [root@docker01 ~]# docker run -d -p 80:80 --name mynginx_v2 base/nginx:1.16.1   # 啟動容器
 2 50a45a0894d8669308de7c70d47c96db8cd8990d3e34d1d125e5289ed062f126
 3 [root@docker01 ~]# 
 4 [root@docker01 ~]# docker ps 
 5 CONTAINER ID   IMAGE               COMMAND                  CREATED         STATUS         PORTS                NAMES
 6 50a45a0894d8   base/nginx:1.16.1   "sh entrypoint.sh ng…"   3 minutes ago   Up 3 minutes   0.0.0.0:80->80/tcp   mynginx_v2
 7 [root@docker01 ~]# netstat -lntup  # 端口查看
 8 Active Internet connections (only servers)
 9 Proto Recv-Q Send-Q Local Address           Foreign Address         State       PID/Program name    
10 tcp        0      0 127.0.0.1:25            0.0.0.0:*               LISTEN      1634/master         
11 tcp        0      0 0.0.0.0:111             0.0.0.0:*               LISTEN      1/systemd           
12 tcp        0      0 0.0.0.0:22              0.0.0.0:*               LISTEN      1349/sshd           
13 tcp6       0      0 ::1:25                  :::*                    LISTEN      1634/master         
14 tcp6       0      0 :::111                  :::*                    LISTEN      1/systemd           
15 tcp6       0      0 :::80                   :::*                    LISTEN      13625/docker-proxy  
16 tcp6       0      0 :::8080                 :::*                    LISTEN      2289/docker-proxy   
17 tcp6       0      0 :::22                   :::*                    LISTEN      1349/sshd           
18 udp        0      0 0.0.0.0:1021            0.0.0.0:*                           847/rpcbind         
19 udp        0      0 0.0.0.0:111             0.0.0.0:*                           1/systemd           
20 udp6       0      0 :::1021                 :::*                                847/rpcbind         
21 udp6       0      0 :::111                  :::*                                1/systemd

 

瀏覽器訪問

http://172.16.1.31/

 

 

  

 

 

———END———
如果覺得不錯就關注下唄 (-^O^-) !

 

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

新北清潔公司,居家、辦公、裝潢細清專業服務

※別再煩惱如何寫文案,掌握八大原則!

網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

※超省錢租車方案

※教你寫出一流的銷售文案?

網頁設計最專業,超強功能平台可客製化

非洲豬瘟疫情防控 南韓今年確診11起

摘錄自2020年01月12日中央通訊社南韓報導

南韓環境部和國立環境科學院今天(12日)表示,自去年10月3日首次確診以來,截至10日共有66起確診感染非洲豬瘟(ASF)病毒,僅今年就有11起確診,疫情防控上仍有很大挑戰。

南韓聯合新聞通訊社報導,去年10月(18起)、11月(15起)、12月(22起)平均每天有0.5~0.7起確診,但今年以來每天發現1起以上感染死豬。江原道華川地區最近首次發現感染非洲豬瘟的野豬屍體。

之前僅在京畿道漣川(26起)、坡州(22起)、江原道鐵原(17起)發現感染非洲豬瘟的野豬屍體,此次擴散到華川。

報導說,政府9日召開會議檢討非洲豬瘟疫情應對情況。雖然確診疫情數在增加,但政府官員認為,這是防疫過程中的自然現象。不過由於家豬傳播非洲豬瘟病毒的可能性沒有完全消失,將繼續加強防疫監管。

本站聲明:網站內容來源環境資訊中心https://e-info.org.tw/,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※為什麼 USB CONNECTOR 是電子產業重要的元件?

網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

※台北網頁設計公司全省服務真心推薦

※想知道最厲害的網頁設計公司"嚨底家"!

新北清潔公司,居家、辦公、裝潢細清專業服務

※推薦評價好的iphone維修中心

番茄感染病毒 法國出現首批確診案例

摘錄自2020年2月18日中央社報導

法國農業部今天(18日)表示,法國最西邊菲尼斯泰爾省(Finistere)的番茄植物已遭一種毀滅性病毒污染,這種病毒可能導致全部作物付之一炬。

法新社報導,法國農業部表示,已隔離一座農場,並將摧毀充滿番茄的多座溫室,目前沒有已知的治療方法。

這種病毒稱作「番茄褐色皺紋果病毒」(ToBRFV),可造成番茄出現粗糙變色斑塊,以致番茄賣不出去。官員先前警告,這種病毒的散播對農夫將有「重大經濟後果」。

這種病毒對人類無害。2014年於以色列的溫室中傳出首例,之後傳播至歐洲及美洲。

本站聲明:網站內容來源環境資訊中心https://e-info.org.tw/,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

USB CONNECTOR掌控什麼技術要點? 帶您認識其相關發展及效能

台北網頁設計公司這麼多該如何選擇?

※智慧手機時代的來臨,RWD網頁設計為架站首選

※評比南投搬家公司費用收費行情懶人包大公開

※幫你省時又省力,新北清潔一流服務好口碑

※回頭車貨運收費標準