這 10 行比較字符串相等的代碼給我整懵逼了,不信你也來看看

抱歉用這種標題吸引你點進來了,不過你不妨看完,看看能否讓你有所收穫。​(有收穫,請評論區留個言,沒收穫,下周末我直播吃**,哈哈,這你也信)

補充說明:微信公眾號改版,對各個號主影響還挺大的。目前從後台數據來看,對我影響不大,因為我這反正都是小號,閱讀量本身就少的可憐,真相了,狗頭(剛從交流群學會的表情)。

先直接上代碼:

boolean safeEqual(String a, String b) {
   if (a.length() != b.length()) {
       return false;
   }
   int equal = 0;
   for (int i = 0; i < a.length(); i++) {
       equal |= a.charAt(i) ^ b.charAt(i);
   }
   return equal == 0;
}

上面的代碼是我根據原版(Scala)翻譯成 Java的,Scala 版本(最開始吸引程序猿石頭注意力的代碼)如下:

def safeEqual(a: String, b: String) = {
  if (a.length != b.length) {
    false
  } else {
    var equal = 0
    for (i <- Array.range(0, a.length)) {
      equal |= a(i) ^ b(i)
    }
    equal == 0
  }
}

剛開始看到這段源碼感覺挺奇怪的,這個函數的功能是比較兩個字符串是否相等,首先“長度不等結果肯定不等,立即返回”這個很好理解。

再看看後面的,稍微動下腦筋,轉彎下也能明白這其中的門道:通過異或操作1^1=0, 1^0=1, 0^0=0,來比較每一位,如果每一位都相等的話,兩個字符串肯定相等,最後存儲累計異或值的變量equal必定為 0,否則為 1。

再細想一下呢?

for (i <- Array.range(0, a.length)) {
  if (a(i) ^ b(i) != 0// or a(i) != b[i]
    return false
}

我們常常講性能優化,從效率角度上講,難道不是應該只要中途發現某一位的結果不同了(即為1)就可以立即返回兩個字符串不相等了嗎?(如上所示)

這其中肯定有……

再再細想一下呢?

結合方法名稱 safeEquals 可能知道些眉目,與安全有關。

本文開篇的代碼來自playframewok 里用來驗證cookie(session)中的數據是否合法(包含簽名的驗證),也是石頭寫這篇文章的由來。

以前知道通過延遲計算等手段來提高效率的手段,但這種已經算出結果卻延遲返回的,還是頭一回!

我們來看看,JDK 中也有類似的方法,如下代碼摘自 java.security.MessageDigest

public static boolean isEqual(byte[] digesta, byte[] digestb) {
   if (digesta == digestb) return true;
   if (digesta == null || digestb == null) {
       return false;
   }
   if (digesta.length != digestb.length) {
       return false;
   }

   int result = 0;
   // time-constant comparison
   for (int i = 0; i < digesta.length; i++) {
       result |= digesta[i] ^ digestb[i];
   }
   return result == 0;
}

看註釋知道了,目的是為了用常量時間複雜度進行比較。

但這個計算過程耗費的時間不是常量有啥風險? (腦海里響起了背景音樂:“小朋友,你是否有很多問號?”)

真相大白

再深入探索和了解了一下,原來這麼做是為了防止計時攻擊(Timing Attack)。(也有人翻譯成時序攻擊​)​

計時攻擊(Timing Attack)

計時攻擊是邊信道攻擊(或稱”側信道攻擊”, Side Channel Attack, 簡稱SCA) 的一種,邊信道攻擊是一種針對軟件或硬件設計缺陷,走“歪門邪道”的一種攻擊方式。

這種攻擊方式是通過功耗、時序、電磁泄漏等方式達到破解目的。在很多物理隔絕的環境中,往往也能出奇制勝,這類新型攻擊的有效性遠高於傳統的密碼分析的數學方法(某百科上說的)。

這種手段可以讓調用 safeEquals("abcdefghijklmn", "xbcdefghijklmn") (只有首位不相同)和調用 safeEquals("abcdefghijklmn", "abcdefghijklmn") (兩個完全相同的字符串)的所耗費的時間一樣。防止通過大量的改變輸入並通過統計運行時間來暴力破解出要比較的字符串。

舉個,如果用之前說的“高效”的方式來實現的話。假設某個用戶設置了密碼為 password,通過從a到z(實際範圍可能更廣)不斷枚舉第一位,最終統計發現 p0000000 的運行時間比其他從任意a~z的都長(因為要到第二位才能發現不同,其他非 p 開頭的字符串第一位不同就直接返回了),這樣就能猜測出用戶密碼的第一位很可能是p了,然後再不斷一位一位迭代下去最終破解出用戶的密碼。

當然,以上是從理論角度分析,確實容易理解。但實際上好像通過統計運行時間總感覺不太靠譜,這個運行時間對環境太敏感了,比如網絡,內存,CPU負載等等都會影響。

但安全問題感覺更像是 “寧可信其有,不可信其無”。為了防止(特別是與簽名/密碼驗證等相關的操作)被 timing attack,目前各大語言都提供了相應的安全比較函數。各種軟件系統(例如 OpenSSL)、框架(例如 Play)的實現也都採用了這種方式。

例如 “世界上最好的編程語言”(粉絲較少,評論區應該打不起架來)—— php中的:

// Compares two strings using the same time whether they're equal or not.
// This function should be used to mitigate timing attacks; 
// for instance, when testing crypt() password hashes.
bool hash_equals ( string $known_string , string $user_string )

//This function is safe against timing attacks.
boolean password_verify ( string $password , string $hash )

其實各種語言版本的實現方式都與上面的版本差不多,將兩個字符串每一位取出來異或(^)並用或(|)保存,最後通過判斷結果是否為 0 來確定兩個字符串是否相等。

如果剛開始沒有用 safeEquals 去實現,後續的版本還會通過打補丁的方式去修復這樣的安全隱患。

例如 JDK 1.6.0_17 中的Release Notes[1]中就提到了MessageDigest.isEqual 中的bug的修復,如下圖所示:

MessageDigest timing attack vulnerabilities

大家可以看看這次變更的的詳細信息openjdk中的 bug fix diff[2]為:

MessageDigest.isEqual計時攻擊

Timing Attack 真的可行嗎?

我覺得各大語言的 API 都用這種實現,肯定還是有道理的,理論上應該可以被利用的。 這不,學術界的這篇論文就宣稱用這種計時攻擊的方法破解了 OpenSSL 0.9.7 的RSA加密算法了。關於 RSA 算法的介紹可以看看之前本人寫的這篇文章。

這篇Remote Timing Attacks are Practical[3] 論文中指出(我大致翻譯下摘要,感興趣的同學可以通過文末鏈接去看原文):

計時攻擊往往用於攻擊一些性能較弱的計算設備,例如一些智能卡。我們通過實驗發現,也能用於攻擊普通的軟件系統。本文通過實驗證明,通過這種計時攻擊方式能夠攻破一個基於 OpenSSL 的 web 服務器的私鑰。結果證明計時攻擊用於進行網絡攻擊在實踐中可行的,因此各大安全系統需要抵禦這種風險。

最後,本人畢竟不是專研完全方向,以上描述是基於本人的理解,如果有不對的地方,還請大家留言指出來。感謝。

補充說明2:感謝正在閱讀文章的你,讓我還有動力繼續堅持更新原創。

本人發文不多,但希望寫的文章能達到的目的是:佔用你的閱讀時間,就盡量能夠讓你有所收穫。

如果你覺得我的文章有所幫助,還請你幫忙轉發分享,另外請別忘了點擊公眾號右上角加個星標,好讓你別錯過後續的精彩文章(微信改版了,或許我發的文章都不能推送到你那了)。

​原創真心不易,希望你能幫我個小忙唄,如果本文內容你覺得有所啟發,有所收穫,請幫忙點個“在看”唄,或者轉發分享讓更多的小夥伴看到。 ​ 參考資料:

  • Timing Attacks on RSA: Revealing Your Secrets through the Fourth Dimension
  • Remote Timing Attacks are Practical

 

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※為什麼 USB CONNECTOR 是電子產業重要的元件?

網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

※台北網頁設計公司全省服務真心推薦

※想知道最厲害的網頁設計公司"嚨底家"!

※推薦評價好的iphone維修中心

SpringColud Eureka的服務註冊與發現

一、Eureka簡介

本文中所有代碼都會上傳到git上,請放心瀏覽
項目git地址:https://github.com/839022478/Spring-Cloud

在傳統應用中,組件之間的調用,通過有規範的約束的接口來實現,從而實現不同模塊間良好的協作。但是被拆分成微服務后,每個微服務實例的網絡地址都可能動態變化,數量也會變化,使得原來硬編碼的地址失去了作用。需要一个中心化的組件來進行服務的登記和管理,為了解決上面的問題,於是出現了服務治理,就是管理所有的服務信息和狀態,也就是我們所說的註冊中心

1.1 註冊中心

比如我們去做火車或者汽車,需要去買票乘車,只看我們有沒有票(有沒有服務),有就去買票(獲取註冊列表),然後乘車(調用),不用關心到底有多少車在運行

流程圖:

使用註冊中心,我們不需要關心有多少提供方,只管去調用就可以了,那麼註冊中心有哪些呢?

註冊中心:Eureka,Nacos,Consul,Zookeeper

本文中講解的是比較火熱的Spring Cloud微服務下的Eureka,Eureka是Netflix開發的服務發現框架,是一個RESTful風格的服務,是一個用於服務發現和註冊的基礎組件,是搭建Spring Cloud微服務的前提之一,它屏蔽了Server和client的交互細節,使得開發者將精力放到業務上。

服務註冊與發現主要包括兩個部分:服務端(Eureka Server)和客戶端(Eureka Client)

  • 服務端(Eureka Server): 一個公共服務,為Client提供服務註冊和發現的功能,維護註冊到自身的Client的相關信息,同時提供接口給Client獲取註冊表中其他服務的信息,使得動態變化的Client能夠進行服務間的相互調用。

  • 客戶端(Eureka Client): Client將自己的服務信息通過一定的方式登記到Server上,並在正常範圍內維護自己信息一致性,方便其他服務發現自己,同時可以通過Server獲取到自己依賴的其他服務信息,完成服務調用,還內置了負載均衡器,用來進行基本的負載均衡

Eureka GIt官網:https://github.com/Netflix/Eureka

1.3 服務註冊與發現

服務註冊與發現關係圖:

1.2 client功能和server功能

1.2.1 client功能

  1. 註冊:每個微服務啟動時,將自己的網絡地址等信息註冊到註冊中心,註冊中心會存儲(內存中)這些信息。
  2. 獲取服務註冊表:服務消費者從註冊中心,查詢服務提供者的網絡地址,並使用該地址調用服務提供者,為了避免每次都查註冊表信息,所以client會定時去server拉取註冊表信息到緩存到client本地。
  3. 心跳:各個微服務與註冊中心通過某種機制(心跳)通信,若註冊中心長時間和服務間沒有通信,就會註銷該實例。
  4. 調用:實際的服務調用,通過註冊表,解析服務名和具體地址的對應關係,找到具體服務的地址,進行實際調用。

1.2.2 server註冊中心功能

  1. 服務註冊表:記錄各個微服務信息,例如服務名稱,ip,端口等。
    註冊表提供 查詢API(查詢可用的微服務實例)和管理API(用於服務的註冊和註銷)。
  2. 服務註冊與發現:註冊:將微服務信息註冊到註冊中心。發現:查詢可用微服務列表及其網絡地址。
  3. 服務檢查:定時檢測已註冊的服務,如發現某實例長時間無法訪問,就從註冊表中移除。

二、Eureka單節點搭建

2.1 pom.xml

在有的教程中,會引入spring-boot-starter-web,這個依賴其實不用,因為spring-cloud-starter-netflix-eureka-server的依賴已經包含了它,在pom依賴進去,就可以了

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-netflix-eureka-server</artifactId>
</dependency>

2.2 application.yml

server:
  port: 8500
eureka:
  client:
    #是否將自己註冊到Eureka Server,默認為true,由於當前就是server,故而設置成false,表明該服務不會向eureka註冊自己的信息
    register-with-eureka: false
    #是否從eureka server獲取註冊信息,由於單節點,不需要同步其他節點數據,用false
    fetch-registry: false
    #設置服務註冊中心的URL,用於client和server端交流
    service-url:
      defaultZone: http://localhost:8080/eureka/

2.3 服務端啟動類

啟動類上添加此註解標識該服務為配置中心
@EnableEurekaServer

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.netflix.eureka.server.EnableEurekaServer;

@EnableEurekaServer
@SpringBootApplication
public class EurekaServerApplication {

    public static void main(String[] args) {
        SpringApplication.run(EurekaServerApplication.class, args);
    }

}

2.4 啟動

我們啟動EurekaDemoApplication ,然後在瀏覽器中輸入地址 http://localhost:8500/,就可以啟動我們的 Eureka 了,我們來看下效果,出現了這個畫面,就說明我們已經成功啟動~,只是此時我們的服務中是還沒有客戶端進行註冊

三、服務註冊

注意:在客戶端pom裏面我們需要加上spring-boot-starter-web,否則服務是無法正常啟動的

3.1 pom.xml

       <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
        </dependency>     

3.2 application.yml

#註冊中心
eureka:
  client:
    #設置服務註冊中心的URL
    service-url:
      defaultZone: http://localhost:8500/eureka/
  #服務名
  instance:
    appname: mxn

3.3 客戶端啟動類

在客戶端啟動類中我們需要加上 @EnableDiscoveryClient註解

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;

@EnableDiscoveryClient
@SpringBootApplication
public class EurekaClientApplication {

    public static void main(String[] args) {
        SpringApplication.run(EurekaClientApplication.class, args);
    }
}

3.4 查看效果

工程啟動后,刷新http://localhost:8500/頁面,我們可以發現服務註冊成功了

並且我們可以在idea日誌打印中看到DiscoveryClient_MXN/DESKTOP-5BQ3UK8 - registration status: 204,說明就是註冊成功了
Eureka Server與Eureka Client之間的聯繫主要通過心跳的方式實現。心跳(Heartbeat)即Eureka Client定時向Eureka Server彙報本服務實例當前的狀態,維護本服務實例在註冊表中租約的有效性。

Eureka Client將定時從Eureka Server中拉取註冊表中的信息,並將這些信息緩存到本地,用於服務發現

四、Eureka 端點

官網地址:https://github.com/Netflix/eureka/wiki/Eureka-REST-operations

Eureka服務器還提供了一個端點(eureka/apps/{applicaitonName})可以查看所註冊的服務詳細信息 。applicaitonName就是微服務的名稱,比如這裏我們訪問 http://localhost:8500/eureka/apps/mxn

五、Eureka 原理

5.1 本質

存儲了每個客戶端的註冊信息。EurekaClient從EurekaServer同步獲取服務註冊列表。通過一定的規則選擇一個服務進行調用

5.2 Eureka架構圖

  • 服務提供者: 是一個eureka client,向Eureka Server註冊和更新自己的信息,同時能從Eureka Server註冊表中獲取到其他服務的信息。
  • 服務註冊中心: 提供服務註冊和發現的功能。每個Eureka Cient向Eureka Server註冊自己的信息,也可以通過Eureka Server獲取到其他服務的信息達到發現和調用其他服務的目的。
  • 服務消費者: 是一個eureka client,通過Eureka Server獲取註冊到其上其他服務的信息,從而根據信息找到所需的服務發起遠程調用。
  • 同步複製: Eureka Server之間註冊表信息的同步複製,使Eureka Server集群中不同註冊表中服務實例信息保持一致。
  • 遠程調用: 服務客戶端之間的遠程調用。
  • 註冊: Client端向Server端註冊自身的元數據以供服務發現。
  • 續約: 通過發送心跳到Server以維持和更新註冊表中服務實例元數據的有效性。當在一定時長內,Server沒有收到Client的心跳信息,將默認服務下線,會把服務實例的信息從註冊表中刪除。
  • 下線: Client在關閉時主動向Server註銷服務實例元數據,這時Client的服務實例數據將從Server的註冊表中刪除。
  • 獲取註冊表: Client向Server請求註冊表信息,用於服務發現,從而發起服務間遠程調用。

5.3 Eureka自我保護

有時候我們會看到這樣的提示信息:EMERGENCY! EUREKA MAY BE INCORRECTLY CLAIMING INSTANCES ARE UP WHEN THEY'RE NOT. RENEWALS ARE LESSER THAN THRESHOLD AND HENCE THE INSTANCES ARE NOT BEING EXPIRED JUST TO BE SAFE.,這是因為默認情況下,Eureka Server在一定時間內,沒有接收到某個微服務心跳,會將某個微服務註銷(90S)。但是當網絡故障時,微服務與Server之間無法正常通信,上述行為就非常危險,因為微服務正常,不應該註銷,它的指導思想就是 寧可保留健康的和不健康的,也不盲目註銷任何健康的服務
我們也可以通過命令去關閉自我保護的功能:

eureka:
  server: 
    enable-self-preservation: false

那麼自我保護是如何觸發的呢?
自我保護機制的觸發條件是,當每分鐘心跳次數( renewsLastMin) 小於 numberOfRenewsPerMinThreshold時,並且開啟自動保護模式開關( eureka.server.enable-self-preservation = true) 時,觸發自我保護機制,不再自動過期租約
上面我們所有的小於 numberOfRenewsPerMinThreshold,到底是怎麼計算的呢,我們在eureka源碼中可以得知

numberOfRenewsPerMinThreshold = expectedNumberOfRenewsPerMin * 續租百分比(默認為0.85)
expectedNumberOfRenewsPerMin = 當前註冊的應用實例數 x 2
當前註冊的應用實例數 x 2 是因為,在默認情況下,註冊的應用實例每半分鐘續租一次,那麼一分鐘心跳兩次,因此 x 2

例如:我們有10個服務,期望每分鐘續約數:10 * 2=20,期望閾值:20*0.85=17,當少於17時,就會觸發自我保護機制

5.4 健康檢查

由於server和client通過心跳保持 服務狀態,而只有狀態為UP的服務才能被訪問。看eureka界面中的status。

比如心跳一直正常,服務一直UP,但是此服務DB(數據庫)連不上了,無法正常提供服務。

此時,我們需要將 微服務的健康狀態也同步到server。只需要啟動eureka的健康檢查就行。這樣微服務就會將自己的健康狀態同步到eureka。配置如下即可。

在client端配置:將自己的健康狀態傳播到server。

eureka:
  client:
    healthcheck:
      enabled: true

5.5 Eureka監聽事件

import com.netflix.appinfo.InstanceInfo;
import org.springframework.cloud.netflix.eureka.server.event.*;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;

import java.time.LocalDateTime;

@Component
public class CustomEvent {

    @EventListener
    public void listen(EurekaInstanceCanceledEvent event ) {
        System.out.println(LocalDateTime.now()+"服務下線事件:"+event.getAppName()+"---"+event.getServerId());
//發釘釘
    }

    @EventListener
    public void listen(EurekaInstanceRegisteredEvent event) {
        InstanceInfo instanceInfo = event.getInstanceInfo();
        System.out.println(LocalDateTime.now()+"服務上線事件:"+instanceInfo.getAppName()+"---"+instanceInfo.getInstanceId());
    }

    @EventListener
    public void listen(EurekaInstanceRenewedEvent event) {
        System.out.println(LocalDateTime.now()+"服務續約/心跳上報事件:"+event.getAppName()+"---"+event.getServerId());

    }

    @EventListener
    public void listen(EurekaRegistryAvailableEvent event) {
        System.out.println(LocalDateTime.now()+"註冊中心可用事件");
    }

    @EventListener
    public void listen(EurekaServerStartedEvent event) {
        System.out.println(LocalDateTime.now()+"註冊中心啟動事件");

    }
}

5.6 Renew: 服務續約

Eureka Client 會每隔 30 秒發送一次心跳來續約。 通過續約來告知 Eureka Server 該 Eureka Client 運行正常,沒有出現問題。 默認情況下,如果 Eureka Server 在 90 秒內沒有收到 Eureka Client 的續約,Server 端會將實例從其註冊表中刪除,此時間可配置,一般情況不建議更改。

5.6 服務剔除

如果Eureka Client在註冊后,既沒有續約,也沒有下線(服務崩潰或者網絡異常等原因),那麼服務的狀態就處於不可知的狀態,不能保證能夠從該服務實例中獲取到回饋,所以需要服務剔除此方法定時清理這些不穩定的服務,該方法會批量將註冊表中所有過期租約剔除,剔除是定時任務,默認60秒執行一次。延時60秒,間隔60秒

剔除的限制:
1.自我保護期間不清除。
2.分批次清除。

六、Eureka缺陷

由於集群間的同步複製是通過HTTP的方式進行,基於網絡的不可靠性,集群中的Eureka Server間的註冊表信息難免存在不同步的時間節點,不滿足CAP中的C(數據一致性)

七、總結

中間我們講解了eureka的節點搭建,以及原理,對於現在很火熱的微服務,我們對Eureka是非常有必要進行了解的,如果覺得文章對你有幫助,來個點贊支持吧,如果對文章有疑問或建議,歡迎討論留言,謝謝大家~

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

USB CONNECTOR掌控什麼技術要點? 帶您認識其相關發展及效能

台北網頁設計公司這麼多該如何選擇?

※智慧手機時代的來臨,RWD網頁設計為架站首選

※評比南投搬家公司費用收費行情懶人包大公開

※回頭車貨運收費標準

Express4.x之中間件與路由詳解及源碼分析,Express4.x之API:express,Express4.x之API:express

  • Application.use()
  • Application.router()
  • express核心源碼模擬

 一、express.use()

1.1app.use([path,] callback [, callback …])

通過語法結構可以看到Application.use()參數分別有以下幾種情形:

app.use(function(){...}); //給全局添加一个中間件
app.use(path,function(){...}); //給指定路由path添加一个中間件
app.use(function(){...}, function(){...}, ...); //給全局添加n个中間件
app.use(path,function(){...},function(){...}, ...); //給指定路由path添加n个中間件

關於path最簡單也是最常用的就是字符串類型(例:‘/abcd’);除了字符串Express還提供了模板和正則格式(例:’/abc?d‘, ‘/ab+cd‘, ‘/ab\*cd‘, ‘/a(bc)?d‘, ‘/\/abc|\/xyz/‘);除了單個的字符串和模板還可以將多個path作為一個數組的元素,然後將這個數組作為use的path,這樣就可以同時給多個路由添加中間件,詳細內容可以參考官方文檔:https://www.expressjs.com.cn/4x/api.html#path-examples。

關於callbakc多個或單个中間件程序這已經再語法結構中直觀的體現出來了,這裏重點來看看回調函數的參數:

app.use(function(req,res,next){...}); //必須提供的參數 
app.use(function(err,req,res,next){...}); //錯誤中間件需要在最前面添加一個錯誤參數

關於中間件的簡單應用:

let express = require('./express');
let app = express();
app.use('/',function(req,res,next){
    console.log("我是一個全局中間件");
    next(); //每个中間件的最末尾必須調用next
});

app.use('/',function(err,req,res,next){
    console.log("我是一個全局錯誤中間,當發生錯誤是調用")    
    console.error(err.stack);
    res.status(500).send('服務出錯誤了!');
    //由於這個錯誤處理直接響應了客戶端,可以不再調用next,當然後面還需要處理一些業務的話也是可以調用next的
});

1.2簡單的模擬Express源碼實現Appliction.use()以及各個請求方法的響應註冊方法(這裏個源碼模擬路由概念還比較模糊,所以使用請求方法的響應註冊API,而沒有使用路由描述):

 1 //文件結構
 2 express
 3     index.js
 4 //源碼模擬實現
 5 let http = require("http");
 6 let url = require('url');
 7 function createApplication(){
 8     //app是一個監聽函數
 9     let app = (req,res) =>{
10         //取出每一個層
11         //1.獲取請求的方法
12         let m = req.method.toLowerCase();
13         let {pathname} = url.parse(req.url,true);
14 
15         //通過next方法進行迭代
16         let index = 0;
17         function next(err){
18             //如果routes迭代完成還沒有找到,說明路徑不存在
19             if(index === app.routes.length) return res.end(`Cannot ${m} ${pathname}`);
20             let {method, path, handler} = app.routes[index++];//每次調用next就應該取下一個layer
21             if(err){
22                 if(handler.length === 4){
23                     handler(err,req,res,next);
24                 }else{
25                     next(err);
26                 }
27             }else{
28                 if(method === 'middle'){ //處理中間件
29                     if(path === '/' || path === pathname || pathname.startsWith(path+'/')){
30                         handler(req,res,next);
31                     }else{
32                         next();//如果這个中間件沒有匹配到,繼續通過next迭代路由容器routes
33                     }
34                 }else{ //處理路由
35                     if( (method === m || method ==='all') && (path === pathname || path === '*')){ //匹配請求方法和請求路徑(接口)
36                         handler(req,res);//匹配成功后執行的Callback
37                     }else{
38                         next();
39                     }
40                 }
41             }
42         }
43         next();
44     }
45     app.routes = [];//路由容器
46     app.use = function(path,handler){
47         if(typeof handler !== 'function'){
48             handler = path;
49             path = '/';
50         }
51         let layer = {
52             method:'middle', //method是middle就表示它是一个中間件
53             path,
54             handler
55         }
56         app.routes.push(layer);
57     }
58     app.all = function(path,handler){
59         let layer = {
60             method:'all',
61             path,
62             handler
63         }
64         app.routes.push(layer);
65     }
66     console.log(http.METHODS);
67     http.METHODS.forEach(method =>{
68         method = method.toLocaleLowerCase();
69         app[method] = function (path,handler){//批量生成各個請求方法的路由註冊方法
70             let layer = {
71                 method,
72                 path,
73                 handler
74             }
75             app.routes.push(layer);
76         }
77     });
78     //內置中間件,給req擴展path、qury屬性
79     app.use(function(req,res,next){
80         let {pathname,query} = url.parse(req.url,true);
81         let hostname = req.headers['host'].split(':')[0];
82         req.path = pathname;
83         req.query = query;
84         req.hostname = hostname;
85         next();
86     });
87     //通過app.listen調用http.createServer()掛在app(),啟動express服務
88     app.listen = function(){
89         let server = http.createServer(app);
90         server.listen(...arguments);
91     }
92     return app;
93 }
94 module.exports = createApplication;

View Code

測試模擬實現的Express:

 1 let express = require('./express');
 2 
 3 let app = express();
 4 
 5 app.use('/',function(req,res,next){
 6     console.log("我是一個全局中間件");
 7     next();
 8 });
 9 app.use('/user',function(req,res,next){
10     console.log("我是user接口的中間件");
11     next();
12 });
13 app.get('/name',function(req,res){
14     console.log(req.path,req.query,req.hostname);
15     res.end('zfpx');
16 });
17 app.post('/name',function(req,res){
18     res.end('post name');
19 });
20 
21 
22 app.all("*",function(req,res){
23     res.end('all');
24 });
25 
26 app.use(function(err,req,res,next){
27     console.log(err);
28     next();
29 });
30 
31 app.listen(12306);

View Code

在windows系統下測試請求:

 

 

關於源碼的構建詳細內容可以參考這個視頻教程:app.use()模擬構建視頻教程,前面就已經說明過這個模式實現僅僅是從表面的業務邏輯,雖然有一點底層的雛形,但與源碼還是相差甚遠,這一部分也僅僅只是想幫助理解Express採用最簡單的方式表現出來。

1.3如果你看過上面的源碼或自己也實現過,就會發現Express關於中間件的添加方式除了app.use()還有app.all()及app.METHOD()。在模擬源碼中我並未就use和all的差異做處理,都是採用了請求路徑絕對等於path,這種方式是all的特性,use的path實際表示為請求路徑的開頭:

app.use(path,callback):path表示請求路徑的開頭部分。

app.all(path,callback):paht表示完全等於請求路徑。

app.METHOD(path,callback):並不是真的有METHOD這個方法,而是指HTTP請求方法,實際上表示的是app.get()、app.post()、app.put()等方法,而有時候我們會將這些方法說成用來註冊路由,這是因為路由註冊的確使用這些方法,但同時這些方法也是可以用作中間的添加,這在前一篇博客中的功能解析中就有說明(Express4.x之API:express),詳細見過後面的路由解析就會更加明了。

 二、express.router()

2.1在實例化一個Application時會實例化一個express.router()實例並被添加到app._router屬性上,實際上這個app使用的use、all、METHOD時都是在底層調用了該Router實例上對應的方法,比如看下面這些示例:

 1 let express = require("express");
 2 let app = express();
 3 
 4 app._router.use(function(req,res,next){
 5     console.log("--app.router--");
 6     next();
 7 });
 8 
 9 app._router.post("/csJSON",function(req,res,next){
10     res.writeHead(200);
11     res.write(JSON.stringify(req.body));
12     res.end();
13 });
14 
15 app.listen(12306);

上面示例中的app._router.use、app._router.post分別同等與app.use、app.post,這裏到這裏也就說明了上一篇博客中的路由與Application的關係Express4.x之API:express。

2.2Express中的Router除了為Express.Application提供路由功能以外,Express也將它作為一個獨立的路由工具分離了出來,也就是說Router自身可以獨立作為一個應用,如果我們在實際應用中有相關業務有類似Express.Application的路由需求,可以直接實例化一個Router使用,應用的方式如下:

let express = require('/express');
let router = express.Router();
//這部分可以詳細參考官方文檔有詳細的介紹

2.3由於這篇博客主要是分析Express的中間件及路由的底層邏輯,所以就不在這裏詳細介紹某個模塊的應用,如果有時間我再寫一篇關於Router模塊的應用,這裏我直接上一份模擬Express路由的代碼以供參考:

文件結構:

express //根路徑
    index.js //express主入口文件
    application.js //express應用構造模塊
    router //路由路徑
        index.js //路由主入口文件
        layer.js //構造層的模塊
        route.js //子路由模塊

Express路由系統的邏輯結構圖:

模擬代碼(express核心源碼模擬):

1 //express主入口文件
2 let Application = require('./application.js');
3 
4 function createApplication(){
5     return new Application();
6 }
7 
8 module.exports = createApplication;

index.js //express主入口文件

 1 //用來創建應用app
 2 let http = require('http');
 3 let url = require('url');
 4 
 5 //導入路由系統模塊
 6 let Router = require('./router');
 7 
 8 const methods = http.METHODS;
 9 
10 //Application ---- express的應用系統
11 function Application(){
12     //創建一個內置的路由系統
13     this._router = new Router();
14 }
15 
16 //app.get ---- 實現路由註冊業務
17 // Application.prototype.get = function(path,...handlers){
18 //     this._router.get(path,'use',handlers);
19 // }
20 
21 methods.forEach(method => {
22     method = method.toLocaleLowerCase();
23     Application.prototype[method] = function(path,...handlers){
24         this._router[method](path,handlers);
25     }
26 });
27 
28 //app.use ---- 實現中間件註冊業務
29 //這裏模擬處理三種參數模式:
30 // -- 1個回調函數:callback
31 // -- 多個回調函數:[callback,] callback [,callback...]
32 // -- 指定路由的中間件:[path,] callback [,callback...]
33 // -- 注意源碼中可以處理這三種參數形式還可以處理上面數據的數組形式,以及其他Application(直接將其他app上的中間件添加到當前應用上)
34 Application.prototype.use = function(fn){
35     let path = '/';
36     let fns = [];
37     let arg = [].slice.call(arguments);
38     if(typeof fn !== 'function' && arg.length >= 2){
39         if(typeof arg[0] !== 'string'){
40             fns = arg;
41         }else{
42             path = arg[0];
43             fns = arg.slice(1);
44         }
45     }else{
46         fns = arg;
47     }
48     this._router.use(path,'use',fns);
49 }
50 
51 Application.prototype.all = function(fn){
52     let path = '/';
53     let fns = [];
54     let arg = [].slice.call(arguments);
55     if(typeof fn !== 'function' && arg.length >= 2){
56         if(typeof arg[0] !== 'string'){
57             fns = arg;
58         }else{
59             path = arg[0];
60             fns = arg.slice(1);
61         }
62       }else{
63         fns = arg;
64     }
65     this._router.use(path,'all',fns);
66 }
67 
68 //將http的listen方法封裝到Application的原型上
69 Application.prototype.listen = function(){
70     let server = http.createServer((req,res)=>{
71         //done 用於當路由無任何可匹配項時調用的處理函數
72         function done(){
73             res.end(`Cannot ${req.url} ${req.method}`);
74         }
75         this._router.handle(req,res,done); //調用路由系統的handle方法處理請求
76     });
77     server.listen(...arguments);
78 };
79 
80 module.exports = Application;

application.js //express應用構造模塊

 1 //express路由系統
 2 const Layer = require('./layer.js');
 3 const Route = require('./route.js');
 4 
 5 const http = require('http');
 6 const methods = http.METHODS;
 7 
 8 const url = require('url');
 9 
10 
11 //路由對象構造函數
12 function Router(){
13     this.stack = [];
14 }
15 
16 //router.route ---- 用於創建子路由對象route與主路由上層(layer)的關係
17 //並將主路由上的層緩存到路由對象的stack容器中,該層建立路徑與子路由處理請求的關係
18 Router.prototype.route = function(path){
19     let route = new Route();
20     let layer = new Layer(path,route.dispatch.bind(route));
21     this.stack.push(layer);
22     return route;
23 }
24 
25 //router.get ---- 實現路由註冊
26 //實際上這個方法調用router.route方法分別創建一個主路由系統層、一個子路由系統,並建立兩者之間的關係,詳細見Router.prototype.route
27 //然後獲取子路由系統對象,並將回調函數和請求方法註冊在這個子路由系統上
28 
29 
30 // Router.prototype.get = function(path,handlers){
31 //     let route = this.route(path);
32 //     route.get(handlers);
33 // }
34 
35 methods.forEach(method =>{ 
36     method = method.toLocaleLowerCase();
37     //注意下面這個方法會出現內存泄漏問題,有待改進
38     Router.prototype[method] = function(path, handlers){
39         let route = this.route(path);
40         route[method](handlers);
41     }
42 });
43 
44 //router.use ---- 實現中間件註冊(按照路由開頭的路徑匹配,即相對路由匹配)
45 Router.prototype.use = function(path,routerType,fns){
46     let router = this;
47     fns.forEach(function(fn){
48         let layer = new Layer(path,fn);
49         layer.middle = true; //標記這個層為相對路由中間件
50         layer.routerType = routerType;
51         router.stack.push(layer);
52     });
53 }
54 
55 //調用路由處理請求
56 Router.prototype.handle = function(req,res,out){
57     let {pathname} = url.parse(req.url);
58     let index = 0;
59     let next = () => {
60         if(index >= this.stack.length) return out();
61         let layer = this.stack[index++];
62         if(layer.middle && (layer.path === '/' || pathname === layer.path || pathname.startsWith(layer.path + '/'))){
63             //處理中間件
64             if(layer.routerType === 'use'){
65                 layer.handle_request(req,res,next);
66             }else if(layer.routerType === 'all' && layer.path === pathname){
67                 layer.handle_request(req,res,next);
68             }else{
69                 next();
70             }
71         }else if(layer.match(pathname)){
72             //處理響應--更準確的說是處理具體請求方法上的中間件或響應
73             layer.handle_request(req,res,next);
74         }else{
75             next();
76         }
77     }
78     next();
79 }
80 
81 module.exports = Router;

index.js //路由主入口文件

 1 //Layer的構造函數
 2 function Layer(path,handler){
 3     this.path = path; //當前層的路徑
 4     this.handler = handler;  //當前層的回調函數
 5 }
 6 
 7 //判斷請求方法與當前層的方法是否一致
 8 Layer.prototype.match = function(pathname){
 9     return this.path === pathname;
10 }
11 
12 //調用當前層的回調函數handler
13 Layer.prototype.handle_request = function(req,res,next){
14     this.handler(req,res,next);
15 }
16 
17 module.exports = Layer;

layer.js //構造層的模塊

 1 //Layer的構造函數
 2 function Layer(path,handler){
 3     this.path = path; //當前層的路徑
 4     this.handler = handler;  //當前層的回調函數
 5 }
 6 
 7 //判斷請求方法與當前層的方法是否一致
 8 Layer.prototype.match = function(pathname){
 9     return this.path === pathname;
10 }
11 
12 //調用當前層的回調函數handler
13 Layer.prototype.handle_request = function(req,res,next){
14     this.handler(req,res,next);
15 }
16 
17 module.exports = Layer;

route.js //子路由模塊

測試代碼:

 1 let express = require('./express');
 2 let app = express();
 3 
 4 
 5 app.use('/name',function(req,res,next){
 6     console.log('use1');
 7     next();
 8 });
 9 app.use(function(req,res,next){
10     console.log('use2-1');
11     next();
12 },function(req,res,next){
13     console.log('use2-2');
14     next();
15 });
16 
17 app.all('/name',function(req,res,next){
18     console.log('all-1');
19     next();
20 });
21 app.all('/name/app',function(req,res,next){
22     console.log('all-2');
23     next();
24 });
25 
26 app.get('/name/app',function(req,res){
27     res.end(req.url);
28 });
29 // console.log(app._router.stack);
30 app.listen(12306);

View Code

測試結果:

 

 

 

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

網頁設計公司推薦不同的風格,搶佔消費者視覺第一線

※想知道購買電動車哪裡補助最多?台中電動車補助資訊懶人包彙整

南投搬家公司費用,距離,噸數怎麼算?達人教你簡易估價知識!

※教你寫出一流的銷售文案?

※超省錢租車方案

Kubernetes-subpath的使用

一、什麼是subpath

為了支持單一個pod多次使用同一個volume而設計,subpath翻譯過來是子路徑的意思,如果是數據卷掛載在容器,指的是存儲卷目錄的子路徑,如果是配置項configMap/Secret,則指的是掛載在容器的子路徑。

 

二、subpath的使用場景

1、 1個pod中可以拉起多個容器,有時候希望將不同容器的路徑掛載在存儲卷volume的子路徑,這個時候需要用到subpath

2、volume支持將configMap/Secret掛載在容器的路徑,但是會覆蓋掉容器路徑下原有的文件,如何支持選定configMap/Secret的每個key-value掛載在容器中,且不會覆蓋掉原目錄下的文件,這個時候也可以用到subpath

 

三、subpath的使用

1、存儲卷

    採用hostpath的方式創建PV,宿主機的映射目錄為/data/pod/volume5

[root@k8s-master zhanglei]# cat pv-subpath.yaml 
kind: PersistentVolume
apiVersion: v1
metadata:
  name: pv-subpath-05
  labels:
    release: stable
spec:
  capacity:
    storage: 0.1Gi
  accessModes:
    - ReadWriteOnce
  persistentVolumeReclaimPolicy: Recycle
  hostPath:
    path: /data/pod/volume5                 # 宿主機的目錄

[root@k8s-master zhanglei]# kubectl create -f pv-subpath.yaml  

PV創建成功后,再創建PVC

[root@k8s-master zhanglei]# cat pvc-subpath.yaml 
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: pvc-subpath
  namespace: default
spec:
 accessModes: ["ReadWriteOnce"]
 resources:
   requests: 
     storage: 0.05Gi
[root@k8s-master zhanglei]# kubectl create -f pvc-subpath.yaml

在pod中聲明並使用subpath

[root@k8s-master zhanglei]# cat pod-subpath.yaml 
apiVersion: v1
kind: Pod
metadata:
  name: pod-subpath-zltest
spec:
    containers:
    - name: ubuntu-subpath-container
      image: ubuntu
      volumeMounts:
      - mountPath: /var/lib/ubuntu            # 容器1的掛載目錄
        name: subpath-vol
        subPath: ubuntutest                   # 宿主機volume5的子目錄1
    - name: nginx-subpath-container
      image: nginx
      volumeMounts:
      - mountPath: /var/www/nginx             # 容器2的掛載目錄
        name: subpath-vol
        subPath: nginxtest                   # 宿主機volume5的子目錄2 
    volumes:
    - name: subpath-vol
      persistentVolumeClaim:
        claimName: pvc-subpath               # PVC的名字

  [root@k8s-master zhanglei]# kubectl create -f pod-subpath.yaml

[root@k8s-master zhanglei]# kubectl describe pod  pod-subpath-zltest 
Name:         pod-subpath-zltest
Namespace:    default
Priority:     0
Node:         k8s-master/192.168.126.129
Start Time:   Fri, 29 May 2020 16:45:49 +0800
Labels:       <none>
Annotations:  cni.projectcalico.org/podIP: 10.122.235.235/32
              cni.projectcalico.org/podIPs: 10.122.235.235/32
Status:       Running
IP:           10.122.235.235
IPs:
  IP:  10.122.235.235
Containers:
  ubuntu-subpath-container:
    Container ID:   docker://6e5cb30ee7e03b77d2ca22e4cd818ff326fa40836427fe17b1584646b4388dce
    Image:          ubuntu
    Image ID:       docker-pullable://ubuntu@sha256:747d2dbbaaee995098c9792d99bd333c6783ce56150d1b11e333bbceed5c54d7
    Port:           <none>
    Host Port:      <none>
    State:          Waiting
      Reason:       CrashLoopBackOff
    Last State:     Terminated
      Reason:       Completed
      Exit Code:    0
      Started:      Sun, 14 Jun 2020 22:38:11 +0800
      Finished:     Sun, 14 Jun 2020 22:38:11 +0800
    Ready:          False
    Restart Count:  558
    Environment:    <none>
    Mounts:
      /var/lib/ubuntu from subpath-vol (rw,path="ubuntutest")
      /var/run/secrets/kubernetes.io/serviceaccount from default-token-74s86 (ro)
  nginx-subpath-container:
    Container ID:   docker://95101741eb1b6aa4c1e53d8fc4ab8006e74fd2eb923eca211ca20a01edcd7630
    Image:          nginx
    Image ID:       docker-pullable://nginx@sha256:30dfa439718a17baafefadf16c5e7c9d0a1cde97b4fd84f63b69e13513be7097
    Port:           <none>
    Host Port:      <none>
    State:          Running
      Started:      Fri, 29 May 2020 16:47:14 +0800
    Ready:          True
    Restart Count:  0
    Environment:    <none>
    Mounts:
      /var/run/secrets/kubernetes.io/serviceaccount from default-token-74s86 (ro)
      /var/www/nginx from subpath-vol (rw,path="nginxtest")
Conditions:
  Type              Status
  Initialized       True 
  Ready             False 
  ContainersReady   False 
  PodScheduled      True 
Volumes:
  subpath-vol:
    Type:       PersistentVolumeClaim (a reference to a PersistentVolumeClaim in the same namespace)
    ClaimName:  pvc-subpath
    ReadOnly:   false
  default-token-74s86:
    Type:        Secret (a volume populated by a Secret)
    SecretName:  default-token-74s86
    Optional:    false
QoS Class:       BestEffort
Node-Selectors:  <none>
Tolerations:     node.kubernetes.io/not-ready:NoExecute for 300s
                 node.kubernetes.io/unreachable:NoExecute for 300s
Events:
  Type     Reason   Age                    From                 Message
  ----     ------   ----                   ----                 -------
  Normal   Pulled   21m (x555 over 16d)    kubelet, k8s-master  Successfully pulled image "ubuntu"
  Normal   Created  21m (x555 over 16d)    kubelet, k8s-master  Created container ubuntu-subpath-container
  Normal   Started  21m (x555 over 16d)    kubelet, k8s-master  Started container ubuntu-subpath-container
  Normal   Pulling  6m10s (x562 over 16d)  kubelet, k8s-master  Pulling image "ubuntu"
  Warning  BackOff  71s (x11744 over 16d)  kubelet, k8s-master  Back-off restarting failed container

現在來驗證下在宿主機存儲卷的目錄下是否有2個子目錄,1個是ubuntutest用來掛載容器1的,另外1個是nginxtest用來掛載容器2的

[root@k8s-master /]# cd data/pod/volume5
[root@k8s-master volume5]# ls
nginxtest ubuntutest
[root@k8s-master volume5]# cd nginxtest/     # 可以看到是1個目錄,非文件
[root@k8s-master nginxtest]#

進入到容器中,掛載一個文件,驗證是否可以同步到存儲卷

[root@k8s-master nginxtest]# kubectl exec -it pod-subpath-zltest -c nginx-subpath-container -- bash
root@pod-subpath-zltest:/# cd /var/www/nginx
root@pod-subpath-zltest:/var/www/nginx# ls
nginx-test-subpath.txt
[root@k8s-master volume5]# cd nginxtest/
[root@k8s-master nginxtest]# ls
nginx-test-subpath.txt

可以看到容器1的目錄/var/www/nginx 和存儲卷的子目錄 nginxtest完成了映射,容器2類似,這裏不再贅述。

2、配置項-configMap

1)創建configMap

[root@k8s-master consecret]# cat conf-subpath.yaml 
apiVersion: v1
kind: ConfigMap
metadata:
  name: conf-subpath-zltest
  namespace: default
data:
  example.property.1: hello      # key-value鍵值對
  example.property.2: world
  example.property.file: |-
    property.1=value-1
    property.2=value-2
    property.3=value-3

2)在Pod中使用configMap

[root@k8s-master consecret]# cat pod-conf-subpath.yaml 
apiVersion: v1
kind: Pod
metadata:
  labels:
    purpose: test-configmap-volume
  name: pod-conf-testvolume
spec:
  containers:
    - name: test-configmap-volume
      image: nginx
      volumeMounts:
        - name: config-volume
          mountPath: /etc/nginx/example.property.1       # 容器掛載目錄
          subPath: example.property.1                    # 將key名稱作為文件名,hello作為文件內容
  volumes:
    - name: config-volume
      configMap:
         name: conf-subpath-zltest      # 指定使用哪個CM
        
[root@k8s-master consecret]# kubectl create -f pod-conf-subpath.yaml 
[root@k8s-master consecret]# kubectl describe pod  pod-conf-testvolume 
Name:         pod-conf-testvolume
Namespace:    default
Priority:     0
Node:         k8s-master/192.168.126.129
Start Time:   Wed, 03 Jun 2020 11:46:36 +0800
Labels:       purpose=test-configmap-volume
Annotations:  cni.projectcalico.org/podIP: 10.122.235.249/32
              cni.projectcalico.org/podIPs: 10.122.235.249/32
Status:       Running
IP:           10.122.235.249
IPs:
  IP:  10.122.235.249
Containers:
  test-configmap-volume:
    Container ID:   docker://e2cf37cb24af32023eb5d22389545c3468104a4344c47363b5330addc40cb914
    Image:          nginx
    Image ID:       docker-pullable://nginx@sha256:883874c218a6c71640579ae54e6952398757ec65702f4c8ba7675655156fcca6
    Port:           <none>
    Host Port:      <none>
    State:          Running
      Started:      Wed, 03 Jun 2020 11:46:53 +0800
    Ready:          True
    Restart Count:  0
    Environment:    <none>
    Mounts:
      /etc/nginx/example.property.1 from config-volume (rw,path="example.property.1")  
      /var/run/secrets/kubernetes.io/serviceaccount from default-token-74s86 (ro)
Conditions:
  Type              Status
  Initialized       True 
  Ready             True 
  ContainersReady   True 
  PodScheduled      True 
Volumes:
  config-volume:
    Type:      ConfigMap (a volume populated by a ConfigMap)
    Name:      conf-subpath-zltest
    Optional:  false
  default-token-74s86:
    Type:        Secret (a volume populated by a Secret)
    SecretName:  default-token-74s86
    Optional:    false
QoS Class:       BestEffort
Node-Selectors:  <none>
Tolerations:     node.kubernetes.io/not-ready:NoExecute for 300s
                 node.kubernetes.io/unreachable:NoExecute for 300s
Events:          <none>

在容器掛載路徑驗證下是否將configMap中example.property.1掛載在容器中,且是否會覆蓋掉原有的目錄

root@pod-conf-testvolume:/# cd  /etc/nginx 
root@pod-conf-testvolume:/etc/nginx# ls
conf.d            fastcgi_params  koi-win    modules     scgi_params   win-utf
example.property.1  koi-utf        mime.types    nginx.conf  uwsgi_params

從上可以看到example.property.1已經掛載到容器中,且未對目錄原有的文件進行覆蓋

root@pod-conf-testvolume:/etc/nginx# cd example.property.1 
bash: cd: example.property.1: Not a directory
root@pod-conf-testvolume:/etc/nginx# cat example.property.1 helloroot@pod-conf-testvolume:/etc/nginx# 

從上可以驗證configMap的subpath用法支持將configMap中的每對key-value以key名稱作為文件名,value作為文件內容掛載到容器的目錄中。

四、總結

本文介紹了subpath分別在持久化存儲卷和配置項configMap中的使用,豐富了volume在pod中的使用場景。

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

網頁設計公司推薦不同的風格,搶佔消費者視覺第一線

※Google地圖已可更新顯示潭子電動車充電站設置地點!!

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

※別再煩惱如何寫文案,掌握八大原則!

網頁設計最專業,超強功能平台可客製化

線上服務的FGC問題排查,看這篇就夠了!

線上服務的GC問題,是Java程序非常典型的一類問題,非常考驗工程師排查問題的能力。同時,幾乎是面試必考題,但是能真正答好此題的人並不多,要麼原理沒吃透,要麼缺乏實戰經驗。

過去半年時間里,我們的廣告系統出現了多次和GC相關的線上問題,有Full GC過於頻繁的,有Young GC耗時過長的,這些問題帶來的影響是:GC過程中的程序卡頓,進一步導致服務超時從而影響到廣告收入。

這篇文章,我將以一個FGC頻繁的線上案例作為引子,詳細介紹下GC的排查過程,另外會結合GC的運行原理給出一份實踐指南,希望對你有所幫助。內容分成以下3個部分:

1、從一次FGC頻繁的線上案例說起

2、GC的運行原理介紹

3、排查FGC問題的實踐指南

01 從一次FGC頻繁的線上案例說起

去年10月份,我們的廣告召回系統在程序上線后收到了FGC頻繁的系統告警,通過下面的監控圖可以看到:平均每35分鐘就進行了一次FGC。而程序上線前,我們的FGC頻次大概是2天一次。下面,詳細介紹下該問題的排查過程。

1. 檢查JVM配置

通過以下命令查看JVM的啟動參數:
ps aux | grep “applicationName=adsearch”

-Xms4g -Xmx4g -Xmn2g -Xss1024K
-XX:ParallelGCThreads=5
-XX:+UseConcMarkSweepGC
-XX:+UseParNewGC
-XX:+UseCMSCompactAtFullCollection
-XX:CMSInitiatingOccupancyFraction=80

可以看到堆內存為4G,新生代為2G,老年代也為2G,新生代採用ParNew收集器,老年代採用併發標記清除的CMS收集器,當老年代的內存佔用率達到80%時會進行FGC。

進一步通過 jmap -heap 7276 | head -n20 可以得知新生代的Eden區為1.6G,S0和S1區均為0.2G。

2. 觀察老年代的內存變化

通過觀察老年代的使用情況,可以看到:每次FGC后,內存都能回到500M左右,因此我們排除了內存泄漏的情況。

3. 通過jmap命令查看堆內存中的對象

通過命令 jmap -histo 7276 | head -n20

上圖中,按照對象所佔內存大小排序,显示了存活對象的實例數、所佔內存、類名。可以看到排名第一的是:int[],而且所佔內存大小遠遠超過其他存活對象。至此,我們將懷疑目標鎖定在了 int[] .

4. 進一步dump堆內存文件進行分析

鎖定 int[] 后,我們打算dump堆內存文件,通過可視化工具進一步跟蹤對象的來源。考慮堆轉儲過程中會暫停程序,因此我們先從服務管理平台摘掉了此節點,然後通過以下命令dump堆內存:

jmap -dump:format=b,file=heap 7276

通過JVisualVM工具導入dump出來的堆內存文件,同樣可以看到各個對象所佔空間,其中int[]佔到了50%以上的內存,進一步往下便可以找到 int[] 所屬的業務對象,發現它來自於架構團隊提供的codis基礎組件。

5. 通過代碼分析可疑對象

通過代碼分析,codis基礎組件每分鐘會生成約40M大小的int數組,用於統計TP99 和 TP90,數組的生命周期是一分鐘。而根據第2步觀察老年代的內存變化時,發現老年代的內存基本上也是每分鐘增加40多M,因此推斷:這40M的int數組應該是從新生代晉陞到老年代。

我們進一步查看了YGC的頻次監控,通過下圖可以看到大概1分鐘有8次左右的YGC,這樣基本驗證了我們的推斷:因為CMS收集器默認的分代年齡是6次,即YGC 6次后還存活的對象就會晉陞到老年代,而codis組件中的大數組生命周期是1分鐘,剛好滿足這個要求。

至此,整個排查過程基本結束了,那為什麼程序上線前沒出現此問題呢?通過上圖可以看到:程序上線前YGC的頻次在5次左右,此次上線后YGC頻次變成了8次左右,從而引發了此問題。

6. 解決方案

為了快速解決問題,我們將CMS收集器的分代年齡改成了15次,改完后FGC頻次恢復到了2天一次,後續如果YGC的頻次超過每分鐘15次還會再次觸發此問題。當然,我們最根本的解決方案是:優化程序以降低YGC的頻率,同時縮短codis組件中int數組的生命周期,這裏就不做展開了。

02 GC的運行原理介紹

上面整個案例的分析過程中,其實涉及到很多GC的原理知識,如果不懂得這些原理就着手處理,其實整個排查過程是很抓瞎的。

這裏,我選擇幾個最核心的知識點,展開介紹下GC的運行原理,最後再給出一份實踐指南。

1. 堆內存結構

大家都知道: GC分為YGC和FGC,它們均發生在JVM的堆內存上。先來看下JDK8的堆內存結構:

可以看到,堆內存採用了分代結構,包括新生代和老年代。新生代又分為:Eden區,From Survivor區(簡稱S0),To Survivor區(簡稱S1區),三者的默認比例為8:1:1。另外,新生代和老年代的默認比例為1:2。

堆內存之所以採用分代結構,是考慮到絕大部分對象都是短生命周期的,這樣不同生命周期的對象可放在不同的區域中,然後針對新生代和老年代採用不同的垃圾回收算法,從而使得GC效率最高。

2. YGC是什麼時候觸發的?

大多數情況下,對象直接在年輕代中的Eden區進行分配,如果Eden區域沒有足夠的空間,那麼就會觸發YGC(Minor GC),YGC處理的區域只有新生代。因為大部分對象在短時間內都是可收回掉的,因此YGC后只有極少數的對象能存活下來,而被移動到S0區(採用的是複製算法)。

當觸發下一次YGC時,會將Eden區和S0區的存活對象移動到S1區,同時清空Eden區和S0區。當再次觸發YGC時,這時候處理的區域就變成了Eden區和S1區(即S0和S1進行角色交換)。每經過一次YGC,存活對象的年齡就會加1。

3. FGC又是什麼時候觸發的?

下面4種情況,對象會進入到老年代中:

1、YGC時,To Survivor區不足以存放存活的對象,對象會直接進入到老年代。

2、經過多次YGC后,如果存活對象的年齡達到了設定閾值,則會晉陞到老年代中。

3、動態年齡判定規則,To Survivor區中相同年齡的對象,如果其大小之和佔到了 To Survivor區一半以上的空間,那麼大於此年齡的對象會直接進入老年代,而不需要達到默認的分代年齡。

4、大對象:由-XX:PretenureSizeThreshold啟動參數控制,若對象大小大於此值,就會繞過新生代, 直接在老年代中分配。

當晉陞到老年代的對象大於了老年代的剩餘空間時,就會觸發FGC(Major GC),FGC處理的區域同時包括新生代和老年代。除此之外,還有以下4種情況也會觸發FGC:

1、老年代的內存使用率達到了一定閾值(可通過參數調整),直接觸發FGC。

2、空間分配擔保:在YGC之前,會先檢查老年代最大可用的連續空間是否大於新生代所有對象的總空間。如果小於,說明YGC是不安全的,則會查看參數 HandlePromotionFailure 是否被設置成了允許擔保失敗,如果不允許則直接觸發Full GC;如果允許,那麼會進一步檢查老年代最大可用的連續空間是否大於歷次晉陞到老年代對象的平均大小,如果小於也會觸發 Full GC。

3、Metaspace(元空間)在空間不足時會進行擴容,當擴容到了-XX:MetaspaceSize 參數的指定值時,也會觸發FGC。

4、System.gc() 或者Runtime.gc() 被顯式調用時,觸發FGC。

4. 在什麼情況下,GC會對程序產生影響?

不管YGC還是FGC,都會造成一定程度的程序卡頓(即Stop The World問題:GC線程開始工作,其他工作線程被掛起),即使採用ParNew、CMS或者G1這些更先進的垃圾回收算法,也只是在減少卡頓時間,而並不能完全消除卡頓。

那到底什麼情況下,GC會對程序產生影響呢?根據嚴重程度從高到底,我認為包括以下4種情況:

1、FGC過於頻繁:FGC通常是比較慢的,少則幾百毫秒,多則幾秒,正常情況FGC每隔幾個小時甚至幾天才執行一次,對系統的影響還能接受。但是,一旦出現FGC頻繁(比如幾十分鐘就會執行一次),這種肯定是存在問題的,它會導致工作線程頻繁被停止,讓系統看起來一直有卡頓現象,也會使得程序的整體性能變差。

2、YGC耗時過長:一般來說,YGC的總耗時在幾十或者上百毫秒是比較正常的,雖然會引起系統卡頓幾毫秒或者幾十毫秒,這種情況幾乎對用戶無感知,對程序的影響可以忽略不計。但是如果YGC耗時達到了1秒甚至幾秒(都快趕上FGC的耗時了),那卡頓時間就會增大,加上YGC本身比較頻繁,就會導致比較多的服務超時問題。

3、FGC耗時過長:FGC耗時增加,卡頓時間也會隨之增加,尤其對於高併發服務,可能導致FGC期間比較多的超時問題,可用性降低,這種也需要關注。

4、YGC過於頻繁:即使YGC不會引起服務超時,但是YGC過於頻繁也會降低服務的整體性能,對於高併發服務也是需要關注的。

其中,「FGC過於頻繁」和「YGC耗時過長」,這兩種情況屬於比較典型的GC問題,大概率會對程序的服務質量產生影響。剩餘兩種情況的嚴重程度低一些,但是對於高併發或者高可用的程序也需要關注。

03 排查FGC問題的實踐指南

通過上面的案例分析以及理論介紹,再總結下FGC問題的排查思路,作為一份實踐指南供大家參考。

1. 清楚從程序角度,有哪些原因導致FGC?

1、大對象:系統一次性加載了過多數據到內存中(比如SQL查詢未做分頁),導致大對象進入了老年代。

2、內存泄漏:頻繁創建了大量對象,但是無法被回收(比如IO對象使用完后未調用close方法釋放資源),先引發FGC,最後導致OOM.

3、程序頻繁生成一些長生命周期的對象,當這些對象的存活年齡超過分代年齡時便會進入老年代,最後引發FGC. (即本文中的案例)

4、程序BUG導致動態生成了很多新類,使得 Metaspace 不斷被佔用,先引發FGC,最後導致OOM.

5、代碼中顯式調用了gc方法,包括自己的代碼甚至框架中的代碼。

6、JVM參數設置問題:包括總內存大小、新生代和老年代的大小、Eden區和S區的大小、元空間大小、垃圾回收算法等等。

2. 清楚排查問題時能使用哪些工具

1、公司的監控系統:大部分公司都會有,可全方位監控JVM的各項指標。

2、JDK的自帶工具,包括jmap、jstat等常用命令:

查看堆內存各區域的使用率以及GC情況
jstat -gcutil -h20 pid 1000

查看堆內存中的存活對象,並按空間排序
jmap -histo pid | head -n20

dump堆內存文件
jmap -dump:format=b,file=heap pid

3、可視化的堆內存分析工具:JVisualVM、MAT等

3. 排查指南

1、查看監控,以了解出現問題的時間點以及當前FGC的頻率(可對比正常情況看頻率是否正常)

2、了解該時間點之前有沒有程序上線、基礎組件升級等情況。

3、了解JVM的參數設置,包括:堆空間各個區域的大小設置,新生代和老年代分別採用了哪些垃圾收集器,然後分析JVM參數設置是否合理。

4、再對步驟1中列出的可能原因做排除法,其中元空間被打滿、內存泄漏、代碼顯式調用gc方法比較容易排查。

5、針對大對象或者長生命周期對象導致的FGC,可通過 jmap -histo 命令並結合dump堆內存文件作進一步分析,需要先定位到可疑對象。

6、通過可疑對象定位到具體代碼再次分析,這時候要結合GC原理和JVM參數設置,弄清楚可疑對象是否滿足了進入到老年代的條件才能下結論。

04 最後的話

這篇文章通過線上案例並結合GC原理詳細介紹了FGC的排查過程,同時給出了一份實踐指南。

後續會以類似的方式,再分享一個YGC耗時過長的案例,希望能幫助大家吃透GC問題排查,如果覺得本文對你有幫助,請大家關注我的個人公眾號!

– End –

作者簡介:程序員,985碩士,前亞馬遜Java工程師,現58轉轉技術總監。持續分享技術和管理方向的文章。如果感興趣,可微信掃描下面的二維碼關注我的公眾號:『IT人的職場進階』

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※帶您來了解什麼是 USB CONNECTOR  ?

※自行創業缺乏曝光? 網頁設計幫您第一時間規劃公司的形象門面

※如何讓商品強力曝光呢? 網頁設計公司幫您建置最吸引人的網站,提高曝光率!

※綠能、環保無空污,成為電動車最新代名詞,目前市場使用率逐漸普及化

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

※教你寫出一流的銷售文案?

3dTiles 數據規範詳解[1] 介紹

版權:轉載請帶原地址。https://www.cnblogs.com/onsummer/p/12799366.html @秋意正寒

Web中的三維

html5和webgl技術使得瀏覽器三維變成了可能。

巧婦難為無米之炊,三維數據(三維模型)是三維可視化重要的一環,事實上就是:三維數據眾多,行業跨界廣。

參考資料:http://www.bgteach.com/article/132

three.js的各種加載器實現了大部分通用三維格式的加載,屏蔽了格式不同的數據結構差異。

然而,這樣還是不能滿足日益增長的效果需求,比如場景一大,模型文件體積變大,解析所耗費的時間越來越長。

webgl,包括所有gpu有關的圖形渲染編程,幾乎只認這樣的三維數據:頂點、頂點顏色、頂點法線、着色語言…

所以,三維圖形界的通用格式:glTF應運而生,它面向終點,它按照圖形編程所需的格式來存儲數據,藉以二進制編碼提高傳輸速度。

它不再使用面向對象的思維存儲三維模型、貼圖紋理,而是按顯卡的思維存儲,存的是頂點、法線、頂點顏色等最基礎的信息,只不過組織結構上進行了精心的設計。

它面向終點,就意味着可編輯性差,因為渲染性能的提高犧牲了可編輯性,它不再像3ds、dae甚至是max、skp一樣容易編輯和轉換。

事實上,大多數三維軟件提供了glTF格式的轉換,或多一步,或一步到位。

地理真三維

早年,地理的三維還處於地形三維上,即数字高程模型(DEM)提供地表的高度拉伸。柵格高程數據、等高線、不規則三角網等均是数字高程模型的具體案例。
下圖是不規則三角網,也即所謂的三角面片(圖形渲染中很常見):

隨着學科的融合、計算機技術和硬件的更新換代,使得有模型、有細節的真三維融入到GIS中成為了可能,或者說,計算機技術和硬件的升級,給GIS以更廣闊的視角觀察世界。

cesium.js 號稱是 webgl 封裝的三維地理庫,是支持 gltf 模型的加載的。

面對大規模精細三維數據的加載,還要照顧到GIS的各種坐標系統、分析計算,gltf這種單個模型的方案顯得力不從心。

2016年,Cesium 團隊借鑒傳統2DGIS的地圖規範——WMTS,借鑒圖形學中的層次細節模型,打造出大規模的三維數據標準—— 3d-Tiles,中文譯名:三維瓦片。

它在模型上利用了 gltf 渲染快的特點,對大規模的三維數據進行組織,包括層次細節模型、模型的屬性數據、模型的層級數據等。

3dTiles的設計思想

3dTiles 繼承了 gltf 的優點:貼合圖形渲染 API 的邏輯,討 GPU 喜愛,webgl 對其內部組織起來的三維模型數據,不需要轉換,可以直接渲染(glTF 的功勞)。

關於 glTF 是如何嵌入到 3dTiles 中的,開篇不談,後續精講。

我們區分一組概念:規範和實現。

3dTiles 是一種規範,在規範的指導下,各種資源文件可以是獨立存在於硬盤中的目錄、文件,也可以以二進制形式寫入數據庫中。目前,3dTiles 的官方實現只有 “散列文件”,也就是文件、文件夾的形式存儲在硬盤中,有關如何存儲到數據庫中的討論,官方仍在進行中(截至發博客)。

glTF 也是一種規範,它的數據文件不一定就是後綴名為 .gltf 的文件,也不一定只有一個文件(glTF 的文件還可以是二進制文件、紋理貼圖文件等,扯遠了哈)。
在本文,會嚴格指明是數據還是數據標準,如果我說的是 “XXX文件(例如 Bird.glb 文件)” ,那就是在指特定的文件。

3dTiles還有一個特點:那就是不記錄模型數據,只記錄各級“Tile”的邏輯關係,以及“Tile”自己的屬性信息。所謂的模型數據,是指三維模型的頂點、貼圖材質、法線、顏色等信息。邏輯關係是指,各級Tile是如何在空間中保持連續的,LOD是如何組織的。屬性信息就很簡單啦,門有門的生產商,窗戶有窗戶的使用年限等,往大了說,建築還有它自己的壽命、法人、施工單位等屬性信息。

3dTiles的特點總結如下:

  • 三維模型使用了 glTF 規範,繼承它的渲染高性能
  • 除了嵌入的 glTF,3dTiles 自己 只記錄各級Tile的空間邏輯關係(如何構成整個3dtiles)和屬性信息,以及模型與屬性如何掛接在一起的信息

我覺得你還是雲里霧裡的,下一節將展示3dTiles具體數據,說說3dTiles的組織結構,說說3dTiles中的”Tile”,也就是“三維瓦片數據”中的“瓦片”是什麼。

3dTiles系列博客最終目錄:

01 引入與博客目錄:3dTiles 數據規範詳解

02 Tileset與Tile

03 內嵌在瓦片文件中的兩大數據表

04.1 B3dm 類型

04.2 I3dm 類型

04.3 Pnts 類型

04.4 Cmpt 類型

04.5 未發布的瓦片規範

05 3dTiles強大的擴展能力

06 優缺點

07 與I3S比較

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※為什麼 USB CONNECTOR 是電子產業重要的元件?

網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

※台北網頁設計公司全省服務真心推薦

※想知道最厲害的網頁設計公司"嚨底家"!

※推薦評價好的iphone維修中心

併發系列(一)——線程池源碼(ThreadPoolExecutor類)簡析

前言

  本文主要是結合源碼去線程池執行任務的過程,基於JDK 11,整個過程基本與JDK 8相同。

  個人水平有限,文中若有表達有誤的,歡迎大夥留言指出,謝謝了!

一、線程池簡介

  1.1 使用線程池的優點

    1)通過復用已創建的線程,降低資源的消耗(線程的創建/銷毀是要消耗資源的)、提高響應速度;

    2)管理線程的個數,線程的個數在初始化線程池的時候指定;

    3)統一管理線程,比如停止,stop()方法;

  1.2 線程池執行任務過程

    線程池執行任務的過程如下圖所示,主要分為以下4步,其中參數的含義會在後面詳細講解:

    1)判斷工作的線程是否小於核心線程數據(workerCountOf(c) < corePoolSize),若小於則會新建一個線程去執行任務,這一步僅僅的是根據線程個數決定;

    2)若核心線程池滿了,就會判斷線程池的狀態,若是running狀態,則嘗試加入任務隊列,若加入成功后還會做一些事情,後面詳細說;

    3)若任務隊列滿了,則加入失敗,此時會判斷整個線程池線程是否滿,若沒有則創建非核心線程執行任務;

    4)若線程池滿了,則根據拒絕測試處理無法執行的任務;

    整體過程如下圖:

二、ThreadPoolExecutor類解析

  2.1 ThreadPoolExecutor的構造函數

    ThreadPoolExecutor類一共提供了4個構造函數,涉及5~7個參數,下面就5個必備參數的構造函數進行說明:

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }

    1)corePoolSize :初始化核心線程池中線程個數的大小;

    2)maxmumPoolSize:線程池中線程大小;

    3)keepAliveTime:非核心線程的超時時長;

      非核心線程空閑時常大於該值就會被終止。

    4)unit :keepAliveTime的單位,類型可以參見TimeUnit類;

    5)BlockingQueue workQueue:阻塞隊列,維護等待執行的任務;

  2.2  私有類Worker

    在ThreadPoolExecutor類中有兩個集合類型比較重要,一個是用於放置等待任務的workQueue,其類型是阻塞對列;一個是用於用於存放工作線程的works,其是Set類型,其中存放的類型是Worker。

    進一步簡化線程池執行過程,可以理解為works中的工作線程不停的去阻塞對列中取任務,執行結束,線程重新加入大works中。

    為此,有必要簡單了解一下Work類型的組成。

private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        /** Thread this worker is running in.  Null if factory fails. */
        //工作線程,由線程的工廠類初始化
        final Thread thread;
        /** Initial task to run.  Possibly null. */
        Runnable firstTask;
        /** Per-thread task counter */
        volatile long completedTasks;
        //不可重入的鎖
        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        .......
    }

    Worker類繼承於隊列同步器(AbstractQueueSynchronizer),隊列同步器是採取鎖或其他同步組件的基礎框架,其主要結構是自旋獲取鎖的同步隊列和等待喚醒的等待隊列,其方法因此可以分為兩類:對state改變的方法 和 入、出隊列的方法,即獲取獲取鎖的資格的變化(可能描述的不準確)。關於隊列同步器後續博客會詳細分析,此處不展開討論。

    Work類中通過CAS設置狀態失敗后直接返回false,而不是判斷當前線程是否已獲取鎖來實現不可重入的鎖,源碼註釋中解釋這樣做的原因是因為避免work tash重新獲取到控制線程池全局的方法,如setCorePoolSize。

  2.3  拒絕策略類

    ThreadPoolExecutor的拒絕策略類是以私有類的方式實現的,有四種策略:

    1)AbortPolicy:丟棄任務並拋出RejectedExecutionException異常(默認拒絕處理策略)。

      2)DiscardPolicy:拋棄新來的任務,但是不拋出異常。

      3)DiscardOldestPolicy:拋棄等待隊列頭部(最舊的)的任務,然後重新嘗試執行程序(失敗則會重複此過程)。

      4)CallerRunsPolicy:由調用線程處理該任務。

    其代碼相對簡單,可以參考源碼。

三、任務執行過程分析

  3.1 execute(Runnable)方法

    execute(Runnable)方法的整體過程如上文1.2所述,其實現方式如下:

public void execute(Runnable command) {
        //執行的任務為空,直接拋出異常
        if (command == null)
            throw new NullPointerException();
        //ctl是ThreadPoolExecutor中很關鍵的一個AtomicInteger,主線程池的控制狀態
        int c = ctl.get();
        //1、判斷是否小於核心線程池的大小,若是則直接嘗試新建一個work線程
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        //2、大於核心線程池的大小或新建work失敗(如創建thread失敗),會先判斷線程池是否是running狀態,若是則加入阻塞對列
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            //重新驗證線程池是否為running,若否,則嘗試從對列中刪除,成功后執行拒絕策略
            if (! isRunning(recheck) && remove(command))
                reject(command);
            //若線程池的狀態為shutdown則,嘗試去執行完阻塞對列中的任務
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        //3、新建非核心線程去執行任務,若失敗,則採取拒絕策略
        else if (!addWorker(command, false))
            reject(command);
    }

  3.2 addWorker(Runnable,boole)方法

    execute(Runnable)方法中,新建(非)核心線程執行任務主要是通過addWorker方法實現的,其執行過程如下:

private boolean addWorker(Runnable firstTask, boolean core) {
        //此處反覆檢查線程池的狀態以及工作線程是否超過給定的值
        retry:
        for (int c = ctl.get();;) {
            // Check if queue empty only if necessary.
            if (runStateAtLeast(c, SHUTDOWN)
                && (runStateAtLeast(c, STOP)
                    || firstTask != null
                    || workQueue.isEmpty()))
                return false;

            for (;;) {
            //核心和非核心線程的區別
                if (workerCountOf(c)
                    >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateAtLeast(c, SHUTDOWN))
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            //通過工廠方法初始化,可能失敗,即可能為null
            final Thread t = w.thread;
            if (t != null) {
            //獲取全局鎖
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int c = ctl.get();
                    //線程池處於running狀態
                    //或shutdown狀態但無需要執行的task,個人理解為用於去阻塞隊列中取任務執行
                    if (isRunning(c) ||
                        (runStateLessThan(c, STOP) && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    //執行任務,這裡會執行thread的firstTask獲取阻塞對列中取任務
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
            //開始失敗,則會從workers中刪除新建的work,work數量減1,嘗試關閉線程池,這些過程會獲取全局鎖
                addWorkerFailed(w);
        }
        return workerStarted;
    }

  3.3  runWorker(this) 方法

     在3.2 中當新建的worker線程加入在workers中成功后,就會啟動對應任務,其調用的是Worker類中的run()方法,即調用runWorker(this)方法,其過程如下:

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
        //while()循環中,前者是新建線程執行firstTask,對應線程個數小於核心線程和阻塞隊列滿的情況,
        //getTask()則是從阻塞對列中取任務執行
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                //僅線程池狀態為stop時,線程響應中斷,這裏也就解釋了調用shutdown時,正在工作的線程會繼續工作
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    try {
                    //執行任務
                        task.run();
                        afterExecute(task, null);
                    } catch (Throwable ex) {
                        afterExecute(task, ex);
                        throw ex;
                    }
                } finally {
                    task = null;
                    //完成的個數+1
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            //處理後續工作
            processWorkerExit(w, completedAbruptly);
        }
    }

   3.4 processWorkerExit(Worker,boole)方法

    當任務執行結果后,在滿足一定條件下會新增一個worker線程,代碼如下:

private void processWorkerExit(Worker w, boolean completedAbruptly) {
        if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
            decrementWorkerCount();

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            completedTaskCount += w.completedTasks;
            //對工作線程的增減需要加全局鎖
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }
        //嘗試終止線程池
        tryTerminate();

        int c = ctl.get();
        if (runStateLessThan(c, STOP)) {
        //線程不是中斷,會維持最小的個數
            if (!completedAbruptly) {
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            //執行完任務后,線程重新加入workers中
            addWorker(null, false);
        }
    }

  至此,線程池執行任務的過程分析結束,其他方法的實現過程可以參考源碼。

 

Ref:

[1]http://concurrent.redspider.group/article/03/12.html

[2]《Java併發編程的藝術》

 

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

USB CONNECTOR掌控什麼技術要點? 帶您認識其相關發展及效能

台北網頁設計公司這麼多該如何選擇?

※智慧手機時代的來臨,RWD網頁設計為架站首選

※評比南投搬家公司費用收費行情懶人包大公開

※回頭車貨運收費標準

Dart Memo for Android Developers

Dart Memo for Android Developers

Dart語言一些語法特點和編程規範.

本文適合: 日常使用Kotlin, 突然想寫個Flutter程序的Android程序員.

Dart語言

完整的請看A tour of the Dart language

  • 創建對象可以不用new. -> 並且規範不讓用new, lint會報錯.
  • 聲明變量可以用var, 也可以用具體類型如String. 不變量用final, 常量用const.
  • 沒有訪問修飾符, 用_來表示私有: 文件級別.
  • 字符串可以用單引號'.
  • 語句結尾要用;.
  • 創建數組可以用: var list = [1, 2, 3];.
  • assert()常用來斷定開發時不可能會出現的情況.
  • 空測試操作符: ??.
  • 過濾操作符: where.
  • 兩個點..表示鏈式調用.
  • dynamic說明類型未指定.
  • 除了throw異常, 還可以throw別的東西, 比如字符串.

函數

  • 函數返回值在函數最開頭, 可以不標. -> 但是規範會建議標註返回值.
bool isNoble(int atomicNumber) {
  return _nobleGases[atomicNumber] != null;
}
  • =>箭頭符號, 用來簡化一句話的方法.
bool isNoble(int atomicNumber) => _nobleGases[atomicNumber] != null;

構造函數

  • 構造函數{}表示帶名字, 參數可選, 若要必選加上@required.
const Scrollbar({Key key, @required Widget child})
  • 構造函數名可以是ClassName或者ClassName.identifier.
  • 空構造函數體可以省略, 用;結尾就行:
class Point {
  double x, y;
  Point(this.x, this.y);
}

這裡會初始化相應的變量, 也不用聲明具體的參數類型.

  • factory構造, 可以用來返回緩存實例, 或者返回類型的子類:
factory Logger(String name) {
    return _cache.putIfAbsent(name, () => Logger._internal(name));
}

異步代碼

Future<String> lookUpVersion() async => '1.0.0';

Future checkVersion() async {
  var version = await lookUpVersion();
  // Do something with version
}

編程規範類

完整的規範在這裏: Effective Dart.

有一些Good和Bad的舉例, 這裏僅列出比較常用的幾項.

  • 文件名要蛇形命名: lowercase_with_underscores. 類名: UpperCamelCase.
  • 對自己程序的文件, 兩種import都可以(package開頭或者相對路徑), 但是要保持一致.
  • Flutter程序嵌套比較多, 要用結尾的,來幫助格式化.

本文緣由

年初的時候學了一陣子Flutter, 寫了各種大小demo. 結果隔了兩個月之後, 突然心血來潮想寫個小東西, 打開Android Studio, 首先發現創建Flutter程序的按鈕都不見了. (估計是Android Studio4.0升級之後Flutter的插件沒跟上).

接着用命令行創建了工程, 打開之後稍微整理了一下心情, 然後就….懵逼了.

突然不知道如何下手.
宏觀的東西還記得, 要用什麼package, 基本常用的幾個Widget都是啥, 但是微觀的, 忘了函數和數組都是咋定義的了.
這種懵逼的狀態令我很憤怒, 果然是上年紀了嗎, 無縫切換個語言都不行.

於是就想着還是寫個備忘錄吧.

References

  • A tour of the Dart language
  • Effective Dart

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

網頁設計公司推薦不同的風格,搶佔消費者視覺第一線

※想知道購買電動車哪裡補助最多?台中電動車補助資訊懶人包彙整

南投搬家公司費用,距離,噸數怎麼算?達人教你簡易估價知識!

※教你寫出一流的銷售文案?

※超省錢租車方案

【asp.net core 系列】9 實戰之 UnitOfWork以及自定義代碼生成

0. 前言

在前一篇中我們創建了一個基於EF的數據查詢接口實現基類,這一篇我將帶領大家講一下為這EF補充一些功能,並且提供一個解決避免寫大量配置類的方案。

1. SaveChanges的外移

在之前介紹EF Core的時候,我們提到過使用EF需要在每次使用之後,調用一次SaveChanges將數據提交給數據庫。在實際開發中,我們不能添加一條數據或者做一次修改就調用一次SaveChanges,這完全不現實。因為每次調用SaveChanges是EF向數據庫提交變更的時候,所以EF推薦的是每次執行完用戶的請求之後統一提交數據給數據庫。

這樣就會造成一個問題,可能也不是問題:我們需要一個接口來管理EF 的SaveChanges操作。

1.1 創建一個IUnitOfWork接口

通常我們會在Domain項目中添加一個IUnitOfWork接口,這個接口有一個方法就是SaveChanges,代碼如下:

namespace Domain.Insfrastructure
{
    public interface IUnitOfWork
    {
        void SaveChanges();
    }
}

這個方法的意思表示到執行該方法的時候,一個完整的工作流程執行完成了。也就是說,當執行該方法后,當前請求不會再與數據庫發生連接。

1.2 實現IUnitOfWork接口

在 Domain.Implement中添加IUnitOfWork實現類:

using Domain.Insfrastructure;
using Microsoft.EntityFrameworkCore;

namespace Domain.Implements.Insfrastructure
{
    public class UnitOfWork: IUnitOfWork
    {
        private DbContext DbContext;
        public UnitOfWork(DbContext context)
        {
            DbContext = context;
        }

        public void SaveChanges()
        {
            DbContext.SaveChanges();
        }
    }
}

1.3 調用時機

到現在我們已經創建了一個UnitOfWork的方法,那麼問題來了,我們該在什麼時候調用呢,或者說如何調用呢?

我的建議是創建一個ActionFilter,針對所有的控制器進行SaveChanges進行處理。當然了,也可以在控制器中持有一個IUnitOfWork的示例,然後在Action結束的時候,執行SaveChanges。不過這樣存在一個問題,可能會存在遺漏的方法。所以我推薦這樣操作,這裏簡單演示一下如何創建攔截器:

在Web的根目錄下,創建一個Filters目錄,這個目錄里用來存儲一些過濾器,創建我們需要的過濾器:

using Domain.Insfrastructure;
using Microsoft.AspNetCore.Mvc.Filters;

namespace Web.Filters
{
    public class UnitOfWorkFilterAttribute : ActionFilterAttribute
    {
        public IUnitOfWork UnitOfWork;

        public override void OnActionExecuted(ActionExecutedContext context)
        {
            UnitOfWork.SaveChanges();
        }
    }
}

使用一個ActionFilter可以很方便的解決一些容易遺漏但又必須執行的代碼。這裏就先不介紹如何配置Filter的啟用和詳細介紹了,請允許我賣個關子。當然了,有些小夥伴肯定也能猜到這是一個Attribute類,所以可以按照Attribute給Controller打標記。

2. 創建一個簡單的代碼生成方法

之前在介紹EF的時候,有個小夥伴跟我說,還要寫配置文件啊,太麻煩了。是的,之前我介紹了很多關於寫配置文件不使用特性的好處,但不解決這個問題就無法真正體檢配置類的好處。

雖然說,EF Core約定優先,但是如果默認約定的話,得在DBContext中聲明 DbSet<T> 來聲明這個字段,實體類少的話,比較簡單。如果多個數據表的話,就會非常麻煩。

所以這時候就要使用工具類, 那麼簡單的分析一下,這個工具類需要有哪些功能:

  • 第一步,找到實體類並解析出實體類的類名
  • 第二步,生成配置文件
  • 第三步,創建對應的Repository接口和實現類

很簡單的三步,但是難點就是找實體類並解析出實體類名。

在Util項目中添加一個Develop目錄,並創建Develop類:

namespace Utils.Develop
{
    public class Develop
    {
        
    }
}

定位當前類所在目錄,通過

Directory.GetCurrentDirectory()

這個方法可以獲取當前執行的DLL所在目錄,當然不同的編譯器在執行的時候,會有微妙的不同。所以我們需要以此為根據然後獲取項目的根目錄,一個簡單的方法,查找*.sln 所在目錄:

public static string CurrentDirect
{
    get
    {
        var execute = Directory.GetCurrentDirectory();
        var parent = Directory.GetParent(execute);
        while(parent.GetFiles("*.sln",SearchOption.TopDirectoryOnly).Length == 0)
        {
            parent = parent.Parent;
            if(parent == null)
            {
                return null;
            }
        }
        return parent.FullName;
    }
}

2.1 獲取實體類

那麼獲取到根目錄之後,我們下一步就是獲取實體類。因為我們的實體類都要求是繼承BaseEntity或者命名空間都是位於Data.Models下面。當然這個名稱都是根據實際業務場景約束的,這裏只是以當前項目舉例。那麼,我們可以通過以下方法找到我們設置的實體類:

public static Type[] LoadEntities()
{
    var assembly = Assembly.Load("Data");
    var allTypes = assembly.GetTypes();
    var ofNamespace = allTypes.Where(t => t.Namespace == "Data.Models" || t.Namespace.StartsWith("Data.Models."));
    var subTypes = allTypes.Where(t => t.BaseType.Name == "BaseEntity`1");
    return ofNamespace.Union(subTypes).ToArray();
}

通過 Assembly加載Data的程序集,然後選擇出符合我們要求的實體類。

2.2 編寫Repository接口

我們先約定Model的Repository接口定義在 Domain/Repository目錄下,所以它們的命名空間應該是:

namespace Domain.Repository	
{
}

假設目錄情況與Data/Models下面的代碼結構保持一致,然後生成代碼應該如下:

public static void CreateRepositoryInterface(Type type)
{
    var targetNamespace = type.Namespace.Replace("Data.Models", "");
    if (targetNamespace.StartsWith("."))
    {
        targetNamespace = targetNamespace.Remove(0);
    }
    var targetDir = Path.Combine(new[]{CurrentDirect,"Domain", "Repository"}.Concat(
        targetNamespace.Split('.')).ToArray());
    if (!Directory.Exists(targetDir))
    {
        Directory.CreateDirectory(targetDir);
    }

    var baseName = type.Name.Replace("Entity","");

    if (!string.IsNullOrEmpty(targetNamespace))
    {
        targetNamespace = $".{targetNamespace}";
    }
    var file = $"using {type.Namespace};\r\n"
        + $"using Domain.Insfrastructure;\r\n"
        + $"namespace Domain.Repository{targetNamespace}\r\n"
        + "{\r\n"
        + $"\tpublic interface I{baseName}ModifyRepository : IModifyRepository<{type.Name}>\r\n" +
        "\t{\r\n\t}\r\n"
        + $"\tpublic interface I{baseName}SearchRepository : ISearchRepository<{type.Name}>\r\n" +
        "\t{\r\n\t}\r\n}";

    File.WriteAllText(Path.Combine(targetDir, $"{baseName}Repository.cs"), file);
}

2.3 編寫Repository的實現類

因為我們提供了一個基類,所以我們在生成方法的時候,推薦繼承這個類,那麼實現方法應該如下:

public static void CreateRepositoryImplement(Type type)
{
    var targetNamespace = type.Namespace.Replace("Data.Models", "");
    if (targetNamespace.StartsWith("."))
    {
        targetNamespace = targetNamespace.Remove(0);
    }

    var targetDir = Path.Combine(new[] {CurrentDirect, "Domain.Implements", "Repository"}.Concat(
        targetNamespace.Split('.')).ToArray());
    if (!Directory.Exists(targetDir))
    {
        Directory.CreateDirectory(targetDir);
    }
    var baseName = type.Name.Replace("Entity", "");
    if (!string.IsNullOrEmpty(targetNamespace))
    {
        targetNamespace = $".{targetNamespace}";
    }

    var file = $"using {type.Namespace};" +
        $"\r\nusing Domain.Implements.Insfrastructure;" +
        $"\r\nusing Domain.Repository{targetNamespace};" +
        $"\r\nusing Microsoft.EntityFrameworkCore;" +
        $"namespace Domain.Implements.Repository{targetNamespace}\r\n" +
        "{" +
        $"\r\n\tpublic class {baseName}Repository :BaseRepository<{type.Name}> ,I{baseName}ModifyRepository,I{baseName}SearchRepository " +
        "\r\n\t{" +
        $"\r\n\t\tpublic {baseName}Repository(DbContext context) : base(context)"+
        "\r\n\t\t{"+
        "\r\n\t\t}\r\n"+
        "\t}\r\n}";
    File.WriteAllText(Path.Combine(targetDir, $"{baseName}Repository.cs"), file);
}

2.4 配置文件的生成

仔細觀察一下代碼,可以發現整體都是十分簡單的。所以這篇就不掩飾如何生成配置文件了,小夥伴們可以自行嘗試一下哦。具體實現可以等一下篇哦。

3. 總結

這一篇初略的介紹了兩個用來輔助EF Core實現的方法或類,這在開發中很重要。UnitOfWork用來確保一次請求一個工作流程,簡單的代碼生成類讓我們能讓我們忽略那些繁重的創建同類代碼的工作。

更多內容煩請關注我的博客《高先生小屋》

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

網頁設計公司推薦不同的風格,搶佔消費者視覺第一線

※Google地圖已可更新顯示潭子電動車充電站設置地點!!

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

※別再煩惱如何寫文案,掌握八大原則!

網頁設計最專業,超強功能平台可客製化

曹工說Redis源碼(8)–面試時,redis 內存淘汰總被問,但是總答不好

文章導航

Redis源碼系列的初衷,是幫助我們更好地理解Redis,更懂Redis,而怎麼才能懂,光看是不夠的,建議跟着下面的這一篇,把環境搭建起來,後續可以自己閱讀源碼,或者跟着我這邊一起閱讀。由於我用c也是好幾年以前了,些許錯誤在所難免,希望讀者能不吝指出。

曹工說Redis源碼(1)– redis debug環境搭建,使用clion,達到和調試java一樣的效果

曹工說Redis源碼(2)– redis server 啟動過程解析及簡單c語言基礎知識補充

曹工說Redis源碼(3)– redis server 啟動過程完整解析(中)

曹工說Redis源碼(4)– 通過redis server源碼來理解 listen 函數中的 backlog 參數

曹工說Redis源碼(5)– redis server 啟動過程解析,以及EventLoop每次處理事件前的前置工作解析(下)

曹工說Redis源碼(6)– redis server 主循環大體流程解析

曹工說Redis源碼(7)– redis server 的周期執行任務,到底要做些啥

什麼是內存淘汰

內存淘汰,和平時我們設置redis key的過期時間,不是一回事;內存淘汰是說,假設我們限定redis只能使用8g內存,現在已經使用了這麼多了(包括設置了過期時間的key和沒設過期時間的key),那,後續的set操作,還怎麼辦呢?

是不是只能報錯了?

那不行啊,不科學吧,因為有的key,可能已經很久沒人用了,可能以後也不會再用到了,那我們是不是可以把這類key給幹掉呢?

幹掉key的過程,就是內存淘汰。

內存淘汰什麼時候啟用

當我們在配置文件里設置了如下屬性時:

# maxmemory <bytes>

默認,該屬性是被註釋掉的。

其實,這個配置項的註釋,相當有價值,我們來看看:

# Don't use more memory than the specified amount of bytes.
# When the memory limit is reached Redis will try to remove keys
# according to the eviction policy selected (see maxmemory-policy).
#
# If Redis can't remove keys according to the policy, or if the policy is
# set to 'noeviction', Redis will start to reply with errors to commands
# that would use more memory, like SET, LPUSH, and so on, and will continue
# to reply to read-only commands like GET.
#
# This option is usually useful when using Redis as an LRU cache, or to set
# a hard memory limit for an instance (using the 'noeviction' policy).
#
# WARNING: If you have slaves attached to an instance with maxmemory on,
# the size of the output buffers needed to feed the slaves are subtracted
# from the used memory count, so that network problems / resyncs will
# not trigger a loop where keys are evicted, and in turn the output
# buffer of slaves is full with DELs of keys evicted triggering the deletion
# of more keys, and so forth until the database is completely emptied.
#
# In short... if you have slaves attached it is suggested that you set a lower
# limit for maxmemory so that there is some free RAM on the system for slave
# output buffers (but this is not needed if the policy is 'noeviction').
#
# maxmemory <bytes>

渣翻譯如下:

不能使用超過指定數量bytes的內存。當該內存限制被達到時,redis會根據過期策略(eviction policy,通過參數 maxmemory-policy來指定)來驅逐key。

如果redis根據指定的策略,或者策略被設置為“noeviction”,redis會開始針對如下這種命令,回復錯誤。什麼命令呢?會使用更多內存的那類命令,比如set、lpush;只讀命令還是不受影響,可以正常響應。

該選項通常在redis使用LRU緩存時有用,或者在使用noeviction策略時,設置一個進程級別的內存limit。

內存淘汰策略

所謂策略,意思是,當我們要刪除部分key的時候,刪哪些,不刪哪些?是不是需要一個策略?比如是隨機刪,就像滅霸一樣?還是按照lru時間來刪,lru的策略意思就是,最近最少使用的key,將被優先刪除。

總之,我們需要定一個規則。

redis默認支持以下策略:

# MAXMEMORY POLICY: how Redis will select what to remove when maxmemory
# is reached. You can select among five behaviors:
# 
# volatile-lru -> remove the key with an expire set using an LRU algorithm
# allkeys-lru -> remove any key accordingly to the LRU algorithm
# volatile-random -> remove a random key with an expire set
# allkeys-random -> remove a random key, any key
# volatile-ttl -> remove the key with the nearest expire time (minor TTL)
# noeviction -> don't expire at all, just return an error on write operations
# 
# Note: with any of the above policies, Redis will return an error on write
#       operations, when there are not suitable keys for eviction.
#
#       At the date of writing this commands are: set setnx setex append
#       incr decr rpush lpush rpushx lpushx linsert lset rpoplpush sadd
#       sinter sinterstore sunion sunionstore sdiff sdiffstore zadd zincrby
#       zunionstore zinterstore hset hsetnx hmset hincrby incrby decrby
#       getset mset msetnx exec sort
#
# The default is:
#
# maxmemory-policy noeviction
maxmemory-policy allkeys-lru
針對設置了過期時間的,使用lru算法
# volatile-lru -> remove the key with an expire set using an LRU algorithm

針對全部key,使用lru算法
# allkeys-lru -> remove any key accordingly to the LRU algorithm

針對設置了過期時間的,隨機刪
# volatile-random -> remove a random key with an expire set

針對全部key,隨機刪
# allkeys-random -> remove a random key, any key

針對設置了過期時間的,馬上要過期的,刪掉
# volatile-ttl -> remove the key with the nearest expire time (minor TTL)

不過期,不能寫了,就報錯
# noeviction -> don't expire at all, just return an error on write operations

一般呢,我們會設置為:

allkeys-lru,即,針對全部key,進行lru。

源碼實現

配置讀取

在如下結構體中,定義了如下字段:

struct redisServer {
	...
	unsigned long long maxmemory;   /* Max number of memory bytes to use */
    int maxmemory_policy;           /* Policy for key eviction */
    int maxmemory_samples;          /* Pricision of random sampling */
    ...
}

當我們在配置文件中,進入如下配置時,該結構體中幾個字段的值如下:

maxmemory 3mb
maxmemory-policy allkeys-lru
# maxmemory-samples 5  這個取了默認值

maxmemory_policy為3,是因為枚舉值為3:

#define REDIS_MAXMEMORY_VOLATILE_LRU 0
#define REDIS_MAXMEMORY_VOLATILE_TTL 1
#define REDIS_MAXMEMORY_VOLATILE_RANDOM 2
#define REDIS_MAXMEMORY_ALLKEYS_LRU 3
#define REDIS_MAXMEMORY_ALLKEYS_RANDOM 4
#define REDIS_MAXMEMORY_NO_EVICTION 5
#define REDIS_DEFAULT_MAXMEMORY_POLICY REDIS_MAXMEMORY_NO_EVICTION

處理命令時,判斷是否進行內存淘汰

在處理命令的時候,會調用中的

redis.c  processCommand
    
int processCommand(redisClient *c) {
    /* The QUIT command is handled separately. Normal command procs will
     * go through checking for replication and QUIT will cause trouble
     * when FORCE_REPLICATION is enabled and would be implemented in
     * a regular command proc. */
    // 特別處理 quit 命令
    void *commandName = c->argv[0]->ptr;
    redisLog(REDIS_NOTICE, "The server is now processing %s", commandName);

    if (!strcasecmp(c->argv[0]->ptr, "quit")) {
        addReply(c, shared.ok);
        c->flags |= REDIS_CLOSE_AFTER_REPLY;
        return REDIS_ERR;
    }

    /* Now lookup the command and check ASAP about trivial error conditions
     * such as wrong arity, bad command name and so forth. */
    // 1 查找命令,並進行命令合法性檢查,以及命令參數個數檢查
    c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
    if (!c->cmd) {
        // 沒找到指定的命令
        flagTransaction(c);
        addReplyErrorFormat(c, "unknown command '%s'",
                            (char *) c->argv[0]->ptr);
        return REDIS_OK;
    }

    /* Check if the user is authenticated */
    //2 檢查認證信息
    if (server.requirepass && !c->authenticated && c->cmd->proc != authCommand) {
        flagTransaction(c);
        addReply(c, shared.noautherr);
        return REDIS_OK;
    }

    /* If cluster is enabled perform the cluster redirection here.
     *
     * 3 如果開啟了集群模式,那麼在這裏進行轉向操作。
     *
     * However we don't perform the redirection if:
     *
     * 不過,如果有以下情況出現,那麼節點不進行轉向:
     *
     * 1) The sender of this command is our master.
     *    命令的發送者是本節點的主節點
     *
     * 2) The command has no key arguments. 
     *    命令沒有 key 參數
     */
    if (server.cluster_enabled &&
        !(c->flags & REDIS_MASTER) &&
        !(c->cmd->getkeys_proc == NULL && c->cmd->firstkey == 0)) {
        int hashslot;

        // 集群已下線
        if (server.cluster->state != REDIS_CLUSTER_OK) {
            flagTransaction(c);
            addReplySds(c, sdsnew("-CLUSTERDOWN The cluster is down. Use CLUSTER INFO for more information\r\n"));
            return REDIS_OK;

            // 集群運作正常
        } else {
            int error_code;
            clusterNode *n = getNodeByQuery(c, c->cmd, c->argv, c->argc, &hashslot, &error_code);
            // 不能執行多鍵處理命令
            if (n == NULL) {
                flagTransaction(c);
                if (error_code == REDIS_CLUSTER_REDIR_CROSS_SLOT) {
                    addReplySds(c, sdsnew("-CROSSSLOT Keys in request don't hash to the same slot\r\n"));
                } else if (error_code == REDIS_CLUSTER_REDIR_UNSTABLE) {
                    /* The request spawns mutliple keys in the same slot,
                     * but the slot is not "stable" currently as there is
                     * a migration or import in progress. */
                    addReplySds(c, sdsnew("-TRYAGAIN Multiple keys request during rehashing of slot\r\n"));
                } else {
                    redisPanic("getNodeByQuery() unknown error.");
                }
                return REDIS_OK;

                //3.1 命令針對的槽和鍵不是本節點處理的,進行轉向
            } else if (n != server.cluster->myself) {
                flagTransaction(c);
                // -<ASK or MOVED> <slot> <ip>:<port>
                // 例如 -ASK 10086 127.0.0.1:12345
                addReplySds(c, sdscatprintf(sdsempty(),
                                            "-%s %d %s:%d\r\n",
                                            (error_code == REDIS_CLUSTER_REDIR_ASK) ? "ASK" : "MOVED",
                                            hashslot, n->ip, n->port));

                return REDIS_OK;
            }

            // 如果執行到這裏,說明鍵 key 所在的槽由本節點處理
            // 或者客戶端執行的是無參數命令
        }
    }

    /* Handle the maxmemory directive.
     *
     * First we try to free some memory if possible (if there are volatile
     * keys in the dataset). If there are not the only thing we can do
     * is returning an error. */
    //4 如果設置了最大內存,那麼檢查內存是否超過限制,並做相應的操作
    if (server.maxmemory) {
        //4.1 如果內存已超過限制,那麼嘗試通過刪除過期鍵來釋放內存
        int retval = freeMemoryIfNeeded();
        // 如果即將要執行的命令可能佔用大量內存(REDIS_CMD_DENYOOM)
        // 並且前面的內存釋放失敗的話
        // 那麼向客戶端返回內存錯誤
        if ((c->cmd->flags & REDIS_CMD_DENYOOM) && retval == REDIS_ERR) {
            flagTransaction(c);
            addReply(c, shared.oomerr);
            return REDIS_OK;
        }
    }    
    ....
  • 1處,查找命令,對應的函數指針(類似於java里的策略模式,根據命令,找對應的策略)
  • 2處,檢查,是否密碼正確
  • 3處,集群相關操作;
  • 3.1處,不是本節點處理,直接返回ask,指示客戶端轉向
  • 4處,判斷是否設置了maxMemory,這裏就是本文重點:設置了maxMemory時,內存淘汰策略
  • 4.1處,調用了下方的 freeMemoryIfNeeded

接下來,深入4.1處:


int freeMemoryIfNeeded(void) {
    size_t mem_used, mem_tofree, mem_freed;
    int slaves = listLength(server.slaves);

    /* Remove the size of slaves output buffers and AOF buffer from the
     * count of used memory. */
    // 計算出 Redis 目前佔用的內存總數,但有兩個方面的內存不會計算在內:
    // 1)從服務器的輸出緩衝區的內存
    // 2)AOF 緩衝區的內存
    mem_used = zmalloc_used_memory();
    if (slaves) {
		...
    }
    if (server.aof_state != REDIS_AOF_OFF) {
        mem_used -= sdslen(server.aof_buf);
        mem_used -= aofRewriteBufferSize();
    }

    /* Check if we are over the memory limit. */
    //1 如果目前使用的內存大小比設置的 maxmemory 要小,那麼無須執行進一步操作
    if (mem_used <= server.maxmemory) return REDIS_OK;

    //2 如果佔用內存比 maxmemory 要大,但是 maxmemory 策略為不淘汰,那麼直接返回
    if (server.maxmemory_policy == REDIS_MAXMEMORY_NO_EVICTION)
        return REDIS_ERR; /* We need to free memory, but policy forbids. */

    /* Compute how much memory we need to free. */
    // 3 計算需要釋放多少字節的內存
    mem_tofree = mem_used - server.maxmemory;

    // 初始化已釋放內存的字節數為 0
    mem_freed = 0;

    // 根據 maxmemory 策略,
    //4 遍歷字典,釋放內存並記錄被釋放內存的字節數
    while (mem_freed < mem_tofree) {
        int j, k, keys_freed = 0;

        // 遍歷所有字典
        for (j = 0; j < server.dbnum; j++) {
            long bestval = 0; /* just to prevent warning */
            sds bestkey = NULL;
            dictEntry *de;
            redisDb *db = server.db + j;
            dict *dict;

            if (server.maxmemory_policy == REDIS_MAXMEMORY_ALLKEYS_LRU ||
                server.maxmemory_policy == REDIS_MAXMEMORY_ALLKEYS_RANDOM) {
                // 如果策略是 allkeys-lru 或者 allkeys-random 
                //5 那麼淘汰的目標為所有數據庫鍵
                dict = server.db[j].dict;
            } else {
                // 如果策略是 volatile-lru 、 volatile-random 或者 volatile-ttl 
                //6 那麼淘汰的目標為帶過期時間的數據庫鍵
                dict = server.db[j].expires;
            }


            /* volatile-random and allkeys-random policy */
            // 如果使用的是隨機策略,那麼從目標字典中隨機選出鍵
            if (server.maxmemory_policy == REDIS_MAXMEMORY_ALLKEYS_RANDOM ||
                server.maxmemory_policy == REDIS_MAXMEMORY_VOLATILE_RANDOM) {
                de = dictGetRandomKey(dict);
                bestkey = dictGetKey(de);
            }
            /* volatile-lru and allkeys-lru policy */
            //7 
            else if (server.maxmemory_policy == REDIS_MAXMEMORY_ALLKEYS_LRU ||
                     server.maxmemory_policy == REDIS_MAXMEMORY_VOLATILE_LRU) {
                struct evictionPoolEntry *pool = db->eviction_pool;

                while (bestkey == NULL) {
                    // 8 
                    evictionPoolPopulate(dict, db->dict, db->eviction_pool);
                    /* Go backward from best to worst element to evict. */
                    for (k = REDIS_EVICTION_POOL_SIZE - 1; k >= 0; k--) {
                        if (pool[k].key == NULL) continue;
                        // 8.1
                        de = dictFind(dict, pool[k].key);

                        /* 8.2 Remove the entry from the pool. */
                        sdsfree(pool[k].key);
                        /* Shift all elements on its right to left. */
                        memmove(pool + k, pool + k + 1,
                                sizeof(pool[0]) * (REDIS_EVICTION_POOL_SIZE - k - 1));
                        /* Clear the element on the right which is empty
                         * since we shifted one position to the left.  */
                        pool[REDIS_EVICTION_POOL_SIZE - 1].key = NULL;
                        pool[REDIS_EVICTION_POOL_SIZE - 1].idle = 0;

                        /* If the key exists, is our pick. Otherwise it is
                         * a ghost and we need to try the next element. */
                        // 8.3
                        if (de) {
                            bestkey = dictGetKey(de);
                            break;
                        } else {
                            /* Ghost... */
                            continue;
                        }
                    }
                }
            }

                /* volatile-ttl */
                // 策略為 volatile-ttl ,從一集 sample 鍵中選出過期時間距離當前時間最接近的鍵
            else if (server.maxmemory_policy == REDIS_MAXMEMORY_VOLATILE_TTL) {
                ...
            }

            /* Finally remove the selected key. */
            // 8.4 刪除被選中的鍵
            if (bestkey) {
                long long delta;

                robj *keyobj = createStringObject(bestkey, sdslen(bestkey));
                propagateExpire(db, keyobj);
                /* We compute the amount of memory freed by dbDelete() alone.
                 * It is possible that actually the memory needed to propagate
                 * the DEL in AOF and replication link is greater than the one
                 * we are freeing removing the key, but we can't account for
                 * that otherwise we would never exit the loop.
                 *
                 * AOF and Output buffer memory will be freed eventually so
                 * we only care about memory used by the key space. */
                // 計算刪除鍵所釋放的內存數量
                delta = (long long) zmalloc_used_memory();
                dbDelete(db, keyobj);
                delta -= (long long) zmalloc_used_memory();
                mem_freed += delta;

                // 對淘汰鍵的計數器增一
                server.stat_evictedkeys++;

                notifyKeyspaceEvent(REDIS_NOTIFY_EVICTED, "evicted",
                                    keyobj, db->id);
                decrRefCount(keyobj);
                keys_freed++;
				...
            }
        }

        if (!keys_freed) return REDIS_ERR; /* nothing to free... */
    }

    return REDIS_OK;
}
  • 1處,如果目前使用的內存大小比設置的 maxmemory 要小,那麼無須執行進一步操作

  • 2處,如果佔用內存比 maxmemory 要大,但是 maxmemory 策略為不淘汰,那麼直接返回

  • 3處,計算需要釋放多少字節的內存

  • 4處,遍歷字典,釋放內存並記錄被釋放內存的字節數

  • 5處,如果策略是 allkeys-lru 或者 allkeys-random 那麼淘汰的目標為所有數據庫鍵

  • 6處,如果策略是 volatile-lru 、 volatile-random 或者 volatile-ttl ,那麼淘汰的目標為帶過期時間的數據庫鍵

  • 7處,如果使用的是 LRU 策略, 那麼從 sample 鍵中選出 IDLE 時間最長的那個鍵

  • 8處,調用evictionPoolPopulate,該函數在下面講解,該函數的功能是,傳入一個鏈表,即這裏的db->eviction_pool,然後在函數內部,隨機找出n個key,放入傳入的鏈表中,並按照空閑時間排序,空閑最久的,放到最後。

    當該函數,返回后,db->eviction_pool這個鏈表裡就存放了我們要淘汰的key。

  • 8.1處,找到這個key,這個key,在後邊會被刪除

  • 8.2處,下面這一段,從db->eviction_pool將這個已經處理了的key刪掉

  • 8.3處,如果這個key,是存在的,則跳出循環,在後面8.4處,會被刪除

  • 8.4處,刪除這個key

選擇哪些key作為被淘汰的key

前面我們看到,在7處,如果為lru策略,則會進入8處的函數:

evictionPoolPopulate。

該函數的名稱為:填充(populate)驅逐(eviction)對象池(pool)。驅逐的意思,就是現在達到了maxmemory,沒辦法,只能開始刪除掉一部分元素,來騰空間了,不然新的put類型的命令,根本沒辦法執行。

該方法的大概思路是,使用lru的時候,隨機找n個key,類似於抽樣,然後放到一個鏈表,根據空閑時間排序。

具體看看該方法的實現:

void evictionPoolPopulate(dict *sampledict, dict *keydict, struct evictionPoolEntry *pool) {

其中,傳入的第三個參數,是要被填充的對象,在c語言中,習慣傳入一個入參,然後在函數內部填充或者修改入參對象的屬性。

該屬性,就是前面說的那個鏈表,用來存放收集的隨機的元素,該鏈表中節點的結構如下:

struct evictionPoolEntry {
    unsigned long long idle;    /* Object idle time. */
    sds key;                    /* Key name. */
};

該結構共2個字段,一個存儲key,一個存儲空閑時間。

該鏈表中,共maxmemory-samples個元素,會按照idle時間長短排序,idle時間長的在鏈表尾部,(假設頭在左,尾在右)。

void evictionPoolPopulate(dict *sampledict, dict *keydict, struct evictionPoolEntry *pool) {
    int j, k, count;
    dictEntry *_samples[EVICTION_SAMPLES_ARRAY_SIZE];
    dictEntry **samples;

    /* Try to use a static buffer: this function is a big hit...
     * Note: it was actually measured that this helps. */
    if (server.maxmemory_samples <= EVICTION_SAMPLES_ARRAY_SIZE) {
        samples = _samples;
    } else {
        samples = zmalloc(sizeof(samples[0]) * server.maxmemory_samples);
    }

    /* 1 Use bulk get by default. */
    count = dictGetRandomKeys(sampledict, samples, server.maxmemory_samples);

	// 2
    for (j = 0; j < count; j++) {
        unsigned long long idle;
        sds key;
        robj *o;
        dictEntry *de;

        de = samples[j];
        key = dictGetKey(de);
        /* If the dictionary we are sampling from is not the main
         * dictionary (but the expires one) we need to lookup the key
         * again in the key dictionary to obtain the value object. */
        if (sampledict != keydict) de = dictFind(keydict, key);
        // 3
        o = dictGetVal(de);
        // 4
        idle = estimateObjectIdleTime(o);

        /* 5  Insert the element inside the pool.
         * First, find the first empty bucket or the first populated
         * bucket that has an idle time smaller than our idle time. */
        k = 0;
        while (k < REDIS_EVICTION_POOL_SIZE &&
               pool[k].key &&
               pool[k].idle < idle)
            k++;
        
		...
            
        // 6
        pool[k].key = sdsdup(key);
        pool[k].idle = idle;
    }
    if (samples != _samples) zfree(samples);
}
  • 1處,獲取 server.maxmemory_samples個key,這裡是隨機獲取的,(dictGetRandomKeys),這個值,默認值為5,放到samples中

  • 2處,遍歷返回來的samples

  • 3處,調用如下宏,獲取val

    he的類型為dictEntry:

    /*
     * 哈希表節點
     */
    typedef struct dictEntry {
        
        // 鍵
        void *key;
    
        // 值
        union {
            // 1
            void *val;
            uint64_t u64;
            int64_t s64;
        } v;
    
        // 指向下個哈希表節點,形成鏈表
        struct dictEntry *next;
    
    } dictEntry;
    

    所以,這裏去

    robj *o; 
    
    o = dictGetVal(de);
    

    實際就是獲取其v屬性中的val,(1處):

    #define dictGetVal(he) ((he)->v.val)
    
  • 4處,準備計算該val的空閑時間

    我們上面3處,看到,獲取的o的類型為robj。我們現在看看怎麼計算對象的空閑時長:

    /* Given an object returns the min number of milliseconds the object was never
     * requested, using an approximated LRU algorithm. */
    unsigned long long estimateObjectIdleTime(robj *o) {
        //4.1 獲取系統的當前時間
        unsigned long long lruclock = LRU_CLOCK();
        // 4.2
        if (lruclock >= o->lru) {
            // 4.3
            return (lruclock - o->lru) * REDIS_LRU_CLOCK_RESOLUTION;
        } else {
            return (lruclock + (REDIS_LRU_CLOCK_MAX - o->lru)) *
                        REDIS_LRU_CLOCK_RESOLUTION;
        }
    }
    

    這裏,4.1處,獲取系統的當前時間;

    4.2處,如果系統時間,大於對象的lru時間

    4.3處,則用系統時間減去對象的lru時間,再乘以單位,換算為毫秒,最終返回的單位,為毫秒(可以看註釋。)

    #define REDIS_LRU_CLOCK_RESOLUTION 1000 /* LRU clock resolution in ms */
    
  • 5處,這裏拿當前元素,和pool中已經放進去的元素,從第0個開始比較,如果當前元素的idle時長,大於pool中指針0指向的元素,則和pool中索引1的元素比較;直到條件不滿足為止。

    這句話意思就是,類似於冒泡,把當前元素一直往後冒,直到idle時長小於被比較的元素為止。

  • 6處,把當前元素放進pool中。

經過上面的處理后,鏈表中存放了全部的抽樣元素,且ide時間最長的,在最右邊。

對象還有字段存儲空閑時間?

前面4處,說到,用系統的當前時間,減去對象的lru時間。

大家看看對象的結構體

typedef struct redisObject {

    // 類型
    unsigned type:4;

    // 編碼
    unsigned encoding:4;

    //1 對象最後一次被訪問的時間
    unsigned lru:REDIS_LRU_BITS; /* lru time (relative to server.lruclock) */

    // 引用計數
    int refcount;

    // 指向實際值的指針
    void *ptr;

} robj;

上面1處,lru屬性,就是用來存儲這個。

創建對象時,直接使用當前系統時間創建

robj *createObject(int type, void *ptr) {

    robj *o = zmalloc(sizeof(*o));

    o->type = type;
    o->encoding = REDIS_ENCODING_RAW;
    o->ptr = ptr;
    o->refcount = 1;

    /*1 Set the LRU to the current lruclock (minutes resolution). */
    o->lru = LRU_CLOCK();
    return o;
}

1處即是。

robj *createEmbeddedStringObject(char *ptr, size_t len) {
    robj *o = zmalloc(sizeof(robj)+sizeof(struct sdshdr)+len+1);
    struct sdshdr *sh = (void*)(o+1);

    o->type = REDIS_STRING;
    o->encoding = REDIS_ENCODING_EMBSTR;
    o->ptr = sh+1;
    o->refcount = 1;
    // 1
    o->lru = LRU_CLOCK();

    sh->len = len;
    sh->free = 0;
    if (ptr) {
        memcpy(sh->buf,ptr,len);
        sh->buf[len] = '\0';
    } else {
        memset(sh->buf,0,len+1);
    }
    return o;
}

1處即是。

每次查找該key時,刷新時間

robj *lookupKey(redisDb *db, robj *key) {

    // 查找鍵空間
    dictEntry *de = dictFind(db->dict,key->ptr);

    // 節點存在
    if (de) {
        

        // 取出值
        robj *val = dictGetVal(de);

        /* Update the access time for the ageing algorithm.
         * Don't do it if we have a saving child, as this will trigger
         * a copy on write madness. */
        // 更新時間信息(只在不存在子進程時執行,防止破壞 copy-on-write 機制)
        if (server.rdb_child_pid == -1 && server.aof_child_pid == -1)
            // 1
            val->lru = LRU_CLOCK();

        // 返回值
        return val;
    } else {

        // 節點不存在

        return NULL;
    }
}

1處即是,包括get、set等各種操作,都會刷新該時間。

仔細看下面的堆棧,set的,get同理:

總結

大家有沒有更清楚一些呢?

總的來說,就是,設置了max-memory后,達到該內存限制后,會在處理命令時,檢查是否要進行內存淘汰;如果要淘汰,則根據maxmemory-policy的策略來。

隨機選擇maxmemory-sample個元素,按照空閑時間排序,拉鏈表;挨個挨個清除。

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

網頁設計公司推薦不同的風格,搶佔消費者視覺第一線

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

※自行創業缺乏曝光? 網頁設計幫您第一時間規劃公司的形象門面

南投搬家公司費用需注意的眉眉角角,別等搬了再說!

※教你寫出一流的銷售文案?