阿里雲ECS服務器部署HADOOP集群(一):Hadoop完全分佈式集群環境搭建,阿里雲ECS服務器部署HADOOP集群(二):HBase完全分佈式集群搭建(使用外置ZooKeeper),阿里雲ECS服務器部署HADOOP集群(三):ZooKeeper 完全分佈式集群搭建,阿里雲ECS服務器部署HADOOP集群(四):Hive本地模式的安裝,阿里雲ECS服務器部署HADOOP集群(六):Flume 安裝,阿里雲ECS服務器部署HADOOP集群(七):Sqoop 安裝

準備:

兩台配置CentOS 7.3的阿里雲ECS服務器;

hadoop-2.7.3.tar.gz安裝包;

jdk-8u77-linux-x64.tar.gz安裝包;

hostname及IP的配置:

更改主機名:

由於系統為CentOS 7,可以直接使用‘hostnamectl set-hostname 主機名’來修改,修改完畢后重新shell登錄或者重啟服務器即可。

1 hostnamectl set-hostname master
2 exit
3 ssh root@master
1 hostnamectl set-hostname slave1
2 exit 3 ssh root@slave1

設置本地域名:

設置本地域名這一步非常關鍵,ip的本地域名信息配置不好,即有可能造成Hadoop啟動出現問題,又有可能造成在使用Hadoop的MapReduce進行計算時報錯。在ECS上搭建Hadoop集群環境需參考以下兩篇文章:

總結一下那就是,在“/etc/hosts”文件中進行域名配置時要遵從2個原則:

  •  新加域名在前面: 將新添加的Master、Slave服務器ip域名(例如“test7972”),放置在ECS服務器原有本地域名(例如“iZuf67wb***************”)的前面。但是注意ECS服務器原有本地      域名(例如“iZuf67wb***************”)不能被刪除,因為操作系統別的地方還會使用到。
  •  IP本機內網,其它外網: 在本機上的操作,都要設置成內網ip;其它機器上的操作,要設置成外網ip。

master

slave1

此處摘自 

配置好后需要在各個節點上執行如下命令,測試是否相互 ping 得通,如果 ping 不通,後面就無法順利配置成功:

1 ping master -c 3
2 ping slave1 -c 3

例如我在 master 節點上 ping slave1 ,ping 通的話會显示 time,显示的結果如下圖所示:

各節點角色分配

master: NameNode  ResourceManager

slave1: DataNode NodeManager

免密碼登錄配置

分別在 master 和 slave1 上做如下操作

1 ssh-keygen -t rsa
2 ssh-copy-id master 3 ssh-copy-id slave1

驗證

ssh master date;ssh slave1 date

配置JDK

解壓JDK安裝包到/usr/local/下

tar -zxvf jdk-8u77-linux-x64.tar.gz -C /usr/local/

將解壓目錄改為 jdk1.8

mv jdk1.8.0_77/ jdk1.8/

設置JAVA_HOME到系統環境變量

vim /etc/profile

在最後加入以下兩行代碼

export JAVA_HOME=/usr/local/jdk1.8
export PATH=$PATH:$JAVA_HOME/bin

重新加載環境

source /etc/profile

這樣 master 的jdk就配置好了,可以用命令 java -version 測試下。

java -version

下面只需將 master 上配置好的文件分發到 slave1 上即可。

將/usr/local/jdk1.8分發到 slave1 的/usr/local/下(建議壓縮后再分發)

scp -r /usr/local/jdk1.8/ slave1:/usr/local/

將/etc/profile分發到 slave1 的/etc/下

scp /etc/profile slave1:/etc/

  然後重新加載 slave1 環境便完成了 slave1 的jdk配置

source /etc/profile

hadoop集群配置

1 cd ~
2 tar -zxvf hadoop-2.7.3.tar.gz -C /usr/local # 解壓到/usr/local中
3 cd /usr/local/
4 mv ./hadoop-2.7.3/ ./hadoop            # 將文件夾名改為hadoop

輸入如下命令來檢查 Hadoop 是否可用,成功則會显示 Hadoop 版本信息:

1 cd /usr/local/hadoop
2 ./bin/hadoop version

添加 HADOOP_HOME 到系統環境變量

vim /etc/profile

在後面添加如下兩行

1 export HADOOP_HOME=/usr/local/hadoop
2 export PATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin

重新加載環境,並輸出變量 HADOOP_HOME 驗證

進入/user/local/hadoop/etc/hadoop/可以看到如下配置文件

集群/分佈式模式需要修改 /usr/local/hadoop/etc/hadoop 中的6個配置文件,更多設置項可點擊查看官方說明,這裏僅設置了我完成課堂作業所必須的設置項:hadoop-env.sh, slaves,  ,  ,  ,   。

1.首先來配置 hadoop-env.sh ,只需要設置一下JAVA_HOME即可

注:之前在配置jdk中配置的是基於系統的JAVA_HOME變量,這裏需要配置基於Hadoop集群的JAVA_HOME變量。

hadoop-env.sh 是Hadoop的環境變量配置腳本。

所以應做以下修改 vim hadoop-env.sh

export JAVA_HOME=/usr/local/jdk1.8

2.配置 slave , 指定 slave 節點

sudo vi slaves

刪去原有的 localhost , 添加將作為 slave 節點的 slave1

3.配置 core-site.xml 

 1 <configuration>
 2 
 3     <property>
 4         <name>fs.defaultFS</name>
 5         <value>hdfs://master:9000</value>
 6         <description>The name of the default file system.</description>
 7     </property> 
 8 # 設置訪問hdfs的默認名,9000是默認端口
 9 
10     <property>
11         <name>hadoop.tmp.dir</name>
12         <value>/usr/local/hadoop/tmp</value>
13         <description>Abase for other temporary directories.</description>
14     </property>
15 # 在hdfs格式化的時候會自動創建相應的目錄 'tmp/' 16 17 <property> 18 <name>fs.trash.interval</name> 19 <value>4320</value> 20 <description>Number of minutes after which the checkpoint gets deleted.</description> 21 </property> 22 # 設置回收站里的文件保留時間(單位:秒) 23 24 </configuration>

4.配置 hdfs-site.xml 

 1 <configuration>
 2 
 3     <property>
 4         <name>dfs.namenode.name.dir</name>
 5         <value>/usr/local/hadoop/tmp/dfs/name</value>
 6     </property>
 7 
 8     <property>
 9         <name>dfs.datanode.data.dir</name>
10         <value>/usr/local/hadoop/tmp/dfs/data</value>
11     </property>
12 
13     <property>
14         <name>dfs.replication</name>
15         <value>1</value>
16     </property>
17 # 副本,因為有一個 slave 節點這裏設置為1(一般偽分佈模式設1個,三個或三個以上節點設3個)
18 
19     <property>
20         <name>dfs.permissions.enabled</name>
21         <value>false</value>
22         <description>If "true", enable permission checking in HDFS. If "false", permission checking is turned off, but all other behavior is unchanged. Switching from one parameter value to the other does not change the mode, owner or group of files or directories.</description>
23     </property>
24 
25 </configuration>    

5.配置 mapred-site.xml (這個文件沒有直接提供,而是提供了模版文件,需將模版文件轉換為配置文件) 

1 sudo mv mapred-site.xml.template mapred-site.xml
2 sudo vi mapred-site.xml
 1 <configuration>
 2 
 3     <property>
 4         <name>mapreduce.framework.name</name>
 5         <value>yarn</value>
 6         <description>The runtime framework for executing MapReduce jobs.Can be one of local, classic or yarn.</description>
 7     </property>
 8     <property>
 9         <name>mapreduce.jobtracker.http.address</name>
10         <value>master:50030</value>
11     </property>
12     <property>
13         <name>mapreduce.jobhisotry.address</name>
14         <value>master:10020</value>
15     </property>
16     <property>
17         <name>mapreduce.jobhistory.webapp.address</name>
18         <value>master:19888</value>
19     </property>
20     <property>
21         <name>mapreduce.jobhistory.done-dir</name>
22         <value>/jobhistory/done</value>
23     </property>
24     <property>
25         <name>mapreduce.jobhistory.intermediate-done-dir</name>
26         <value>/jobhisotry/done_intermediate</value>
27     </property>
28     <property>
29         <name>mapreduce.job.ubertask.enable</name>
30         <value>true</value>
31         <description>Whether to enable the small-jobs "ubertask" optimization,which runs "sufficiently small" jobs sequentially within a single JVM."Small" is defined by the following maxmaps, maxreduces, and maxbytes settings. Note that configurations for application masters also affect the "Small" definition - yarn.app.mapreduce.am.resource.mb must be larger than both mapreduce.map.memory.mb and mapreduce.reduce.memory.mb, and yarn.app.mapreduce.am.resource.cpu-vcores must be larger than both mapreduce.map.cpu.vcores and mapreduce.reduce.cpu.vcores to enable ubertask. Users may override this value.</description>
32     </property>
33 
34 </configuration>

6.配置 yarn-site.xml

 1 <configuration>
 2 
 3     <property>
 4         <name>yarn.resourcemanager.hostname</name>
 5         <value>master</value>
 6     </property>
 7     <property>
 8         <name>yarn.nodemanager.aux-services</name>
 9         <value>mapreduce_shuffle</value>
10         <description>A comma separated list of services where service name should only contain a-zA-Z0-9_ and can not start with numbers</description>
11     </property>
12     <property>
13         <name>yarn.resourcemanager.address</name>
14         <value>master:18040</value>
15     </property>
16     <property>
17         <name>yarn.resourcemanager.scheduler.address</name>
18         <value>master:18030</value>
19     </property>
20     <property>
21         <name>yarn.resourcemanager.resource-tracker.address</name>
22         <value>master:18025</value>
23     </property>
24     <property>
25         <name>yarn.resourcemanager.admin.address</name>
26         <value>master:18141</value>
27     </property>
28     <property>
29         <name>yarn.resourcemanager.webapp.address</name>
30         <value>master:18088</value>
31     </property>
32     <property>
33         <name>yarn.log-aggregation-enable</name>
34         <value>true</value>
35     </property>
36     <property>
37         <name>yarn.log-aggregation.retain-seconds</name>
38         <value>86400</value>
39     </property>
40     <property>
41         <name>yarn.log-aggregation.retain-check-interval-seconds</name>
42         <value>86400</value>
43     </property>
44     <property>
45         <name>yarn.nodemanager.remote-app-log-dir</name>
46         <value>/tmp/logs</value>
47     </property>
48     <property>
49         <name>yarn.nodemanager.remote-app-log-dir-suffix</name>
50         <value>logs</value>
51     </property>
52 
53 </configuration>

 到這裏 master 就已經配置好了,下面將該服務器的配置分發到 slave1 上去(建議壓縮后再分發),在此使用壓縮後分發的方法

在 master 節點上執行

1 cd /usr/local
2 tar -zcvf ~/hadoop.master.tar.gz ./hadoop 3 cd ~ 4 scp ./hadoop.master.tar.gz slave1:/root/ 5 scp /etc/profile slave1:/etc/

在 slave1 節點上執行

tar -zxvf ~/hadoop.master.tar.gz -C /usr/local

在 slave1 上重新加載環境並檢查驗證

source /etc/profile
echo $HADOOP_HOME

HDFS NameNode 格式化(只要在 master 上執行即可)

$HADOOP_HOME/bin/hdfs namenode -format

看到下面的輸出,表明hdfs格式化成功

INFO common.Storage: Storage directory /usr/local/hadoop/tmp/dfs/name has been successfully formatted.

啟動前檢查防火牆狀態

systemctl status firewalld

我這裡是已經關閉的,若未關閉,可以參考下圖(來自)

阿里雲服務器還需要在服務器安全組裡配置防火牆,需將配置文件里的相關端口全部添加,否則會出現 web 頁面打不開,以及 DataNode 啟動但 Live datenode 為 0 等問題

啟動 Hadoop 集群

$HADOOP_HOME/sbin/start-all.sh

 

  • 第一次啟動 hadoop 時會出現 ssh 提示,提示是否要連入 0.0.0.0 節點,輸入 yes 即可
  • 若出現 hadoop 啟動時 datanode 沒有啟動,可以參考來解決

啟動 job history server

在 master 上執行

$HADOOP_HOME/sbin/mr-jobhistory-daemon.sh start historyserver

成功后在兩個節點上驗證

在 master 上 執行 

jps

可以看到 ResourceManager、SecondaryNameNode、NameNode、JobHistoryServer 四個進程全部啟動

在 slave1 上執行

jps

可以看到 NodeManager、DataNode 兩個進程全部啟動

缺少任一進程都表示出錯。另外還需要在 Master 節點上通過命令 hdfs dfsadmin -report 查看 DataNode 是否正常啟動,如果 Live datanodes 不為 0 ,則說明集群啟動成功。例如我這邊一共有 1 個 Datanodes:

全部配置完成之後查看 web 頁面

hdfs: http://master:50070/

hdfs Datanode 節點信息

hdfs 的情況

hdfs 的文件情況

 yarn:http://master:18088/

 

阿里雲ECS服務器部署HADOOP集群系列:

 

 

 

 

 

 

 

 

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理【其他文章推薦】

台北網頁設計公司這麼多,該如何挑選?? 網頁設計報價省錢懶人包"嚨底家"

網頁設計公司推薦更多不同的設計風格,搶佔消費者視覺第一線

※想知道購買電動車哪裡補助最多?台中電動車補助資訊懶人包彙整

Tesla計畫於上海建電動車廠,關稅考量為主因

電動車製造商特斯拉(Tesla)在Model X 的銷售和Model 3 的產能上都面臨著巨大挑戰,在全球最大的電動車市場──中國,Tesla 則看到了電動車需求持續成長帶來的機會,並希望能夠透過投資建廠、本土化製造等方式在中國電動車市場分一杯羹,據悉Tesla 將在上海建造海外第一座電動車製造工廠,未來可能會用於Model 3 電動車的生產。

據知情人士透露,特斯拉已經與上海市政府達成合作協議,將首次在中國生產製造電動車,此次合作將有助於特斯拉進一步提升在中國市場的銷售,目前中國是全球最大的電動車市場,政府對於電動車的銷售和生產有許多優惠政策和補貼。

特斯拉的製造工廠將建在上海臨港開發區,細節正在確認中,將在本月晚些時候對外公開,據悉上海市政府要求該製造工廠必須由特斯拉和上海的合作夥伴共同投資建造,但是否持有控股權還不得而知。市場諮詢公司Dunne Automotive 總裁Michael Dune 表示,特斯拉有機會佔據中國電動車市場的領先地位,許多有實力和知名度的品牌公司都會選擇在上海建造生產基地。

特斯拉選擇在上海生產電動車,有助於直接與中國汽車廠商的產品競爭,這比在美國生產再進口到中國市場銷售,至少能夠降低25% 的進口關稅,正是由於關稅的成本,Tesla Model S 和Model X 電動車在中國市場的價格比美國市場高一倍。

中國政府已經將電動車產業做為戰略性的新興產業,目標是在未來10 年內將混合式和全電動車的銷量提升10 倍,2016 年中國市場電動車的銷量約為28.3 萬台,佔比全球銷量的41%,Tesla 2016 年營收大約為70 億美元,其中15% 來自中國市場。目前大約有200 家公司宣布在中國製造電動車的計畫,其中北汽汽車和比亞迪公司的電動車佔總銷量的89%。

特斯拉CEO 伊隆·馬斯克(Elon Musk)早在3 年前就表示希望能夠在中國建設製造工廠,自2014 年以後每次到訪中國都會與大量政府官員見面。2017 年6 月初特斯拉宣布2017 年下半年在中國超級充電站建設計畫。目前特斯拉在中國大約有117 個超級充電站。

(合作媒體:。圖片出處:public domain CC0)

本站聲明:網站內容來源於EnergyTrend https://www.energytrend.com.tw/ev/,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

台北網頁設計公司這麼多,該如何挑選?? 網頁設計報價省錢懶人包"嚨底家"

網頁設計公司推薦更多不同的設計風格,搶佔消費者視覺第一線

※想知道購買電動車哪裡補助最多?台中電動車補助資訊懶人包彙整

ThreadLocal原理分析與代碼驗證

ThreadLocal提供了線程安全的數據存儲和訪問方式,利用不帶key的get和set方法,居然能做到線程之間隔離,非常神奇。

比如

ThreadLocal<String> threadLocal = new ThreadLocal<>();

in thread 1

//in thread1
treadLocal.set("value1");
.....
//value的值是value1
String value = threadLocal.get();

in thread 2

//in thread2
treadLocal.set("value2");
.....
//value的值是value2
String value = threadLocal.get();

不論thread1和thread2是不是同時執行,都不會有線程安全問題,我們來測試一下。

線程安全測試

開10個線程,每個線程內都對同一個ThreadLocal對象set不同的值,會發現ThreadLocal在每個線程內部get出來的值,只會是自己線程內set進去的值,不會被別的線程影響。

static void testUsage() throws InterruptedException {
    Utils.println("-------------testUsage-------------------");
    ThreadLocal<Long> threadLocal = new ThreadLocal<>();

    AtomicBoolean threadSafe = new AtomicBoolean(true);
    int count = 10;
    CountDownLatch countDownLatch = new CountDownLatch(count);
    Random random = new Random(736832);
    for (int i = 0; i < count; i ++){
        new Thread(() -> {
            try {
                //生成一個隨機數
                Long value = System.nanoTime() + random.nextInt();
                threadLocal.set(value);
                Thread.sleep(1000);

                Long value2 = threadLocal.get();
                if (!value.equals(value2)) {
                    //get和set的value不一致,說明被別的線程修改了,但這是不可能出現的
                    threadSafe.set(false);
                    Utils.println("thread unsafe, this could not be happen!");
                }
            } catch (InterruptedException e) {

            }finally {
                countDownLatch.countDown();
            }

        }).start();
    }

    countDownLatch.await();

    Utils.println("all thread done, and threadSafe is " + threadSafe.get());
    Utils.println("------------------------------------------");
}

輸出:

-------------testUsage------------------
all thread done, and threadSafe is true
-----------------------------------------

原理淺析

翻開ThreadLocal的源碼,會發現ThreadLocal只是一個空殼子,它並不存儲具體的value,而是利用當前線程(Thread.currentThread())的threadLocalMap來存儲value,key就是這個threadLocal對象本身。

public void set(T value) {
    Thread t = Thread.currentThread();
    ThreadLocalMap map = getMap(t);
    if (map != null)
        map.set(this, value);
    else
        createMap(t, value);
}

ThreadLocalMap getMap(Thread t) {
    return t.threadLocals;
}

Thread的threadLocals字段是ThreadLocalMap類型(你可以簡單理解為一個key value的Map),key是ThreadLocal對象,value是我們在外層設置的值

  • 當我們調用threadLocal.set(value)方法的時候,會找到當前線程的threadLocals這個map,然後以this作為key去set key value
  • 當我們調用threadLocal.get()方法的時候,會找到當前線程的threadLocals這個map,然後以this作為key去get value
  • 當我們調用threadLocal.remove()方法的時候,會找到當前線程的threadLocals這個map,然後以this作為key去remove

這就相當於:

Thread.currentThread().threadLocals.set(threadLocal1, "value1");
.....
//value的值是value1
String value = Thread.currentThread().threadLocals.get(threadLocal1);

因為每個Thread都是不同的對象,所以他們的threadLocals也是不同的map,threadLocal在不同的線程里工作時,實際上是從不同的map里get/set,這也就是線程安全的原因了,了解到這一點就差不多了。

再深入一些,ThreadLocalMap的結構

如果繼續翻ThreadLocalMap的源碼,會發現它有個字段table,是Entry類型的數組。

我們不妨寫段代碼,把ThreadLocalMap的結構輸出出來。

由於Thread.threadLocals和ThreadLocalMap類不是public的,我們只有通過反射來獲取它的值。反射的代碼如下(如果嫌長可以不看,直接看輸出):

static Object getThreadLocalMap(Thread thread) throws NoSuchFieldException, IllegalAccessException {        
    //get thread.threadLocals
    Field threadLocals = Thread.class.getDeclaredField("threadLocals");
    threadLocals.setAccessible(true);
    return threadLocals.get(thread);
}

static void printThreadLocalMap(Object threadLocalMap) throws NoSuchFieldException, IllegalAccessException {
    String threadName = Thread.currentThread().getName();
    
    if(threadLocalMap == null){
        Utils.println("threadMap is null, threadName:" + threadName);
        return;
    }

    Utils.println(threadName);

    //get threadLocalMap.table
    Field tableField = threadLocalMap.getClass().getDeclaredField("table");
    tableField.setAccessible(true);
    Object[] table = (Object[])tableField.get(threadLocalMap);
    Utils.println("----threadLocals (ThreadLocalMap), table.length = " + table.length);

    for (int i = 0; i < table.length; i ++){
        WeakReference<ThreadLocal<?>> entry = (WeakReference<ThreadLocal<?>>)table[i];
        printEntry(entry, i);
    }
}
static void printEntry(WeakReference<ThreadLocal<?>> entry, int i) throws NoSuchFieldException, IllegalAccessException {
    if(entry == null){
        Utils.println("--------table[" + i + "] -> null");
        return;
    }
    ThreadLocal key = entry.get();
    //get entry.value
    Field valueField = entry.getClass().getDeclaredField("value");
    valueField.setAccessible(true);
    Object value = valueField.get(entry);

    Utils.println("--------table[" + i + "] -> entry key = " + key + ", value = " + value);
}

測試代碼:

static void testStructure() throws InterruptedException {
    Utils.println("-------------testStructure----------------");
    ThreadLocal<String> threadLocal1 = new ThreadLocal<>();
    ThreadLocal<String> threadLocal2 = new ThreadLocal<>();

    Thread thread1 = new Thread(() -> {
        threadLocal1.set("threadLocal1-value");
        threadLocal2.set("threadLocal2-value");

        try {
            Object threadLocalMap = getThreadLocalMap(Thread.currentThread());
            printThreadLocalMap(threadLocalMap);

        } catch (NoSuchFieldException | IllegalAccessException e) {
            e.printStackTrace();
        }

    }, "thread1");

    thread1.start();

    //wait thread1 done
    thread1.join();

    Thread thread2 = new Thread(() -> {
        threadLocal1.set("threadLocal1-value");
        try {
            Object threadLocalMap = getThreadLocalMap(Thread.currentThread());
            printThreadLocalMap(threadLocalMap);

        } catch (NoSuchFieldException | IllegalAccessException e) {
            e.printStackTrace();
        }

    }, "thread2");

    thread2.start();
    thread2.join();
    Utils.println("------------------------------------------");
}

我們在創建了兩個ThreadLocal的對象threadLocal1和threadLocal2,在線程1里為這兩個對象設置值,在線程2里只為threadLocal1設置值。然後分別打印出這兩個線程的threadLocalMap。

輸出結果為:

-------------testStructure----------------
thread1
----threadLocals (ThreadLocalMap), table.length = 16
--------table[0] -> null
--------table[1] -> entry key = java.lang.ThreadLocal@33baa315, value = threadLocal2-value
--------table[2] -> null
--------table[3] -> null
--------table[4] -> null
--------table[5] -> null
--------table[6] -> null
--------table[7] -> null
--------table[8] -> null
--------table[9] -> null
--------table[10] -> entry key = java.lang.ThreadLocal@4d42db5c, value = threadLocal1-value
--------table[11] -> null
--------table[12] -> null
--------table[13] -> null
--------table[14] -> null
--------table[15] -> null
thread2
----threadLocals (ThreadLocalMap), table.length = 16
--------table[0] -> null
--------table[1] -> null
--------table[2] -> null
--------table[3] -> null
--------table[4] -> null
--------table[5] -> null
--------table[6] -> null
--------table[7] -> null
--------table[8] -> null
--------table[9] -> null
--------table[10] -> entry key = java.lang.ThreadLocal@4d42db5c, value = threadLocal1-value
--------table[11] -> null
--------table[12] -> null
--------table[13] -> null
--------table[14] -> null
--------table[15] -> null
------------------------------------------

從結果上可以看出:

  • 線程1和線程2的threadLocalMap對象的table字段,是個數組,長度都是16
  • 由於線程1里給兩個threadLocal對象設置了值,所以線程1的ThreadLocalMap里有兩個entry,數組下標分別是1和10,其餘的是null(如果你自己寫代碼驗證,下標不一定是1和10,不需要糾結這個問題,只要前後對的上就行)
  • 由於線程2里只給一個threadLocal對象設置了值,所以線程1的ThreadLocalMap里只有一個entry,數組下標是10,其餘的是null
  • threadLocal1這個對象在兩個線程里都設置了值,所以當它作為key加入二者的threadLocalMap時,key是一樣的,都是java.lang.ThreadLocal@4d42db5c;下標也是一樣的,都是10。

為什麼是WeakReference

查看Entry的源碼,會發現Entry繼承自WeakReference:

static class Entry extends WeakReference<ThreadLocal<?>> {
    /** The value associated with this ThreadLocal. */
    Object value;

    Entry(ThreadLocal<?> k, Object v) {
        super(k);
        value = v;
    }
}

構造函數里把key傳給了super,也就是說,ThreadLocalMap中對key的引用,是WeakReference的。

Weak reference objects, which do not prevent their referents from being
made finalizable, finalized, and then reclaimed. Weak references are most
often used to implement canonicalizing mappings.

通俗點解釋:

當一個對象僅僅被weak reference(弱引用), 而沒有任何其他strong reference(強引用)的時候, 不論當前的內存空間是否足夠,當GC運行的時候, 這個對象就會被回收。

看不明白沒關係,還是寫代碼測試一下什麼是WeakReference吧…

static void testWeakReference(){
    Object obj1 = new Object();
    Object obj2 = new Object();
    WeakReference<Object> obj1WeakRef = new WeakReference<>(obj1);
    WeakReference<Object> obj2WeakRf = new WeakReference<>(obj2);
    //obj32StrongRef是強引用
    Object obj2StrongRef = obj2;
    Utils.println("before gc: obj1WeakRef = " + obj1WeakRef.get() + ", obj2WeakRef = " + obj2WeakRf.get() + ", obj2StrongRef = " + obj2StrongRef);

    //把obj1和obj2設為null
    obj1 = null;
    obj2 = null;
    //強制gc
    forceGC();

    Utils.println("after gc: obj1WeakRef = " + obj1WeakRef.get() + ", obj2WeakRef = " + obj2WeakRf.get() + ", obj2StrongRef = " + obj2StrongRef);
}

結果輸出:

before gc: obj1WeakRef = java.lang.Object@4554617c, obj2WeakRef = java.lang.Object@74a14482, obj2StrongRef = java.lang.Object@74a14482
after gc: obj1WeakRef = null, obj2WeakRef = java.lang.Object@74a14482, obj2StrongRef = java.lang.Object@74a14482

從結果上可以看出:

  • 我們先new了兩個對象(為避免混淆,稱他們為Object1和Object2),分別用變量obj1和obj2指向它們,同時定義了一個obj2StrongRef,也指向Object2,最後把obj1和obj2均指向null
  • 由於Object1沒有變量強引用它了,所以在gc后,Object1被回收了,obj1WeakRef.get()返回了null
  • 由於Object2還有obj2StrongRef在引用它,所以gc后,Object2依然存在,沒有被回收。

那麼,ThreadLocalMap中對key的引用,為什麼是WeakReference的呢?

因為大部分情況下,線程不死

大部分情況下,線程不會頻繁的創建和銷毀,一般都會用線程池。所以線程對象一般不會被清除,線程的threadLocalMap就一直存在。
如果key對ThreadLocal是強引用,那麼key永遠不會被回收,即使我們程序里再也不用它了。

但是key是弱引用的話,情況就會得到改善:只要沒有指向threadLocal的強引用了,這個ThreadLocal對象就會被清理。

我們還是寫代碼測試一下吧。

/**
 * 測試ThreadLocal對象什麼時候被回收
 * @throws InterruptedException
 */
static void testGC() throws InterruptedException {
    Utils.println("-----------------testGC-------------------");
    Thread thread1 = new Thread(() -> {
        ThreadLocal<String> threadLocal1 = new ThreadLocal<>();
        ThreadLocal<String> threadLocal2 = new ThreadLocal<>();

        threadLocal1.set("threadLocal1-value");
        threadLocal2.set("threadLocal2-value");

        try {
            Object threadLocalMap = getThreadLocalMap(Thread.currentThread());
            Utils.println("print threadLocalMap before gc");
            printThreadLocalMap(threadLocalMap);

            //set threadLocal1 unreachable
            threadLocal1 = null;

            forceGC();

            Utils.println("print threadLocalMap after gc");
            printThreadLocalMap(threadLocalMap);


        } catch (NoSuchFieldException | IllegalAccessException e) {
            e.printStackTrace();
        }

    }, "thread1");

    thread1.start();
    thread1.join();
    Utils.println("------------------------------------------");
}

我們在一個線程里為兩個ThreadLocal對象賦值,最後把其中一個對象的強引用移除,gc后打印當前線程的threadLocalMap。
輸出結果如下:

-----------------testGC-------------------
print threadLocalMap before gc
thread1
----threadLocals (ThreadLocalMap), table.length = 16
--------table[0] -> null
--------table[1] -> entry key = java.lang.ThreadLocal@7bf9cebf, value = threadLocal2-value
--------table[2] -> null
--------table[3] -> null
--------table[4] -> null
--------table[5] -> null
--------table[6] -> null
--------table[7] -> null
--------table[8] -> null
--------table[9] -> null
--------table[10] -> entry key = java.lang.ThreadLocal@56342d38, value = threadLocal1-value
--------table[11] -> null
--------table[12] -> null
--------table[13] -> null
--------table[14] -> null
--------table[15] -> null
print threadLocalMap after gc
thread1
----threadLocals (ThreadLocalMap), table.length = 16
--------table[0] -> null
--------table[1] -> entry key = java.lang.ThreadLocal@7bf9cebf, value = threadLocal2-value
--------table[2] -> null
--------table[3] -> null
--------table[4] -> null
--------table[5] -> null
--------table[6] -> null
--------table[7] -> null
--------table[8] -> null
--------table[9] -> null
--------table[10] -> entry key = null, value = threadLocal1-value
--------table[11] -> null
--------table[12] -> null
--------table[13] -> null
--------table[14] -> null
--------table[15] -> null
------------------------------------------

從輸出結果可以看到,當我們把threadLocal1的強引用移除並gc之後,table[10]的key變成了null,說明threadLocal1這個對象被回收了;threadLocal2的強引用還在,所以table[1]的key不是null,沒有被回收。

但是你發現沒有,table[10]的key雖然是null了,但value還活着! table[10]這個entry對象,也活着!

是的,因為只有key是WeakReference….

無用的entry什麼時候被回收?

通過查看ThreadLocal的源碼,發現在ThreadLocal對象的get/set/remove方法執行時,都有機會清除掉map中已經無用的entry。

最容易驗證清除無用entry的場景分別是:

  • remove:這個不用說了,這哥們本來就是做這個的
  • get:當一個新的threadLocal對象(沒有set過value)發生get調用時,也會作為新的entry加入map,在加入的過程中,有機會清除掉無用的entry,邏輯和下面的set相同。
  • set: 當一個新的threadLocal對象(沒有set過value)發生set調用時,會在map中加入新的entry,此時有機會清除掉無用的entry,清除的邏輯是:
    • 清除掉table數組中的那些無用entry中的一部分,記住是一部分,這個一部分可能全部,也可能是0,具體算法請看ThreadLocalMap.cleanSomeSlots,這裏不解釋了。
    • 如果上一步的”一部分”是0(即清除了0個),並且map的size(是真實size,不是table.length)大於等於threshold(table.length的2/3),會執行一次rehash,在rehash的過程中,清理掉所有無用的entry,並減小size,清理后的size如果還大於等於threshold – threshold/4,則把table擴容為原來的兩倍大小。

還有其他場景,但不好驗證,這裏就不提了。

ThreadLocal源碼就不貼了,貼了也講不明白,相關邏輯在setInitialValue、cleanSomeSlots、expungeStaleEntries、rehash、resize等方法里。

在我們寫代碼驗證entry回收邏輯之前,還需要簡單的提一下ThreadLocalMap的hash算法。

entry數組的下標如何確定?

每個ThreadLocal對象,都有一個threadLocalHashCode變量,在加入ThreadLocalMap的時候,根據這個threadLocalHashCode的值,對entry數組的長度取余(hash & (len – 1)),餘數作為下標。

那麼threadLocalHashCode是怎麼計算的呢?看源碼:

public class ThreadLocal<T>{
    private final int threadLocalHashCode = nextHashCode();
    private static AtomicInteger nextHashCode = new AtomicInteger();

    private static final int HASH_INCREMENT = 0x61c88647;

    private static int nextHashCode() {
        return nextHashCode.getAndAdd(HASH_INCREMENT);
    }
    ...
}

ThreadLocal類維護了一個全局靜態字段nextHashCode,每new一個ThreadLocal對象,nextHashCode都會遞增0x61c88647,作為下一個ThreadLocal對象的threadLocalHashCode。

這個0x61c88647,是個神奇的数字,只要以它為遞增值,那麼和2的N次方取余時,在有限的次數內不會發生重複。
比如和16取余,那麼在16次遞增內,不會發生重複。還是寫代碼驗證一下吧。

int hashCode = 0;
int HASH_INCREMENT = 0x61c88647;
int length = 16;

for(int i = 0; i < length ; i ++){
    int h = hashCode & (length - 1);
    hashCode += HASH_INCREMENT;
    System.out.println("h = " + h + ", i = " + i);
}

輸出結果為:

h = 0, i = 0
h = 7, i = 1
h = 14, i = 2
h = 5, i = 3
h = 12, i = 4
h = 3, i = 5
h = 10, i = 6
h = 1, i = 7
h = 8, i = 8
h = 15, i = 9
h = 6, i = 10
h = 13, i = 11
h = 4, i = 12
h = 11, i = 13
h = 2, i = 14
h = 9, i = 15

你看,h的值在16次遞增內,沒有發生重複。 但是要記住,2的N次方作為長度才會有這個效果,這也解釋了為什麼ThreadLocalMap的entry數組初始長度是16,每次都是2倍的擴容。

驗證新threadLocal的get和set時回收部分無效的entry

為了驗證出結果,我們需要先給ThreadLocal的nextHashCode重置一個初始值,這樣在測試的時候,每個threadLocal的數組下標才會按照我們設計的思路走。

static void resetNextHashCode() throws NoSuchFieldException, IllegalAccessException {
    Field nextHashCodeField = ThreadLocal.class.getDeclaredField("nextHashCode");
    nextHashCodeField.setAccessible(true);
    nextHashCodeField.set(null, new AtomicInteger(1253254570));
}

然後在測試代碼里,我們先調用resetNextHashCode方法,然後加兩個ThreadLocal對象並set值,gc前把強引用去除,gc后再new兩個新的theadLocal對象,分別調用他們的get和set方法。
在每個關鍵點打印出threadLocalMap做比較。

static void testExpungeSomeEntriesWhenGetOrSet() throws InterruptedException {
    Utils.println("----------testExpungeStaleEntries----------");
    Thread thread1 = new Thread(() -> {
        try {
            resetNextHashCode();

            //注意,這裏必須有兩個ThreadLocal,才能驗證出threadLocal1被清理
            ThreadLocal<String> threadLocal1 = new ThreadLocal<>();
            ThreadLocal<String> threadLocal2 = new ThreadLocal<>();

            threadLocal1.set("threadLocal1-value");
            threadLocal2.set("threadLocal2-value");


            Object threadLocalMap = getThreadLocalMap(Thread.currentThread());
            //set threadLocal1 unreachable
            threadLocal1 = null;
            threadLocal2 = null;
            forceGC();

            Utils.println("print threadLocalMap after gc");
            printThreadLocalMap(threadLocalMap);

            ThreadLocal<String> newThreadLocal1 = new ThreadLocal<>();
            newThreadLocal1.get();
            Utils.println("print threadLocalMap after call a new newThreadLocal1.get");
            printThreadLocalMap(threadLocalMap);

            ThreadLocal<String> newThreadLocal2 = new ThreadLocal<>();
            newThreadLocal2.set("newThreadLocal2-value");
            Utils.println("print threadLocalMap after call a new newThreadLocal2.set");
            printThreadLocalMap(threadLocalMap);


        } catch (NoSuchFieldException | IllegalAccessException e) {
            e.printStackTrace();
        }

    }, "thread1");

    thread1.start();
    thread1.join();
    Utils.println("------------------------------------------");
}

程序輸出結果為:

----------testExpungeStaleEntries----------
print threadLocalMap after gc
thread1
----threadLocals (ThreadLocalMap), table.length = 16
--------table[0] -> null
--------table[1] -> entry key = null, value = threadLocal2-value
--------table[2] -> null
--------table[3] -> null
--------table[4] -> null
--------table[5] -> null
--------table[6] -> null
--------table[7] -> null
--------table[8] -> null
--------table[9] -> null
--------table[10] -> entry key = null, value = threadLocal1-value
--------table[11] -> null
--------table[12] -> null
--------table[13] -> null
--------table[14] -> null
--------table[15] -> null
print threadLocalMap after call a new newThreadLocal1.get
thread1
----threadLocals (ThreadLocalMap), table.length = 16
--------table[0] -> null
--------table[1] -> entry key = null, value = threadLocal2-value
--------table[2] -> null
--------table[3] -> null
--------table[4] -> null
--------table[5] -> null
--------table[6] -> null
--------table[7] -> null
--------table[8] -> entry key = java.lang.ThreadLocal@2b63dc81, value = null
--------table[9] -> null
--------table[10] -> null
--------table[11] -> null
--------table[12] -> null
--------table[13] -> null
--------table[14] -> null
--------table[15] -> null
print threadLocalMap after call a new newThreadLocal2.set
thread1
----threadLocals (ThreadLocalMap), table.length = 16
--------table[0] -> null
--------table[1] -> null
--------table[2] -> null
--------table[3] -> null
--------table[4] -> null
--------table[5] -> null
--------table[6] -> null
--------table[7] -> null
--------table[8] -> entry key = java.lang.ThreadLocal@2b63dc81, value = null
--------table[9] -> null
--------table[10] -> null
--------table[11] -> null
--------table[12] -> null
--------table[13] -> null
--------table[14] -> null
--------table[15] -> entry key = java.lang.ThreadLocal@2e93c547, value = newThreadLocal2-value
------------------------------------------

從結果上來看,

  • gc后table[1]和table[10]的key變成了null
  • new newThreadLocal1.get后,新增了table[8],table[10]被清理了,但table[1]還在(這就是cleanSomeSlots中some的意思)
  • new newThreadLocal2.set后,新增了table[15],table[1]被清理了。

驗證map的size大於等於table.length的2/3時回收所有無效的entry

    static void testExpungeAllEntries() throws InterruptedException {
        Utils.println("----------testExpungeStaleEntries----------");
        Thread thread1 = new Thread(() -> {
            try {
                resetNextHashCode();

                int threshold = 16 * 2 / 3;
                ThreadLocal[] threadLocals = new ThreadLocal[threshold - 1];
                for(int i = 0; i < threshold - 1; i ++){
                    threadLocals[i] = new ThreadLocal<String>();
                    threadLocals[i].set("threadLocal" + i + "-value");
                }

                Object threadLocalMap = getThreadLocalMap(Thread.currentThread());

                threadLocals[1] = null;
                threadLocals[8] = null;
                //threadLocals[6] = null;
                //threadLocals[4] = null;
                //threadLocals[2] = null;
                forceGC();

                Utils.println("print threadLocalMap after gc");
                printThreadLocalMap(threadLocalMap);

                ThreadLocal<String> newThreadLocal1 = new ThreadLocal<>();
                newThreadLocal1.set("newThreadLocal1-value");
                Utils.println("print threadLocalMap after call a new newThreadLocal1.get");
                printThreadLocalMap(threadLocalMap);

            } catch (NoSuchFieldException | IllegalAccessException e) {
                e.printStackTrace();
            }

        }, "thread1");

        thread1.start();
        thread1.join();
        Utils.println("------------------------------------------");
    }

我們先創建了9個threadLocal對象並設置了值,然後去掉了其中2個的強引用(注意這2個可不是隨意挑選的)。
gc后再添加一個新的threadLocal,最後打印出最新的map。輸出為:

----------testExpungeStaleEntries----------
print threadLocalMap after gc
thread1
----threadLocals (ThreadLocalMap), table.length = 16
--------table[0] -> null
--------table[1] -> entry key = null, value = threadLocal1-value
--------table[2] -> entry key = null, value = threadLocal8-value
--------table[3] -> null
--------table[4] -> entry key = java.lang.ThreadLocal@60523912, value = threadLocal6-value
--------table[5] -> null
--------table[6] -> entry key = java.lang.ThreadLocal@48fccd7a, value = threadLocal4-value
--------table[7] -> null
--------table[8] -> entry key = java.lang.ThreadLocal@188bbe72, value = threadLocal2-value
--------table[9] -> null
--------table[10] -> entry key = java.lang.ThreadLocal@19e0ebe8, value = threadLocal0-value
--------table[11] -> entry key = java.lang.ThreadLocal@688bcb6f, value = threadLocal7-value
--------table[12] -> null
--------table[13] -> entry key = java.lang.ThreadLocal@46324c19, value = threadLocal5-value
--------table[14] -> null
--------table[15] -> entry key = java.lang.ThreadLocal@38f1283, value = threadLocal3-value
print threadLocalMap after call a new newThreadLocal1.get
thread1
----threadLocals (ThreadLocalMap), table.length = 32
--------table[0] -> null
--------table[1] -> null
--------table[2] -> null
--------table[3] -> null
--------table[4] -> null
--------table[5] -> null
--------table[6] -> entry key = java.lang.ThreadLocal@48fccd7a, value = threadLocal4-value
--------table[7] -> null
--------table[8] -> null
--------table[9] -> entry key = java.lang.ThreadLocal@1dae16b1, value = newThreadLocal1-value
--------table[10] -> entry key = java.lang.ThreadLocal@19e0ebe8, value = threadLocal0-value
--------table[11] -> null
--------table[12] -> null
--------table[13] -> entry key = java.lang.ThreadLocal@46324c19, value = threadLocal5-value
--------table[14] -> null
--------table[15] -> null
--------table[16] -> null
--------table[17] -> null
--------table[18] -> null
--------table[19] -> null
--------table[20] -> entry key = java.lang.ThreadLocal@60523912, value = threadLocal6-value
--------table[21] -> null
--------table[22] -> null
--------table[23] -> null
--------table[24] -> entry key = java.lang.ThreadLocal@188bbe72, value = threadLocal2-value
--------table[25] -> null
--------table[26] -> null
--------table[27] -> entry key = java.lang.ThreadLocal@688bcb6f, value = threadLocal7-value
--------table[28] -> null
--------table[29] -> null
--------table[30] -> null
--------table[31] -> entry key = java.lang.ThreadLocal@38f1283, value = threadLocal3-value
------------------------------------------

從結果上看:

  • gc后table[1]和table[2](即threadLocal1和threadLocal8)的key變成了null
  • 加入新的threadLocal后,table的長度從16變成了32(因為此時的size是8,正好等於10 – 10/4,所以擴容),並且threadLocal1和threadLocal8這兩個entry不見了。

如果在gc前,我們把threadLocals[1、8、6、4、2]都去掉強引用,加入新threadLocal後會發現1、8、6、4、2被清除了,但沒有擴容,因為此時size是5,小於10-10/4。這個邏輯就不貼測試結果了,你可以取消註釋上面代碼中相關的邏輯試試。

大部分場景下,ThreadLocal對象的生命周期是和app一致的,弱引用形同虛設

回到現實中。

我們用ThreadLocal的目的,無非是在跨方法調用時更方便的線程安全地存儲和使用變量。這就意味着ThreadLocal的生命周期很長,甚至和app是一起存活的,強引用一直在。

既然強引用一直存在,那麼弱引用就形同虛設了。

所以在確定不再需要ThreadLocal中的值的情況下,還是老老實實的調用remove方法吧!

代碼地址

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理【其他文章推薦】

台北網頁設計公司這麼多,該如何挑選?? 網頁設計報價省錢懶人包"嚨底家"

網頁設計公司推薦更多不同的設計風格,搶佔消費者視覺第一線

※想知道購買電動車哪裡補助最多?台中電動車補助資訊懶人包彙整

全球Q2電動汽車發展指數 中國首度躍居整體排名第一

中證網報導,羅蘭貝格與德國汽車研究機構亞琛汽車工程技術公司共同發佈《2017年第二季全球電動汽車發展指數》。報告中顯示,中國大陸首次在電動汽車發展指數的整體排名中躍居全球第一,並直指儘管政府新能源政策收緊,補貼力道減弱,中國電動汽車和電池製造市場份額仍將保持強有力的增長,進一步擴大領先優勢。

該報告對中國、德國、法國、義大利、美國、日本和韓國電動汽車的發展現狀進行比較。整體而言,中國首次躍居指數整體排名第一,美國與德國分居第二、三位,而在上一季指數排名中位列第一的日本則失去領先地位。報告預測,在可預見的未來,中國將統領電動汽車的行業與市場。

在技術層面,法國超越德國,位居首位,主要由於有更多的德國整車廠大批量生產續航能力和最高電動時速都較低的插電式混合動力汽車,導致其電動汽車技術能力略有下降;日本排名第三,其整車廠的電動汽車技術水準較高且價格更加實惠;中國整車廠則仍主要定位於技術含量較低的領域。

在行業總量層面,中國正在逐步擴大其領先優勢;在電池製造領域,中國的優勢也更加明顯;反觀日本在電動汽車產量和全球電池產量份額上都處於不利地位,排名維持在第三;美國行業成績有所提升,位居第二。至於在市場規模層面,中國的需求進一步急劇增長,但電動汽車所占市場份額仍略低於法國,排在第二,美國名列第三。

資料顯示,2016年中國生產了超過35萬輛插電式混合動力和純電動乘用車,銷售額保持兩位數增長,市佔率從0.8%上升至1.3%;同年,德國、法國與美國電動汽車的註冊數量均實現了兩位數的增長。但整體而言,2016年僅有法國與中國兩個國家的純電動和插電式混合動力汽車市場份額超過1%。

報告認為,中國電動汽車銷量的快速增長主要得益於政府大幅度補貼和主要城市對汽油車的限牌政策,但政府對於汽車廠商的政策正在收緊。對此,羅蘭貝格合夥人鄭贇表示,雖然大陸政府的激勵政策在初期對行業發展起到了重要的推動作用,但難以長久維持,政府需要控制成本,也有意讓本土廠商培育自身能力,電動汽車產業的發展將由政府推動向市場推動轉變,其最新版的新能源汽車雙積分管理意見徵求稿就明確地傳達了此訊號。

根據羅蘭貝格的估算,要達到新能源汽車積分比例2020年12%的目標,該年電動汽車的總銷量需達到約160萬輛。鄭贇指出,汽車設計、配件整合以及供應商管理能力將成為大陸本土廠商所面臨的重大挑戰,想要在政府退補的情況下實現增長、完成積分目標,成本控制是關鍵;只有成本控制能力和價格競爭力的提升才能幫助其本土廠商在國際電動汽車市場上保持長期的競爭優勢。

(本文內容由授權使用。圖片出處:pixabay CC0)

本站聲明:網站內容來源於EnergyTrend https://www.energytrend.com.tw/ev/,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

台北網頁設計公司這麼多,該如何挑選?? 網頁設計報價省錢懶人包"嚨底家"

網頁設計公司推薦更多不同的設計風格,搶佔消費者視覺第一線

※想知道購買電動車哪裡補助最多?台中電動車補助資訊懶人包彙整

馬斯克看好電動車市場,Tesla 將在美國建 3 座超級工廠

電動車製造商特斯拉(Tesla)和伊隆·馬斯克(Elon Musk)在過去幾個月中都多次提及了投資建造超級工廠的計畫,新製造工廠的選址將在年底公開,據悉 Tesla 將建造 5 座超級工廠,其中美國將有 2 到 3 座全新的超級工廠。   特斯拉將在海外建造超級工廠的消息已經傳了幾個月的時間了,在美國本土的超級工廠建造計畫則一直沒有進展。該公司 CEO 馬斯克確認將有 2 到 3 座超級工廠選址在美國本土。   2017 年 6 月特斯拉公司在股東大會上確認有 3 座超級工廠選址已經啟動,這些工廠包括了電動車和電池生產線。   據之前媒體曝光的消息顯示,特斯拉至少將在海外市場建造兩座超級工廠,分別位於歐洲和中國,特斯拉已經與中國上海市政府簽訂合作協議,共同建造電動車製造工廠。   在美國州長協議的會議上,馬斯克公開表示,將會有 2 到 3 座超級工廠選址在美國本土,他面對所有州長做出這一表態,也是希望政府部門能夠提供工廠建造和電動車生產方便的優惠政策,顯然許多州長都對 Tesla 超級工廠非常感興趣。   特斯拉在內華達州的超級工廠給該州帶來了超過 50 億美元的投資,創造了一萬個工作職缺,馬斯克表示,吸引 Tesla 把超級工廠建造內華達州的因素很多,包括稅收方面的優惠。   馬斯克希望政府部門能夠在立法上做出更多進步,讓新的技術能夠更快地商業化,之前他曾多次公開表示內華達州政府具有前瞻性,在超級工廠的建造過程中展示了前所未有的高效。   Tesla 未來的超級工廠將把電動車製造和電池的生產放在同一座工廠,有效地提升電動車的產能,而不是像現在這樣電池和電動車分開製造,再運往組裝工廠。   (合作媒體:。圖片出處:Tesla)  

本站聲明:網站內容來源於EnergyTrend https://www.energytrend.com.tw/ev/,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

台北網頁設計公司這麼多,該如何挑選?? 網頁設計報價省錢懶人包"嚨底家"

網頁設計公司推薦更多不同的設計風格,搶佔消費者視覺第一線

※想知道購買電動車哪裡補助最多?台中電動車補助資訊懶人包彙整

電動車增長 高盛估全球石油需求最快2024年觸頂

路透社7月25日報導,高盛(Goldman Sachs)最新報告表示,受到汽車燃油效率提高、電動車產業快速發展、經濟增長低迷以及高油價等因素的影響,全球石油需求最快可能在2024年就會到達高峰。報告預估,全球的電動車市場將從2016年的200萬輛,爆發增長至2030年的8,300萬輛。全球石油需求的年均增長率將從2011到2016年期間的1.6%,降至2017到2022年的1.2%,至2025年降至0.7%,2030年降至0.4%。

高盛表示,從現在起到2030年,運輸部門對石油需求增長的貢獻將會逐步下滑,石化產業的需求將取而代之並躍居主流。報告也預估未來五年油品的供應將會出現過剩,因煉油產能持續增加但需求增長放緩的影響,這會使得全球煉油廠的產能利用率下滑,並壓縮煉油廠的毛利。此外,由於越來越多的石化原料來自煉油體系之外(如天然氣凝析油等),煉油廠石油需求的佔比也將會下滑。

亞洲三大石油消費國中國大陸、印度以及日本的需求增長疲弱,將令油市重新恢復平衡的時間拉長。大陸、印度以及日本合計佔全球石油需求的20%比重,但各自面臨不同的困難,使得石油需求的增長疲弱。其中,日本受困於人口老化以及汽車燃油效率的持續提高,印度因去年底去貨幣化的政策衝擊需求,而中國大陸正積極去化過剩煉油產能,將會影響到原油的需求。

英國石油公司(BP PLC)發布的《世界能源統計年鑑》表示,2016年,全球能源需求增長1%,連續第三年呈現疲弱的增長態勢(2014與2015年分別年增1%與0.9%),相比過去十年的平均增長率為1.8%。主要的增長來自於中國大陸與印度,其中印度2016年能源需求年增5.4%,增速與過去幾年相符。大陸去年能源需求年增1.3%,與2015年的1.2%增幅相近,但只有過去十年平均增速的四分之一,並寫下1997-98年亞洲金融風暴以來的連續兩年最低增速。

BP年鑑指出,2016年,石油消費佔全球能源消費的三分之一比重,全球石油需求年增1.6%或每日160萬桶,此高於過去十年的平均增速(1.2%)。其中,大陸石油需求年增每日40萬桶,印度以及歐洲的石油需求均年增每日30萬桶。2016年,全球石油日產量僅年增40萬桶,則是創下2013年以來的最低增長;其中,中東的石油日產量年增170萬桶,但中東以外的石油日產量則是年減130萬桶。

(本文內容由授權使用。圖片出處:public domain CC0)

 

本站聲明:網站內容來源於EnergyTrend https://www.energytrend.com.tw/ev/,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

台北網頁設計公司這麼多,該如何挑選?? 網頁設計報價省錢懶人包"嚨底家"

網頁設計公司推薦更多不同的設計風格,搶佔消費者視覺第一線

※想知道購買電動車哪裡補助最多?台中電動車補助資訊懶人包彙整

動手造輪子:實現簡單的 EventQueue

動手造輪子:實現簡單的 EventQueue

Intro

最近項目里有遇到一些併發的問題,想實現一個隊列來將併發的請求一個一個串行處理,可以理解為使用消息隊列處理併發問題,之前實現過一個簡單的 EventBus,於是想在 EventBus 的基礎上改造一下,加一個隊列,改造成類似消息隊列的處理模式。消息的處理(Consumer)直接使用 .netcore 里的 IHostedService 來實現了一個簡單的後台任務處理。

初步設計

  • Event 抽象的事件
  • EventHandler 處理 Event 的方法
  • EventStore 保存訂閱 Event 的 EventHandler
  • EventQueue 保存 Event 的隊列
  • EventPublisher 發布 Event
  • EventConsumer 處理 Event 隊列里的 Event
  • EventSubscriptionManager 管理訂閱 Event 的 EventHandler

實現代碼

EventBase 定義了基本事件信息,事件發生時間以及事件的id:

public abstract class EventBase
{
    [JsonProperty]
    public DateTimeOffset EventAt { get; private set; }

    [JsonProperty]
    public string EventId { get; private set; }

    protected EventBase()
    {
      this.EventId = GuidIdGenerator.Instance.NewId();
      this.EventAt = DateTimeOffset.UtcNow;
    }

    [JsonConstructor]
    public EventBase(string eventId, DateTimeOffset eventAt)
    {
      this.EventId = eventId;
      this.EventAt = eventAt;
    }
}

EventHandler 定義:

public interface IEventHandler
{
    Task Handle(IEventBase @event);
}

public interface IEventHandler<in TEvent> : IEventHandler where TEvent : IEventBase
{
    Task Handle(TEvent @event);
}

public class EventHandlerBase<TEvent> : IEventHandler<TEvent> where TEvent : EventBase
{
    public virtual Task Handle(TEvent @event)
    {
        return Task.CompletedTask;
    }

    public Task Handle(IEventBase @event)
    {
        return Handle(@event as TEvent);
    }
}

EventStore:

public class EventStore
{
    private readonly Dictionary<Type, Type> _eventHandlers = new Dictionary<Type, Type>();

    public void Add<TEvent, TEventHandler>() where TEventHandler : IEventHandler<TEvent> where TEvent : EventBase
    {
        _eventHandlers.Add(typeof(TEvent), typeof(TEventHandler));
    }

    public object GetEventHandler(Type eventType, IServiceProvider serviceProvider)
    {
        if (eventType == null || !_eventHandlers.TryGetValue(eventType, out var handlerType) || handlerType == null)
        {
            return null;
        }
        return serviceProvider.GetService(handlerType);
    }

    public object GetEventHandler(EventBase eventBase, IServiceProvider serviceProvider) =>
        GetEventHandler(eventBase.GetType(), serviceProvider);

    public object GetEventHandler<TEvent>(IServiceProvider serviceProvider) where TEvent : EventBase =>
        GetEventHandler(typeof(TEvent), serviceProvider);
}

EventQueue 定義:

public class EventQueue
{
    private readonly ConcurrentDictionary<string, ConcurrentQueue<EventBase>> _eventQueues =
        new ConcurrentDictionary<string, ConcurrentQueue<EventBase>>();

    public ICollection<string> Queues => _eventQueues.Keys;

    public void Enqueue<TEvent>(string queueName, TEvent @event) where TEvent : EventBase
    {
        var queue = _eventQueues.GetOrAdd(queueName, q => new ConcurrentQueue<EventBase>());
        queue.Enqueue(@event);
    }

    public bool TryDequeue(string queueName, out EventBase @event)
    {
        var queue = _eventQueues.GetOrAdd(queueName, q => new ConcurrentQueue<EventBase>());
        return queue.TryDequeue(out @event);
    }

    public bool TryRemoveQueue(string queueName)
    {
        return _eventQueues.TryRemove(queueName, out _);
    }

    public bool ContainsQueue(string queueName) => _eventQueues.ContainsKey(queueName);

    public ConcurrentQueue<EventBase> this[string queueName] => _eventQueues[queueName];
}

EventPublisher:

public interface IEventPublisher
{
    Task Publish<TEvent>(string queueName, TEvent @event)
        where TEvent : EventBase;
}
public class EventPublisher : IEventPublisher
{
    private readonly EventQueue _eventQueue;

    public EventPublisher(EventQueue eventQueue)
    {
        _eventQueue = eventQueue;
    }

    public Task Publish<TEvent>(string queueName, TEvent @event)
        where TEvent : EventBase
    {
        _eventQueue.Enqueue(queueName, @event);
        return Task.CompletedTask;
    }
}

EventSubscriptionManager:

public interface IEventSubscriptionManager
{
    void Subscribe<TEvent, TEventHandler>()
        where TEvent : EventBase
        where TEventHandler : IEventHandler<TEvent>;
}

public class EventSubscriptionManager : IEventSubscriptionManager
{
    private readonly EventStore _eventStore;

    public EventSubscriptionManager(EventStore eventStore)
    {
        _eventStore = eventStore;
    }

    public void Subscribe<TEvent, TEventHandler>()
        where TEvent : EventBase
        where TEventHandler : IEventHandler<TEvent>
    {
        _eventStore.Add<TEvent, TEventHandler>();
    }
}

EventConsumer:

public class EventConsumer : BackgroundService
{
    private readonly EventQueue _eventQueue;
    private readonly EventStore _eventStore;
    private readonly int maxSemaphoreCount = 256;
    private readonly IServiceProvider _serviceProvider;
    private readonly ILogger _logger;

    public EventConsumer(EventQueue eventQueue, EventStore eventStore, IConfiguration configuration, ILogger<EventConsumer> logger, IServiceProvider serviceProvider)
    {
        _eventQueue = eventQueue;
        _eventStore = eventStore;
        _logger = logger;
        _serviceProvider = serviceProvider;
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        using (var semaphore = new SemaphoreSlim(Environment.ProcessorCount, maxSemaphoreCount))
        {
            while (!stoppingToken.IsCancellationRequested)
            {
                var queues = _eventQueue.Queues;
                if (queues.Count > 0)
                {
                    await Task.WhenAll(
                    queues
                        .Select(async queueName =>
                        {
                            if (!_eventQueue.ContainsQueue(queueName))
                            {
                                return;
                            }
                            try
                            {
                                await semaphore.WaitAsync(stoppingToken);
                                //
                                if (_eventQueue.TryDequeue(queueName, out var @event))
                                {
                                    var eventHandler = _eventStore.GetEventHandler(@event, _serviceProvider);
                                    if (eventHandler is IEventHandler handler)
                                    {
                                        _logger.LogInformation(
                                            "handler {handlerType} begin to handle event {eventType}, eventId: {eventId}, eventInfo: {eventInfo}",
                                            eventHandler.GetType().FullName, @event.GetType().FullName,
                                            @event.EventId, JsonConvert.SerializeObject(@event));

                                        try
                                        {
                                            await handler.Handle(@event);
                                        }
                                        catch (Exception e)
                                        {
                                            _logger.LogError(e, "event  {eventId}  handled exception", @event.EventId);
                                        }
                                        finally
                                        {
                                            _logger.LogInformation("event {eventId} handled", @event.EventId);
                                        }
                                    }
                                    else
                                    {
                                        _logger.LogWarning(
                                            "no event handler registered for event {eventType}, eventId: {eventId}, eventInfo: {eventInfo}",
                                            @event.GetType().FullName, @event.EventId,
                                            JsonConvert.SerializeObject(@event));
                                    }
                                }
                            }
                            catch (Exception ex)
                            {
                                _logger.LogError(ex, "error running EventConsumer");
                            }
                            finally
                            {
                                semaphore.Release();
                            }
                        })
                );
                }

                await Task.Delay(50, stoppingToken);
            }
        }
    }
}

為了方便使用定義了一個 Event 擴展方法:

public static IServiceCollection AddEvent(this IServiceCollection services)
{
    services.TryAddSingleton<EventStore>();
    services.TryAddSingleton<EventQueue>();
    services.TryAddSingleton<IEventPublisher, EventPublisher>();
    services.TryAddSingleton<IEventSubscriptionManager, EventSubscriptionManager>();

    services.AddSingleton<IHostedService, EventConsumer>();
    return services;
}

使用示例

定義 PageViewEvent 記錄請求信息:

public class PageViewEvent : EventBase
{
    public string Path { get; set; }
}

這裏作為示例只記錄了請求的Path信息,實際使用可以增加更多需要記錄的信息

定義 PageViewEventHandler,處理 PageViewEvent

public class PageViewEventHandler : EventHandlerBase<PageViewEvent>
{
    private readonly ILogger _logger;

    public PageViewEventHandler(ILogger<PageViewEventHandler> logger)
    {
        _logger = logger;
    }

    public override Task Handle(PageViewEvent @event)
    {
        _logger.LogInformation($"handle pageViewEvent: {JsonConvert.SerializeObject(@event)}");
        return Task.CompletedTask;
    }
}

這個 handler 里什麼都沒做只是輸出一個日誌

這個示例項目定義了一個記錄請求路徑的事件以及一個發布請求記錄事件的中間件

// 發布 Event 的中間件
app.Use(async (context, next) =>
{
    var eventPublisher = context.RequestServices.GetRequiredService<IEventPublisher>();
    await eventPublisher.Publish("pageView", new PageViewEvent() { Path = context.Request.Path.Value });
    await next();
});

Startup 配置:

public void ConfigureServices(IServiceCollection services)
{
    // ...
    services.AddEvent();
    services.AddSingleton<PageViewEventHandler>();// 註冊 Handler
}

// This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
public void Configure(IApplicationBuilder app, IHostingEnvironment env, IEventSubscriptionManager eventSubscriptionManager)
{
    eventSubscriptionManager.Subscribe<PageViewEvent, PageViewEventHandler>();
    app.Use(async (context, next) =>
    {
        var eventPublisher = context.RequestServices.GetRequiredService<IEventPublisher>();
        await eventPublisher.Publish("pageView", new PageViewEvent() { Path = context.Request.Path.Value });
        await next();
    });
    // ...
}

使用效果:

More

注:只是一個初步設計,基本可以實現功能,還是有些不足,實際應用的話還有一些要考慮的事情

  1. Consumer 消息邏輯,現在的實現有些問題,我們的應用場景目前比較簡單還可以滿足,如果事件比較多就會而且每個事件可能處理需要的時間長短不一樣,會導致在一個批次中執行的 Event 中已經完成的事件要等待其他還沒完成的事件完成之後才能繼續取下一個事件,理想的消費模式應該是各個隊列相互獨立,在同一個隊列中保持順序消費即可
  2. 上面示例的 EventStore 的實現只是簡單的實現了一個事件一個 Handler 的處理情況,實際業務場景中很可能會有一個事件需要多個 Handler 的情況
  3. 這個實現是基於內存的,如果要在分佈式場景下使用就不適用了,需要自己實現一下基於redis或者數據庫的以滿足分佈式的需求
  4. and more…

上面所有的代碼可以在 Github 上獲取,示例項目 Github 地址:

Reference

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理【其他文章推薦】

台北網頁設計公司這麼多,該如何挑選?? 網頁設計報價省錢懶人包"嚨底家"

網頁設計公司推薦更多不同的設計風格,搶佔消費者視覺第一線

※想知道購買電動車哪裡補助最多?台中電動車補助資訊懶人包彙整

.NET進階篇06-async異步、thread多線程3

知識需要不斷積累、總結和沉澱,思考和寫作是成長的催化劑

梯子

一、任務Task

System.Threading.Tasks在.NET4引入,前麵線程的API太多了,控制不方便,而ThreadPool控制能力又太弱,比如做線程的延續、阻塞、取消、超時等功能不太方便,所以Task就抽象了線程功能,在後台使用ThreadPool

1、啟動任務

可以使用TaskFactory類或Task類的構造函數和Start()方法,委託可以提供帶有一個Object類型的輸入參數,所以可以給任務傳遞任意數據,還漏了一個常用的Task.Run

TaskFactory taskFactory = new TaskFactory();
taskFactory.StartNew(() => 
{
    Console.WriteLine($"tid={Thread.CurrentThread.ManagedThreadId},datetime={DateTime.Now}");
});
Task.Factory.StartNew(() =>
{
    Console.WriteLine($"tid={Thread.CurrentThread.ManagedThreadId},datetime={DateTime.Now}");
});
Task task = new Task(() =>
{
    Console.WriteLine($"tid={Thread.CurrentThread.ManagedThreadId},datetime={DateTime.Now}");
});
task.Start();

只有Task類實例方式需要Start()去啟動任務,當然可以RunSynchronously()來同步執行任務,主線程會等待,就是用主線程來執行這個task任務

Task task = new Task(() =>
{
    Thread.Sleep(10000);
    Console.WriteLine($"tid={Thread.CurrentThread.ManagedThreadId},datetime={DateTime.Now}");
});
task.RunSynchronously();

2、阻塞延續

在Thread中我們使用join來阻塞等待,在多個Thread時進行控制就不太方便。Task中我們使用實例方法Wait阻塞單個任務或靜態方法WaitAll和WaitAny阻塞多個任務

var task = new Task(() =>
{
    Thread.Sleep(5*1000);
    Console.WriteLine($"tid={Thread.CurrentThread.ManagedThreadId},datetime={DateTime.Now}");
});
var task2 = new Task(() =>
{
    Thread.Sleep(10 * 1000);
    Console.WriteLine($"tid={Thread.CurrentThread.ManagedThreadId},datetime={DateTime.Now}");
});
task.Start();
task2.Start();
//task.Wait();//單任務等待
//Task.WaitAny(task, task2);//任何一個任務完成就繼續
Task.WaitAll(task, task2);//任務都完成才繼續

如果不希望阻塞主線程,實現當一個任務或幾個任務完成后執行別的任務,可以使用Task靜態方法WhenAll和WhenAny,他們將返回一個Task,但這個Task不允許你控制,將會在滿足WhenAll和WhenAny里任務完成時自動完成,然後調用Task的ContinueWith方法,就可以在一個任務完成后緊跟開始另一個任務

Task.WhenAll(task, task2).ContinueWith((t) =>
{
    Console.WriteLine($"tid={Thread.CurrentThread.ManagedThreadId},datetime={DateTime.Now}");
});

Task.Factory工廠中也存在類似ContinueWhenAll和ContinueWhenAny

3、任務層次結構

不僅可以在一個任務結束后執行另一個任務,也可以在一個任務內啟動一個任務,這就啟動了一個父子層次結構

var parentTask = new Task(()=> 
{
    Console.WriteLine($"parentId={Thread.CurrentThread.ManagedThreadId},datetime={DateTime.Now}");
    Thread.Sleep(5*1000);
    var childTask = new Task(() =>
    {
        Thread.Sleep(10 * 1000);
        Console.WriteLine($"childId={Thread.CurrentThread.ManagedThreadId},datetime={DateTime.Now}")
    });
    childTask.Start();
});
parentTask.Start();

如果父任務在子任務之前結束,父任務的狀態為WaitingForChildrenToComplete,當子任務也完成時,父任務的狀態就變為RanToCompletion,當然,在創建任務時指定TaskCreationOptions枚舉參數,可以控制任務的創建和執行的可選行為

4、枚舉參數

簡單介紹下創建任務中的TaskCreationOptions枚舉參數,創建任務時我們可以提供TaskCreationOptions枚舉參數,用於控制任務的創建和執行的可選行為的標誌

  1. AttachedToParent:指定將任務附加到任務層次結構中的某個父級,意思就是建立父子關係,父任務必須等待子任務完成才可以繼續執行。和WaitAll效果一樣。上面例子如果在創建子任務時指定TaskCreationOptions.AttachedToParent,那麼父任務wait時也會等子任務的結束
  2. DenyChildAttach:不讓子任務附加到父任務上
  3. LongRunning:指定是長時間運行任務,如果事先知道這個任務會耗時比較長,建議設置此項。這樣,Task調度器會創建Thread線程,而不使用ThreadPool線程。因為你長時間佔用ThreadPool線程不還,那它可能必要時會在線程池中開啟新的線程,造成調度壓力
  4. PreferFairness:盡可能公平的安排任務,這意味着較早安排的任務將更可能較早運行,而較晚安排運行的任務將更可能較晚運行。實際通過把任務放到線程池的全局隊列中,讓工作線程去爭搶,默認是在本地隊列中。

另一個枚舉參數是ContinueWith方法中的TaskContinuationOptions枚舉參數,它除了擁有幾個和上面同樣功能的枚舉值外,還擁有控制任務的取消延續等功能

  1. LazyCancellation:在延續取消的情況下,防止延續的完成直到完成先前的任務。什麼意思呢?
CancellationTokenSource source = new CancellationTokenSource();
source.Cancel();
var task1 = new Task(() => 
{
    Console.WriteLine($"task1 id={Thread.CurrentThread.ManagedThreadId},datetime={DateTime.Now}");
});
var task2 = task1.ContinueWith(t =>
{
    Console.WriteLine($"task2 id={Thread.CurrentThread.ManagedThreadId},datetime={DateTime.Now}");
},source.Token);
var task3 = task2.ContinueWith(t =>
{
    Console.WriteLine($"task3 id={Thread.CurrentThread.ManagedThreadId},datetime={DateTime.Now}");
});
task1.Start();

上面例子我們企圖task1->task2->task3順序執行,然後通過CancellationToken來取消task2的執行。結果會是怎樣呢?結果task1和task3會并行執行(task3也是會執行的,而且是和task1并行,等於原來的一條鏈變成了兩條鏈),然後我們嘗試使用LazyCancellation,

var task2 = task1.ContinueWith(t =>
{
    Console.WriteLine($"task2 id={Thread.CurrentThread.ManagedThreadId},datetime={DateTime.Now}");
},source.Token,TaskContinuationOptions.LazyCancellation,TaskScheduler.Current);

這樣,將會在task1執行完成后,task2才去判斷source.Token,為Cancel就不執行,接下來執行task3就保證了原來的順序

  1. ExecuteSynchronously:指定應同步執行延續任務,比如上例中,在延續任務task2中指定此參數,則task2會使用執行task1的線程來執行,這樣防止線程切換,可以做些共有資源的訪問。不指定的話就隨機,但也能也用到task1的線程
  2. NotOnRanToCompletion:延續任務必須在前面任務非完成狀態下執行
  3. OnlyOnRanToCompletion:延續任務必須在前面任務完成狀態才能執行
  4. NotOnFaulted,OnlyOnCanceled,OnlyOnFaulted等等

5、任務取消

在上篇使用Thread時,我們使用一個變量isStop標記是否取消任務,這種訪問共享變量的方式難免會出問題。task中提出CancellationTokenSource類專門處理任務取消,常見用法看下面代碼註釋

CancellationTokenSource source = new CancellationTokenSource();//構造函數中也可指定延遲取消
//註冊一個取消時調用的委託
source.Token.Register(() =>
{
    Console.WriteLine("當前source已經取消,可以在這裏做一些其他事情(比如資源清理)...");
});
var task1 = new Task(() => 
{
    while (!source.IsCancellationRequested)
    {
        Console.WriteLine($"task1 id={Thread.CurrentThread.ManagedThreadId},datetime={DateTime.Now}");
    }
},source.Token);
task1.Start();
//source.Cancel();//取消
source.CancelAfter(1000);//延時取消

6、任務結果

讓子線程返回結果,可以將信息寫入到線程安全的共享變量中去,或則使用可以返回結果的任務。使用Task的泛型版本Task<TResult>,就可以定義返回結果的任務。Task是繼承自Task的,Result獲取結果時是要阻塞等待直到任務完成返回結果的,內部判斷沒有完成則wait。通過TaskStatus屬性可獲得此任務的狀態是啟動、運行、異常還是取消等

var task = new Task<string>(() =>
{
     return "hello ketty";
});
task.Start();
string result = task.Result;

7、異常

可以使用AggregateException來接受任務中的異常信息,這是一個聚合異常繼承自Exception,可以遍歷獲取包含的所有異常,以及進行異常處理,決定是否繼續往上拋異常等

var task = Task.Factory.StartNew(() =>
{
    var childTask1 = Task.Factory.StartNew(() =>
    {
        throw new Exception("childTask1異常...");
    },TaskCreationOptions.AttachedToParent);
    var childTask12= Task.Factory.StartNew(() =>
    {
        throw new Exception("childTask2異常...");
    }, TaskCreationOptions.AttachedToParent);
});
try
{
    try
    {
        task.Wait();
    }
    catch (AggregateException ex)
    {
        foreach (var item in ex.InnerExceptions)
        {
            Console.WriteLine($"message{item.InnerException.Message}");
        }
        ex.Handle(x =>
        {
            if (x.InnerException.Message == "childTask1異常...")
            {
                return true;//異常被處理,不繼續往上拋了
            }
            return false;
        });
    }
}
catch (Exception ex)
{
    throw;
}

二、并行Parallel

1、Parallel.For()、Parallel.ForEach()

在.NET4中,另一個新增的抽象的線程時Parallel類。這個類定義了并行的for和foreach的靜態方法。Parallel.For()和Parallel.ForEach()方法多次調用一個方法,而Parallel.Invoke()方法允許同時調用不同的方法。首先Parallel是會阻塞主線程的,它將讓主線程也參与到任務中
Parallel.For()類似於for允許語句,并行迭代同一個方法,迭代順序沒有保證的

ParallelLoopResult result = Parallel.For(010, i =>
{
    Console.WriteLine($"{i} task:{Task.CurrentId} thread:{Thread.CurrentThread.ManagedThreadId}");
});
Console.WriteLine(result.IsCompleted);

也可以提前中斷Parallel.For()方法。For()方法的一個重載版本接受Action<int,parallelloopstate style=”font-size: inherit; color: inherit; line-height: inherit; margin: 0px; padding: 0px;”>類型參數。一般不使用,像下面這樣,本想大於5就停止,但實際也可能有大於5的任務已經在跑了。可以通過ParallelOptions傳入允許最大線程數以及取消Token等

ParallelLoopResult result = Parallel.For(010new ParallelOptions() { MaxDegreeOfParallelism = 8 },(i,loop) =>
{
    Console.WriteLine($"{i} task:{Task.CurrentId} thread:{Thread.CurrentThread.ManagedThreadId}");
    if (i > 5)
    {
        loop.Break();
    }
});

2、Parallel.For<TLocal>

For還有一個高級泛型版本,相當於并行的聚合計算

ParallelLoopResult For<TLocal>(int fromInclusive, int toExclusive, Func<TLocal> localInit, Func<int, ParallelLoopStateTLocalTLocal> body, Action<TLocal> localFinally);

像下面這樣我們求0…100的和,第三個參數更定一個種子初始值,第四個參數迭代累計,最後聚合

int totalNum = 0;
Parallel.For<int>(0100() => { return 0; }, (current, loop, total) =>
{
    total += current;
    return total;
}, (total) =>
{
    Interlocked.Add(ref totalNum, total);
});

上面For用來處理數組數據,ForEach()方法用來處理非數組的數據任務,比如字典數據繼承自IEnumerable的集合等

3、Parallel.Invoke()

Parallel.Invoke()則可以并行調用不同的方法,參數傳遞一個Action的委託數組

Parallel.Invoke(() => { Console.WriteLine($"方法1 thread:{Thread.CurrentThread.ManagedThreadId}"); }
    , () => { Console.WriteLine($"方法2 thread:{Thread.CurrentThread.ManagedThreadId}"); }
    , () => { Console.WriteLine($"方法3 thread:{Thread.CurrentThread.ManagedThreadId}"); });

4、PLinq

Plinq,為了能夠達到最大的靈活度,linq有了并行版本。使用也很簡單,只需要將原始集合AsParallel就轉換為支持并行化的查詢。也可以AsOrdered來順序執行,取消Token,強制并行等

var nums = Enumerable.Range(0100);
var query = from n in nums.AsParallel()
            select new
            {
                thread=$"tid={Thread.CurrentThread.ManagedThreadId},datetime={DateTime.Now}"
            };

三、異步等待AsyncAwait

異步編程模型,可能還需要大篇幅來學習,這裏先介紹下基本用法,內在本質需要用ILSpy反編譯來看,以後可能要分專題總結。文末先給幾個參考資料,有興趣自己闊以先琢磨琢磨鴨

1、簡單使用

這是.NET4.5開始提供的一對語法糖,使得可以較簡便的使用異步編程。async用在方法定義前面,await只能寫在帶有async標記的方法中,任何方法都可以增加async,一般成對出現,只有async沒有意義,只有await會報錯,請先看下面的示例

private static async void AsyncTest()
{
    //主線程執行
    Console.WriteLine($"before await ThreadId={Thread.CurrentThread.ManagedThreadId}");
    TaskFactory taskFactory = new TaskFactory();
    Task task = taskFactory.StartNew(() =>
    {
        Thread.Sleep(3000);
        Console.WriteLine($"task ThreadId={Thread.CurrentThread.ManagedThreadId}");
    });
    await task;//主線程到這裏就返回了,執行主線程任務
    //子線程執行,其實是封裝成委託,在task之後成為回調(編譯器功能  狀態機實現) 後面相當於task.ContinueWith()
    //這個回調的線程是不確定的:可能是主線程  可能是子線程  也可能是其他線程,在winform中是主線程
    Console.WriteLine($"after await ThreadId={Thread.CurrentThread.ManagedThreadId}");
}

一般使用async都會讓方法返回一個Task的,像下面這樣複雜一點的

private static async Task<stringAsyncTest2()
{
    Console.WriteLine($"before await ThreadId={Thread.CurrentThread.ManagedThreadId}");
    TaskFactory taskFactory = new TaskFactory();
    string x = await taskFactory.StartNew(() =>
      {
          Thread.Sleep(3000);
          Console.WriteLine($"task ThreadId={Thread.CurrentThread.ManagedThreadId}");
          return "task over";
      });

    Console.WriteLine($"after await ThreadId={Thread.CurrentThread.ManagedThreadId}");
    return x;
}

通過var reslult = AsyncTest2().Result;調用即可。但注意如果調用Wait或Result的代碼位於UI線程,Task的實際執行在其他線程,其需要返回UI線程則會造成死鎖,所以應該Async all the way

2、優雅

從上面簡單示例中可以看出異步編程的執行邏輯:主線程A邏輯->異步任務線程B邏輯->主線程C邏輯
異步方法的返回類型只能是void、Task、Task。示例中異步方法的返回值類型是Task,通常void也不推薦使用,沒有返回值直接用Task就是

上一篇也大概了解到如果我們要在任務中更新UI,需要調用Invoke通知UI線程來更新,代碼看起來像下面這樣,在一個任務後去更新UI

private void button1_Click(object sender, EventArgs e)
{
    var ResultTask = Task.Run(() => {
        Thread.Sleep(5000);
        return "任務完成";
    });
    ResultTask.ContinueWith((r)=> 
    {
        textBox1.Invoke(() => {
            textBox1.Text = r.Result;
        });
    });
}

如果使用async/await會看起來像這樣,是不是優雅了許多。以看似同步編程的方式實現異步

private async void button1_Click(object sender, EventArgs e)
{
    var t = Task.Run(() => {
        Thread.Sleep(5000);
        return "任務完成";
    });
    textBox1.Text = await t;
}

3、最後

在.NET 4.5中引入的Async和Await兩個新的關鍵字后,用戶能以一種簡潔直觀的方式實現異步編程。甚至都不需要改變代碼的邏輯結構,就能將原來的同步函數改造為異步函數。
在內部實現上,Async和Await這兩個關鍵字由編譯器轉換為狀態機,通過System.Threading.Tasks中的并行類實現代碼的異步執行。

字數有點多了,我的能力也就高考作文800字能寫的出奇好。看了很多異步編程,腦袋有點炸,等消化后再輸出一次,技藝不足,只能用輸出倒逼輸入了,下一篇會是線程安全集合、鎖問題、同步問題,基於事件的異步模式等

Search the fucking web
Read the fucking maunal

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理【其他文章推薦】

台北網頁設計公司這麼多,該如何挑選?? 網頁設計報價省錢懶人包"嚨底家"

網頁設計公司推薦更多不同的設計風格,搶佔消費者視覺第一線

※想知道購買電動車哪裡補助最多?台中電動車補助資訊懶人包彙整

Kafka冪等性原理及實現剖析

1.概述

最近和一些同學交流的時候反饋說,在面試Kafka時,被問到Kafka組件組成部分、API使用、Consumer和Producer原理及作用等問題都能詳細作答。但是,問到一個平時不注意的問題,就是Kafka的冪等性,被卡主了。那麼,今天筆者就為大家來剖析一下Kafka的冪等性原理及實現。

2.內容

2.1 Kafka為啥需要冪等性?

Producer在生產發送消息時,難免會重複發送消息。Producer進行retry時會產生重試機制,發生消息重複發送。而引入冪等性后,重複發送只會生成一條有效的消息。Kafka作為分佈式消息系統,它的使用場景常見與分佈式系統中,比如消息推送系統、業務平台系統(如物流平台、銀行結算平台等)。以銀行結算平台來說,業務方作為上游把數據上報到銀行結算平台,如果一份數據被計算、處理多次,那麼產生的影響會很嚴重。

2.2 影響Kafka冪等性的因素有哪些?

在使用Kafka時,需要確保Exactly-Once語義。分佈式系統中,一些不可控因素有很多,比如網絡、OOM、FullGC等。在Kafka Broker確認Ack時,出現網絡異常、FullGC、OOM等問題時導致Ack超時,Producer會進行重複發送。可能出現的情況如下:

 

 

2.3 Kafka的冪等性是如何實現的?

Kafka為了實現冪等性,它在底層設計架構中引入了ProducerID和SequenceNumber。那這兩個概念的用途是什麼呢?

  • ProducerID:在每個新的Producer初始化時,會被分配一個唯一的ProducerID,這個ProducerID對客戶端使用者是不可見的。
  • SequenceNumber:對於每個ProducerID,Producer發送數據的每個Topic和Partition都對應一個從0開始單調遞增的SequenceNumber值。

2.3.1 冪等性引入之前的問題?

Kafka在引入冪等性之前,Producer向Broker發送消息,然後Broker將消息追加到消息流中后給Producer返回Ack信號值。實現流程如下:

 

上圖的實現流程是一種理想狀態下的消息發送情況,但是實際情況中,會出現各種不確定的因素,比如在Producer在發送給Broker的時候出現網絡異常。比如以下這種異常情況的出現:

 

上圖這種情況,當Producer第一次發送消息給Broker時,Broker將消息(x2,y2)追加到了消息流中,但是在返回Ack信號給Producer時失敗了(比如網絡異常) 。此時,Producer端觸發重試機制,將消息(x2,y2)重新發送給Broker,Broker接收到消息后,再次將該消息追加到消息流中,然後成功返回Ack信號給Producer。這樣下來,消息流中就被重複追加了兩條相同的(x2,y2)的消息。

2.3.2 冪等性引入之後解決了什麼問題?

面對這樣的問題,Kafka引入了冪等性。那麼冪等性是如何解決這類重複發送消息的問題的呢?下面我們可以先來看看流程圖:

 

 同樣,這是一種理想狀態下的發送流程。實際情況下,會有很多不確定的因素,比如Broker在發送Ack信號給Producer時出現網絡異常,導致發送失敗。異常情況如下圖所示:

 

 當Producer發送消息(x2,y2)給Broker時,Broker接收到消息並將其追加到消息流中。此時,Broker返回Ack信號給Producer時,發生異常導致Producer接收Ack信號失敗。對於Producer來說,會觸發重試機制,將消息(x2,y2)再次發送,但是,由於引入了冪等性,在每條消息中附帶了PID(ProducerID)和SequenceNumber。相同的PID和SequenceNumber發送給Broker,而之前Broker緩存過之前發送的相同的消息,那麼在消息流中的消息就只有一條(x2,y2),不會出現重複發送的情況。

2.3.3 ProducerID是如何生成的?

客戶端在生成Producer時,會實例化如下代碼:

// 實例化一個Producer對象
Producer<String, String> producer = new KafkaProducer<>(props);

在org.apache.kafka.clients.producer.internals.Sender類中,在run()中有一個maybeWaitForPid()方法,用來生成一個ProducerID,實現代碼如下:

 private void maybeWaitForPid() {
        if (transactionState == null)
            return;

        while (!transactionState.hasPid()) {
            try {
                Node node = awaitLeastLoadedNodeReady(requestTimeout);
                if (node != null) {
                    ClientResponse response = sendAndAwaitInitPidRequest(node);
                    if (response.hasResponse() && (response.responseBody() instanceof InitPidResponse)) {
                        InitPidResponse initPidResponse = (InitPidResponse) response.responseBody();
                        transactionState.setPidAndEpoch(initPidResponse.producerId(), initPidResponse.epoch());
                    } else {
                        log.error("Received an unexpected response type for an InitPidRequest from {}. " +
                                "We will back off and try again.", node);
                    }
                } else {
                    log.debug("Could not find an available broker to send InitPidRequest to. " +
                            "We will back off and try again.");
                }
            } catch (Exception e) {
                log.warn("Received an exception while trying to get a pid. Will back off and retry.", e);
            }
            log.trace("Retry InitPidRequest in {}ms.", retryBackoffMs);
            time.sleep(retryBackoffMs);
            metadata.requestUpdate();
        }
    }

3.事務

與冪等性有關的另外一個特性就是事務。Kafka中的事務與數據庫的事務類似,Kafka中的事務屬性是指一系列的Producer生產消息和消費消息提交Offsets的操作在一個事務中,即原子性操作。對應的結果是同時成功或者同時失敗。

這裏需要與數據庫中事務進行區別,操作數據庫中的事務指一系列的增刪查改,對Kafka來說,操作事務是指一系列的生產和消費等原子性操作。

3.1 Kafka引入事務的用途?

在事務屬性引入之前,先引入Producer的冪等性,它的作用為:

  • Producer多次發送消息可以封裝成一個原子性操作,即同時成功,或者同時失敗;
  • 消費者&生產者模式下,因為Consumer在Commit Offsets出現問題時,導致重複消費消息時,Producer重複生產消息。需要將這個模式下Consumer的Commit Offsets操作和Producer一系列生產消息的操作封裝成一個原子性操作。

產生的場景有:

比如,在Consumer中Commit Offsets時,當Consumer在消費完成時Commit的Offsets為100(假設最近一次Commit的Offsets為50),那麼執行觸發Balance時,其他Consumer就會重複消費消息(消費的Offsets介於50~100之間的消息)。

3.2 事務提供了哪些可使用的API?

Producer提供了五種事務方法,它們分別是:initTransactions()、beginTransaction()、sendOffsetsToTransaction()、commitTransaction()、abortTransaction(),代碼定義在org.apache.kafka.clients.producer.Producer<K,V>接口中,具體定義接口如下:

// 初始化事務,需要注意確保transation.id屬性被分配
void initTransactions();

// 開啟事務
void beginTransaction() throws ProducerFencedException;

// 為Consumer提供的在事務內Commit Offsets的操作
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
                              String consumerGroupId) throws ProducerFencedException;

// 提交事務
void commitTransaction() throws ProducerFencedException;

// 放棄事務,類似於回滾事務的操作
void abortTransaction() throws ProducerFencedException;

3.3 事務的實際應用場景有哪些?

在Kafka事務中,一個原子性操作,根據操作類型可以分為3種情況。情況如下:

  • 只有Producer生產消息,這種場景需要事務的介入;
  • 消費消息和生產消息並存,比如Consumer&Producer模式,這種場景是一般Kafka項目中比較常見的模式,需要事務介入;
  • 只有Consumer消費消息,這種操作在實際項目中意義不大,和手動Commit Offsets的結果一樣,而且這種場景不是事務的引入目的。

4.總結

Kafka的冪等性和事務是比較重要的特性,特別是在數據丟失和數據重複的問題上非常重要。Kafka引入冪等性,設計的原理也比較好理解。而事務與數據庫的事務特性類似,有數據庫使用的經驗對理解Kafka的事務也比較容易接受。

5.結束語

這篇博客就和大家分享到這裏,如果大家在研究學習的過程當中有什麼問題,可以加群進行討論或發送郵件給我,我會盡我所能為您解答,與君共勉!

另外,博主出書了《》和《》,喜歡的朋友或同學, 可以在公告欄那裡點擊購買鏈接購買博主的書進行學習,在此感謝大家的支持。關注下面公眾號,根據提示,可免費獲取書籍的教學視頻。 

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理【其他文章推薦】

台北網頁設計公司這麼多,該如何挑選?? 網頁設計報價省錢懶人包"嚨底家"

網頁設計公司推薦更多不同的設計風格,搶佔消費者視覺第一線

※想知道購買電動車哪裡補助最多?台中電動車補助資訊懶人包彙整

【原創】(十一)Linux內存管理slub分配器

背景

  • Read the fucking source code! –By 魯迅
  • A picture is worth a thousand words. –By 高爾基

說明:

  1. Kernel版本:4.14
  2. ARM64處理器,Contex-A53,雙核
  3. 使用工具:Source Insight 3.5, Visio

1. 概述

之前的文章分析的都是基於頁面的內存分配,而小塊內存的分配和管理是通過塊分配器來實現的。目前內核中,有三種方式來實現小塊內存分配:slab, slub, slob,最先有slab分配器,slub/slob分配器是改進版,slob分配器適用於小內存嵌入式設備,而slub分配器目前已逐漸成為主流塊分配器。接下來的文章,就是以slub分配器為目標,進一步深入。

先來一個初印象:

2. 數據結構

有四個關鍵的數據結構:

  • struct kmem_cache:用於管理SLAB緩存,包括該緩存中對象的信息描述,per-CPU/Node管理slab頁面等;
    關鍵字段如下:
/*
 * Slab cache management.
 */
struct kmem_cache {
    struct kmem_cache_cpu __percpu *cpu_slab;       //每個CPU slab頁面
    /* Used for retriving partial slabs etc */
    unsigned long flags;
    unsigned long min_partial;
    int size;       /* The size of an object including meta data */
    int object_size;    /* The size of an object without meta data */
    int offset;     /* Free pointer offset. */
#ifdef CONFIG_SLUB_CPU_PARTIAL
    /* Number of per cpu partial objects to keep around */
    unsigned int cpu_partial;
#endif
    struct kmem_cache_order_objects oo;     //該結構體會描述申請頁面的order值,以及object的個數

    /* Allocation and freeing of slabs */
    struct kmem_cache_order_objects max;
    struct kmem_cache_order_objects min;
    gfp_t allocflags;   /* gfp flags to use on each alloc */
    int refcount;       /* Refcount for slab cache destroy */
    void (*ctor)(void *);           // 對象構造函數
    int inuse;      /* Offset to metadata */
    int align;      /* Alignment */
    int reserved;       /* Reserved bytes at the end of slabs */
    int red_left_pad;   /* Left redzone padding size */
    const char *name;   /* Name (only for display!) */
    struct list_head list;  /* List of slab caches */       //kmem_cache最終會鏈接在一個全局鏈表中
    struct kmem_cache_node *node[MAX_NUMNODES];     //Node管理slab頁面
};
  • struct kmem_cache_cpu:用於管理每個CPU的slab頁面,可以使用無鎖訪問,提高緩存對象分配速度;
struct kmem_cache_cpu {
    void **freelist;    /* Pointer to next available object */                  //指向空閑對象的指針
    unsigned long tid;  /* Globally unique transaction id */                
    struct page *page;  /* The slab from which we are allocating */     //slab緩存頁面
#ifdef CONFIG_SLUB_CPU_PARTIAL
    struct page *partial;   /* Partially allocated frozen slabs */
#endif
#ifdef CONFIG_SLUB_STATS
    unsigned stat[NR_SLUB_STAT_ITEMS];
#endif
};
  • struct kmem_cache_node:用於管理每個Node的slab頁面,由於每個Node的訪問速度不一致,slab頁面由Node來管理;
/*
 * The slab lists for all objects.
 */
struct kmem_cache_node {
    spinlock_t list_lock;

#ifdef CONFIG_SLUB
    unsigned long nr_partial;    //slab頁表數量
    struct list_head partial;       //slab頁面鏈表
#ifdef CONFIG_SLUB_DEBUG
    atomic_long_t nr_slabs;
    atomic_long_t total_objects;
    struct list_head full;
#endif
#endif
};
  • struct page:用於描述slab頁面struct page結構體中很多字段都是通過union聯合體進行復用的。
    struct page結構中,用於slub的成員如下:
struct page {
    union {
       ...
        void *s_mem;            /* slab first object */
       ...
    };
    
    /* Second double word */
    union {
       ...
        void *freelist;     /* sl[aou]b first free object */
       ...
    };
    
    union {
       ...
        struct {
            union {
              ...
                struct {            /* SLUB */
                    unsigned inuse:16;
                    unsigned objects:15;
                    unsigned frozen:1;
                };
                ...
            };
       ...
        };       
    };   
    
    /*
     * Third double word block
     */
    union {
       ...
        struct {        /* slub per cpu partial pages */
            struct page *next;  /* Next partial slab */
#ifdef CONFIG_64BIT
            int pages;  /* Nr of partial slabs left */
            int pobjects;   /* Approximate # of objects */
#else
            short int pages;
            short int pobjects;
#endif
        };

        struct rcu_head rcu_head;   /* Used by SLAB
                         * when destroying via RCU
                         */
    };
    ...
        struct kmem_cache *slab_cache;  /* SL[AU]B: Pointer to slab */    
    ...
}

圖來了:

3. 流程分析

針對Slub的使用,可以從三個維度來分析:

  1. slub緩存創建
  2. slub對象分配
  3. slub對象釋放

下邊將進一步分析。

3.1 kmem_cache_create

在內核中通過kmem_cache_create接口來創建一個slab緩存

先看一下這個接口的函數調用關係圖:

  1. kmem_cache_create完成的功能比較簡單,就是創建一個用於管理slab緩存kmem_cache結構,並對該結構體進行初始化,最終添加到全局鏈表中。kmem_cache結構體初始化,包括了上文中分析到的kmem_cache_cpukmem_cache_node兩個字段結構。

  2. 在創建的過程中,當發現已有的slab緩存中,有存在對象大小相近,且具有兼容標誌的slab緩存,那就只需要進行merge操作並返回,而無需進一步創建新的slab緩存

  3. calculate_sizes函數會根據指定的force_order或根據對象大小去計算kmem_cache結構體中的size/min/oo等值,其中kmem_cache_order_objects結構體,是由頁面分配order值和對象數量兩者通過位域拼接起來的。

  4. 在創建slab緩存的時候,有一個先雞后蛋的問題:kmem_cache結構體來管理一個slab緩存,而創建kmem_cache結構體又是從slab緩存中分配出來的對象,那麼這個問題是怎麼解決的呢?可以看一下kmem_cache_init函數,內核中定義了兩個靜態的全局變量kmem_cachekmem_cache_node,在kmem_cache_init函數中完成了這兩個結構體的初始化之後,相當於就是創建了兩個slab緩存,一個用於分配kmem_cache結構體對象的緩存池,一個用於分配kmem_cache_node結構體對象的緩存池。由於kmem_cache_cpu結構體是通過__alloc_percpu來分配的,因此不需要創建一個相關的slab緩存

3.2 kmem_cache_alloc

kmem_cache_alloc接口用於從slab緩存池中分配對象。

看一下大體的調用流程圖:

從上圖中可以看出,分配slab對象與Buddy System中分配頁面類似,存在快速路徑和慢速路徑兩種,所謂的快速路徑就是per-CPU緩存,可以無鎖訪問,因而效率更高。

整體的分配流程大體是這樣的:優先從per-CPU緩存中進行分配,如果per-CPU緩存中已經全部分配完畢,則從Node管理的slab頁面中遷移slab頁per-CPU緩存中,再重新分配。當Node管理的slab頁面也不足的情況下,則從Buddy System中分配新的頁面,添加到per-CPU緩存中。

還是用圖來說明更清晰,分為以下幾步來分配:

  1. fastpath
    快速路徑下,以原子的方式檢索per-CPU緩存的freelist列表中的第一個對象,如果freelist為空並且沒有要檢索的對象,則跳入慢速路徑操作,最後再返回到快速路徑中重試操作。

  2. slowpath-1
    將per-CPU緩存中page指向的slab頁中的空閑對象遷移到freelist中,如果有空閑對象,則freeze該頁面,沒有空閑對象則跳轉到slowpath-2

  3. slowpath-2
    將per-CPU緩存中partial鏈表中的第一個slab頁遷移到page指針中,如果partial鏈表為空,則跳轉到slowpath-3

  4. slowpath-3
    將Node管理的partial鏈表中的slab頁遷移到per-CPU緩存中的page中,並重複第二個slab頁將其添加到per-CPU緩存中的partial鏈表中。如果遷移的slab中空閑對象超過了kmem_cache.cpu_partial的一半,則僅遷移slab頁,並且不再重複。
    如果每個Node的partial鏈表都為空,跳轉到slowpath-4

  5. slowpath-4
    Buddy System中獲取頁面,並將其添加到per-CPU的page中。

3.2 kmem_cache_free

kmem_cache_free的操作,可以看成是kmem_cache_alloc的逆過程,因此也分為快速路徑和慢速路徑兩種方式,同時,慢速路徑中又分為了好幾種情況,可以參考kmem_cache_alloc的過程。

調用流程圖如下:

效果如下:

  1. 快速路徑釋放
    快速路徑下,直接將對象返回到freelist中即可。

  2. put_cpu_partial
    put_cpu_partial函數主要是將一個剛freeze的slab頁,放入到partial鏈表中。
    put_cpu_partial函數中調用unfreeze_partials函數,這時候會將per-CPU管理的partial鏈表中的slab頁面添加到Node管理的partial鏈表的尾部。如果超出了Node的partial鏈表,溢出的slab頁面中沒有分配對象的slab頁面將會返回到夥伴系統。

  3. add_partial
    添加slab頁到Node的partial鏈表中。

  4. remove_partial
    從Node的partial鏈表移除slab頁。

具體釋放的流程走哪個分支,跟對象的使用情況,partial鏈表的個數nr_partial/min_partial等相關,細節就不再深入分析了。

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理【其他文章推薦】

台北網頁設計公司這麼多,該如何挑選?? 網頁設計報價省錢懶人包"嚨底家"

網頁設計公司推薦更多不同的設計風格,搶佔消費者視覺第一線

※想知道購買電動車哪裡補助最多?台中電動車補助資訊懶人包彙整