推薦算法之用矩陣分解做協調過濾——LFM模型

隱語義模型(Latent factor model,以下簡稱LFM),是基於矩陣分解的推薦算法,在其基本算法上引入L2正則的FunkSVD算法在推薦系統領域更是廣泛使用,在Spark上也有其實現。本文將對 LFM原理進行詳細闡述,給出其基本算法原理。此外,還將介紹使得隱語義模型聲名大噪的算法FunkSVD和在其基礎上改進較為成功的BiasSVD。最後,對LFM進行一個較為全面的總結。

1. 矩陣分解應用於推薦算法要解決的問題

在推薦系統中,我們經常可能面臨的場景是:現有大量用戶和物品,以及少部分用戶對少部分物品的評分,我們需要使用現有的用戶對少部分物品的評分去推測用戶對物品集中其他物品的可能的評分,從而將預測中評分高的物品推薦給用戶。例如下面的用戶物品評分表:

用戶\物品 物品 1 物品 2 物品 3 物品 4 物品 5
用戶 1 3 2
用戶 2 1 2 6
用戶 3 3 4 6
用戶 4 1 2 5
用戶 5 4 2 3

對於每個用戶,我們希望較準確的預測出其對未評分物品的評分。將m個用戶和n個物品的評分看做一個矩陣M,從而將矩陣分解應用到該場景,即可解決這一問題。而本文,將關注於矩陣分解用於到推薦的方法之一,即LFM算法的解決方案。

2. LFM

LFM算法的核心思想是通過隱含特徵(Latent factor)聯繫用戶和物品,該算法最早在文本挖掘領域中被提出用於找到文本的隱含語義,相關名詞還有LDATopic Model等。

2.1 如何表示用戶的偏好和物品(item)屬性?

在被問到這個問題時,針對MovieLens(電影評分)數據集,你可能會說用戶是否喜歡動作片,物品所屬電影類型去回答。但用戶對其他類型的電影的偏好程度呢?物品在其它類型所佔的權重又是多少呢?就算針對現有的電影類型去表徵用戶偏好和物品,那麼能否能夠完全的表示出用戶的偏好和物品屬性呢?答案是不能,比如用戶喜歡看成龍的電影這個偏好沒法表示出來,電影由誰導演,主演是誰沒法表示。但你要問我用哪些屬性去表徵呢?這個誰也無法給出一個很好的答案,粒度很難控制。

2.2 LFM來救場

隱語義模型較好的解決了該問題,它從數據出發,通過基於用戶行為統計的自動聚類,可指定出表徵用戶偏好和物品的向量維度,最終得到用戶的偏好向量以及物品的表徵向量。LFM通過以下公式計算用戶u對物品i的偏好:
\[ preference(u, i)=p^T_u q_i=\sum_f^F{p_{u,k}q_{i,k}} \]
這個公式,\(p_{u,k}\)度量了用戶u的偏好和第f個隱類的關係,\(q_{i,k}\)度量了物品i和第f個隱類的關係。

那麼現在,我們期望用戶的評分矩陣M這樣分解:
\[ M_{m*n}=P^T_{m*k}Q_{k*n} \]
那麼,我們如何將矩陣分解呢?這裏採用了線性回歸的思想,即盡可能的讓用戶的評分和我們預測的評分的殘差盡可能小,也就是說,可以用均方差作為損失函數來尋找最終的PQ。考慮所有的用戶和樣本的組合,我們期望的最小化損失函數為:
\[ \sum_{i,j}{(m_{ij}-p_i^Tq_j)^2} \]
只要我們能夠最小化上面的式子,並求出極值所對應的\(p_i\)\(q_j\),則我們最終可以得到矩陣PQ,那麼對於任意矩陣M任意一個空白評分的位置,我們就可以通過\(p^T_i q_j\)計算預測評分。

2.3 FunkSVD用於推薦

上面是隱語義模型LFM的基本原理,但在實際業務中,為防止過擬合,我們常常會加入一個L2的正則化項,這也就誕生了我們的FunkSVD算法。其優化目標函數\(J(p,q)\)定義為:
\[ \underbrace{argmin}_{p_i,q_j}\sum_{i,j}{(m_{ij}-p^T_iq_j)^2+\lambda({\Arrowvert{p_i}\Arrowvert}^2_2+{\Arrowvert{q_i}\Arrowvert}^2_2)} \]
其中λ為正則化係數,需要調參。對於這個優化問題,我們一般通過梯度下降法來進行優化得到結果。

將上式分別對\(p_i\)\(q_j\)求導我們得到:
\[ \frac{\partial{J}}{\partial{p_i}}=-2(m_{ij}-p^T_iq_j)q_j+2\lambda{p_i} \]

\[ \frac{\partial{J}}{\partial{q_j}}=-2(m_{ij}-p^T_iq_j)p_i+2\lambda{q_j} \]

則梯度下降中迭代公式為:
\[ p_i = p_i +\alpha((m_{ij}-p^T_iq_j)q_j-\lambda{p_i}) \]

\[ q_j = q_j+\alpha((m_{ij}-p^T_iq_j)p_i-\lambda{q_j}) \]

通過迭代我們最終可以得到PQ,進而用於推薦。

為讀者進一步理解,筆者實現了基於MovieLens數據集實現了該方法。代碼詳見github:

2.4 BiasSVD用於推薦

BiasSVDFunkSVD較為成功的改進版算法。BiasSVD假設評分系統包括三部分的偏置因素:一些和用戶物品無關的評分因素。用戶有一些和物品無關的評分因素,稱為用戶偏置項。而物品也有一些和用戶無關的評分因素,稱為物品偏置項。這很好理解,對於樂觀的用戶來說,它的評分行為普遍偏高,而對批判性用戶來說,他的評分記錄普遍偏低,即使他們對同一物品的評分相同,但是他們對該物品的喜好程度卻並不一樣。同理,對物品來說,以電影為例,受大眾歡迎的電影得到的評分普遍偏高,而一些爛片的評分普遍偏低,這些因素都是獨立於用戶或產品的因素,而和用戶對產品的的喜好無關。

假設評分系統平均分為μ,第i個用戶的用戶偏置項為\(b_i\),而第j個物品的物品偏置項為\(b_j\),則加入了偏置項以後的優化目標函數\(J(p_i,q_j)\)是這樣的:
\[ \underbrace{argmin}_{p_i,q_j}\sum_{i,j}{(m_{ij}-p^T_iq_j-u-b_i-b_j)^2+\lambda({\Arrowvert{p_i}\Arrowvert}^2_2+{\Arrowvert{q_i}\Arrowvert}^2_2+{\Arrowvert{b_i}\Arrowvert}^2_2+{\Arrowvert{b_j}\Arrowvert}^2_2)} \]
這個優化目標也可以採用梯度下降法求解。和FunkSVD不同的是,此時我們多了兩個偏執項\(b_i\)\(b_j\)\(p_i\)\(q_j\)的迭代公式和FunkSVD類似,只是每一步的梯度導數稍有不同而已。\(b_i\)\(b_j\)一般可以初始設置為0,然後參与迭代。迭代公式為:
\[ p_i = p_i +\alpha((m_{ij}-p^T_iq_j-u-b_i-b_j)q_j-\lambda{p_i}) \]

\[ q_j = q_j+\alpha((m_{ij}-p^T_iq_j-u-b_i-b_j)p_i-\lambda{q_j}) \]

\[ b_i=b_i+\alpha(m_{ij}-p^T_iq_j-u-b_i-b_j-\lambda{b_i}) \]

\[ b_j=b_j+\alpha(m_{ij}-p^T_iq_j-u-b_i-b_j-\lambda{b_j}) \]

通過迭代我們最終可以得到PQ,進而用於推薦。BiasSVD增加了一些額外因素的考慮,因此在某些場景會比FunkSVD表現好。

為讀者進一步理解,筆者實現了基於MovieLens數據集實現了該方法。代碼詳見github

小結

LFM 是一種基於機器學習的方法,具有比較好的理論基礎,通過優化一個設定的指標建立最優的模型。它實質上是矩陣分解應用到推薦的方法,其中FunkSVD更是將矩陣分解用於推薦方法推到了新的高度,在實際應用中使用非常廣泛。當然矩陣分解方法也在不停的進步,目前矩陣分解推薦算法中分解機方法(factorization machine, FM)已成為一個趨勢。

對於矩陣分解用於推薦方法本身來說,它容易編程實現,實現複雜度低,預測效果也好,同時還能保持擴展性。這些都是其寶貴的優點。但是LFM 無法給出很好的推薦解釋,它計算出的隱類雖然在語義上確實代表了一類興趣和物品,卻很難用自然語言描述並生成解釋展現給用戶。

LFM 在建模過程中,假設有 M 個用戶、 N 個物品、 K 條用戶對物品的行為記錄,如果是 F 個隱類,那麼它離線計算的空間複雜度是 \(O(F*(M+N))\) ,迭代 S次則時間複雜度為 \(O(K * F * S)\)。當 M(用戶數量)和 N(物品數量)很大時LFM相對於ItemCFUserCF可以很好地節省離線計算的內存,在時間複雜度由於LFM會多次迭代上所以和ItemCFUserCF沒有質的差別。

同時,遺憾的是,LFM 無法進行在線實時推薦,即當用戶有了新的行為後,他的推薦列表不會發生變化。而從 LFM的預測公式可以看到, LFM 在給用戶生成推薦列表時,需要計算用戶對所有物品的興趣權重,然後排名,返回權重最大的 N 個物品。那麼,在物品數很多時,這一過程的時間複雜度非常高,可達 \(O(M*N*F)\) 。因此, LFM 不太適合用於物品數非常龐大的系統,如果要用,我們也需要一個比較快的算法給用戶先計算一個比較小的候選列表,然後再用LFM重新排名。另一方面,LFM 在生成一個用戶推薦列表時速度太慢,因此不能在線實時計算,而需要離線將所有用戶的推薦結果事先計算好存儲在數據庫中。

參考:

  • 推薦系統實戰—項亮

(歡迎轉載,轉載請註明出處。歡迎溝通交流: losstie@outlook.com)

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理【其他文章推薦】

網頁設計公司推薦更多不同的設計風格,搶佔消費者視覺第一線

※廣告預算用在刀口上,網站設計公司幫您達到更多曝光效益

※自行創業 缺乏曝光? 下一步"網站設計"幫您第一時間規劃公司的門面形象

Docker基礎與實戰,看這一篇就夠了

docker 基礎

什麼是Docker

Docker 使用 Google 公司推出的 Go 語言 進行開發實現,基於 Linux 內核的 cgroupnamespace,以及 AUFS 類的 Union FS 等技術,對進程進行封裝隔離,屬於 操作系統層面的虛擬化技術。由於隔離的進程獨立於宿主和其它的隔離的進程,因此也稱其為容器。

Docker 在容器的基礎上,進行了進一步的封裝,從文件系統、網絡互聯到進程隔離等等,極大的簡化了容器的創建和維護。使得 Docker 技術比虛擬機技術更為輕便、快捷。

記住最重要的一點,Dokcer實際是宿主機的一個普通的進程,這也是Dokcer與傳統虛擬化技術的最大不同。

為什麼要使用Docker

使用Docker最重要的一點就是Docker能保證運行環境的一致性,不會出現開發、測試、生產由於環境配置不一致導致的各種問題,一次配置多次運行。使用Docker,可更快地打包、測試以及部署應用程序,並可減少從編寫到部署運行代碼的周期。

docker 安裝

  • Docker 要求 CentOS 系統的內核版本高於 3.10 ,查看本頁面的前提條件來驗證你的CentOS 版本是否支持 Docker 。
    uname -r

  • 更新yum,升級到最新版本
    yum update

  • 卸載老版本的docker(若有)
    yum remove docker docker-common docker-selinux docker-engine
    執行該命令只會卸載Docker本身,而不會刪除Docker存儲的文件,例如鏡像、容器、卷以及網絡文件等。這些文件保存在/var/lib/docker 目錄中,需要手動刪除。

  • 查看yum倉庫,查看是否有docker
    ll /etc/yum.repos.d/

    如果用的廠商的服務器(阿里雲、騰訊雲)一般都會有docker倉庫,如果用的是虛擬機或者公司的服務器基本會沒有。

  • 安裝軟件包, yum-util 提供yum-config-manager功能,另外兩個是devicemapper驅動依賴的
    yum install -y yum-utils device-mapper-persistent-data lvm2

  • 安裝倉庫
    yum-config-manager --add-repo https://download.docker.com/linux/centos/docker-ce.repo

  • 查看docker版本
    yum list docker-ce --showduplicates | sort -r

  • 安裝docker
    yum install docker-ce
    以上語句是是安裝最新版本的Docker,你也可以通過yum install docker-ce-<VERSION> 安裝指定版本

  • 啟動docker
    systemctl start docker

  • 驗證安裝是否正確
    dokcer run hello-world

docker 重要命令

鏡像相關

  • 搜索鏡像docker search
    docker search nginx Docker就會在Docker Hub中搜索含有“nginx”這個關鍵詞的鏡像倉庫

  • 下載鏡像docker pull
    docker pull nginx Docker就會在Docker Hub中下載含有“nginx”最新版本的鏡像
    當然也可以使用docker pull reg.jianzh5.com/nginx:1.7.9 下載指定倉庫地址標籤的nginx鏡像

  • 列出鏡像docker images

  • 刪除鏡像docker rmi
    docker rmi hello-world刪除我們剛剛下載的hello-world鏡像

  • 構建鏡像docker build
    通過Dockerfile構建鏡像,這個我們等下再拿出來詳細說明。

容器相關

  • 新建啟動鏡像docker run
    這個命令是我們最常用的命令,主要使用以下幾個選項
    ① -d選項:表示後台運行
    ② -P選項(大寫):隨機端口映射
    ③ -p選項(小寫):指定端口映射,前面是宿主機端口後面是容器端口,如docker run nginx -p 8080:80,將容器的80端口映射到宿主機的8080端口,然後使用localhost:8080就可以查看容器中nginx的歡迎頁了
    ④ -v選項:掛載宿主機目錄,前面是宿主機目錄,後面是容器目錄,如docker run -d -p 80:80 -v /dockerData/nginx/conf/nginx.conf:/etc/nginx/nginx.conf nginx 掛載宿主機的/dockerData/nginx/conf/nginx.conf的文件,這樣就可以在宿主機對nginx進行參數配置了,注意目錄需要用絕對路徑,不要使用相對路徑,如果宿主機目錄不存在則會自動創建。
    ⑤–rm : 停止容器後會直接刪除容器,這個參數在測試是很有用,如docker run -d -p 80:80 --rm nginx
    ⑥–name : 給容器起個名字,否則會出現一長串的自定義名稱如 docker run -name niginx -d -p 80:80 - nginx

  • 列出容器 docker ps
    這個命令可以列出當前運行的容器,使用-a參數后列出所有的容器(包括已停止的)

  • 停止容器docker stop
    docker stop 5d034c6ea010 後面跟的是容器ID,也可以使用容器名稱

  • 啟動停止的容器docker start
    docker run是新建容器並啟動,docker start 是啟動停止的容器,如docker start 5d034c6ea010

  • 重啟容器docker restart
    此命令執行的過程實際是先執行docker stop,然後再執行docker start,如docker restart 5d034c6ea010

  • 進入容器docker exec -it 容器id /bin/bash
    docker exec -it 5d034c6ea010 /bin/bash,就相當於進入了容器本身的操作系統

  • 刪除容器 docker rm
    docker rm 5d034c6ea010 後面跟的是容器ID,刪除容器之前需要先停止容器運行

  • 數據拷貝docker cp
    此命令用於容器與宿主機之間進行數據拷貝,如 docker cp 5d034c6ea010: /etc/nginx/nginx.conf /dockerData/nginx/conf/nginx.conf 將容器的目錄文件拷貝到宿主機指定位置,容器ID可以替換成容器名。

命令實戰

如果我們需要一個nginx容器,並且需要在宿主機上直接修改nginx的配置文件、默認主頁,在宿主機可以實時看到容器nginx的日誌。我們可以按照如下的方式一步一步完成。

  • 使用–rm參數啟動容器,方便刪除
    docker run -d -p 8081:80 --name nginx --rm nginx

  • 進入容器,查看容器中配置文件、項目文件、日誌文件的目錄地址
    docker exec -it 9123b67e428e /bin/bash

  • 導出容器的配置文件
    docker cp nginx:/etc/nginx/nginx.conf /dockerData/nginx/conf/nginx.conf導出配置文件 nginx.conf
    docker cp nginx:/etc/nginx/conf.d /dockerData/nginx/conf/conf.d導出配置目錄 conf.d

  • 停止容器docker stop 9123b67e428e,由於加了–rm參數,容器會自動刪除

  • 再以如下命令啟動容器,完成目錄掛載
    shell docker run -d -p 8081:80 --name nginx \ -v /dockerData/nginx/conf/nginx.conf:/etc/nginx/nginx.conf \ -v /dockerData/nginx/conf/conf.d:/etc/nginx/conf.d \ -v /dockerData/nginx/www:/usr/share/nginx/html \ -v /dockerData/nginx/logs:/var/log/nginx nginx
  • 訪問服務器地址http://192.168.136.129:8081/

    訪問報錯,這時候就進入宿主機的日誌目錄/dockerData/nginx/logs查看日誌
    2019/11/23 10:08:11 [error] 6#6: *1 directory index of “/usr/share/nginx/html/” is forbidden, client: 192.168.136.1, server: localhost, request: “GET / HTTP/1.1”, host: “192.168.136.129:8081”
    因為/usr/share/nginx/html/被掛載到了服務器上面的/dockerData/nginx/www目錄下,原來的歡迎頁面在dockerData/nginx/www是沒有的,所有就報錯了,這裏我們隨便建一個。

  • 建立默認主頁
    shell #打開項目文件 cd /dockerData/nginx/www #使用vim 創建並編輯文件 vi index.html #此時我們會進入vim界面,按 i 插入,然後輸入 <h1 align="center">Hello,Welcome to Docker World</h1> #輸入完后,按 esc,然後輸入 :wq
  • 再次訪問瀏覽器地址

Dockerfile

我們可以使用Dockfile構建一個鏡像,然後直接在docker中運行。Dockerfile文件為一個文本文件,裡面包含構建鏡像所需的所有的命令,首先我們來認識一下Dockerfile文件中幾個重要的指令。

指令詳解

  • FROM
    選擇一個基礎鏡像,然後在基礎鏡像上進行修改,比如構建一個SpringBoot項目的鏡像,就需要選擇java這個基礎鏡像,FROM需要作為Dockerfile中的第一條指令
    如:FROM openjdk:8-jdk-alpine 基礎鏡像如果可以的話最好使用alpine版本的,採用alpline版本的基礎鏡像構建出來的鏡像會小很多。

  • RUN
    RUN指令用來執行命令行命令的。它有一下兩種格式:

    • shell 格式:RUN ,就像直接在命令行中輸入的命令一樣。 RUN echo '<h1>Hello, Docker!</h1>' > /usr/share/nginx/html/index.html
    • exec 格式:RUN [“可執行文件”, “參數1”, “參數2”],這更像是函數調用中的格式。
  • CMD
    此指令就是用於指定默認的容器主進程的啟動命令的。
    CMD指令格式和RUN相似,也是兩種格式
    • shell 格式:CMD
    • exec 格式:CMD [“可執行文件”, “參數1”, “參數2″…]
    • 參數列表格式:CMD [“參數1”, “參數2″…]。在指定了 ENTRYPOINT 指令后,用 CMD 指定具體的參數。
  • ENTRYPOINT
    ENTRYPOINT 的格式和 RUN 指令格式一樣,分為 exec 格式和 shell 格式。 ENTRYPOINT 的目的和 CMD 一樣,都是在指定容器啟動程序及參數。ENTRYPOINT 在運行時也可以替代,不過比 CMD 要略顯繁瑣,需要通過 docker run 的參數 --entrypoint 來指定。
    當指定了 ENTRYPOINT 后,CMD 的含義就發生了改變,不再是直接的運行其命令,而是將 CMD 的內容作為參數傳給 ENTRYPOINT 指令,換句話說實際執行時,將變為:
    <ENTRYPOINT> "<CMD>"

  • COPY & ADD
    這2個指令都是複製文件,它將從構建上下文目錄中   的文件/目錄 複製到新的一層的鏡像內的   位置。比如: COPY demo-test.jar app.jarADD demo-test.jar app.jar
    ADD指令比 COPY高級點,可以指定一個URL地址,這樣Docker引擎會去下載這個URL的文件,如果 ADD後面是一個 tar文件的話,Dokcer引擎還會去解壓縮。
    我們在構建鏡像時盡可能使用 COPY,因為 COPY 的語義很明確,就是複製文件而已,而 ADD 則包含了更複雜的功能,其行為也不一定很清晰。

  • EXPOSE
    聲明容器運行時的端口,這隻是一個聲明,在運行時並不會因為這個聲明應用就會開啟這個端口的服務。在 Dockerfile 中寫入這樣的聲明有兩個好處,一個是幫助鏡像使用者理解這個鏡像服務的守護端口,以方便配置映射;另一個用處則是在運行時使用隨機端口映射時,也就是 docker run -P 時,會自動隨機映射 EXPOSE 的端口。
    要將 EXPOSE 和在運行時使用 -p <宿主端口>:<容器端口> 區分開來。-p,是映射宿主端口和容器端口,換句話說,就是將容器的對應端口服務公開給外界訪問,而 EXPOSE 僅僅是聲明容器打算使用什麼端口而已,並不會自動在宿主進行端口映射。

  • ENV
    這個指令很簡單,就是設置環境變量,無論是後面的其它指令,如 RUN,還是運行時的應用,都可以直接使用這裏定義的環境變量。它有如下兩種格式:
    • ENV <key> <value>
    • ENV <key1>=<value1> <key2>=<value2>...
  • VOLUME
    該指令使容器中的一個目錄具有持久化存儲的功能,該目錄可被容器本身使用,也可共享給其他容器。當容器中的應用有持久化數據的需求時可以在Dockerfile中使用該指令。如VOLUME /tmp
    這裏的 /tmp 目錄就會在運行時自動掛載為匿名卷,任何向 /tmp 中寫入的信息都不會記錄進容器存儲層,從而保證了容器存儲層的無狀態化。當然,運行時可以覆蓋這個掛載設置。比如:
    docker run -d -v mydata:/tmp xxxx

  • LABEL
    你可以為你的鏡像添加labels,用來組織鏡像,記錄版本描述,或者其他原因,對應每個label,增加以LABEL開頭的行,和一個或者多個鍵值對。如下所示:
    LABEL version="1.0" LABEL description="test"

Dockerfile實戰

我們以一個簡單的SpringBoot項目為例構建基於SpringBoot應用的鏡像。
功能很簡單,只是對外提供了一個say接口,在進入這個方法的時候打印出一行日誌,並將日誌寫入日誌文件。

@SpringBootApplication
@RestController
@Log4j2
public class DockerApplication {

    public static void main(String[] args) {
        SpringApplication.run(DockerApplication.class, args);
    }

    @GetMapping("/say")
    public String say(){
        log.info("get say request...");
        return "Hello,Java日知錄";
    }
    
}

我們使用maven將其打包成jar文件,放入一個單獨的文件夾,然後按照下面步驟一步步構建鏡像並執行

  • 在當前文件夾建立Dockerfile文件,文件內容如下:
    properties FROM openjdk:8-jdk-alpine #將容器中的/tmp目錄作為持久化目錄 VOLUME /tmp #暴露端口 EXPOSE 8080 #複製文件 COPY docker-demo.jar app.jar #配置容器啟動后執行的命令 ENTRYPOINT ["java","-Djava.security.egd=file:/dev/./urandom","-jar","/app.jar"]
  • 使用如下命令構建鏡像
    docker built -t springboot:v1.0 .

    -t 指定鏡像的名稱及版本號,注意後面需要以 . 結尾。

  • 查看鏡像文件

  • 運行構建的鏡像
    docker run -v /app/docker/logs:/logs -p 8080:8080 --rm --name springboot springboot:v1.0

  • 瀏覽器訪問http://192.168.136.129:8080/say

  • 在宿主機上實時查看日誌
    tail -100f /app/docker/logs/docker-demo-info.log

    請關注個人公眾號:JAVA日知錄

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理【其他文章推薦】

※帶您來了解什麼是 USB CONNECTOR  ?

※自行創業 缺乏曝光? 下一步"網站設計"幫您第一時間規劃公司的門面形象

※如何讓商品強力曝光呢? 網頁設計公司幫您建置最吸引人的網站,提高曝光率!!

※綠能、環保無空污,成為電動車最新代名詞,目前市場使用率逐漸普及化

※廣告預算用在刀口上,網站設計公司幫您達到更多曝光效益

同步鎖基本原理與實現

  為充分利用機器性能,人們發明了多線程。但同時帶來了線程安全問題,於是人們又發明了同步鎖。

  這個問題自然人人知道,但你真的了解同步鎖嗎?還是說你會用其中的上鎖與解鎖功能?

  今天我們就一起來深入看同步鎖的原理和實現吧!

 

一、同步鎖的職責

  同步鎖的職責可以說就一個,限制資源的使用(線程安全從屬)。

  它一般至少會包含兩個功能: 1. 給資源加鎖; 2. 給資源解鎖;另外,它一般還有 等待/通知 即 wait/notify 的功能;

  同步鎖的應用場景:多個線程同時操作一個事務必須保證正確性;一個資源只能同時由一線程訪問操作;一個資源最多只能接入k的併發訪問;保證訪問的順序性;

  同步鎖的實現方式:操作系統調度實現;應用自行實現;CAS自旋;

  同步鎖的幾個問題:

    為什麼它能保證線程安全?

    鎖等待耗CPU嗎?

    使用鎖后性能下降嚴重的原因是啥?

 

二、同步鎖的實現一:lock/unlock

  其實對於應用層來說,非常多就是 lock/unlock , 這也是鎖的核心。

  AQS 是java中很多鎖實現的基礎,因為它屏蔽了很多繁雜而底層的阻塞操作,為上層抽象出易用的接口。

  我們就以AQS作為跳板,先來看一下上鎖的過程。為不至於陷入具體鎖的業務邏輯中,我們先以最簡單的 CountDownLatch 看看。

    // 先看看 CountDownLatch 的基礎數據結構,可以說是不能再簡單了,就繼承了 AQS,然後簡單覆寫了幾個必要方法。
    // java.util.concurrent.CountDownLatch.Sync
    /**
     * Synchronization control For CountDownLatch.
     * Uses AQS state to represent count.
     */
    private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;

        Sync(int count) {
            setState(count);
        }

        int getCount() {
            return getState();
        }

        protected int tryAcquireShared(int acquires) {
            // 只有一種情況會獲取鎖成功,即 state == 0 的時候
            return (getState() == 0) ? 1 : -1;
        }

        protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                // 原始的鎖數量是在初始化時指定的不可變的,每次釋放一個鎖標識
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    // 只有一情況會釋放鎖成功,即本次釋放后 state == 0
                    return nextc == 0;
            }
        }
    }
    private final Sync sync;

 

重點1,我們看看上鎖過程,即 await() 的調用。

    public void await() throws InterruptedException {
        // 調用 AQS 的接口,由AQS實現了鎖的骨架邏輯
        sync.acquireSharedInterruptibly(1);
    }
    
    // java.util.concurrent.locks.AbstractQueuedSynchronizer#acquireSharedInterruptibly
    /**
     * Acquires in shared mode, aborting if interrupted.  Implemented
     * by first checking interrupt status, then invoking at least once
     * {@link #tryAcquireShared}, returning on success.  Otherwise the
     * thread is queued, possibly repeatedly blocking and unblocking,
     * invoking {@link #tryAcquireShared} until success or the thread
     * is interrupted.
     * @param arg the acquire argument.
     * This value is conveyed to {@link #tryAcquireShared} but is
     * otherwise uninterpreted and can represent anything
     * you like.
     * @throws InterruptedException if the current thread is interrupted
     */
    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        // 首先嘗試獲取鎖,如果成功就不用阻塞了
        // 而從上面的邏輯我們看到,獲取鎖相當之簡單,所以,獲取鎖本身並沒有太多的性能消耗喲
        // 如果獲取鎖失敗,則會進行稍後嘗試,這應該是複雜而精巧的
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }
    
    /**
     * Acquires in shared interruptible mode.
     * @param arg the acquire argument
     */
    private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        // 首先將當前線程添加排隊隊尾,此處會保證線程安全,稍後我們可以看到
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                // 獲取其上一節點,如果上一節點是頭節點,就代表當前線程可以再次嘗試獲取鎖了
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                // 先檢測是否需要阻塞,然後再進行阻塞等待,阻塞由 LockSupport 底層支持
                // 如果阻塞后,將不會主動喚醒,只會由 unlock 時,主動被通知
                // 因此,此處即是獲取鎖的最終等待點
                // 操作系統將不會再次調度到本線程,直到獲取到鎖
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

    // 如此線程安全地添加當前線程到隊尾? CAS 保證
    /**
     * Creates and enqueues node for current thread and given mode.
     *
     * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
     * @return the new node
     */
    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }
    /**
     * Inserts node into queue, initializing if necessary. See picture above.
     * @param node the node to insert
     * @return node's predecessor
     */
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }
    
    // 檢測是否需要進行阻塞
    /**
     * Checks and updates status for a node that failed to acquire.
     * Returns true if thread should block. This is the main signal
     * control in all acquire loops.  Requires that pred == node.prev.
     *
     * @param pred node's predecessor holding status
     * @param node the node
     * @return {@code true} if thread should block
     */
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
             // 只有前置節點是 SIGNAL 狀態的節點,才需要進行 阻塞等待,當然前置節點會在下一次循環中被設置好
            return true;
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
    
    // park 阻塞實現
    /**
     * Convenience method to park and then check if interrupted
     *
     * @return {@code true} if interrupted
     */
    private final boolean parkAndCheckInterrupt() {
        // 將當前 AQS 實例作為鎖對象 blocker, 進行操作系統調用阻塞, 所以所有等待鎖的線程將會在同一個鎖前提下執行
        LockSupport.park(this);
        return Thread.interrupted();
    }

  如上,上鎖過程是比較簡單明了的。加入一隊列,然後由操作系統將線程調出。(那麼操作系統是如何把線程調出的呢?有興趣自行研究)

 

重點2. 解鎖過程,即 countDown() 調用

    public void countDown() {
        // 同樣直接調用 AQS 的接口,由AQS實現了鎖的釋放骨架邏輯
        sync.releaseShared(1);
    }
    // java.util.concurrent.locks.AbstractQueuedSynchronizer#releaseShared
    /**
     * Releases in shared mode.  Implemented by unblocking one or more
     * threads if {@link #tryReleaseShared} returns true.
     *
     * @param arg the release argument.  This value is conveyed to
     *        {@link #tryReleaseShared} but is otherwise uninterpreted
     *        and can represent anything you like.
     * @return the value returned from {@link #tryReleaseShared}
     */
    public final boolean releaseShared(int arg) {
        // 調用業務實現的釋放邏輯,如果成功,再執行底層的釋放,如隊列移除,線程通知等等
        // 在 CountDownLatch 的實現中,只有 state == 0 時才會成功,所以它只會執行一次底層釋放
        // 這也是我們認為 CountDownLatch 能夠做到多線程同時執行的效果的原因之一
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }
    
    /**
     * Release action for shared mode -- signals successor and ensures
     * propagation. (Note: For exclusive mode, release just amounts
     * to calling unparkSuccessor of head if it needs signal.)
     */
    private void doReleaseShared() {
        /*
         * Ensure that a release propagates, even if there are other
         * in-progress acquires/releases.  This proceeds in the usual
         * way of trying to unparkSuccessor of head if it needs
         * signal. But if it does not, status is set to PROPAGATE to
         * ensure that upon release, propagation continues.
         * Additionally, we must loop in case a new node is added
         * while we are doing this. Also, unlike other uses of
         * unparkSuccessor, we need to know if CAS to reset status
         * fails, if so rechecking.
         */
        for (;;) {
            Node h = head;
            // 隊列不為空才進行釋放
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                // 看過上面的 lock 邏輯,我們知道只要在阻塞狀態,一定是 Node.SIGNAL 
                if (ws == Node.SIGNAL) {
                    // 狀態改變成功,才進行後續的喚醒邏輯
                    // 因為先改變狀態成功,才算是線程安全的,再進行喚醒,否則進入下一次循環再檢查
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    // 將頭節點的下一節點喚醒,如有必要
                    unparkSuccessor(h);
                }
                // 這裏的 propagates, 是要傳播啥呢??
                // 為什麼只喚醒了一個線程,其他線程也可以動了?
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }
    /**
     * Wakes up node's successor, if one exists.
     *
     * @param node the node
     */
    private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        // 喚醒下一個節點
        // 但如果下一節點已經取消等待了,那麼就找下一個沒最近的沒被取消的線程進行喚醒
        // 喚醒只是針對一個線程的喲
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }

 

重要3. 線程解鎖的傳播性?

  因為從上一節的講解中,我們看到,當用戶調用 countDown 時,僅僅是讓操作系統喚醒了 head 的下一個節點線程或者最近未取消的節點。那麼,從哪裡來的所有線程都獲取了鎖從而運行呢?

  其實是在 獲取鎖的過程中,還有一點我們未看清:

    // java.util.concurrent.locks.AbstractQueuedSynchronizer#doAcquireShared
    /**
     * Acquires in shared uninterruptible mode.
     * @param arg the acquire argument
     */
    private void doAcquireShared(int arg) {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    // 當countDown被調用后,head節點被喚醒,執行
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        // 獲取到鎖后,設置node為下一個頭節點,並把喚醒狀態傳播下去,而這裏面肯定會做一些喚醒其他線程的操作,請看下文
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    
    /**
     * Sets head of queue, and checks if successor may be waiting
     * in shared mode, if so propagating if either propagate > 0 or
     * PROPAGATE status was set.
     *
     * @param node the node
     * @param propagate the return value from a tryAcquireShared
     */
    private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // Record old head for check below
        setHead(node);
        /*
         * Try to signal next queued node if:
         *   Propagation was indicated by caller,
         *     or was recorded (as h.waitStatus either before
         *     or after setHead) by a previous operation
         *     (note: this uses sign-check of waitStatus because
         *      PROPAGATE status may transition to SIGNAL.)
         * and
         *   The next node is waiting in shared mode,
         *     or we don't know, because it appears null
         *
         * The conservatism in both of these checks may cause
         * unnecessary wake-ups, but only when there are multiple
         * racing acquires/releases, so most need signals now or soon
         * anyway.
         */
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            // 如果有必要,則做一次喚醒下一線程的操作
            // 在 countDown() 不會觸發此操作,所以這裏只是一個內部調用傳播
            Node s = node.next;
            if (s == null || s.isShared())
                // 此處鎖釋放邏輯如上,總之,又是另一次的喚醒觸發
                doReleaseShared();
        }
    }

  到此,我們明白了它是怎麼做到一個鎖釋放,所有線程可通行的。也從根本上回答了我們猜想,所有線程同時併發運行。然而並沒有,它只是通過喚醒傳播性來依次喚醒各個等待線程的。從絕對時間性上來講,都是有先後關係的。以後可別再淺顯說是同時執行了喲。

 

三、 鎖的切換:wait/notify

  上面看出,針對一個lock/unlock 的過程還是很簡單的,由操作系統負責大頭,實現代碼也並不多。

  但是針對稍微有點要求的場景,就會進行條件式的操作。比如:持有某個鎖運行一段代碼,但是,運行時發現某條件不滿足,需要進行等待而不能直接結束,直到條件成立。即所謂的 wait 操作。

  乍一看,wait/notify 與 lock/unlock 很像,其實不然。區分主要是 lock/unlock 是針對整個代碼段的,而 wait/notify 則是針對某個條件的,即獲取了鎖不代表條件成立了,但是條件成立了一定要在鎖的前提下才能進行安全操作。

  那麼,是否 wait/notify 也一樣的實現簡單呢?比如java的最基礎類 Object 類就提供了 wait/notify 功能。

  我們既然想一探究竟,還是以併發包下的實現作為基礎吧,畢竟 java 才是我們的強項。

  本次,咱們以  ArrayBlockingQueue#put/take 作為基礎看下這種場景的使用先。

  ArrayBlockingQueue 的put/take 特性就是,put當隊列滿時,一直阻塞,直到有可用位置才繼續運行下一步。而take當隊列為空時一樣阻塞,直到隊列里有數據才運行下一步。這種場景使用鎖主不好搞了,因為這是一個條件判斷。put/take 如下:

    // java.util.concurrent.ArrayBlockingQueue#put
    /**
     * Inserts the specified element at the tail of this queue, waiting
     * for space to become available if the queue is full.
     *
     * @throws InterruptedException {@inheritDoc}
     * @throws NullPointerException {@inheritDoc}
     */
    public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            // 當隊列滿時,一直等待
            while (count == items.length)
                notFull.await();
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }
    
    // java.util.concurrent.ArrayBlockingQueue#take
    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            // 當隊列為空時一直等待
            while (count == 0)
                notEmpty.await();
            return dequeue();
        } finally {
            lock.unlock();
        }
    }

  看起來相當簡單,完全符合人類思維。只是,這裏使用的兩個變量進行控制流程 notFull,notEmpty. 這兩個變量是如何進行關聯的呢?

  在這之前,我們還需要補充下上面的例子,即 notFull.await(), notEmpty.await(); 被阻塞了,何時才能運行呢?如上代碼在各自的入隊和出隊完成之後進行通知就可以了。

    // 與 put 對應,入隊完成后,隊列自然就不為空了,通知下 notEmpty 就好了
    /**
     * Inserts element at current put position, advances, and signals.
     * Call only when holding lock.
     */
    private void enqueue(E x) {
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        final Object[] items = this.items;
        items[putIndex] = x;
        if (++putIndex == items.length)
            putIndex = 0;
        count++;
        // 我已放入一個元素,不為空了
        notEmpty.signal();
    }
    // 與 take 對應,出隊完成后,自然就不可能是滿的了,至少一個空餘空間。
    /**
     * Extracts element at current take position, advances, and signals.
     * Call only when holding lock.
     */
    private E dequeue() {
        // assert lock.getHoldCount() == 1;
        // assert items[takeIndex] != null;
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        E x = (E) items[takeIndex];
        items[takeIndex] = null;
        if (++takeIndex == items.length)
            takeIndex = 0;
        count--;
        if (itrs != null)
            itrs.elementDequeued();
        // 我已移除一個元素,肯定沒有滿了,你們繼續放入吧
        notFull.signal();
        return x;
    }

  是不是超級好理解。是的。不過,我們不是想看 ArrayBlockingQueue 是如何實現的,我們是要論清 wait/notify 是如何實現的。因為畢竟,他們不是一個鎖那麼簡單。

    // 三個鎖的關係,即 notEmpty, notFull 都是 ReentrantLock 的條件鎖,相當於是其子集吧
    /** Main lock guarding all access */
    final ReentrantLock lock;

    /** Condition for waiting takes */
    private final Condition notEmpty;

    /** Condition for waiting puts */
    private final Condition notFull;
    
    public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }
    // lock.newCondition() 是什麼鬼?它是 AQS 中實現的 ConditionObject
    // java.util.concurrent.locks.ReentrantLock#newCondition
    public Condition newCondition() {
        return sync.newCondition();
    }
        // java.util.concurrent.locks.ReentrantLock.Sync#newCondition
        final ConditionObject newCondition() {
            // AQS 中定義
            return new ConditionObject();
        }

  接下來,我們要帶着幾個疑問來看這個 Condition 的對象:

    1. 它的 wait/notify 是如何實現的?
    2. 它是如何與互相進行聯繫的?
    3. 為什麼 wait/notify 必須要在外面的lock獲取之後才能執行?
    4. 它與Object的wait/notify 有什麼相同和不同點?

  能夠回答了上面的問題,基本上對其原理與實現也就理解得差不多了。

 

重點1. wait/notify 是如何實現的?

  我們從上面可以看到,它是通過調用 await()/signal() 實現的,到底做事如何,且看下面。

        // java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#await()
        /**
         * Implements interruptible condition wait.
         * <ol>
         * <li> If current thread is interrupted, throw InterruptedException.
         * <li> Save lock state returned by {@link #getState}.
         * <li> Invoke {@link #release} with saved state as argument,
         *      throwing IllegalMonitorStateException if it fails.
         * <li> Block until signalled or interrupted.
         * <li> Reacquire by invoking specialized version of
         *      {@link #acquire} with saved state as argument.
         * <li> If interrupted while blocked in step 4, throw InterruptedException.
         * </ol>
         */
        public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            // 添加當前線程到 等待線程隊列中,有 lastWaiter/firstWaiter 維護
            Node node = addConditionWaiter();
            // 釋放當前lock中持有的鎖,詳情且看下文
            int savedState = fullyRelease(node);
            // 從以下開始,將不再保證線程安全性,因為當前的鎖已經釋放,其他線程將會重新競爭鎖使用
            int interruptMode = 0;
            // 循環判定,如果當前節點不在 sync 同步隊列中,那麼就反覆阻塞自己
            // 所以判斷是否在 同步隊列上,是很重要的
            while (!isOnSyncQueue(node)) {
                // 沒有在同步隊列,阻塞
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            // 當條件被滿足后,需要重新競爭鎖,詳情看下文
            // 競爭到鎖后,原樣返回到 wait 的原點,繼續執行業務邏輯
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            // 下面是異常處理,忽略
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }
    /**
     * Invokes release with current state value; returns saved state.
     * Cancels node and throws exception on failure.
     * @param node the condition node for this wait
     * @return previous sync state
     */
    final int fullyRelease(Node node) {
        boolean failed = true;
        try {
            int savedState = getState();
            // 預期的,都是釋放鎖成功,如果失敗,說明當前線程並並未獲取到鎖,引發異常
            if (release(savedState)) {
                failed = false;
                return savedState;
            } else {
                throw new IllegalMonitorStateException();
            }
        } finally {
            if (failed)
                node.waitStatus = Node.CANCELLED;
        }
    }
    /**
     * Releases in exclusive mode.  Implemented by unblocking one or
     * more threads if {@link #tryRelease} returns true.
     * This method can be used to implement method {@link Lock#unlock}.
     *
     * @param arg the release argument.  This value is conveyed to
     *        {@link #tryRelease} but is otherwise uninterpreted and
     *        can represent anything you like.
     * @return the value returned from {@link #tryRelease}
     */
    public final boolean release(int arg) {
        // tryRelease 由客戶端自定義實現
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
    
    // 如何判定當前線程是否在同步隊列中或者可以進行同步隊列?
    /**
     * Returns true if a node, always one that was initially placed on
     * a condition queue, is now waiting to reacquire on sync queue.
     * @param node the node
     * @return true if is reacquiring
     */
    final boolean isOnSyncQueue(Node node) {
        // 如果上一節點還沒有被移除,當前節點就不能被加入到同步隊列
        if (node.waitStatus == Node.CONDITION || node.prev == null)
            return false;
        // 如果當前節點的下游節點已經存在,則它自身必定已經被移到同步隊列中
        if (node.next != null) // If has successor, it must be on queue
            return true;
        /*
         * node.prev can be non-null, but not yet on queue because
         * the CAS to place it on queue can fail. So we have to
         * traverse from tail to make sure it actually made it.  It
         * will always be near the tail in calls to this method, and
         * unless the CAS failed (which is unlikely), it will be
         * there, so we hardly ever traverse much.
         */
         // 最終直接從同步隊列中查找,如果找到,則自身已經在同步隊列中
        return findNodeFromTail(node);
    }

    /**
     * Returns true if node is on sync queue by searching backwards from tail.
     * Called only when needed by isOnSyncQueue.
     * @return true if present
     */
    private boolean findNodeFromTail(Node node) {
        Node t = tail;
        for (;;) {
            if (t == node)
                return true;
            if (t == null)
                return false;
            t = t.prev;
        }
    }
    
    // 當條件被滿足后,需要重新競爭鎖,以保證外部的鎖語義,因為之前自己已經將鎖主動釋放
    // 這個鎖與 lock/unlock 時的一毛一樣,沒啥可講的
    // java.util.concurrent.locks.AbstractQueuedSynchronizer#acquireQueued
    /**
     * Acquires in exclusive uninterruptible mode for thread already in
     * queue. Used by condition wait methods as well as acquire.
     *
     * @param node the node
     * @param arg the acquire argument
     * @return {@code true} if interrupted while waiting
     */
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

  總結一下 wait 的邏輯:

    1. 前提:自身已獲取到外部鎖;
    2. 將當前線程添加到 ConditionQueue 等待隊列中;
    3. 釋放已獲取到的鎖;
    4. 反覆檢查進入等待,直到當前節點被移動到同步隊列中;
    5. 條件滿足被喚醒,重新競爭外部鎖,成功則返回,否則繼續阻塞;(外部鎖是同一個,這也是要求兩個對象必須存在依賴關係的原因)
    6. wait前線程持有鎖,wait后線程持有鎖,沒有一點外部鎖變化;

 

重點2. 釐清了 wait, 接下來,我們看 signal() 通知喚醒的實現:

        // java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#signal
        /**
         * Moves the longest-waiting thread, if one exists, from the
         * wait queue for this condition to the wait queue for the
         * owning lock.
         *
         * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
         *         returns {@code false}
         */
        public final void signal() {
            // 只有獲取鎖的實例,才可以進行signal,否則你拿什麼去保證線程安全呢
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            // 通知 firstWaiter 
            if (first != null)
                doSignal(first);
        }
        
        /**
         * Removes and transfers nodes until hit non-cancelled one or
         * null. Split out from signal in part to encourage compilers
         * to inline the case of no waiters.
         * @param first (non-null) the first node on condition queue
         */
        private void doSignal(Node first) {
            // 最多只轉移一個 節點
            do {
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
                first.nextWaiter = null;
            } while (!transferForSignal(first) &&
                     (first = firstWaiter) != null);
        }
    // 將一個節點從 等待隊列 移動到 同步隊列中,即可參与下一輪競爭
    // 只有確實移動成功才會返回 true
    // 說明:當前線程是持有鎖的線程
    // java.util.concurrent.locks.AbstractQueuedSynchronizer#transferForSignal
    /**
     * Transfers a node from a condition queue onto sync queue.
     * Returns true if successful.
     * @param node the node
     * @return true if successfully transferred (else the node was
     * cancelled before signal)
     */
    final boolean transferForSignal(Node node) {
        /*
         * If cannot change waitStatus, the node has been cancelled.
         */
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;

        /*
         * Splice onto queue and try to set waitStatus of predecessor to
         * indicate that thread is (probably) waiting. If cancelled or
         * attempt to set waitStatus fails, wake up to resync (in which
         * case the waitStatus can be transiently and harmlessly wrong).
         */
        // 同步隊列由 head/tail 指針維護
        Node p = enq(node);
        int ws = p.waitStatus;
        // 注意,此處正常情況下並不會喚醒等待線程,僅是將隊列轉移。 
        // 因為當前線程的鎖保護區域並未完成,完成后自然會喚醒其他等待線程
        // 否則將會存在當前線程任務還未執行完成,卻被其他線程搶了先去,那接下來的任務當如何??
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);
        return true;
    }

  總結一下,notify 的功能原理如下:

    1. 前提:自身已獲取到外部鎖;
    2. 轉移下一個等待隊列的節點到同步隊列中;
    3. 如果遇到下一節點被取消情況,順延到再下一節點直到為空,至多轉移一個節點;
    4. 正常情況下不做線程的喚醒操作;

  所以,實現 wait/notify, 最關鍵的就是維護兩個隊列,等待隊列與同步隊列,而且都要求是在有外部鎖保證的情況下執行。

  到此,我們也能回答一個問題:為什麼wait/notify一定要在鎖模式下才能運行?

  因為wait是等待條件成立,此時必定存在競爭需要做保護,而它自身又必須釋放鎖以使外部條件可成立,且後續需要做恢復動作;而notify之後可能還有後續工作必須保障安全,notify只是鎖的一個子集。。。

 

四、通知所有線程的實現:notifyAll

  有時條件成立后,可以允許所有線程通行,這時就可以進行 notifyAll, 那麼如果達到通知所有的目的呢?是一起通知還是??

  以下是 AQS 中的實現:

        // java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#signalAll
        public final void signalAll() {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            if (first != null)
                doSignalAll(first);
        }
        /**
         * Removes and transfers all nodes.
         * @param first (non-null) the first node on condition queue
         */
        private void doSignalAll(Node first) {
            lastWaiter = firstWaiter = null;
            do {
                Node next = first.nextWaiter;
                first.nextWaiter = null;
                transferForSignal(first);
                first = next;
            } while (first != null);
        }

  可以看到,它是通過遍歷所有節點,依次轉移等待隊列到同步隊列(通知)的,原本就沒有人能同時干幾件事的!

  本文從java實現的角度去解析同步鎖的原理與實現,但並不局限於java。道理總是相通的,只是像操作系統這樣的大佬,能幹的活更純粹:比如讓cpu根本不用調度一個線程。

 

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理【其他文章推薦】

網頁設計公司推薦更多不同的設計風格,搶佔消費者視覺第一線

※廣告預算用在刀口上,網站設計公司幫您達到更多曝光效益

※自行創業 缺乏曝光? 下一步"網站設計"幫您第一時間規劃公司的門面形象

this綁定方式總結

最近在回顧js的一些基礎知識,把《你不知道的js》系列又看了一遍,this始終是重中之重,還是決定把this相關知識做一個系統的總結,也方便自己日後回顧。

this的四條綁定規則

1.默認綁定

這是最常用的函數調用類型:獨立函數調用(即函數是直接使用不帶任何修飾的函數引用進行調用的)。可以把這條規則看作是無法應用其他規則時的默認規則。
默認綁定的this在非嚴格模式下指向window,嚴格模式下指向undefined,比如下面的函數foo在非嚴格模式下:

var a = 2;
function foo(){
    var a = 3;
    console.log(this.a);
}
foo(); //2

這裏的foo()方法內的this指向了window,因此window.a = 2;

嚴格模式下,this.指向undefined,因此訪問this.a會報錯:

var a = 2;
function foo(){
    "use strict";
    var a = 3;
    console.log(this.a);
}
foo(); //Uncaught TypeError: Cannot read property 'a' of undefined

2.隱式綁定

如果調用位置上有上下文對象,或者說被某個對象“擁有”或者“包
含”,則使用隱式綁定。

function foo() {
    console.log( this.a );
}
var obj = {
    a: 2,
    foo: foo
};
obj.foo(); // 2

上例中的foo是通過obj.foo()的方式調用的,調用位置會使用obj上下文來引用函數,因此foo中的this指向了obj。
另外foo是當做引用被加入到obj中的,但是無論是直接在obj 中定義還是先定義再添加為引用屬性,foo嚴格上來說都不屬於obj,因此上述定義裏面的“擁有”與“包含”加上了引號,這樣說是為了方便理解。
常見的隱式調用場景:
obj.fn();
arguments[i]();//其實就是將點的調用方式變為了[]調用
el.onClick(function(){console.log(this);//this指向el})

隱式丟失

先來看一段代碼:

function foo() {
    console.log( this.a );
}
var obj = {
    a: 2,
    foo: foo
};
var bar = obj.foo; // 函數別名!
var a = "global"; // a 是全局對象的屬性
bar(); // "global"

上述代碼其實只用看調用的方式:bar(),這其實是一個不帶任何修飾的函數調用,因此應用了默認綁定。
還有一種參數傳遞的方式也會發生隱式丟失,原理其實跟上述例子一樣:

function foo() {
    console.log( this.a );
}
function doFoo(fn) {
    // fn 其實引用的是foo
    fn(); // <-- 調用位置!
}
var obj = {
    a: 2,
    foo: foo
};
var a = "global"; // a 是全局對象的屬性
doFoo( obj.foo ); // "global"

显示綁定

使用call,apply和bind方法可以指定綁定函數的this的值,這種綁定方法叫显示綁定。

function foo() {
    console.log( this.a );
}
var obj = {
    a:2
};
foo.call( obj ); // 2

通過foo.call(obj),我們可以在調用foo 時強制把它的this 綁定到obj 上

new綁定

new操作符可以基於一個“構造函數”新創建一個對象實例,new的實例化過程如下:

  1. 創建(或者說構造)一個全新的對象。
  2. 這個新對象會被執行[[ 原型]] 連接。
  3. 這個新對象會綁定到函數調用的this。
  4. 如果函數沒有返回其他對象,那麼new 表達式中的函數調用會自動返回這個新對象。
    明確了new的實例化過程后,思考如下代碼:
function foo(a) {
    this.a = a;
}
var bar = new foo(2);
console.log( bar.a ); // 2

new foo(2)后新創建了個實例對象bar,然後把這個新對象bar綁定到了foo函數中的this,因此執行this.a = a后其實是把a賦給了bar.a

優先級

一般情況下this的綁定會根據上述四條綁定規則來,那麼他們同時出現時,該以怎樣的順序來判斷this的指向?下面是具體的規則:

  1. 函數是否在new 中調用(new 綁定)?如果是的話this 綁定的是新創建的對象( var bar = new foo() )。
  2. 函數是否通過call、apply(顯式綁定)或者硬綁定調用?如果是的話,this 綁定的是指定的對象( var bar = foo.call(obj2) )。
  3. 函數是否在某個上下文對象中調用(隱式綁定)?如果是的話,this 綁定的是那個上下文對象。( var bar = obj1.foo() )
  4. 如果都不是的話,使用默認綁定。如果在嚴格模式下,就綁定到undefined,否則綁定到全局對象。( var bar = foo() )

綁定例外

1.使用call,appy,bind這種顯式綁定的方法,參數傳入null或者undefined作為上下文時,函數調用還是會使用默認綁定

function foo() {
    console.log( this.a );
}
var a = 2;
foo.call( null ); // 2

什麼情況下需要將上下文傳為null呢?
1.使用bind函數來實現柯里化

function foo(a,b) {
    console.log(a,b);
}
// 使用 bind(..) 進行柯里化
var bar = foo.bind( null, 2 );
bar( 3 ); // 2,3

2.使用apply(..) 來展開一個數組,併當作參數傳入一個函數

function foo(a,b) {
    console.log(a,b);
}
// 把數組展開成參數
foo.apply( null, [2, 3] ); // 2,3

其實上面兩種使用場景其實都不關心call/app/bind第一個參數的值是什麼,只是想傳個佔位值而已。
但是總是傳入null可能會出現一些難以追蹤的bug,比如說當你在使用的第三方庫中的某個函數中有this時,this會被錯誤的綁定到全局對象上,造成一些難以預料的後果(修改全局變量)

var a = 1;//全局變量
const Utils = {
    a: 2,
    changeA: function(a){
        this.a = a;
    }
}
Utils.changeA(3);
Utils.a //3
a //1
Utils.changeA.call(null,4);
Utils.a //3
a //4,修改了全局變量a!

更安全的做法:

var o = Object.create(null);
Utils.changeA.call(o,6);
a //1, 全局變量沒有修改
o.a // 6 改的是變量o

2.間接引用

function foo() {
    console.log( this.a );
}
var a = 2;
var o = { a: 3, foo: foo };
var p = { a: 4 };
o.foo(); // 3
(p.foo = o.foo)(); // 2

賦值表達式p.foo = o.foo 的返回值是目標函數的引用,因此調用位置是foo() 而不是p.foo() 或者o.foo()。根據我們之前說過的,這裡會應用默認綁定。

this詞法(箭頭函數)

上述的幾種規則適用於所有的正常函數,但不包括ES6的箭頭函數。箭頭函數不使用this的四種標準規則,而是根據外層(函數或者全局)作用域(詞法作用域)來決定this

function foo() {
// 返回一個箭頭函數
    return (a) => {
        //this 繼承自foo()
        console.log( this.a );
    };
}
var obj1 = {
    a:2
};
var obj2 = {
    a:3
};
var bar = foo.call( obj1 );
bar.call( obj2 ); // 2, 不是3 !

foo() 內部創建的箭頭函數會捕獲調用時foo() 的this。由於foo() 的this 綁定到obj1,bar(引用箭頭函數)的this 也會綁定到obj1,箭頭函數的綁定無法被修改。(new 也不行!)

幾個例子加深理解

this的理論知識講解得差不多了,來幾個例子看看自己有沒有理解全面:
1.經典面試題:以下輸出結果是什麼

var length = 10;
function fn() {
    console.log(this.length);
}
var obj = {
  length: 5,
  method: function(fn) {
    fn();
    arguments[0]();
  }
};
obj.method(fn, 1);

obj中method方法裏面調用了兩次fn。第一次是直接調用的“裸露”的fn,因此fn()中this使用默認綁定,this.length為10.第二次調用時通過arguments的方式調用的,arguments[0]其實指向的就是fn,但是是通過obj[fn]這種對象上下文的隱式綁定的,因此this指向arguments,而arguments只有一個一項(method中只有fn一個參數),因此arguments.length為1。因此打印的結果為:

10
1

2.以下輸出什麼

var obj = {
    birth: 1990,
    getAge: function () {
        var b = this.birth; // 1990
        var fn = function () {
            return new Date().getFullYear() - this.birth; // this指向window或undefined
        };
        return fn();
    }
};
obj.getAge();

答案是嚴格模式下會報錯,非嚴格模式下輸出NaN
原因也是因為在調用obj.getAge()后,getAge方法內的this使用隱式綁定。但是return fn()的時候用的是“裸露的fn”使用默認綁定,fn裏面的this指向window或者undefined。
使用箭頭函數來修正this的指向:

var obj = {
    birth: 1990,
    getAge: function () {
        var b = this.birth; // 1990
        var fn = () => new Date().getFullYear() - this.birth; // this指向obj對象
        return fn();
    }
};
obj.getAge(); // 25

使用箭頭函數后,fn中的this在他的詞法分析階段就已經確定好了(即fn定義的時候),跟調用位置無關。fn的this指向外層的作用域(即getAge中的this)
3.以下輸出為什麼是’luo’

var A = function( name ){ 
    this.name = name;
};
var B = function(){ 
    A.apply(this,arguments);
};
B.prototype.getName = function(){ 
    return this.name;
};
var b=new B('sven');  // B {name: "luo"}
console.log( b.getName() ); // 輸出:  'luo'

執行new B(‘seven’)後會返回一個新對象b,並且B函數中的this會綁定到新對象b上,B的函數體內執行A.apply(this.arguments)也就是執行b.name = name;這個時候b的值就是{name:’luo’},所以b.getName()就能輸出’luo’啦~

實際在業務使用中,邏輯會更複雜一些,但是萬變不離其宗,都按照上面寫的規則來代入就好了

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理【其他文章推薦】

※帶您來了解什麼是 USB CONNECTOR  ?

※自行創業 缺乏曝光? 下一步"網站設計"幫您第一時間規劃公司的門面形象

※如何讓商品強力曝光呢? 網頁設計公司幫您建置最吸引人的網站,提高曝光率!!

※綠能、環保無空污,成為電動車最新代名詞,目前市場使用率逐漸普及化

※廣告預算用在刀口上,網站設計公司幫您達到更多曝光效益

【其他文章推薦】

台北網頁設計公司這麼多,該如何挑選?? 網頁設計報價省錢懶人包"嚨底家"

網頁設計公司推薦更多不同的設計風格,搶佔消費者視覺第一線

※想知道購買電動車哪裡補助最多?台中電動車補助資訊懶人包彙整

從零開始搭建前後端分離的NetCore2.2(EF Core CodeFirst+Autofac)+Vue的項目框架之十二Swagger(參數)使用二

  引言

  在 中提到了 Swagger 的基本使用,僅限於沒有參數,沒有驗證的那種api文檔生成,那麼這篇就連接上篇繼續,在一般具有安全性、權限等驗證的接口上,

  都會在header/url中加上請求者的秘鑰、簽名等,當然也有可能添加到body等其它地方, Swashbuckle.AspNetCore 都支持這些寫法。

  如何使用 — 下面將介紹兩種使用方式

兩種方式參數設置到何處都是在  In屬性上,屬性對於值如下:    參考

  • query: 參数字段值對應放在url中
  • header: 參數值對應放在header param中
  • body: 參數對應放到請求體中
  • path: 參數應該對應放到請求路徑  // 具體貌似沒用
  • formData: 參數對應放到請求表單中

  第一種:將一個或多個參數保護API的“securityDefinitions”添加到生成的Swagger中。

這種是直接在文檔的右上方添加一個 Authorize 按鈕,設置了值后,每一個請求都會在設置的位置上加上相應的值,在 上一篇隨筆中的 ConfigureServices 方法中,

對應位置 services.AddSwaggerGen(options =>{}) 中的  XmlComments  下 添加代碼如下:

                options.AddSecurityDefinition("token", new ApiKeyScheme
                {
                    Description = "token format : {token}",//參數描述
                    Name = "token",//名字
                    In = "header",//對應位置
                    Type = "apiKey"//類型描述
                });
                options.AddSecurityDefinition("sid", new ApiKeyScheme
                {
                    Description = "sid format : {sid}",//參數描述
                    Name = "sid",//名字
                    In = "header",//對應位置
                    Type = "apiKey"//類型描述
                });
                //添加Jwt驗證設置 設置為全局的,不然在代碼中取不到
                options.AddSecurityRequirement(new Dictionary<string, IEnumerable<string>> {
                    { "token", Enumerable.Empty<string>() },
                    { "sid", Enumerable.Empty<string>() },
                });

  添加完成后,運行起來看下效果,效果圖: 

 設置上對應值,調用測試方法,可以在header中取到剛設置到的值,

 這裡能看到,可以取到設置的參數了。這樣一來,在需要驗證的接口上,我們就可以通過接口文檔來測試了。基本不用再藉助postman等接口測試工具了。

但是,但是,這裡有一個問題,就是只要設置了參數值,每一次訪問都會在請求中帶上參數。

下面將介紹第二種方式,只給需要驗證用戶的接口上添加驗證參數。

  第二種:使用“filters”擴展Swagger生成器,來實現只在需要添加參數的方法上添加參數。複雜的可以根據自己的需求來添加對應參數

實現方式就是先新建一個類,名: SwaggerParameter ,實現 IOperationFilter 接口。SwaggerParameter 類代碼如下: 

    /// <summary>
    /// 自定義添加參數
    /// </summary>
    public class SwaggerParameter : IOperationFilter
    {
        /// <summary>
        /// 實現 Apply 方法
        /// </summary>
        /// <param name="operation"></param>
        /// <param name="context"></param>
        public void Apply(Operation operation, OperationFilterContext context)
        {
            if (operation.Parameters == null) operation.Parameters = new List<IParameter>();
            var attrs = context.ApiDescription.ActionDescriptor.AttributeRouteInfo;
            var t = typeof(BaseUserController);
            //先判斷是否是繼承用戶驗證類
            if (context.ApiDescription.ActionDescriptor is ControllerActionDescriptor descriptor && context.MethodInfo.DeclaringType?.IsSubclassOf(t) == true)
            {
                //再驗證是否允許匿名訪問
                var actionAttributes = descriptor.MethodInfo.GetCustomAttributes(inherit: true);
                bool isAnonymous = actionAttributes.Any(a => a is AllowAnonymousAttribute);
                // 需要驗證的方法添加
                if (!isAnonymous)
                {
                    operation.Parameters.Add(new NonBodyParameter()
                    {
                        Name = "sid",
                        In = "header", //query header body path formData
                        Type = "string",
                        Required = true,//是否必選
                        Description = "登錄返回的sid"
                    });
                    operation.Parameters.Add(new NonBodyParameter()
                    {
                        Name = "token",
                        In = "header", //query header body path formData
                        Type = "string",
                        Required = true,//是否必選
                        Description = "登錄返回的token"
                    });
                }
            }
        }
    }

 運行起來后,進入到  文檔頁面,可以看到右上角的 Authorize 按鈕已經不見了,在不需要驗證的方法上,也找不到相應需要設置參數的輸入框。就只有在需要驗證的接口上才有。

參考Swagger文檔圖如下: 

參考代碼圖如下:

 

效果圖: 

  這樣一來設置也就完成了。從上面就能看出,就只有需要用戶驗證的接口才會有相應參數。 

 

我的設置方式是先定義了用戶驗證控制器類,讓需要用戶驗證的控制器繼承該控制器,然後在該控制器中不需要用戶驗證的接口上加上 AllowAnonymous 屬性 

設置fitter時就可以根據上面提到的兩個點來進行判斷是否需要加上參數,如果不是這樣實現的,可以根據自己的需求變更fitter類,來控制文檔的生成。 

 

以上若有什麼不對或可以改進的地方,望各位指出或提出意見,一起探討學習~ 

有需要源碼的可通過此 鏈接拉取 覺得還可以的給個 start 和點個 下方的推薦哦~~謝謝!

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理【其他文章推薦】

※帶您來了解什麼是 USB CONNECTOR  ?

※自行創業 缺乏曝光? 下一步"網站設計"幫您第一時間規劃公司的門面形象

※如何讓商品強力曝光呢? 網頁設計公司幫您建置最吸引人的網站,提高曝光率!!

※綠能、環保無空污,成為電動車最新代名詞,目前市場使用率逐漸普及化

※廣告預算用在刀口上,網站設計公司幫您達到更多曝光效益

實現 Redis 協議解析器

本文是 《用 Golang 實現一個 Redis》系列文章第二篇,本文將分別介紹 以及 的實現,若您對協議有所了解可以直接閱讀協議解析器部分。

Redis 通信協議

Redis 自 2.0 版本起使用了統一的協議 RESP (REdis Serialization Protocol),該協議易於實現,計算機可以高效的進行解析且易於被人類讀懂。

RESP 是一個二進制安全的文本協議,工作於 TCP 協議上。客戶端和服務器發送的命令或數據一律以 \r\n (CRLF)結尾。

RESP 定義了5種格式:

  • 簡單字符串(Simple String): 服務器用來返回簡單的結果,比如”OK”。非二進制安全,且不允許換行。
  • 錯誤信息(Error): 服務器用來返回簡單的結果,比如”ERR Invalid Synatx”。非二進制安全,且不允許換行。
  • 整數(Integer): llenscard等命令的返回值, 64位有符號整數
  • 字符串(Bulk String): 二進制安全字符串, get 等命令的返回值
  • 數組(Array, 舊版文檔中稱 Multi Bulk Strings): Bulk String 數組,客戶端發送指令以及lrange等命令響應的格式

RESP 通過第一個字符來表示格式:

  • 簡單字符串:以”+” 開始, 如:”+OK\r\n”
  • 錯誤:以”-” 開始,如:”-ERR Invalid Synatx\r\n”
  • 整數:以”:”開始,如:”:1\r\n”
  • 字符串:以 $ 開始
  • 數組:以 * 開始

Bulk String有兩行,第一行為 $+正文長度,第二行為實際內容。如:

$3\r\nSET\r\n

Bulk String 是二進制安全的可以包含任意字節,就是說可以在 Bulk String 內部包含 “\r\n” 字符(行尾的CRLF被隱藏):

$4
a\r\nb

$-1 表示 nil, 比如使用 get 命令查詢一個不存在的key時,響應即為$-1

Array 格式第一行為 “*”+數組長度,其後是相應數量的 Bulk String。如, ["foo", "bar"]的報文:

*2
$3
foo
$3
bar

客戶端也使用 Array 格式向服務端發送指令。命令本身將作為第一個參數,如 SET key value指令的RESP報文:

*3
$3
SET
$3
key
$5
value

將換行符打印出來:

*3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n

協議解析器

我們在 一文中已經介紹過TCP服務器的實現,協議解析器將實現其 Handler 接口充當應用層服務器。

協議解析器將接收 Socket 傳來的數據,並將其數據還原為 [][]byte 格式,如 "*3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\value\r\n" 將被還原為 ['SET', 'key', 'value']

本文完整代碼:

來自客戶端的請求均為數組格式,它在第一行中標記報文的總行數並使用CRLF作為分行符。

bufio 標準庫可以將從 reader 讀到的數據緩存到 buffer 中,直至遇到分隔符或讀取完畢后返回,所以我們使用 reader.ReadBytes('\n') 來保證每次讀取到完整的一行。

需要注意的是RESP是二進制安全的協議,它允許在正文中使用CRLF字符。舉例來說 Redis 可以正確接收並執行SET "a\r\nb" 1指令, 這條指令的正確報文是這樣的:

*3  
$3
SET
$4
a\r\nb 
$7
myvalue

ReadBytes 讀取到第五行 “a\r\nb\r\n”時會將其誤認為兩行:

*3  
$3
SET
$4
a  // 錯誤的分行
b // 錯誤的分行
$7
myvalue

因此當讀取到第四行$4后, 不應該繼續使用 ReadBytes('\n') 讀取下一行, 應使用 io.ReadFull(reader, msg) 方法來讀取指定長度的內容。

msg = make([]byte, 4 + 2) // 正文長度4 + 換行符長度2
_, err = io.ReadFull(reader, msg)

定義 Client 結構體作為客戶端抽象:

type Client struct {
    /* 與客戶端的 Tcp 連接 */
    conn   net.Conn

    /* 
     * 帶有 timeout 功能的 WaitGroup, 用於優雅關閉
     * 當響應被完整發送前保持 waiting 狀態, 阻止鏈接被關閉
     */
    waitingReply wait.Wait

    /* 標記客戶端是否正在發送指令 */ 
    sending atomic.AtomicBool
    
    /* 客戶端正在發送的參數數量, 即 Array 第一行指定的數組長度 */
    expectedArgsCount uint32
    
    /* 已經接收的參數數量, 即 len(args)*/ 
    receivedCount uint32
    
    /*
     * 已經接收到的命令參數,每個參數由一個 []byte 表示
     */
    args [][]byte
}

定義解析器:

type Handler struct {

    /* 
     * 記錄活躍的客戶端鏈接 
     * 類型為 *Client -> placeholder 
     */
    activeConn sync.Map 

    /* 數據庫引擎,執行指令並返回結果 */
    db db.DB

    /* 關閉狀態標誌位,關閉過程中時拒絕新建連接和新請求 */
    closing atomic.AtomicBool 
}

接下來可以編寫主要部分了:

func (h *Handler)Handle(ctx context.Context, conn net.Conn) {
    if h.closing.Get() {
        // 關閉過程中不接受新連接
        _ = conn.Close()
    }

    /* 初始化客戶端狀態 */
    client := &Client {
        conn:   conn,
    }
    h.activeConn.Store(client, 1)

    reader := bufio.NewReader(conn)
    var fixedLen int64 = 0 // 將要讀取的 BulkString 的正文長度
    var err error
    var msg []byte
    for {
        /* 讀取下一行數據 */ 
        if fixedLen == 0 { // 正常模式下使用 CRLF 區分數據行
            msg, err = reader.ReadBytes('\n')
            // 判斷是否以 \r\n 結尾
            if len(msg) == 0 || msg[len(msg) - 2] != '\r' {
                errReply := &reply.ProtocolErrReply{Msg:"invalid multibulk length"}
                _, _ =  client.conn.Write(errReply.ToBytes())
            }
        } else { // 當讀取到 BulkString 第二行時,根據給出的長度進行讀取
            msg = make([]byte, fixedLen + 2)
            _, err = io.ReadFull(reader, msg)
            // 判斷是否以 \r\n 結尾
            if len(msg) == 0 || 
              msg[len(msg) - 2] != '\r' ||  
              msg[len(msg) - 1] != '\n'{
                errReply := &reply.ProtocolErrReply{Msg:"invalid multibulk length"}
                _, _ =  client.conn.Write(errReply.ToBytes())
            }
            // Bulk String 讀取完畢,重新使用正常模式
            fixedLen = 0 
        }
        // 處理 IO 異常
        if err != nil {
            if err == io.EOF || err == io.ErrUnexpectedEOF {
                logger.Info("connection close")
            } else {
                logger.Warn(err)
            }
            _ = client.Close()
            h.activeConn.Delete(client)
            return // io error, disconnect with client
        }

        /* 解析收到的數據 */
        if !client.sending.Get() { 
            // sending == false 表明收到了一條新指令
            if msg[0] == '*' {
                // 讀取第一行獲取參數個數
                expectedLine, err := strconv.ParseUint(string(msg[1:len(msg)-2]), 10, 32)
                if err != nil {
                    _, _ = client.conn.Write(UnknownErrReplyBytes)
                    continue
                }
                // 初始化客戶端狀態
                client.waitingReply.Add(1) // 有指令未處理完成,阻止服務器關閉
                client.sending.Set(true) // 正在接收指令中
                // 初始化計數器和緩衝區 
                client.expectedArgsCount = uint32(expectedLine) 
                client.receivedCount = 0
                client.args = make([][]byte, expectedLine)
            } else {
                // TODO: text protocol
            }
        } else {
            // 收到了指令的剩餘部分(非首行)
            line := msg[0:len(msg)-2] // 移除換行符
            if line[0] == '$' { 
                // BulkString 的首行,讀取String長度
                fixedLen, err = strconv.ParseInt(string(line[1:]), 10, 64)
                if err != nil {
                    errReply := &reply.ProtocolErrReply{Msg:err.Error()}
                    _, _ = client.conn.Write(errReply.ToBytes())
                }
                if fixedLen <= 0 {
                    errReply := &reply.ProtocolErrReply{Msg:"invalid multibulk length"}
                    _, _ = client.conn.Write(errReply.ToBytes())
                }
            } else { 
                // 收到參數
                client.args[client.receivedCount] = line
                client.receivedCount++
            }


            // 一條命令發送完畢
            if client.receivedCount == client.expectedArgsCount {
                client.sending.Set(false)

                // 執行命令並響應
                result := h.db.Exec(client.args)
                if result != nil {
                    _, _ = conn.Write(result.ToBytes())
                } else {
                    _, _ = conn.Write(UnknownErrReplyBytes)
                }

                // 重置客戶端狀態,等待下一條指令
                client.expectedArgsCount = 0
                client.receivedCount = 0
                client.args = nil
                client.waitingReply.Done()
            }
        }
    }
}

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理【其他文章推薦】

網頁設計公司推薦更多不同的設計風格,搶佔消費者視覺第一線

※廣告預算用在刀口上,網站設計公司幫您達到更多曝光效益

※自行創業 缺乏曝光? 下一步"網站設計"幫您第一時間規劃公司的門面形象

pod刪除主要流程源碼解析

本文以v1.12版本進行分析

當一個pod刪除時,client端向apiserver發送請求,apiserver將pod的deletionTimestamp打上時間。kubelet watch到該事件,開始處理。

syncLoop

kubelet對pod的處理主要都是在syncLoop中處理的。

func (kl *Kubelet) syncLoop(updates <-chan kubetypes.PodUpdate, handler SyncHandler) {
for {
...
        if !kl.syncLoopIteration(updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) {
            break
        }
...

與pod刪除主要在syncLoopIteration中需要關注的是以下這兩個。

func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
    syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
    select {
    case u, open := <-configCh:
...
        switch u.Op {
...
        case kubetypes.UPDATE:
            handler.HandlePodUpdates(u.Pods)
...
    case <-housekeepingCh:
        if !kl.sourcesReady.AllReady() {
        } else {
            if err := handler.HandlePodCleanups(); err != nil {
                glog.Errorf("Failed cleaning pods: %v", err)
            }
        }
    }

第一個是由於發送給apiserver的DELETE請求觸發的,增加了deletionTimestamp的事件。這裏對應於kubetypes.UPDATE。也就是會走到HandlePodUpdates函數。

另外一個與delete相關的是每2s執行一次的來自於housekeepingCh的定時事件,用於清理pod,執行的是handler.HandlePodCleanups函數。這兩個作用不同,下面分別進行介紹。

HandlePodUpdates

先看HandlePodUpdates這個流程。只要打上了deletionTimestamp,就必然走到這個流程里去。

func (kl *Kubelet) HandlePodUpdates(pods []*v1.Pod) {
    for _, pod := range pods {
...
        kl.dispatchWork(pod, kubetypes.SyncPodUpdate, mirrorPod, start)
    }
}

在HandlePodUpdates中,進而將pod的信息傳遞到dispatchWork中處理。

func (kl *Kubelet) dispatchWork(pod *v1.Pod, syncType kubetypes.SyncPodType, mirrorPod *v1.Pod, start time.Time) {
    if kl.podIsTerminated(pod) {
        if pod.DeletionTimestamp != nil {
            kl.statusManager.TerminatePod(pod)
        }
        return
    }
    // Run the sync in an async worker.
    kl.podWorkers.UpdatePod(&UpdatePodOptions{
        Pod:        pod,
        MirrorPod:  mirrorPod,
        UpdateType: syncType,
        OnCompleteFunc: func(err error) {
...

這裏首先通過判斷了kl.podIsTerminated(pod)判斷pod是不是已經處於了Terminated狀態。如果是的話,則不進行下面的kl.podWorkers.UpdatePod。

func (kl *Kubelet) podIsTerminated(pod *v1.Pod) bool {
    status, ok := kl.statusManager.GetPodStatus(pod.UID)
    if !ok {
        status = pod.Status
    }
    return status.Phase == v1.PodFailed || status.Phase == v1.PodSucceeded || (pod.DeletionTimestamp != nil && notRunning(status.ContainerStatuses))
}

這個地方特別值得注意的是,並不是由了DeletionTimestamp就會認為是Terminated狀態,而是有DeletionTimestamp且所有的容器不在運行了。也就是說如果是一個正在正常運行的pod,是也會走到kl.podWorkers.UpdatePod中的。UpdatePod通過一系列函數調用,最終會通過異步的方式執行syncPod函數中進入到syncPod函數中。

func (kl *Kubelet) syncPod(o syncPodOptions) error {
...
    if !runnable.Admit || pod.DeletionTimestamp != nil || apiPodStatus.Phase == v1.PodFailed {
        var syncErr error
        if err := kl.killPod(pod, nil, podStatus, nil); err != nil {
...

在syncPod中,調用killPod從而對pod進行停止操作。

killPod

killPod是停止pod的主體。在很多地方都會使用。這裏主要介紹下起主要的工作流程。停止pod的過程主要發生在killPodWithSyncResult函數中。

func (m *kubeGenericRuntimeManager) killPodWithSyncResult(pod *v1.Pod, runningPod kubecontainer.Pod, gracePeriodOverride *int64) (result kubecontainer.PodSyncResult) {
    killContainerResults := m.killContainersWithSyncResult(pod, runningPod, gracePeriodOverride)
...
    for _, podSandbox := range runningPod.Sandboxes {
            if err := m.runtimeService.StopPodSandbox(podSandbox.ID.ID); err != nil {
...

killPodWithSyncResult的主要工作分為兩個部分。killContainersWithSyncResult負責將pod中的container停止掉,在停止后再執行StopPodSandbox。

func (m *kubeGenericRuntimeManager) killContainer(pod *v1.Pod, containerID kubecontainer.ContainerID, containerName string, reason string, gracePeriodOverride *int64) error {
    if err := m.internalLifecycle.PreStopContainer(containerID.ID); err != nil {
        return err
    }
...
    err := m.runtimeService.StopContainer(containerID.ID, gracePeriod)

killContainersWithSyncResult的主要工作是在killContainer中完成的,這裏可以看到,其中的主要兩個步驟是在容器中進行prestop的操作。待其成功后,進行container的stop工作。至此所有的應用容器都已經停止了。下一步是停止pause容器。而StopPodSandbox就是執行這一過程的。將sandbox,也就是pause容器停止掉。StopPodSandbox是在dockershim中執行的。

func (ds *dockerService) StopPodSandbox(ctx context.Context, r *runtimeapi.StopPodSandboxRequest) (*runtimeapi.StopPodSandboxResponse, error) {
...
if !hostNetwork && (ready || !ok) {
...
        err := ds.network.TearDownPod(namespace, name, cID, annotations)
...
    }
    if err := ds.client.StopContainer(podSandboxID, defaultSandboxGracePeriod); err != nil {

StopPodSandbox中主要的部分是先進行網絡卸載,再停止相應的容器。在完成StopPodSandbox后,至此pod的所有容器都已經停止完成。至於volume的卸載,是在volumeManager中進行的。本文不做單獨介紹了。停止后的容器在pod徹底清理后,會被gc回收。這裏也不展開講了。

HandlePodCleanups

上面這個流程並不是刪除流程的全部。一個典型的情況就是,如果container都不是running,那麼在UpdatePod的時候都return了,那麼又由誰來處理呢?這裏我們回到最開始,就是那個每2s執行一次的HandlePodCleanups的流程。也就是說比如container處於crash,container正好不是running等情況,其實是在這個流程里進行處理的。當然HandlePodCleanups的作用不僅僅是清理not running的pod,再比如數據已經在apiserver中強制清理掉了,或者由於其他原因這個節點上還有一些沒有完成清理的pod,都是在這個流程中進行處理。

func (kl *Kubelet) HandlePodCleanups() error {
... 
    for _, pod := range runningPods {
        if _, found := desiredPods[pod.ID]; !found {
            kl.podKillingCh <- &kubecontainer.PodPair{APIPod: nil, RunningPod: pod}
        }
    }

runningPods是從cache中獲取節點現有的pod,而desiredPods則是節點上應該存在未被停止的pod。如果存在runningPods中有而desiredPods中沒有的pod,那麼它應該被停止,所以發送到podKillingCh中。

func (kl *Kubelet) podKiller() {
...
    for podPair := range kl.podKillingCh {
...

        if !exists {
            go func(apiPod *v1.Pod, runningPod *kubecontainer.Pod) {
                glog.V(2).Infof("Killing unwanted pod %q", runningPod.Name)
                err := kl.killPod(apiPod, runningPod, nil, nil)
...
            }(apiPod, runningPod)
        }
    }
}

在podKiller流程中,會去接收來自podKillingCh的消息,從而執行killPod,上文已經做了該函數的介紹了。

statusManager

在最後,statusManager中的syncPod流程,將會進行檢測,通過canBeDeleted確認是否所有的容器關閉了,volume卸載了,cgroup清理了等等。如果這些全部完成了,則從apiserver中將pod信息徹底刪除。

func (m *manager) syncPod(uid types.UID, status versionedPodStatus) {
...
    if m.canBeDeleted(pod, status.status) {
        deleteOptions := metav1.NewDeleteOptions(0)
        deleteOptions.Preconditions = metav1.NewUIDPreconditions(string(pod.UID))
        err = m.kubeClient.CoreV1().Pods(pod.Namespace).Delete(pod.Name, deleteOptions)
...

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理【其他文章推薦】

※帶您來了解什麼是 USB CONNECTOR  ?

※自行創業 缺乏曝光? 下一步"網站設計"幫您第一時間規劃公司的門面形象

※如何讓商品強力曝光呢? 網頁設計公司幫您建置最吸引人的網站,提高曝光率!!

※綠能、環保無空污,成為電動車最新代名詞,目前市場使用率逐漸普及化

※廣告預算用在刀口上,網站設計公司幫您達到更多曝光效益

【目標檢測實戰】目標檢測實戰之一–手把手教你LMDB格式數據集製作!

文章目錄

1 目標檢測簡介
2 lmdb數據製作
    2.1 VOC數據製作
    2.2 lmdb文件生成

lmdb格式的數據是在使用caffe進行目標檢測或分類時,使用的一種數據格式。這裏我主要以目標檢測為例講解lmdb格式數據的製作。

1 目標檢測簡介

【1】目標檢測主要有兩個任務:

  1. 判斷圖像中對象的類別
  2. 類別的位置

【2】目標檢測需要的數據:

  1. 訓練所需的圖像數據,可以是jpg、png等圖片格式
  2. 圖像數據對應的類別信息和類別框的位置信息。

2 lmdb數據製作

caffe一般使用lmdb格式的數據,在製作數據之前,我們需要對數據進行標註,可以使用labelImg對圖像進行標註(),這裏就不多贅述數據標註的問題。總之,我們得到了圖像的標註Annotations數據。lmdb數據製作,首先需要將annotations數據和圖像數據製作為VOC格式,然後將其生成LMDB文件即可。下邊是詳細的步驟:

2.1 VOC數據製作

這裏我以caffe環境的Mobilenet+YOLOv3模型的代碼為例(),進行lmdb數據製作,並且也假設你已經對其配置編譯成功(如沒成功,可以參考博文進行配置),所以我們的根目錄為:caffe-Mobilenet-YOLO-master,下邊為詳細步驟:

【1】VOC格式目錄建立:

VOC格式目錄主要包含為:

其中,Annotations里存儲的是xml標註信息,JPEGImages存儲的是圖片,ImageSets則是訓練和測試的txt列表等信息,下邊我們就要安裝如上的目錄進行建立我們自己的數據目錄。

創建Annotations、JPEGImages、ImageSets/Main等文件,命令如下(也可直接界面操作哈):

注:建議新手按照我的名稱,對於後續文件修改容易!!!

cd ~/   # 進入home目錄
cd Documents/  # 進入Documents目錄
cd caffe-Mobilenet-YOLO-master/  # 進入我們的根目錄
cd data         # 進入data目錄內
mkdir VOCdevkit   # 創建存儲我們自己的數據的文件夾
cd VOCdevkit
mkdir MyDataSet  # 創建存儲voc的目錄
cd MyDataSet   
# 創建VOC格式目錄
mkdir Annotations
mkdir JPEGImages
mkdir ImageSets
cd ImageSets
mkdir Main

好啦,我們的文件夾就建立好了,如下圖所示:

【2】將所有xml文件考入至Annotations文件夾內
【3】將所有圖片考入至JPEGImages文件夾內
【4】劃分訓練接、驗證集合測試集,如下為Python代碼,需要修改的地方註釋已標明:

import os  
import random 
# 標註文件的路徑,需要你自己修改
xmlfilepath=r'/home/Documents/caffe-Mobilenet-YOLO-master/data/VOCdevkit/MyDataSet/Annotations/'      
# 這裡是存儲數據的本目錄,需要改為你自己的目錄              
saveBasePath=r"/home/Documents/caffe-Mobilenet-YOLO-master/data/VOCdevkit/"                        
trainval_percent=0.8            # 表示訓練集和驗證集所佔比例,你需要自己修改,也可選擇不修改
train_percent=0.8               # 表示訓練集所佔訓練集驗證集的比例,你需要自己修改,也可選擇不修改       
total_xml = os.listdir(xmlfilepath)
num=len(total_xml)    
list=range(num)    
tv=int(num*trainval_percent)    
tr=int(tv*train_percent)    
trainval= random.sample(list,tv)    
train=random.sample(trainval,tr)    
  
print("train and val size",tv)  
print("traub suze",tr)  
ftrainval = open(os.path.join(saveBasePath,'MyDataSet/ImageSets/Main/trainval.txt'), 'w')    
ftest = open(os.path.join(saveBasePath,'MyDataSet/ImageSets/Main/test.txt'), 'w')    
ftrain = open(os.path.join(saveBasePath,'MyDataSet/ImageSets/Main/train.txt'), 'w')    
fval = open(os.path.join(saveBasePath,'MyDataSet/ImageSets/Main/val.txt'), 'w')    
  
for i  in list:    
    name=total_xml[i][:-4]+'\n'    
    if i in trainval:    
        ftrainval.write(name)    
        if i in train:    
            ftrain.write(name)    
        else:    
            fval.write(name)    
    else:    
        ftest.write(name)    
    
ftrainval.close()    
ftrain.close()    
fval.close()    
ftest .close() 

上述代碼修改之後,在根目錄caffe-Mobilenet-YOLO-master執行上述代碼即可,
在data/VOCdevkit/MyDataSet/ImageSets下生成trainval.txt、test.txt、train.txt、val.txt等所需的txt文件,如下圖所示:

這些TXT文件會包含圖片的名字,不帶路徑,如下圖所示:

2.2 lmdb文件生成

【1】執行如下命令,將生成lmdb所需的腳本複製至data/VOCdevkit/MyDataSet文件夾內:

cp data/VOC0712/create_* data/MyDataSet/                # 把create_list.sh和create_data.sh複製到MyDataSet目錄                  
cp data/VOC0712/labelmap_voc.prototxt data/MyDataSet/   # 把labelmap_voc.prototxt複製到MyDataSet目錄 

【2】修改create_list.sh文件:

1 第3行修改目錄路徑,截止到VOCdevkit即可

2 第13行修改為for name in MyDataSet(VOCdevkit下自己建立的文件夾名字)

3 第15-18行註釋掉

4 第41行get_image_size修改為自己的路徑(注意,這裡是build caffe_mobilenet_yolo之後才會形成的):

#!/bin/bash
# 如果嚴格安裝我上述的步驟,就可以不用修改路徑位置。
# 需要修改的位置也使用註釋進行了標註和解釋

# 這裏需要更改,你數據的根目錄位置,需要修改的地方!!!!
root_dir="/home/Documents/Caffe_Mobilenet_YOLO/data/VOCdevkit/"   
sub_dir=ImageSets/Main
bash_dir="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
for dataset in trainval test
do
  dst_file=$bash_dir/$dataset.txt
  if [ -f $dst_file ]
  then
    rm -f $dst_file
  fi
  for name in MyDataSet  # 如果你建立的不是MyDataSet,這裏需要修改為你自己的名字
  do
    # 這裏需要修改,註釋掉即可
    #if [[ $dataset == "test" && $name == "VOC2012" ]]
    #then
    #  continue
    #fi
    echo "Create list for $name $dataset..."
    dataset_file=$root_dir/$name/$sub_dir/$dataset.txt

    img_file=$bash_dir/$dataset"_img.txt"
    cp $dataset_file $img_file
    sed -i "s/^/$name\/JPEGImages\//g" $img_file
    sed -i "s/$/.jpg/g" $img_file

    label_file=$bash_dir/$dataset"_label.txt"
    cp $dataset_file $label_file
    sed -i "s/^/$name\/Annotations\//g" $label_file
    sed -i "s/$/.xml/g" $label_file

    paste -d' ' $img_file $label_file >> $dst_file

    rm -f $label_file
    rm -f $img_file
  done

  # Generate image name and size infomation.
  if [ $dataset == "test" ]
  then
    home/Documents/Caffe_Mobilenet_YOLO/caffe-MobileNet-YOLO-master/build/tools/get_image_size $root_dir $dst_file $bash_dir/$dataset"_name_size.txt"

【3】creat_data.sh修改:

1 第2行修改為自己的路徑:root_dir=”/home/Documents/caffe-MobileNet-YOLO-master/”

2 第7行修改為:data_root_dir=”/home/Documents/caffe-MobileNet-YOLO-master/data/VOVdevkit/

3 第8行修改為:dataset_name=”MyDataSet”

4 第9行修改為:mapfile=”\(root_dir/data/VOCdevkit/\)dataset_name/labelmap_voc.prototxt”

5 第26行修改為\(root_dir/data/VOCdevkit/\)dataset_name/$subset.txt

cur_dir=$(cd $( dirname ${BASH_SOURCE[0]} ) && pwd )
# 修改為自己的路徑
root_dir="/home/Documents/Caffe_Mobilenet_YOLO/caffe-MobileNet-YOLO-master/"

cd $root_dir

redo=1
# 這裏需要修改為自己的路徑
data_root_dir="/home/Documents/Caffe_Mobilenet_YOLO/caffe-MobileNet-YOLO-master/data/VOCdevkit/"
dataset_name="MyDataSet"  # 修改為自己的名字
mapfile="$root_dir/data/VOCdevkit/$dataset_name/labelmap_voc.prototxt"  # 修改為自己的路徑
anno_type="detection"
db="lmdb"
min_dim=0
max_dim=0
width=0
height=0

extra_cmd="--encode-type=jpg --encoded"
if [ $redo ]
then
  extra_cmd="$extra_cmd --redo"
fi
for subset in test trainval
# subset.txt路徑需要修改
do
  python $root_dir/scripts/create_annoset.py --anno-type=$anno_type \
  --label-map-file=$mapfile --min-dim=$min_dim --max-dim=$max_dim --resize-width=$width \
  --resize-height=$height --check-label $extra_cmd $data_root_dir $root_dir/data/VOCdevkit/$dataset_name/$subset.txt \
  $data_root_dir/$dataset_name/$db/$dataset_name"_"$subset"_"$db examples/$dataset_name

【3】修改labelmap_voc.prototxt文件:

除了第一個背景標籤部分不要修改,其他改成自己的標籤就行,多的刪掉,少了添加進入就行

【4】最後在caffe-MobileNet-YOLO-master/examples文件夾內新建一個MyDataSet文件夾(空的)

【5】運行create_list.sh腳本: ./data/VOCdevkit/MyDataSet/create_list.sh,運行完后,會在自己建的VOCdevkit/MyDataSet/目錄內生成trainval.txt, test.txt, test_name_size.txt。

【6】運行create_data.sh腳本: ./data/VOCdevkit/MyDataSet/create_data.sh

運行此命令時,提示:bash:./data/VOCdevkit/MyDataSet/create_list.sh:Permission denied,沒有權限,需要執行如下命令賦予執行命令:

chmod u+x data/VOCdevkit/MyDataSet/create_data.sh

出現了錯誤:ValueError: need more than 2 values to unpack,

需要將create_annoset.py中第88行的seg去掉,因為我們的Annotations只有兩個值,img_file和anno。

運行完后,會在會在自己建的VOCdevkit/MyDataSet目錄內生成lmdb文件夾:

lmdb對應訓練集和測試集的lmdb格式的文件夾:

***
好啦,今天的教程就到這裏,如有疑問,可關注公眾號【計算機視覺聯盟】私信我或留言交流!!

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理【其他文章推薦】

網頁設計公司推薦更多不同的設計風格,搶佔消費者視覺第一線

※廣告預算用在刀口上,網站設計公司幫您達到更多曝光效益

※自行創業 缺乏曝光? 下一步"網站設計"幫您第一時間規劃公司的門面形象

氫燃料沒未來?Honda改拚電動車,明年搶攻歐陸

本田汽車(Honda)計畫於2018年一次推出兩款全電動車,且宣告未來將以電動車作為發展主軸。

根據《BusinessInsider》報導表示,本田六月就已表示將打造中國專屬電動車,8月29日發佈的聲明稿則指明另一款電動車是為歐洲客戶所設計。

本田說,新城市電動概念車(Urban EV Concept)將在九月法蘭克福車展上亮相,這也是本田在歐洲第一款電動車,肩負打開歐洲電動車市場的任務。

目前本田僅有一款純電動車在美國上市,不過本田表示,2030年旗下三分之二的車型都要電動化,達成目標的方法將在法蘭克福車展上對外說明。

本田與日本同業豐田(Toyota)此前主要開發氫燃料車與混和動力車,但由上述可知本田策略已經轉向發展電動車,可能引領其它日本車廠跟進。

(本文內容由授權使用。圖片出處:)

本站聲明:網站內容來源於EnergyTrend https://www.energytrend.com.tw/ev/,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※帶您來了解什麼是 USB CONNECTOR  ?

※自行創業 缺乏曝光? 下一步"網站設計"幫您第一時間規劃公司的門面形象

※如何讓商品強力曝光呢? 網頁設計公司幫您建置最吸引人的網站,提高曝光率!!

※綠能、環保無空污,成為電動車最新代名詞,目前市場使用率逐漸普及化

※廣告預算用在刀口上,網站設計公司幫您達到更多曝光效益

豐田:車輛全面由電池驅動前、須經歷2-3次技術突破

 

CNBC 9月5日報導,豐田汽車公司會長內山田竹志(Takeshi Uchiyamada)在接受專訪時表示,基於當前電池技術的侷限、他懷疑消費者會立即投向電動車的懷抱。內山田表示,豐田不是排斥電動車,但為了要提供足夠的續航力、電動車目前需要安裝許多電池並得花相當長的時間去充電,而且電池壽命也是一大問題。

內山田認為車輛全面由電池驅動之前、還須經歷2到3次的技術性突破才行。不過,他也坦承,隨著中國、美國等地鼓勵電動車發展的法令生效,汽車製造商若不推出電動車的話可能就會被淘汰出局,因此豐田已著手研發較好的電池技術。

吉利汽車控股旗下Volvo日前宣布,2019年起旗下所有新車將會是純電動或油電混合驅動。

根據DNV GL首度發布的「能源轉型展望」報告,受電動車滲透率持續上揚的影響,石油供應將在2020到2028年期間轉趨持平、隨後大幅下降,2034年將遭天然氣超越。這份報告預估電動車、內燃引擎車將在2022年達到「成本平價」,預估到2033年全球半數輕型新車銷售量都將是電動車。

特斯拉(Tesla)平價電動車「Model 3」不含獎勵計畫的售價為35,000美元起、電池續航力為345公里。

Thomson Reuters報導,嘉能可(Glencore)董事長Tony Hayward 於5月受訪時表示,電動車的快速進步意味著石油需求可能會在2040年以前觸頂,深海鑽油、加拿大油砂等高成本原油生產商恐將先被淘汰出局,擁有生產成本優勢的石油輸出國組織(OPEC)相對較不受衝擊。Hayward曾任英國石油公司(BP Plc)執行長。

嘉能可執行長Ivan Glasenberg指出,如果電動車在2035年拿下90到95%的市占率,全球年度銅需求量可望較目前的2,300萬噸呈現倍增。德國總理梅克爾(Angela Merkel)5月22日指出,鋰電池技術已經進步到可以讓電動車擁有1千公里的續航力、遠高於目前的200-300公里,德國必須大舉投資以確保產業繼續保有優勢。

戴姆勒(Daimler AG)董事長Deiter Zetsche 5月22日表示,預估到2022年旗下將有超過10款的純電動轎車系列。戴姆勒旗下全資子公司ACCUMOTIVE當日在德國卡門茨(Kamenz)為第二座電池工廠舉行奠基儀式、邀請梅克爾出席。這座工廠耗資5億歐元、預計在2018年年中正式營運。

(本文內容由授權使用。圖片出處:public domian CC0)

本站聲明:網站內容來源於EnergyTrend https://www.energytrend.com.tw/ev/,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

網頁設計公司推薦更多不同的設計風格,搶佔消費者視覺第一線

※廣告預算用在刀口上,網站設計公司幫您達到更多曝光效益

※自行創業 缺乏曝光? 下一步"網站設計"幫您第一時間規劃公司的門面形象