你值得關注的幾種常見的js設計模式

前言

潜水了一段時間,今天空閑時間復盤下之前的知識點,聊聊 js 幾種常見的設計模式。

掌握 JavaScript 中常見的一些設計模式,對我們書寫規範性代碼,可維護性代碼有很大的幫助。

ps:最近在一些好友的鼓勵下,pubdreamcc 準備着手經營一個公眾號了,具體信息會在接下來的两天時間內發布,新手上路,歡迎大夥提供一些寶貴的建議,cc 在這裏先謝了~

內容主體

單例模式

所謂單例模式即為:保證一個類僅有一個實例,並提供一個訪問它的全局訪問點。

這裏其實利用的是 js閉包 來實現這樣的功能。

假如現在我們有這樣的需求,設置一個管理員,無論創建多次都只是設置一次。


function SetManager(name) {
  this.manager = name;
}

SetManager.prototype.getName = function() {
  console.log(this.manager);
};

var SingletonSetManager = (function() {
  var manager = null;

  return function(name) {
    if (!manager) {
        manager = new SetManager(name);
    }

    return manager;
  }
})();

SingletonSetManager('a').getName(); // a
SingletonSetManager('b').getName(); // a
SingletonSetManager('c').getName(); // a

這種方法有一個缺點就是:如果我們需要再次創建一個 HR,則需要將代碼再複製一遍,所以我們可以提取通用的單例。

function getSingleton(fn) {
  var instance = null;

  return function() {
    if (!instance) {
        instance = fn.apply(this, arguments);
    }

    return instance;
  }
}


// 設置管理員
var managerSingleton = getSingleton(function(name){
  var manager = new SetManager(name);
  return manager;
})

managerSingleton('a').getName(); // a
managerSingleton('b').getName(); // a
managerSingleton('c').getName(); // a

// 設置 HR

function SetHr(name) {
  this.hr = name;
}

SetHr.prototype.getName = function() {
  console.log(this.hr);
};

var hrSingleton = getSingleton(function(name) {
  var hr = new SetHr(name);
  return hr;
});

hrSingleton('aa').getName(); // aa
hrSingleton('bb').getName(); // aa
hrSingleton('cc').getName(); // aa

這樣我們的代碼可通用性就會變得更好,省去了一些重複性的代碼。

代理模式

所謂代理模式就是:我們不方便直接訪問某個對象時,可以為對象創建一個佔位符(代理),以便控制對它的訪問,我們實際上訪問的是代理對象。

這裏我們以一個過濾敏感字符來說明這種模式

// 主體,發送消息
function sendMsg(msg) {
  console.log(msg);
}

// 代理,對消息進行過濾
function proxySendMsg(msg) {
  // 無消息則直接返回
  if (typeof msg === 'undefined') {
    console.log(null);
    return;
  }
  
  // 有消息則進行過濾
  msg = ('' + msg).replace(/泥\s*煤/g, '');

  sendMsg(msg);
}


sendMsg('泥煤呀泥 煤呀'); // 泥煤呀泥 煤呀
proxySendMsg('泥煤呀泥 煤'); // 呀
proxySendMsg(); // null

這樣操作的意圖很明顯,當沒有消息的時候,控制對主體對象的訪問,代理直接返回一個 null ,有消息,則會過濾掉敏感字符,實現虛擬代理。

策略模式

策略模式就是內部封裝一些算法,它們之間可以互相替換,但是它們不隨客戶端變化而變化。

策略模式我們外部看不到算法的具體實現,我們也只關心算法實現的結果,不關注過程。

這裏以一個商品促銷的例子來說明下:在聖誕節,某些商品需要八折出售,有些商品需要九折出售,到了元旦節,普通客戶滿100減30,vip客戶滿100減50。可以看到商品出售的價格需要根據不同的條件來規定,分別採取不同的算法實現,所以我們採用策略模式。

// 價格策略對象
class PriceStrategy {
  constructor() {
      // 內部算法對象
    this.stragtegy = {
        // 100返30
      return30(price) {
          return +price + parseInt( price / 100) * 30;
      },
      // 100 返 50
      return50(price) {
          return +price + parseInt(price/ 100) * 50;
      },
      // 9 折
      percent90(price) {
          return price * 100 * 90 / 10000
      },
      percent80(price) {
          return price * 100 * 80 / 10000
      },
      percent50(price) {
          return price * 100 * 50 / 10000
      }
    }
  }
  // 策略算法調用接口
  getPrice(algorithm, price) {
    return this.stragtegy[algorithm] && this.stragtegy[algorithm](price);
  }
}
let priceStrategy = new PriceStrategy();
let price = priceStrategy.getPrice('return50', 314.67);
console.log(price);

這樣,我們可以採取不同的策略算法得到商品的不同價格。

觀察者模式

觀察者模式又稱為 “發布-訂閱模式”,通過定義一種依賴關係,當一個對象狀態發生改變時,訂閱者會得到通知。

其實,我們傳統的 DOM 事件綁定就是一種發布-訂閱模式。

// 訂閱
document.body.addEventListener('click', function() {
  console.log('click1');
}, false);

document.body.addEventListener('click', function() {
  console.log('click2');
}, false);

// 發布
document.body.click(); // click1  click2

裝飾者模式

裝飾者模式就是在不改變原對象基本功能的基礎上,通過增加功能使得原本對象滿足用戶的更為複雜的需求。

比如有這麼一個需求:

用戶點擊輸入框時,如果輸入框輸入的內容有限制,那麼在其後面显示用戶輸入內容的限制格式的提示文案

—————->>>>>>> 現在要改為:

多加一條需求,默認輸入框上邊显示一行文案,當用戶點擊輸入框的時候,文案消失。

這裡是以前的代碼:

// 輸入框元素
let telInput = document.getElementById('tel_input');
// 輸入框提示文案
let telWarnText = document.getElementById('tel_warn_text');
// 點擊輸入框显示輸入框輸入格式提示文案
input.onclick = function () {
  telWarnText.style.display = 'inline-block';
};

修改之後的代碼:

// 輸入框元素
let telInput = document.getElementById('tel_input');
// 輸入框輸入格式提示文案
let telWarnText = document.getElementById('tel_warn_text');
// 輸入框提示輸入文案
let telDemoText = document.getElementById('tel_demo_text');
// 點擊輸入框显示輸入框輸入格式提示文案
input.onclick = function () {
  telWarnText.style.display = 'inline-block';
  telDemoText.style.display = 'none';
};

但是緊接着悲劇就來了,修改了電話輸入框,還有姓名、地址輸入框等等;

裝飾已有的功能對象

原有的功能已經不滿足用戶的需求了,此時需要做的是對原有的功能添加,設置新的屬性和方法來滿足新的需求,但是有不影響原來已經有的部分。

let decorator = function (input, fn) {
  let getInput = document.getElementById(input);
  if(typeof getInput.onclick === 'function') {
    let oldClick = getInput.onclick;
    getInput.onclick = function() {
        // 原來的事件回調函數
        oldClick();
        // 新增的事件回調函數
        fn();
    }
  } else {
    getInput.onclick = fn;
  }
  // 其他事件
};

// 電話輸入框功能裝飾
decorator('tel_input', function() {
  document.getElementById('tel_demo_text').sytle.display = 'none'
});
// 姓名輸入框裝飾
decorator('name_input', function() {
  document.getElementById('name_demo_text').sytle.display = 'none'
});
// 地址輸入框裝飾
decorator('address_input', function() {
  document.getElementById('address_demo_text').sytle.display = 'none'
});

后語

本編文章出自於我的 github 倉庫 ,歡迎喜歡的夥伴 star ,謝謝 。

倉庫地址 前端學習

【精選推薦文章】

自行創業 缺乏曝光? 下一步"網站設計"幫您第一時間規劃公司的門面形象

網頁設計一頭霧水??該從何著手呢? 找到專業技術的網頁設計公司,幫您輕鬆架站!

評比前十大台北網頁設計台北網站設計公司知名案例作品心得分享

台北網頁設計公司這麼多,該如何挑選?? 網頁設計報價省錢懶人包"嚨底家"

大型網站如何防止崩潰,解決高併發帶來的問題

大型網站,比如門戶網站,在面對大量用戶訪問、高併發請求方面帶來的問題
1大併發:在同一個時間點,有大量的客戶來訪問我們的網站,如果訪問量過大,就可能造成網站癱瘓。
2大流量:當網站大后,有大量的圖片,視頻, 這樣就會對流量要求高,需要更多更大的帶寬。
3大存儲:你的數據量會成海量的數據,如果我們的數據放入一張表,是無法應對的。可能對數據保存和查詢出現問題。

基本的解決方案集中在這樣幾個環節:使用高性能的服務器、高性能的數據庫、高效率的編程語言、還有高性能的Web容器,(對架構分層+負載均衡+集群)這幾個解決思路在一定程度上意味着更大的投入。

解決方案:

   一、提高硬件能力、增加系統服務器。(當服務器增加到某個程度的時候系統所能提供的併發訪問量幾乎不變,所以不能根本解決問題)

   二、使用緩存(本地緩存:本地可以使用JDK自帶的 Map、Guava Cache.分佈式緩存:Redis、Memcache.本地緩存不適用於提高系統併發量,一般是用處用在程序中。比如Spring是如何實現單例的呢?大家如果看過源碼的話,應該知道,Spiring把已經初始過的變量放在一個Map中,下次再要使用這個變量的時候,先判斷Map中有沒有,這也就是系統中常見的單例模式的實現。)

 分佈式緩存利器Redis集群,Redis集群的搭建至少需要三主三從。

1. 所有的redis節點彼此互聯(PING-PONG機制),內部使用二進制協議優化傳輸速度和帶寬。

2. 節點的fail是通過集群中超過半數的節點檢測失效時才生效(所以一個集群中至少要有三個節點)。

3. 客戶端與redis節點直連,不需要中間proxy層.客戶端不需要連接集群所有節點,連接集群中任何一個可用節點即可。

4. 集群中每一個節點都存放不同的內容,每一個節點都應有備份機。

5. redis-cluster把所有的物理節點映射到[0-16383]slot上,cluster 負責維護node<->slot<->value

 

 

 

 

Redis 集群中內置了16384 個哈希槽,當需要在Redis 集群中放置一個key-value 時,redis先對 key 使用 crc16 算法算出一個結果,然後把結果對16384 求餘數,這樣每個key 都會對應一個編號在0-16383 之間的哈希槽,redis會根據節點數量大致均等的將哈希槽映射到不同的節點

   三 、消息隊列 (解耦+削峰+異步)通過異步處理提高系統性能,降低系統耦合性

在不使用消息隊列服務器的時候,用戶的請求數據直接寫入數據庫,在高併發的情況下數據庫壓力劇增,使得響應速度變慢。但是在使用消息隊列之後,用戶的請求數據發送給消息隊列之後立即 返回,再由消息隊列的消費者進程從消息隊列中獲取數據,異步寫入數據庫。由於消息隊列服務器處理速度快於數據庫(消息隊列也比數據庫有更好的伸縮性),因此響應速度得到大幅改善。

 

通過使用消息中間件對Dubbo服務間的調用進行解耦, 消息中間件可利用高效可靠的消息傳遞機制進行平台無關的數據交流,並基於數據通信來進行分佈式系統的集成。通過提供消息傳遞和消息排隊模型,可以在分佈式環境下擴展進程間的通信。通過消息中間件,應用程序或組件之間可以進行可靠的異步通訊,從而降低系統之間的耦合度,提高系統的可擴展性和可用性。

   四 、採用分佈式開發 (不同的服務部署在不同的機器節點上,並且一個服務也可以部署在多台機器上,然後利用 Nginx 負載均衡訪問。這樣就解決了單點部署(All In)的缺點,大大提高的系統併發量)

   五 、數據庫分庫(讀寫分離)、分表(水平分表、垂直分表)

PXC高可用集群與Replication集群結合方案

這種的集群在遇到單表數據量超過2000萬的時候,mysql性能會受損,所以一個集群還不夠,我們需要把數據分到另一個集群,這個稱為“切片”,就是把大量的數據拆分到不同的集群中,每個集群的數據都是不一樣的,通過MyCat這個阿里巴巴的開源中間件,可以把sql分到不同的集群裏面去。

PXC集群方案與Replication區別

PXC集群方案所有節點都是可讀可寫的,Replication從節點不能寫入,因為主從同步是單向的,無法從slave節點向master點同步。

PXC同步機制是同步進行的,這也是它能保證數據強一致性的根本原因,Replication同步機制是異步進行的,它如果從節點停止同步,依然可以向主節點插入數據,正確返回,造成數據主從數據的不一致性。

PXC是用犧牲性能保證數據的一致性,Replication在性能上是高於PXC的。所以兩者用途也不一致。PXC是用於重要信息的存儲,例如:訂單、用戶信息等。Replication用於一般信息的存儲,能夠容忍數據丟失,例如:購物車,用戶行為日誌等

   六、 採用集群 (多台機器提供相同的服務)系統架構方案

   七、CDN 加速 (將一些靜態資源比如圖片、視頻等等緩存到離用戶最近的網絡節點)

   八、瀏覽器緩存 頁面靜態化(使用php自己的ob緩存技術實現, 主流的mvc框架(tp,yii,laravel)模板引擎一般都自帶頁面靜態化 )      

   九、使用合適的連接池(數據庫連接池、線程池等等)

   十、適當使用多線程進行開發。

   十一、使用鏡像

鏡像是大型網站常採用的提高性能和數據安全性的方式,鏡像的技術可以解決不同網絡接入商和地域帶來的用戶訪問速度差異,比如ChinaNet和EduNet之間的差異就促使了很多網站在教育網內搭建鏡像站點,數據進行定時更新或者實時更新。有很多專業的現成的解決架構和產品可選。也有廉價的通過軟件實現的思路,比如Linux上的rsync等工具。

   十二、圖片服務器分離

大家知道,對於Web服務器來說,不管是Apache、IIS還是其他容器,圖片是最消耗資源的,於是我們有必要將圖片與頁面進行分離,這是基本上大型網站都會採用的策略,他們都有獨立的、甚至很多台的圖片服務器。這樣的架構可以降低提供頁面訪問請求的服務器系統壓力,並且可以保證系統不會因為圖片問題而崩潰。

在應用服務器和圖片服務器上,可以進行不同的配置優化,比如apache在配置ContentType的時候可以盡量少支持、盡可能少的LoadModule,保證更高的系統消耗和執行效率。

 

【精選推薦文章】

智慧手機時代的來臨,RWD網頁設計已成為網頁設計推薦首選

想知道網站建置、網站改版該如何進行嗎?將由專業工程師為您規劃客製化網頁設計及後台網頁設計

帶您來看台北網站建置台北網頁設計,各種案例分享

廣告預算用在刀口上,網站設計公司幫您達到更多曝光效益

WebApp 安全風險與防護課堂開課了!

本文由葡萄城技術團隊於原創並首發

轉載請註明出處:葡萄城官網,葡萄城為開發者提供專業的開發工具、解決方案和服務,賦能開發者。

 

2018 網絡安全事故頻發,從數據泄露、信息竊取,到 DDOS 攻擊、勒索病毒,不僅威脅的總數在增加,威脅態勢也變得更加多樣化,攻擊者在不斷開發新的攻擊途徑的同時,也儘力在攻擊過程中掩蓋其蹤跡,使網絡安全防護變得越發棘手。未來是萬物互聯的時代,唯有把握住網絡信息安全,才能避免被降維打擊。

為了回饋社區,我們特邀葡萄城高級架構師、安全專家Carl作為分享嘉賓,於葡萄城技術公開課上,以 WebApp 安全防護為出發點,帶你了解更多意想不到的安全防護措施與黑客攻擊手段,助你提高網絡安全意識,最終學會如何規避風險隱患,避免遭受網絡安全攻擊。

本次課程共分三節,計劃講的內容如下:

第一節:開闊眼界 – 提升安全意識

提升網絡安全意識對項目團隊中的每一個角色、每一個流程都至關重要。同時,也只有具備了網絡安全意識,才願意為數據安全投入更多的時間和精力。下面,我將為您展示部分2018年發生的網絡安全事故,這些事故造成的損失,也許遠遠超出你的想象。

2018 網絡安全事故回顧

Facebook數據泄露事件:2018年9月,Facebook因安全系統漏洞而遭受黑客攻擊,導致約5000萬用戶信息泄露。

上市公司數據堂,涉嫌侵犯數百億條公民個人信息:大數據行業知名企業數據堂在短短8個月的時間內,日均泄露公民個人信息1.3億餘條,累計傳輸數據壓縮后約為4000GB。

圓通10億快遞信息泄露: 10億條用戶數據遭公開售賣,這些數據包括寄(收)件人姓名、電話、地址等隱私信息。

萬豪酒店5億用戶開房信息泄露:萬豪酒店客房預訂數據庫遭黑客入侵,約5億名客戶的信息可能被泄露。

更多數據泄露事件

  1. 國泰航空數據泄露,940萬乘客受影響
  2. MongoDB 數據庫被入侵, 1100 萬份郵件記錄遭泄露
  3. SHEIN 數據泄露影響 642 萬用戶
  4. GovPayNet憑證系統存在漏洞,1400萬交易記錄被曝光
  5. 小米有品平台泄露個人隱私 約2000萬用戶數據遭泄露
  6. 美國亞特蘭大市政府受到勒索軟件攻擊
  7. 美國巴爾的摩市遭遇勒索軟件攻擊,導致911緊急調度服務的計算機輔助調度(CAD)功能掉線
  8. 台積電勒索病毒事件,約造成17.6 億元的營收損失,股票市值下跌78億
  9. 很多個人電腦和中小網站都曾遭受攻擊
  10. 平昌冬奧會開幕式服務器遭到身份不明的黑客入侵
  11. GitHub遭1.35T級流量攻擊
  12. CPU數據緩存機制漏洞
  13. iOS 平台WebView組件漏洞(UIWebView/ WKWebView)跨域訪問漏洞(CNNVD-201801-515)
  14. Oracle WebLogic Server WLS核心組件遠程代碼執行漏洞
  15. 微信支付SDKXXE漏洞
  16. Apache Struts2 S2-057安全漏洞

勒索病毒事件

DDoS 攻擊

年度重大漏洞盤點

第二節:知己知彼 – 黑客如何攻擊系統

一名黑客攻擊網站的典型步驟,主要分為以下5步:

  1. 信息收集和漏洞掃描
  2. 漏洞利用
  3. 上傳木馬
  4. 獲取服務器的控制權
  5. 清理痕迹

總結:

黑客不是手動測試系統漏洞的,而是有很多強大的工具可以自動化完成

黑客不是利用系統中的一個漏洞,而是要利用一系列,不同層次的漏洞

黑客經常批量攻擊一系列網站,選取其中漏洞較多,較好利用的重點突破

第三節:十大安全風險(OWASP Top 10)

不安全的軟件正在破壞着我們的金融、醫療、國防、能源和其他重要的基礎設施。隨着我們的軟件變得愈加龐大、複雜且相互關聯,實現應用程序安全的難度也呈指數級增長。而現代軟件開發過程的飛速發展,使得快速、準確地識別軟件安全風險變得愈發的重要,OWASP 組織也因此誕生。

OWASP,即開放式Web應用程序安全項目(Open Web Application Security Project),作為一個開源的、非盈利的全球性安全組織,它提供了有關計算機和互聯網應用程序的公正、實際、有成本效益的信息,其目的是協助個人、企業和機構來發現並使用可信賴的軟件。

OWASP Top 10是由OWASP組織公布,最具權威性的“10項最嚴重的Web應用程序安全風險預警”,其就安全問題從威脅性和脆弱性兩方面進行可能性分析,並結合技術和商業影響的分析結果,輸出公認的、最嚴重的十類Web應用安全風險排名。OWASP Top 10旨在針對上述風險,提出解決方案,幫助IT公司和開發團隊規範應用程序開發流程和測試流程,提高Web產品的安全性。

OWASP敦促所有公司在其組織內採用OWASP Top 10文檔,並確保其Web應用程序最大限度地降低這些風險,採用OWASP Top 10可能是將企業內的軟件開發文化轉變為生成安全代碼文化最行之有效的一步。

OWASP Top 10包括:

  1. 注入
  2. 失效的身份認證
  3. 敏感信息泄露
  4. XML外部實體(XXE)
  5. 失效的訪問控制
  6. 安全配置錯誤
  7. 跨站腳本(XSS)
  8. 不安全的反序列化
  9. 使用含有已知漏洞的組件

10. 不足的日誌記錄和監控

講師介紹:

Carl(陳慶),葡萄城高級架構師、安全專家、葡萄城技術公開課講師。擁有15年項目開發經驗,專註於產品架構、編程技術等領域,對網絡安全有着獨到見解,曾擔任微軟TechEd講師,樂於研究各種前沿技術並分享。

請點擊該地址報名觀看直播:http://live.vhall.com/137416596

錯過本場直播?沒關係,所有直播內容我們會存放在葡萄城公開課頁面,便於您隨時觀看、學習。後續我們也會將Carl老師講的內容整理成文章,發布在社區,敬請關注。

“賦能開發者”葡萄城除了為所有開發人員提供免費的開發技巧分享、項目實戰經驗外,還提供了眾多高水準、高品質的開發工具和開發者解決方案,可有效幫助開發人員提高效率,縮短項目周期,使開發人員能更專註於業務邏輯,順利完成高質量的項目交付,歡迎您深入了解。

【精選推薦文章】

如何讓商品強力曝光呢? 網頁設計公司幫您建置最吸引人的網站,提高曝光率!!

想要讓你的商品在網路上成為最夯、最多人討論的話題?

網頁設計公司推薦更多不同的設計風格,搶佔消費者視覺第一線

不管是台北網頁設計公司台中網頁設計公司,全省皆有專員為您服務

想知道最厲害的台北網頁設計公司推薦台中網頁設計公司推薦專業設計師"嚨底家"!!

kafka消費者客戶端

Kafka消費者

1.1 消費者與消費者組

消費者與消費者組之間的關係

​ 每一個消費者都隸屬於某一個消費者組,一個消費者組可以包含一個或多個消費者,每一條消息只會被消費者組中的某一個消費者所消費。不同消費者組之間消息的消費是互不干擾的。

為什麼會有消費者組的概念

​ 消費者組出現主要是出於兩個目的:

​ (1) 使整體的消費能力具備橫向的伸縮性。可以適當增加消費者組中消費者的數量,來提高整體的消費能力。但是每一個分區至多被消費者組的中一個消費者所消費,因此當消費者組中消費者數量超過分區數時,多出的消費者不會分配到任何一個分區。當然這是默認的分區分配策略,可通過partition.assignment.strategy進行配置。

​ (2) 實現消息消費的隔離。不同消費者組之間消息消費互不干擾,從而實現發布訂閱這種消息投遞模式。

注意:

​ 消費者隸屬的消費者組可以通過group.id進行配置。消費者組是一個邏輯上的概念,但消費者並不是一個邏輯上的概念,它可以是一個線程,也可以是一個進程。同一個消費者組內的消費者可以部署在同一台機器上,也可以部署在不同的機器上。

1.2 消費者客戶端開發

​ 一個正常的消費邏輯需要具備以下幾個步驟:

  • 配置消費者客戶端參數及創建相應的消費者實例。

  • 訂閱主題

  • 拉取消息並消費

  • 提交消費位移

  • 關閉消費者實例

  public class KafkaConsumerAnalysis {
  public static final String brokerList="node112:9092,node113:9092,node114:9092";
  public static final String topic = "topic-demo";
  public static final String groupId = "group.demo";
  public static final AtomicBoolean isRunning = new AtomicBoolean(true);

  public static Properties initConfig() {
      Properties prop = new Properties();
      prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
      prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
      prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
      prop.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
      prop.put(ConsumerConfig.CLIENT_DNS_LOOKUP_CONFIG, "consumer.client.di.demo");
      return prop;
  }


  public static void main(String[] args) {
      KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(initConfig());
       
      for (ConsumerRecord<String, String> record : records) {
          System.out.println("topic = " + record.topic() + ", partition =" +                                               record.partition() + ", offset = " + record.offset());
          System.out.println("key = " + record.key() + ", value = " + record.value());
              }
          }
      } catch (Exception e) {
          e.printStackTrace();
      }finally {
          consumer.close();
      }
  }
}

1.2.1 訂閱主題和分區

​ 先來說一下消費者訂閱消息的粒度:一個消費者可以訂閱一個主題、多個主題、或者多個主題的特定分區。主要通過subsribe和assign兩個方法實現訂閱。

(1)訂閱一個主題:

​ public void subscribe(Collection<String> topics),當集合中有一個主題時。

(2)訂閱多個主題:

​ public void subscribe(Collection<String> topics),當集合中有多個主題時。

​ public void subscribe(Pattern pattern),通過正則表達式實現消費者主題的匹配。通過這種方式,如果在消息消費的過程中,又添加了新的能夠匹配到正則的主題,那麼消費者就可以消費到新添加的主題。 consumer.subscribe(Pattern.compile(“topic-.*”));

(3)多個主題的特定分區

​ public void assign(Collection<TopicPartition> partitions),可以實現訂閱某些特定的主題分區。TopicPartition包括兩個屬性:topic(String)和partition(int)。

​ 如果事先不知道有多少分區該如何處理,KafkaConsumer中的partitionFor方法可以獲得指定主題分區的元數據信息:

​ public List<PartitionInfo> partitionsFor(String topic)

​ PartitionInfo的屬性如下:

  
public class PartitionInfo {
  private final String topic;//主題
  private final int partition;//分區
  private final Node leader;//分區leader
  private final Node[] replicas;//分區的AR
  private final Node[] inSyncReplicas;//分區的ISR
  private final Node[] offlineReplicas;//分區的OSR
}

​ 因此也可以通過這個方法實現某個主題的全部訂閱。

​ 需要指出的是,subscribe(Collection)、subscirbe(Pattern)、assign(Collection)方法分別代表了三種不同的訂閱狀態:AUTO_TOPICS、AUTO_PATTREN和USER_ASSIGN,這三種方式是互斥的,消費者只能使用其中一種,否則會報出IllegalStateException。

​ subscirbe方法可以實現消費者自動再平衡的功能。多個消費者的情況下,可以根據分區分配策略自動分配消費者和分區的關係,當消費者增加或減少時,也能實現負載均衡和故障轉移。

​ 如何實現取消訂閱:

​ consumer.unsubscribe()

1.2.2 反序列化

​ KafkaProducer端生產消息進行序列化,同樣消費者就要進行相應的反序列化。相當於根據定義的序列化格式的一個逆序提取數據的過程。

  
import com.gdy.kafka.producer.Company;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;

import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.Map;

public class CompanyDeserializer implements Deserializer<Company> {
  @Override
  public void configure(Map<String, ?> configs, boolean isKey) {

  }

  @Override
  public Company deserialize(String topic, byte[] data) {
      if(data == null) {
          return null;
      }

      if(data.length < 8) {
          throw new SerializationException("size of data received by Deserializer is shorter than expected");
      }

      ByteBuffer buffer = ByteBuffer.wrap(data);
      int nameLength = buffer.getInt();
      byte[] nameBytes = new byte[nameLength];
      buffer.get(nameBytes);
      int addressLen = buffer.getInt();
      byte[] addressBytes = new byte[addressLen];
      buffer.get(addressBytes);
      String name,address;
      try {
          name = new String(nameBytes,"UTF-8");
          address = new String(addressBytes,"UTF-8");
      }catch (UnsupportedEncodingException e) {
          throw new SerializationException("Error accur when deserializing");
      }

      return new Company(name, address);
  }

  @Override
  public void close() {

  }
}

​ 實際生產中需要自定義序列化器和反序列化器時,推薦使用Avro、JSON、Thrift、ProtoBuf或者Protostuff等通用的序列化工具來包裝。

1.2.3 消息消費

​ Kafka中消息的消費是基於拉模式的,kafka消息的消費是一個不斷輪旋的過程,消費者需要做的就是重複的調用poll方法。

  
public ConsumerRecords<K, V> poll(final Duration timeout)

​ 這個方法需要注意的是,如果消費者的緩衝區中有可用的數據,則會立即返回,否則會阻塞至timeout。如果在阻塞時間內緩衝區仍沒有數據,則返回一個空的消息集。timeout的設置取決於應用程序對效應速度的要求。如果應用線程的位移工作是從Kafka中拉取數據並進行消費可以將這個參數設置為Long.MAX_VALUE。

​ 每次poll都會返回一個ConsumerRecords對象,它是ConsumerRecord的集合。對於ConsumerRecord相比於ProducerRecord多了一些屬性:

  
private final String topic;//主題
  private final int partition;//分區
  private final long offset;//偏移量
  private final long timestamp;//時間戳
  private final TimestampType timestampType;//時間戳類型
  private final int serializedKeySize;//序列化key的大小
  private final int serializedValueSize;//序列化value的大小
  private final Headers headers;//headers
  private final K key;//key
  private final V value;//value
  private volatile Long checksum;//CRC32校驗和

​ 另外我們可以按照分區維度對消息進行消費,通過ConsumerRecords.records(TopicPartiton)方法實現。

  
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
    Set<TopicPartition> partitions = records.partitions();
    for (TopicPartition tp : partitions) {
        for (ConsumerRecord<String, String> record : records.records(tp)) {
              System.out.println(record.partition() + " ," + record.value());
        }
    }

​ 另外還可以按照主題維度對消息進行消費,通過ConsumerRecords.records(Topic)實現。

  
for (String topic : topicList) {
      for (ConsumerRecord<String, String> record : records.records(topic)) {
              System.out.println(record.partition() + " ," + record.value());
      }
}

1.2.4 消費者位移提交

​ 首先要 明白一點,消費者位移是要做持久化處理的,否則當發生消費者崩潰或者消費者重平衡時,消費者消費位移無法獲得。舊消費者客戶端是將位移提交到zookeeper上,新消費者客戶端將位移存儲在Kafka內部主題_consumer_offsets中。

​ KafkaConsumer提供了兩個方法position(TopicPatition)和commited(TopicPartition)。

​ public long position(TopicPartition partition)—–獲得下一次拉取數據的偏移量

​ public OffsetAndMetadata committed(TopicPartition partition)—–給定分區的最後一次提交的偏移量。

還有一個概念稱之為lastConsumedOffset,這個指的是最後一次消費的偏移量。

​ 在kafka提交方式有兩種:自動提交和手動提交。

(1)自動位移提交

​ kafka默認情況下採用自動提交,enable.auto.commit的默認值為true。當然自動提交並不是沒消費一次消息就進行提交,而是定期提交,這個定期的周期時間由auto.commit.intervals.ms參數進行配置,默認值為5s,當然這個參數生效的前提就是開啟自動提交。

​ 自動提交會造成重複消費和消息丟失的情況。重複消費很容易理解,因為自動提交實際是延遲提交,因此很容易造成重複消費,然後消息丟失是怎麼產生的?

(2)手動位移提交

​ 開始手動提交的需要配置enable.auto.commit=false。手動提交消費者偏移量,又可分為同步提交和異步提交。

​ 同步提交:

​ 同步提交很簡單,調用commitSync() 方法:

  
while (isRunning.get()) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
        for (ConsumerRecord<String, String> record : records) {
            //consume message
            consumer.commitSync();
        }
}

​ 這樣,每消費一條消息,提交一個偏移量。當然可用過緩存消息的方式,實現批量處理+批量提交:

  
while (isRunning.get()) {
      ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
      for (ConsumerRecord<String, String> record : records) {
            buffer.add(record);
      }
      if (buffer.size() >= minBaches) {
          for (ConsumerRecord<String, String> record : records) {
              //consume message
          }
          consumer.commitSync();
          buffer.clear();
      }
}

​ 還可以通過public void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets)這個方法實現按照分區粒度進行同步提交。

  
while (isRunning.get()) {
  ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
  for (TopicPartition tp : records.partitions()) {
      List<ConsumerRecord<String, String>> partitionRecords = records.records(tp);
      for (ConsumerRecord record : partitionRecords) {
          //consume message
      }
      long lastConsumerOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
      consumer.commitSync(Collections.singletonMap(tp,new                                                               OffsetAndMetadata(lastConsumerOffset+1)));
  }
}

​ 異步提交:

​ commitAsync異步提交的時候消費者線程不會被阻塞,即可能在提交偏移量的結果還未返回之前,就開始了新一次的拉取數據操作。異步提交可以提升消費者的性能。commitAsync有三個重載:

​ public void commitAsync()

​ public void commitAsync(OffsetCommitCallback callback)

​ public void commitAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback )

​ 對照同步提交的方法參數,多了一個Callback回調參數,它提供了一個異步提交的回調方法,當消費者位移提交完成后回調OffsetCommitCallback的onComplement方法。以第二個方法為例:

  
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
  for (ConsumerRecord<String, String> record : records) {
      //consume message
  }
  consumer.commitAsync(new OffsetCommitCallback() {
      @Override
      public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception e) {
          if (e == null) {
              System.out.println(offsets);
          }else {
                e.printStackTrace();
          }
      }
});

1.2.5 控制和關閉消費

​ kafkaConsumer提供了pause()和resume() 方法分別實現暫停某些分區在拉取操作時返回數據給客戶端和恢復某些分區向客戶端返回數據的操作:

​ public void pause(Collection<TopicPartition> partitions)

​ public void resume(Collection<TopicPartition> partitions)

​ 優雅停止KafkaConsumer退出消費者循環的方式:

​ (1)不要使用while(true),而是使用while(isRunning.get()),isRunning是一個AtomicBoolean類型,可以在其他地方調用isRunning.set(false)方法退出循環。

​ (2)調用consumer.wakup()方法,wakeup方法是KafkaConsumer中唯一一個可以從其他線程里安全調用的方法,會拋出WakeupException,我們不需要處理這個異常。

​ 跳出循環后一定要显示的執行關閉動作和釋放資源。

1.2.6 指定位移消費

KafkaConsumer可通過兩種方式實現實現不同粒度的指定位移消費。第一種是通過auto.offset.reset參數,另一種通過一個重要的方法seek。

(1)auto.offset.reset

auto.offset.reset這個參數總共有三種可配置的值:latest、earliest、none。如果配置不在這三個值當中,就會拋出ConfigException。

latest:當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset或位移越界時,消費新產生的該分區下的數據

earliest:當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset或位移越界時,從頭開始消費

none:topic各分區都存在已提交的offset時,從offset后開始消費;只要有一個分區不存在已提交的offset或位移越界,則拋出NoOffsetForPartitionException異常

消息的消費是通過poll方法進行的,poll方法對於開發者來說就是一個黑盒,無法精確的掌控消費的起始位置。即使通過auto.offsets.reset參數也只能在找不到位移或者位移越界的情況下粗粒度的從頭開始或者從末尾開始。因此,Kafka提供了另一種更細粒度的消費掌控:seek。

(2)seek

seek可以實現追前消費和回溯消費:

  
public void seek(TopicPartition partition, long offset)

可以通過seek方法實現指定分區的消費位移的控制。需要注意的一點是,seek方法只能重置消費者分配到的分區的偏移量,而分區的分配是在poll方法中實現的。因此在執行seek方法之前需要先執行一次poll方法獲取消費者分配到的分區,但是並不是每次poll方法都能獲得數據,所以可以採用如下的方法。

  
consumer.subscribe(topicList);
  Set<TopicPartition> assignment = new HashSet<>();
  while(assignment.size() == 0) {
      consumer.poll(Duration.ofMillis(100));
      assignment = consumer.assignment();//獲取消費者分配到的分區,沒有獲取返回一個空集合
  }

  for (TopicPartition tp : assignment) {
      consumer.seek(tp, 10); //重置指定分區的位移
  }
  while (true) {
      ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
      //consume record
    }

如果對未分配到的分區執行了seek方法,那麼會報出IllegalStateException異常。

在前面我們已經提到,使用auto.offsets.reset參數時,只有當消費者分配到的分區沒有提交的位移或者位移越界時,才能從earliest消費或者從latest消費。seek方法可以彌補這一中情況,實現任意情況的從頭或從尾部消費。

   Set<TopicPartition> assignment = new HashSet<>();
  while(assignment.size() == 0) {
      consumer.poll(Duration.ofMillis(100));
      assignment = consumer.assignment();
  }
  Map<TopicPartition, Long> offsets = consumer.endOffsets(assignment);//獲取指定分區的末尾位置
  for (TopicPartition tp : assignment) {
      consumer.seek;
  }

與endOffset對應的方法是beginningOffset方法,可以獲取指定分區的起始位置。其實kafka已經提供了一個從頭和從尾消費的方法。

  
public void seekToBeginning(Collection<TopicPartition> partitions)
public void seekToEnd(Collection<TopicPartition> partitions)

還有一種場景是這樣的,我們並不知道特定的消費位置,卻知道一個相關的時間點。為解決這種場景遇到的問題,kafka提供了一個offsetsForTimes()方法,通過時間戳來查詢分區消費的位移。

      Map<TopicPartition, Long> timestampToSearch = new HashMap<>();
  for (TopicPartition tp : assignment) {
      timestampToSearch.put(tp, System.currentTimeMillis() - 24 * 3600 * 1000);
  }
//獲得指定分區指定時間點的消費位移
  Map<TopicPartition, OffsetAndTimestamp> offsets =                                                                                   consumer.offsetsForTimes(timestampToSearch);
  for (TopicPartition tp : assignment) {
      OffsetAndTimestamp offsetAndTimestamp = offsets.get(tp);
      if (offsetAndTimestamp != null) {
              consumer.seek(tp, offsetAndTimestamp.offset());
      }
  }

由於seek方法的存在,使得消費者的消費位移可以存儲在任意的存儲介質中,包括DB、文件系統等。

1.2.7 消費者的再均衡

再均衡是指分區的所屬權從一個消費者轉移到另一消費者的行為,它為消費者組具備高可用伸縮性提高保障。不過需要注意的地方有兩點,第一是消費者發生再均衡期間,消費者組中的消費者是無法讀取消息的。第二點就是消費者發生再均衡可能會引起重複消費問題,所以一般情況下要盡量避免不必要的再均衡。

KafkaConsumer的subscribe方法中有一個參數為ConsumerRebalanceListener,我們稱之為再均衡監聽器,它可以用來在設置發生再均衡動作前後的一些準備和收尾動作。

  public interface ConsumerRebalanceListener {
  void onPartitionsRevoked(Collection<TopicPartition> partitions);
  void onPartitionsAssigned(Collection<TopicPartition> partitions);
}

onPartitionsRevoked方法會在再均衡之前和消費者停止讀取消息之後被調用。可以通過這個回調函數來處理消費位移的提交,以避免重複消費。參數partitions表示再均衡前分配到的分區。

onPartitionsAssigned方法會在再均衡之後和消費者消費之間進行調用。參數partitons表示再均衡之後所分配到的分區。

  consumer.subscribe(topicList);
  Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
  consumer.subscribe(topicList, new ConsumerRebalanceListener() {
      @Override
      public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
          consumer.commitSync(currentOffsets);//提交偏移量
      }

      @Override
      public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
          //do something
      }
  });

  try {
      while (isRunning.get()) {
          ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
          for (ConsumerRecord<String, String> record : records) {
              //process records
              //記錄當前的偏移量
              currentOffsets.put(new TopicPartition(record.topic(), record.partition()),new                               OffsetAndMetadata( record.offset() + 1));
          }
          consumer.commitAsync(currentOffsets, null);
      }

      } catch (Exception e) {
          e.printStackTrace();
      }finally {
          consumer.close();
      }

1.2.8 消費者攔截器

消費者攔截器主要是在消費到消息或者提交消費位移時進行一些定製化的操作。消費者攔截器需要自定義實現org.apache.kafka.clients.consumer.ConsumerInterceptor接口。

  public interface ConsumerInterceptor<K, V> extends Configurable {    
  public ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records);
  public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets);
  public void close();
}

onConsume方法是在poll()方法返回之前被調用,比如修改消息的內容、過濾消息等。如果onConsume方法發生異常,異常會被捕獲並記錄到日誌中,但是不會向上傳遞。

Kafka會在提交位移之後調用攔截器的onCommit方法,可以使用這個方法來記錄和跟蹤消費的位移信息。

  
public class ConsumerInterceptorTTL implements ConsumerInterceptor<String,String> {
  private static final long EXPIRE_INTERVAL = 10 * 1000; //10秒過期
  @Override
  public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
      long now = System.currentTimeMillis();
      Map<TopicPartition, List<ConsumerRecord<String, String>>> newRecords = new HashMap<>();

      for (TopicPartition tp : records.partitions()) {
          List<ConsumerRecord<String, String>> tpRecords = records.records(tp);
          List<ConsumerRecord<String, String>> newTpRecords = records.records(tp);
          for (ConsumerRecord<String, String> record : tpRecords) {
              if (now - record.timestamp() < EXPIRE_INTERVAL) {//判斷是否超時
                  newTpRecords.add(record);
              }
          }
          if (!newRecords.isEmpty()) {
              newRecords.put(tp, newTpRecords);
          }


      }
      return new ConsumerRecords<>(newRecords);
  }

  @Override
  public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
      offsets.forEach((tp,offset) -> {
          System.out.println(tp + ":" + offset.offset());
      });
  }

  @Override
  public void close() {}

  @Override
  public void configure(Map<String, ?> configs) {}
}

使用這種TTL需要注意的是如果採用帶參數的位移提交方式,有可能提交了錯誤的位移,可能poll拉取的最大位移已經被攔截器過濾掉。

1.2.9 消費者的多線程實現

KafkaProducer是線程安全的,然而KafkaConsumer是非線程安全的。KafkaConsumer中的acquire方法用於檢測當前是否只有一個線程在操作,如果有就會拋出ConcurrentModifiedException。acuqire方法和我們通常所說的鎖是不同的,它不會阻塞線程,我們可以把它看做是一個輕量級的鎖,它通過線程操作計數標記的方式來檢測是否發生了併發操作。acquire方法和release方法成對出現,分表表示加鎖和解鎖。

  //標記當前正在操作consumer的線程
private final AtomicLong currentThread = new AtomicLong(NO_CURRENT_THREAD);
//refcount is used to allow reentrant access by the thread who has acquired currentThread,
//大概可以理解我加鎖的次數
private final AtomicInteger refcount = new AtomicInteger(0);
private void acquire() {
long threadId = Thread.currentThread().getId();
if (threadId != currentThread.get()&&!currentThread.compareAndSet(NO_CURRENT_THREAD, threadId))
      throw new ConcurrentModificationException("KafkaConsumer is not safe for multi-threaded access");
      refcount.incrementAndGet();
}

private void release() {
  if (refcount.decrementAndGet() == 0)
      currentThread.set(NO_CURRENT_THREAD);
}

kafkaConsumer中的每個共有方法在調用之前都會執行aquire方法,只有wakeup方法是個意外。

KafkaConsumer的非線程安全並不意味着消費消息的時候只能以單線程的方式執行。可以通過多種方式實現多線程消費。

(1)Kafka多線程消費第一種實現方式——–線程封鎖

所謂線程封鎖,就是為每個線程實例化一個KafkaConsumer對象。這種方式一個線程對應一個KafkaConsumer,一個線程(可就是一個consumer)可以消費一個或多個分區的消息。這種消費方式的併發度受限於分區的實際數量。當線程數量超過分分區數量時,就會出現線程限制額的情況。

  import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

public class FirstMutiConsumerDemo {
  public static final String brokerList="node112:9092,node113:9092,node114:9092";
  public static final String topic = "topic-demo";
  public static final String groupId = "group.demo";

  public static Properties initConfig() {
      Properties prop = new Properties();
      prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
      prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
      prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
      prop.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
      prop.put(ConsumerConfig.CLIENT_DNS_LOOKUP_CONFIG, "consumer.client.di.demo");
      prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
      return prop;
  }

  public static void main(String[] args) {
      Properties prop = initConfig();
      int consumerThreadNum = 4;
      for (int i = 0; i < 4; i++) {
          new KafkaCoosumerThread(prop, topic).run();
      }
  }

  public static class KafkaCoosumerThread extends Thread {
  //每個消費者線程包含一個KakfaConsumer對象。
      private KafkaConsumer<String, String> kafkaConsumer;
      public KafkaCoosumerThread(Properties prop, String topic) {
          this.kafkaConsumer = new KafkaConsumer<String, String>(prop);
          this.kafkaConsumer.subscribe(Arrays.asList(topic));
      }

      @Override
      public void run() {
          try {
              while (true) {
                  ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100));
                  for (ConsumerRecord<String, String> record : records) {
                      //處理消息模塊
                  }
              }
          } catch (Exception e) {
              e.printStackTrace();
          }finally {
              kafkaConsumer.close();
          }
      }
  }
}

這種實現方式和開啟多個消費進程的方式沒有本質的區別,優點是每個線程可以按照順序消費消費各個分區的消息。缺點是每個消費線程都要維護一個獨立的TCP連接,如果分區數和線程數都很多,那麼會造成不小的系統開銷。

(2)Kafka多線程消費第二種實現方式——–多個消費線程同時消費同一分區

多個線程同時消費同一分區,通過assign方法和seek方法實現。這樣就可以打破原有消費線程個數不能超過分區數的限制,進一步提高了消費的能力,但是這種方式對於位移提交和順序控制的處理就會變得非常複雜。實際生產中很少使用。

(3)第三種實現方式——-創建一個消費者,records的處理使用多線程實現

一般而言,消費者通過poll拉取數據的速度相當快,而整體消費能力的瓶頸也正式在消息處理這一塊。基於此

考慮第三種實現方式。

  import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThirdMutiConsumerThreadDemo {
  public static final String brokerList="node112:9092,node113:9092,node114:9092";
  public static final String topic = "topic-demo";
  public static final String groupId = "group.demo";

  public static Properties initConfig() {
      Properties prop = new Properties();
      prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
      prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
      prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
      prop.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
      prop.put(ConsumerConfig.CLIENT_DNS_LOOKUP_CONFIG, "consumer.client.di.demo");
      prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
      return prop;
  }

  public static void main(String[] args) {
      Properties prop = initConfig();
      KafkaConsumerThread consumerThread = new KafkaConsumerThread(prop, topic, Runtime.getRuntime().availableProcessors());
      consumerThread.start();
  }


  public static class KafkaConsumerThread extends Thread {
      private KafkaConsumer<String, String> kafkaConsumer;
      private ExecutorService executorService;
      private int threadNum;

      public KafkaConsumerThread(Properties prop, String topic, int threadNum) {
          this.kafkaConsumer = new KafkaConsumer<String, String>(prop);
          kafkaConsumer.subscribe(Arrays.asList(topic));
          this.threadNum = threadNum;
          executorService = new ThreadPoolExecutor(threadNum, threadNum, 0L, TimeUnit.MILLISECONDS,
            new ArrayBlockingQueue<>(1000), new ThreadPoolExecutor.CallerRunsPolicy());
      }

      @Override
      public void run() {
          try {
              while (true) {
                  ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100));
                  if (!records.isEmpty()) {
                      executorService.submit(new RecordHandler(records));
                  }
              }
          } catch (Exception e) {
              e.printStackTrace();
          }finally {
              kafkaConsumer.close();
          }
      }
  }

  public static class RecordHandler implements Runnable {
      public final ConsumerRecords<String,String> records;
      public RecordHandler(ConsumerRecords<String, String> records) {
          this.records = records;
      }
       
      @Override
      public void run() {
          //處理records
      }
  }
}

KafkaConsumerThread類對應一個消費者線程,裏面通過線程池的方式調用RecordHandler處理一批批的消息。其中線程池採用的拒絕策略為CallerRunsPolicy,當阻塞隊列填滿時,由調用線程處理該任務,以防止總體的消費能力跟不上poll拉取的速度。這種方式還可以進行橫向擴展,通過創建多個KafkaConsumerThread實例來進一步提升整體的消費能力。

這種方式還可以減少TCP連接的數量,但是對於消息的順序處理就變得困難了。這種方式需要引入一個共享變量Map<TopicPartition,OffsetAndMetadata> offsets參與消費者的偏移量提交。每一個RecordHandler類在處理完消息后都將對應的消費位移保存到共享變量offsets中,KafkaConsumerThread在每一次poll()方法之後都要進讀取offsets中的內容並對其進行提交。對於offsets的讀寫要採用加鎖處理,防止出現併發問題。並且在寫入offsets的時候需要注意位移覆蓋的問題。針對這個問題,可以將RecordHandler的run方法做如下改變:

  public void run() {
          for (TopicPartition tp : records.partitions()) {
              List<ConsumerRecord<String, String>> tpRecords = this.records.records(tp);
              //處理tpRecords
              long lastConsumedOffset = tpRecords.get(tpRecords.size() - 1).offset();
              synchronized (offsets) {
                  if (offsets.containsKey(tp)) {
                      offsets.put(tp, new OffsetAndMetadata(lastConsumedOffset + 1));
                  }else {
                      long positioin = offsets.get(tp).offset();
                      if(positioin < lastConsumedOffset + 1) {
                      offsets.put(tp, new OffsetAndMetadata(lastConsumedOffset + 1));
                      }
                  }
              }
          }
      }

對應的位移提交代碼也應該在KafkaConsumerThread的run方法中進行體現

  public void run() {
  try {
      while (true) {
          ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100));
          if (!records.isEmpty()) {
              executorService.submit(new RecordHandler(records));
              synchronized (offsets) {
                  if (!offsets.isEmpty()) {
                      kafkaConsumer.commitSync(offsets);
                      offsets.clear();
                    }
              }
          }
      }
  } catch (Exception e) {
      e.printStackTrace();
    }finally {
        kafkaConsumer.close();
      }
    }
}

其實這種方式並不完美,可能造成數據丟失。可以通過更為複雜的滑動窗口的方式進行改進。

1.2.10 消費者重要參數

  • fetch.min.bytes

    kafkaConsumer一次拉拉取請求的最小數據量。適當增加,會提高吞吐量,但會造成額外延遲。

  • fetch.max.bytes

    kafkaConsumer一次拉拉取請求的最大數據量,如果kafka一條消息的大小超過這個值,仍然是可以拉取的。

  • fetch.max.wait.ms

    一次拉取的最長等待時間,配合fetch.min.bytes使用

  • max.partiton.fetch.bytes

    每個分區里返回consumer的最大數據量。

  • max.poll.records

    一次拉取的最大消息數

  • connection.max.idle.ms

    多久之後關閉限制的連接

  • exclude.internal.topics

    這個參數用於設置kafka中的兩個內部主題能否被公開:consumer_offsets和transaction_state。如果設為true,可以使用Pattren訂閱內部主題,如果是false,則沒有這種限制。

  • receive.buffer.bytes

    socket接收緩衝區的大小

  • send.buffer.bytes

    socket發送緩衝區的大小

  • request.timeout.ms

    consumer等待請求響應的最長時間。

  • reconnect.backoff.ms

    重試連接指定主機的等待時間。

  • max.poll.interval.ms

    配置消費者等待拉取時間的最大值,如果超過這個期限,消費者組將剔除該消費者,進行再平衡。

  • auto.offset.reset

    自動偏移量重置

  • enable.auto.commit

    是否允許偏移量的自動提交

  • auto.commit.interval.ms

    自動偏移量提交的時間間隔

【精選推薦文章】

自行創業 缺乏曝光? 下一步"網站設計"幫您第一時間規劃公司的門面形象

網頁設計一頭霧水??該從何著手呢? 找到專業技術的網頁設計公司,幫您輕鬆架站!

評比前十大台北網頁設計台北網站設計公司知名案例作品心得分享

台北網頁設計公司這麼多,該如何挑選?? 網頁設計報價省錢懶人包"嚨底家"

Android-Handler消息機制實現原理

一、消息機制流程簡介

在應用啟動的時候,會執行程序的入口函數main(),main()裏面會創建一個Looper對象,然後通過這個Looper對象開啟一個死循環,這個循環的工作是,不斷的從消息隊列MessageQueue裏面取出消息即Message對象,並處理。然後看下面兩個問題:
循環拿到一個消息之後,如何處理?
是通過在Looper的循環里調用Handler的dispatchMessage()方法去處理的,而dispatchMessage()方法裏面會調用handleMessage()方法,handleMessage()就是平時使用Handler時重寫的方法,所以最終如何處理消息由使用Handler的開發者決定。
MessageQueue里的消息從哪來?
使用Handler的開發者通過調用sendMessage()方法將消息加入到MessageQueue裏面。

上面就是Android中消息機制的一個整體流程,也是 “Android中Handler,Looper,MessageQueue,Message有什麼關係?” 的答案。通過上面的流程可以發現Handler在消息機制中的地位,是作為輔助類或者工具類存在的,用來供開發者使用。

對於這個流程有兩個疑問:

  • Looper中是如何能調用到Handler的方法的?
  • Handler是如何能往MessageQueue中插入消息的?

這兩個問題會在後面給出答案,下面先來通過源碼,分析一下這個過程的具體細節:

二、消息機制的源碼分析

首先main()方法位於ActivityThread.java類裏面,這是一個隱藏類,源碼位置:frameworks/base/core/java/android/app/ActivityThread.java

public static void main(String[] args) {
    ......
    Looper.prepareMainLooper();

    ActivityThread thread = new ActivityThread();
    thread.attach(false);

    if (sMainThreadHandler == null) {
        sMainThreadHandler = thread.getHandler();
    }

    Looper.loop();

    throw new RuntimeException("Main thread loop unexpectedly exited");
}

Looper的創建可以通過Looper.prepare()來完成,上面的代碼中prepareMainLooper()是給主線程創建Looper使用的,本質也是調用的prepare()方法。創建Looper以後就可以調用Looper.loop()開啟循環了。main方法很簡單,不多說了,下面看看Looper被創建的時候做了什麼,下面是Looper的prepare()方法和變量sThreadLocal:

static final ThreadLocal<Looper> sThreadLocal = new ThreadLocal<Looper>();

private static void prepare(boolean quitAllowed) {
    if (sThreadLocal.get() != null) {
        throw new RuntimeException("Only one Looper may be created per thread");
    }
    sThreadLocal.set(new Looper(quitAllowed));
}

很簡單,new了一個Looper,並把new出來的Looper保存到ThreadLocal裏面。ThreadLocal是什麼?它是一個用來存儲數據的類,類似HashMap、ArrayList等集合類。它的特點是可以在指定的線程中存儲數據,然後取數據只能取到當前線程的數據,比如下面的代碼:

ThreadLocal<Integer> mThreadLocal = new ThreadLocal<>();
private void testMethod() {

    mThreadLocal.set(0);
    Log.d(TAG, "main  mThreadLocal=" + mThreadLocal.get());

    new Thread("Thread1") {
        @Override
        public void run() {
            mThreadLocal.set(1);
            Log.d(TAG, "Thread1  mThreadLocal=" + mThreadLocal.get());
        }
    }.start();

    new Thread("Thread2") {
        @Override
        public void run() {
            mThreadLocal.set(2);
            Log.d(TAG, "Thread1  mThreadLocal=" + mThreadLocal.get());
        }
    }.start();

    Log.d(TAG, "main  mThreadLocal=" + mThreadLocal.get());
}

輸出的log是

main  mThreadLocal=0
Thread1  mThreadLocal=1
Thread2  mThreadLocal=2
main  mThreadLocal=0

通過上面的例子可以清晰的看到ThreadLocal存取數據的特點,只能取到當前所在線程存的數據,如果所在線程沒存數據,取出來的就是null。其實這個效果可以通過HashMap<Thread, Object>來實現,考慮線程安全的話使用ConcurrentMap<Thread, Object>,不過使用Map會有一些麻煩的事要處理,比如當一個線程結束的時候我們如何刪除這個線程的對象副本呢?如果使用ThreadLocal就不用有這個擔心了,ThreadLocal保證每個線程都保持對其線程局部變量副本的隱式引用,只要線程是活動的並且 ThreadLocal 實例是可訪問的;在線程消失之後,其線程局部實例的所有副本都會被垃圾回收(除非存在對這些副本的其他引用)。更多ThreadLocal的講解參考:Android線程管理之ThreadLocal理解及應用場景

好了回到正題,prepare()創建Looper的時候同時把創建的Looper存儲到了ThreadLocal中,通過對ThreadLocal的介紹,獲取Looper對象就很簡單了,sThreadLocal.get()即可,源碼提供了一個public的靜態方法可以在主線程的任何地方獲取這個主線程的Looper(注意一下方法名myLooper(),多個地方會用到):

public static @Nullable Looper myLooper() {
    return sThreadLocal.get();
}

Looper創建完了,接下來開啟循環,loop方法的關鍵代碼如下:

public static void loop() {
    final Looper me = myLooper();
    if (me == null) {
        throw new RuntimeException("No Looper; Looper.prepare() wasn't called on this thread.");
    }
    final MessageQueue queue = me.mQueue;

    for (;;) {
        Message msg = queue.next(); // might block
        if (msg == null) {
            // No message indicates that the message queue is quitting.
            return;
        }

        try {
            msg.target.dispatchMessage(msg);
        } finally {
            if (traceTag != 0) {
                Trace.traceEnd(traceTag);
            }
        }

        msg.recycleUnchecked();
    }
}

上面的代碼,首先獲取主線程的Looper對象,然後取得Looper中的消息隊列final MessageQueue queue = me.mQueue;,然後下面是一個死循環,不斷的從消息隊列里取消息Message msg = queue.next();,可以看到取出的消息是一個Message對象,如果消息隊列里沒有消息,就會阻塞在這行代碼,等到有消息來的時候會被喚醒。取到消息以後,通過msg.target.dispatchMessage(msg);來處理消息,msg.target 是一個Handler對象,所以這個時候就調用到我們重寫的Hander的handleMessage()方法了。
msg.target 是在什麼時候被賦值的呢?要找到這個答案很容易,msg.target是被封裝在消息裏面的,肯定要從發送消息那裡開始找,看看Message是如何封裝的。那麼就從Handler的sendMessage(msg)方法開始,過程如下:

public final boolean sendMessage(Message msg) {
    return sendMessageDelayed(msg, 0);
}

public final boolean sendMessageDelayed(Message msg, long delayMillis) {
    if (delayMillis < 0) {
        delayMillis = 0;
    }
    return sendMessageAtTime(msg, SystemClock.uptimeMillis() + delayMillis);
}

public boolean sendMessageAtTime(Message msg, long uptimeMillis) {
    MessageQueue queue = mQueue;
    if (queue == null) {
        RuntimeException e = new RuntimeException(
                this + " sendMessageAtTime() called with no mQueue");
        Log.w("Looper", e.getMessage(), e);
        return false;
    }
    return enqueueMessage(queue, msg, uptimeMillis);
}

private boolean enqueueMessage(MessageQueue queue, Message msg, long uptimeMillis) {
    msg.target = this;
    if (mAsynchronous) {
        msg.setAsynchronous(true);
    }
    return queue.enqueueMessage(msg, uptimeMillis);
}

可以看到最後的enqueueMessage()方法中msg.target = this;,這裏就把發送消息的handler封裝到了消息中。同時可以看到,發送消息其實就是往MessageQueue裏面插入了一條消息,然後Looper裏面的循環就可以處理消息了。Handler裏面的消息隊列是怎麼來的呢?從上面的代碼可以看到enqueueMessage()裏面的queue是從sendMessageAtTime傳來的,也就是mQueue。然後看mQueue是在哪初始化的,看Handler的構造方法如下:

public Handler(Callback callback, boolean async) {
    if (FIND_POTENTIAL_LEAKS) {
        final Class<? extends Handler> klass = getClass();
        if ((klass.isAnonymousClass() || klass.isMemberClass() || klass.isLocalClass()) &&
                (klass.getModifiers() & Modifier.STATIC) == 0) {
            Log.w(TAG, "The following Handler class should be static or leaks might occur: " +
                klass.getCanonicalName());
        }
    }

    mLooper = Looper.myLooper();
    if (mLooper == null) {
        throw new RuntimeException(
            "Can't create handler inside thread that has not called Looper.prepare()");
    }
    mQueue = mLooper.mQueue;
    mCallback = callback;
    mAsynchronous = async;
}

mQueue的初始化很簡單,首先取得Handler所在線程的Looper,然後取出Looper中的mQueue。這也是Handler為什麼必須在有Looper的線程中才能使用的原因,拿到mQueue就可以很容易的往Looper的消息隊列里插入消息了(配合Looper的循環+阻塞就實現了發送接收消息的效果)。

以上就是主線程中消息機制的原理。

那麼,在任何線程下使用handler的如下做法的原因、原理、內部流程等就非常清晰了:

new Thread() {
    @Override
    public void run() {
        Looper.prepare();
        Handler handler = new Handler();
        Looper.loop();
    }
}.start();
  1. 首先Looper.prepare()創建Looper並初始化Looper持有的消息隊列MessageQueue,創建好后將Looper保存到ThreadLocal中方便Handler直接獲取。
  2. 然後Looper.loop()開啟循環,從MessageQueue裏面取消息並調用handler的 dispatchMessage(msg) 方法處理消息。如果MessageQueue里沒有消息,循環就會阻塞進入休眠狀態,等有消息的時候被喚醒處理消息。
  3. 再然後我們new Handler()的時候,Handler構造方法中獲取Looper並且拿到Looper的MessageQueue對象。然後Handler內部就可以直接往MessageQueue裏面插入消息了,插入消息即發送消息,這時候有消息了就會喚醒Looper循環去處理消息。處理消息就是調用dispatchMessage(msg) 方法,最終調用到我們重寫的Handler的handleMessage()方法。

三、通過一些問題的研究加強對消息機制的理解

源碼分析完了,下面看一下文章開頭的兩個問題:

  • Looper中是如何能調用到Handler的方法的?
  • Handler是如何能往MessageQueue中插入消息的?

這兩個問題源碼分析中已經給出答案,這裏做一下總結,首先搞清楚以下對象在消息機制中的關係:

Looper,MessageQueue,Message,ThreadLocal,Handler
  1. Looper對象有一個成員MessageQueue,MessageQueue是一個消息隊列,用來存儲消息Message
  2. Message消息中帶有一個handler對象,所以Looper取出消息后,可以很方便的調用到Handler的方法(問題1解決)
  3. Message是如何帶有handler對象的?是handler在發送消息的時候把自己封裝到消息里的。
  4. Handler是如何發送消息的?是通過獲取Looper對象從而取得Looper裏面的MessageQueue,然後Handler就可以直接往MessageQueue裏面插入消息了。(問題2解決)
  5. Handler是如何獲取Looper對象的?Looper在創建的時候同時把自己保存到ThreadLocal中,並提供一個public的靜態方法可以從ThreadLocal中取出Looper,所以Handler的構造方法里可以直接調用靜態方法取得Looper對象。

帶着上面的一系列問題看源碼就很清晰了,下面是知乎上的一個問答:

Android中為什麼主線程不會因為Looper.loop()里的死循環卡死?

原因很簡單,循環里有阻塞,所以死循環並不會一直執行,相反的,大部分時間是沒有消息的,所以主線程大多數時候都是處於休眠狀態,也就不會消耗太多的CPU資源導致卡死。

  1. 阻塞的原理是使用Linux的管道機制實現的
  2. 主線程沒有消息處理時阻塞在管道的讀端
  3. binder線程會往主線程消息隊列里添加消息,然後往管道寫端寫一個字節,這樣就能喚醒主線程從管道讀端返回,也就是說looper循環里queue.next()會調用返回…

這裏說到binder線程,具體的實現細節不必深究,考慮下面的問題:
主線程的死循環如何處理其它事務?
首先需要看懂這個問題,主線程進入Looper死循環后,如何處理其他事務,比如activity的各個生命周期的回調函數是如何被執行到的(注意這裡是在同一個線程下,代碼是按順序執行的,如果在死循環這阻塞了,那麼進入死循環后循環以外的代碼是如何執行的)。
首先再看main函數的源碼

Looper.prepareMainLooper();

ActivityThread thread = new ActivityThread();
thread.attach(false);

if (sMainThreadHandler == null) {
    sMainThreadHandler = thread.getHandler();
}

Looper.loop();

在Looper.prepare和Looper.loop之間new了一個ActivityThread並調用了它的attach方法,這個方法就是開啟binder線程的,另外new ActivityThread()的時候同時會初始化它的一個H類型的成員,H是一個繼承了Handler的類。此時的結果就是:在主線程開啟loop死循環之前,已經啟動binder線程,並且準備好了一個名為H的Handler,那麼接下來在主線程死循環之外做一些事務處理就很簡單了,只需要通過binder線程向H發送消息即可,比如發送 H.LAUNCH_ACTIVITY 消息就是通知主線程調用Activity.onCreate() ,當然不是直接調用,H收到消息後會進行一系列複雜的函數調用最終調用到Activity.onCreate()。
至於誰來控制binder線程來向H發消息就不深入研究了,下面是《Android開發藝術探索》裏面的一段話:

ActivityThread 通過 ApplicationThread 和 AMS 進行進程間通訊,AMS 以進程間通信的方式完成 ActivityThread 的請求後會回調 ApplicationThread 中的 Binder 方法,然後 ApplicationThread 會向 H 發送消息,H 收到消息後會將 ApplicationThread 中的邏輯切換到 ActivityThread 中去執行,即切換到主線程中去執行,這個過程就是主線程的消息循環模型。

這個問題就到這裏,更多內容看知乎原文

最後

和其他系統相同,Android應用程序也是依靠消息驅動來工作的。網上的這句話還是很有道理的。

文章參考:

《Android開發藝術探索》
Android中為什麼主線程不會因為Looper.loop()里的死循環卡死?
Android線程管理之ThreadLocal理解及應用場景
Android 消息機制——你真的了解Handler
Android Handler到底是什麼

【精選推薦文章】

智慧手機時代的來臨,RWD網頁設計已成為網頁設計推薦首選

想知道網站建置、網站改版該如何進行嗎?將由專業工程師為您規劃客製化網頁設計及後台網頁設計

帶您來看台北網站建置台北網頁設計,各種案例分享

廣告預算用在刀口上,網站設計公司幫您達到更多曝光效益

分享工作中一次優化程序的過程

 

程序應用場景:

年初從總公司交接了一個評分系統,系統大概情況是80w考生,每個考生105條作答數據,作答數據主要是客觀題(單選題,多選題,判斷題),評分時間大概40分鐘左右。

需求:優化代碼,提升評分效率,優化之後評分完成在20分鐘左右。

已有代碼優化邏輯:

1.程序方面:

多線程,通過計算評分的人數,得到需要的線程數量,開啟線程分別進行評分

2.查詢數據:

成績表建立自增長Id,查詢匹配需要評分的數據用where id> =Start and id< End

3.插入數據:

採用DataTable數據導入到Sql Server,提示評分完成時保存數據效率

詳細說一下第二點的邏輯

假設數據庫有100w條需要評分的數據,這時通過計算大概得到需要開啟25個線程去同時進行評分,也就是說第一個線程去評分數據庫Id在  1~40000考生的數據,也就是說去查詢數據時,大概查詢語句select * from A where Id>=1 and Id<40000這樣能夠保證查詢出來得效率是最高的;

但是這裡會有缺陷,如果數據存在刪除過在插入肯定數據就不是連續的,也就是

where Id>=1 and Id<40000不能保證查詢得到40000條數據,數據庫表中存在的Id不一定是從1開始,所以目前這種查詢方式是很理想化的。後續這裏還需要在這快研究和學習下。

本次優化細節

代碼片段1

優化前:

 

優化后:

 

 

總結:提取線程內查詢數據存儲到內存中,從而只會查詢一次。

代碼片段2

優化前:

 

優化后:

 

總結:修改List集合取一條數據的方式,Where修改成Find,如果還需要在這裏提升查詢效率可以修改成for循環,但會導致代碼可讀性會變差。

代碼片段3

優化前

 

優化后

 

 

總結:修改DataTable表Select查詢方式,先存儲到Dictionary中,在通過Key去取對應的數據。

最後通過測試全部完成評分時間大概20分鐘左右,也算成功的完成了這個任務,可能還需要在研究研究代碼,看能否有其他地方可以改善的。

存在的疑惑:

線程運算佔用的電腦Cpu的具體的值?

是否存在最佳線程數量?

電腦Cpu處理能力越強是否也能夠提升程序的評分效率?

總結:

1.線程里反覆查詢而且不變的基礎數據放到線程外查詢存儲到變量中

2.List集合的Where查詢修改成Find查詢,極高的提升查詢效率

3.DataTable的Select改用Dictionary<string,DataRow[]>,同樣極高的提升查詢效率

這次能夠站在前人的肩膀上完成這一次代碼的優化還有學習到了很多,瑾以寫在這片文章分享個人在平時工作中的解決的一些問題,希望這次能夠通過分享此篇文章讓自己更多的去記錄和分享工作中遇到以及解決的問題,提升自己的競爭力。

【精選推薦文章】

如何讓商品強力曝光呢? 網頁設計公司幫您建置最吸引人的網站,提高曝光率!!

想要讓你的商品在網路上成為最夯、最多人討論的話題?

網頁設計公司推薦更多不同的設計風格,搶佔消費者視覺第一線

不管是台北網頁設計公司台中網頁設計公司,全省皆有專員為您服務

想知道最厲害的台北網頁設計公司推薦台中網頁設計公司推薦專業設計師"嚨底家"!!

C#中多線程中變量研究

今天在知乎上看到一個問題【為什麼在同一進程中創建不同線程,但線程各自的變量無法在線程間互相訪問?】。在多線程中,每個線程都是獨立運行的,不同的線程有可能是同一段代碼,但不會是同一作用域,所以不會共享。而共享內存,並沒有作用域之分,同一進程內,不管什麼線程都可以通過同一虛擬內存地址來訪問,不同進程也可以通過ipc等方式共享內存數據。全局變量:任何線程都可以訪問;局部變量(棧變量):任何線程執行到該函數時均可訪問,函數外不可訪問;線程變量:每個線程只能訪問自己的那個拷貝,其他線程不可見。今天就用C#來實現同一段代碼的不同線程,全局變量、局部變量、線程變量。

了解進程與線程

什麼是多任務,簡單來說就是操作系統同時可以運行多個任務。例如:一遍聽歌,一遍寫文檔等。多核CPU可以執行多任務,但是單核CPU也可以執行多任務,CPU是順序執行的,操作系統讓任務輪流執行,例如:聽歌執行一次,停頓0.01s,寫文檔執行一次,停頓0.01s等等。由於CPU的執行速度很快,我們感覺就像所有的任務都是同時執行。對操作系統來說,一個任務就是一個進程,一個進程至少有一個線程。進程是資源分配的最小單位,線程是CPU調度的最小單位。

普通的程序寫法

private static List<int> data = Enumerable.Range(1, 1000).ToList();

public static void SimpleTest()
{
    for (int i = 0; i < 10; i++)
    {
        List<int> tempData = new List<int>();
        foreach (var d in data)
        {
            tempData.Add(d);
        }
        Console.WriteLine($"i:{i},合計:{data.Sum()},是否相等:{data.Sum() == tempData.Sum()}");
    }

    Console.WriteLine("單線程運行結束");
}

多線程寫法

private static List<int> data = Enumerable.Range(1, 1000).ToList();

public static async Task MoreTaskTestAsync()
{
    List<Task> tasks = new List<Task>();
    for (int i = 0; i < 10; i++)
    {
        var tempi = i;
        var t = Task.Run(() =>
        {
            List<int> tempData = new List<int>();
            foreach (var d in data)
            {
                tempData.Add(d);
            }
            Console.WriteLine($"i:{tempi},合計:{data.Sum()},是否相等:{data.Sum() == tempData.Sum()}");
        });
        tasks.Add(t);
    }

    await Task.WhenAll(tasks); //或者Task.WaitAll(tasks.ToArray());
    Console.WriteLine("多線程運行結束");
}

不同的線程同一段代碼,但不會是同一作用域,所以tempData數據沒有互相影響。

全局變量:data,多個線程都可以訪問,list只讀的時候是線性安全
局部變量:i就是局部變量,訪問的線程可以訪問,去掉【var tempi = i;】,運行結果打印出來,值都是一樣的,增加的都是每個線程都訪問單獨的tempi變量

i:10,合計:500500,是否相等:True
i:10,合計:500500,是否相等:True
i:10,合計:500500,是否相等:True
i:10,合計:500500,是否相等:True
i:10,合計:500500,是否相等:True
i:10,合計:500500,是否相等:True
i:10,合計:500500,是否相等:True
i:10,合計:500500,是否相等:True
i:10,合計:500500,是否相等:True
i:10,合計:500500,是否相等:True

線程變量:tempData,每個線程只訪問自己的,互不影響,運行結果

i:3,合計:500500,是否相等:True
i:6,合計:500500,是否相等:True
i:0,合計:500500,是否相等:True
i:1,合計:500500,是否相等:True
i:4,合計:500500,是否相等:True
i:2,合計:500500,是否相等:True
i:7,合計:500500,是否相等:True
i:5,合計:500500,是否相等:True
i:8,合計:500500,是否相等:True
i:9,合計:500500,是否相等:True

寫多線程的時候需要注意,變量的作用域,否則程序運行出來的結果將不會是想要的結果,注意,注意變量作用域。

【精選推薦文章】

自行創業 缺乏曝光? 下一步"網站設計"幫您第一時間規劃公司的門面形象

網頁設計一頭霧水??該從何著手呢? 找到專業技術的網頁設計公司,幫您輕鬆架站!

評比前十大台北網頁設計台北網站設計公司知名案例作品心得分享

台北網頁設計公司這麼多,該如何挑選?? 網頁設計報價省錢懶人包"嚨底家"

MySQL數據庫詳解(三)MySQL的事務隔離剖析

提到事務,你肯定不陌生,和數據庫打交道的時候,我們總是會用到事務。最經典的例子就是轉賬,你要給朋友小王轉 100 塊錢,而此時你的銀行卡只有 100 塊錢。

轉賬過程具體到程序里會有一系列的操作,比如查詢餘額、做加減法、更新餘額等,這些操作必須保證是一體的,不然等程序查完之後,還沒做減法之前,你這 100 塊錢,完全可以藉著這個時間差再查一次,然後再給另外一個朋友轉賬,如果銀行這麼整,不就亂了么?這時就要用到“事務”這個概念了。

簡單來說,事務就是要保證一組數據庫操作,要麼全部成功,要麼全部失敗。在 MySQL 中,事務支持是在引擎層實現的。你現在知道,MySQL 是一個支持多引擎的系統,但並不是所有的引擎都支持事務。比如 MySQL 原生的 MyISAM 引擎就不支持事務,這也是 MyISAM 被InnoDB 取代的重要原因之一。

今天的文章里,我將會以 InnoDB 為例,剖析 MySQL 在事務支持方面的特定實現,並基於原理給出相應的實踐建議,希望這些案例能加深你對 MySQL 事務原理的理解。

隔離性與隔離級別

提到事務,你肯定會想到 ACID(Atomicity、Consistency、Isolation、Durability,即原子性、一致性、隔離性、持久性),今天我們就來說說其中 I,也就是“隔離性”。

當數據庫上有多個事務同時執行的時候,就可能出現臟讀(dirty read)、不可重複讀(non reapeatable read)、幻讀(phantom read)的問題,為了解決這些問題,就有了“隔離級別”的概念。

在談隔離級別之前,你首先要知道,你隔離得越嚴實,效率就會越低。因此很多時候,我們都要在二者之間尋找一個平衡點。SQL 標準的事務隔離級別包括:讀未提交(read uncommitted)、讀提交(read committed)、可重複讀(repeatable read)和串行化serializable )。

下面我逐一為你解釋:

  • 讀未提交是指,一個事務還沒提交時,它做的變更就能被別的事務看到。
  • 讀提交是指,一個事務提交之後,它做的變更才會被其他事務看到。
  • 可重複讀是指,一個事務執行過程中看到的數據,總是跟這個事務在啟動時看到的數據是一致的。當然在可重複讀隔離級別下,未提交變更對其他事務也是不可見的。
  • 串行化,顧名思義是對於同一行記錄,“寫”會加“寫鎖”,“讀”會加“讀鎖”。當出現讀寫鎖衝突的時候,后訪問的事務必須等前一個事務執行完成,才能繼續執行。
  • 其中“讀提交”和“可重複讀”比較難理解,所以我用一個例子說明這幾種隔離級別。假設數據表 T 中只有一列,其中一行的值為 1,下面是按照時間順序執行兩個事務的行為。
mysql> create table T(c int) engine=InnoDB;
insert into T(c) values(1);

我們來看看在不同的隔離級別下,事務 A 會有哪些不同的返回結果,也就是圖裡面 V1、V2、V3 的返回值分別是什麼。

若隔離級別是“讀未提交”, 則 V1 的值就是 2。這時候事務 B 雖然還沒有提交,但是結果已經被 A 看到了。因此,V2、V3 也都是 2。

若隔離級別是“讀提交”,則 V1 是 1,V2 的值是 2。事務 B 的更新在提交后才能被 A 看到。所以, V3 的值也是 2。

若隔離級別是“可重複讀”,則 V1、V2 是 1,V3 是 2。之所以 V2 還是 1,遵循的就是這個要求:事務在執行期間看到的數據前後必須是一致的。

若隔離級別是“串行化”,則在事務 B 執行“將 1 改成 2”的時候,會被鎖住。直到事務 A提交后,事務 B 才可以繼續執行。所以從 A 的角度看, V1、V2 值是 1,V3 的值是 2。

在實現上,數據庫裏面會創建一個視圖,訪問的時候以視圖的邏輯結果為準。在“可重複讀”隔離級別下,這個視圖是在事務啟動時創建的,整個事務存在期間都用這個視圖。在“讀提交”隔離級別下,這個視圖是在每個 SQL 語句開始執行的時候創建的。這裏需要注意的是,“讀未提交”隔離級別下直接返回記錄上的最新值,沒有視圖概念;而“串行化”隔離級別下直接用加鎖的方式來避免并行訪問。

我們可以看到在不同的隔離級別下,數據庫行為是有所不同的。Oracle 數據庫的默認隔離級別其實就是“讀提交”,因此對於一些從 Oracle 遷移到 MySQL 的應用,為保證數據庫隔離級別的一致,你一定要記得將 MySQL 的隔離級別設置為“讀提交”。

配置的方式是,將啟動參數 transaction-isolation 的值設置成 READ-COMMITTED。你可以用show variables 來查看當前的值。

mysql> show variables like 'transaction_isolation';
+-----------------------+----------------+
| Variable_name | Value |
+-----------------------+----------------+
| transaction_isolation | READ-COMMITTED |
+-----------------------+----------------+

總結來說,存在即合理,哪個隔離級別都有它自己的使用場景,你要根據自己的業務情況來定。我想你可能會問那什麼時候需要“可重複讀”的場景呢?我們來看一個數據校對邏輯的案例。

假設你在管理一個個人銀行賬戶表。一個表存了每個月月底的餘額,一個表存了賬單明細。這時候你要做數據校對,也就是判斷上個月的餘額和當前餘額的差額,是否與本月的賬單明細一致。

你一定希望在校對過程中,即使有用戶發生了一筆新的交易,也不影響你的校對結果。這時候使用“可重複讀”隔離級別就很方便。事務啟動時的視圖可以認為是靜態的,不受其他事務更新的影響。

事務隔離的實現

理解了事務的隔離級別,我們再來看看事務隔離具體是怎麼實現的。這裏我們展開說明“可重複讀”。

在 MySQL 中,實際上每條記錄在更新的時候都會同時記錄一條回滾操作。記錄上的最新值,通過回滾操作,都可以得到前一個狀態的值。

假設一個值從 1 被按順序改成了 2、3、4,在回滾日誌裏面就會有類似下面的記錄。

當前值是 4,但是在查詢這條記錄的時候,不同時刻啟動的事務會有不同的 read-view。如圖中看到的,在視圖 A、B、C 裏面,這一個記錄的值分別是 1、2、4,同一條記錄在系統中可以存在多個版本,就是數據庫的多版本併發控制(MVCC)。對於 read-view A,要得到 1,就必須將當前值依次執行圖中所有的回滾操作得到。

同時你會發現,即使現在有另外一個事務正在將 4 改成 5,這個事務跟 read-view A、B、C 對應的事務是不會衝突的。

你一定會問,回滾日誌總不能一直保留吧,什麼時候刪除呢?答案是,在不需要的時候才刪除。也就是說,系統會判斷,當沒有事務再需要用到這些回滾日誌時,回滾日誌會被刪除。

什麼時候才不需要了呢?就是當系統里沒有比這個回滾日誌更早的 read-view 的時候。基於上面的說明,我們來討論一下為什麼建議你盡量不要使用長事務。

長事務意味着系統裏面會存在很老的事務視圖。由於這些事務隨時可能訪問數據庫裏面的任何數據,所以這個事務提交之前,數據庫裏面它可能用到的回滾記錄都必須保留,這就會導致大量佔用存儲空間。

在 MySQL 5.5 及以前的版本,回滾日誌是跟數據字典一起放在 ibdata 文件里的,即使長事務最終提交,回滾段被清理,文件也不會變小。我見過數據只有 20GB,而回滾段有 200GB 的庫。最終只好為了清理回滾段,重建整個庫。

除了對回滾段的影響,長事務還佔用鎖資源,也可能拖垮整個庫,這個我們會在後面講鎖的時候展開。

事務的啟動方式

如前面所述,長事務有這些潛在風險,我當然是建議你盡量避免。其實很多時候業務開發同學並不是有意使用長事務,通常是由於誤用所致。MySQL 的事務啟動方式有以下幾種:

  1. 顯式啟動事務語句, begin 或 start transaction。配套的提交語句是 commit,回滾語句
    是 rollback。
  2. set autocommit=0,這個命令會將這個線程的自動提交關掉。意味着如果你只執行一個select 語句,這個事務就啟動了,而且並不會自動提交。這個事務持續存在直到你主動執行commit 或 rollback 語句,或者斷開連接。

有些客戶端連接框架會默認連接成功后先執行一個 set autocommit=0 的命令。這就導致接下來的查詢都在事務中,如果是長連接,就導致了意外的長事務。

因此,我會建議你總是使用 set autocommit=1, 通過顯式語句的方式來啟動事務。

但是有的開發同學會糾結“多一次交互”的問題。對於一個需要頻繁使用事務的業務,第二種方式每個事務在開始時都不需要主動執行一次 “begin”,減少了語句的交互次數。如果你也有這個顧慮,我建議你使用 commit work and chain 語法。

在 autocommit 為 1 的情況下,用 begin 顯式啟動的事務,如果執行 commit 則提交事務。如果執行 commit work and chain,則是提交事務並自動啟動下一個事務,這樣也省去了再次執行 begin 語句的開銷。同時帶來的好處是從程序開發的角度明確地知道每個語句是否處於事務中。

你可以在 information_schema 庫的 innodb_trx 這個表中查詢長事務,比如下面這個語句,用於查找持續時間超過 60s 的事務。

select * from information_schema.innodb_trx where TIME_TO_SEC(timediff(now(),trx_started))>60

小結

這篇文章裏面,我介紹了 MySQL 的事務隔離級別的現象和實現,根據實現原理分析了長事務存在的風險,以及如何用正確的方式避免長事務。希望我舉的例子能夠幫助你理解事務,並更好地使用 MySQL 的事務特性。

我給你留一個問題吧。你現在知道了系統裏面應該避免長事務,如果你是業務開發負責人同時也是數據庫負責人,你會有什麼方案來避免出現或者處理這種情況呢?

【精選推薦文章】

智慧手機時代的來臨,RWD網頁設計已成為網頁設計推薦首選

想知道網站建置、網站改版該如何進行嗎?將由專業工程師為您規劃客製化網頁設計及後台網頁設計

帶您來看台北網站建置台北網頁設計,各種案例分享

廣告預算用在刀口上,網站設計公司幫您達到更多曝光效益

監測元素resize

前言

近來有需求要做分頁,聽起來可能有點Low。 所以我要把Low的事情做得有點逼格。
分頁本身沒啥,但是數據量起來了,比如十萬。 要是不做點處理, 那你的頁面估計爽得很,機器也爽得很。 放心,我不會讓你這麼爽。

大量數據展現方案

比較流行的當然是虛擬滾動(無限滾動)。

  1. 始終展示的是有限的固定節點。
  2. 外層創建滾動層。

一句話,就是反覆利用固定節點展現數據。

其中還有兩個點

何時需要加載新的分頁數據

  1. scrollTop , clientHeight, scrollHeight
  2. IntersectionObserver (chrome 55+)

如何知道容器寬高變化
本文就圍繞着這個展開

因為使用的是react框架,使用了 react-window, react-window就是用來展現海量數據的react列表組件。
因為項目需要,還要不通尺寸一行展現不同數量的數據。 肯定有人就說,監聽window.resize。

沒錯,監聽resize一定程度,但是window.resize, 並不能讓我知道容器本身的尺寸,當

然可以通過getComputedStyle獲取。 要是window沒有resize的情況呢。

我就想實時的知道尺寸的變化。

尺寸變化監測方案

監測元素resize這裡有幾種方案的測試和源碼。

Cross-Browser, Event-based, Element Resize Detection

思路:

如果IE,直接註冊onresize(這個點贊啊)

否則: 創建 type為text/html的object

設置position為absolute, 高度100%, 寬度100% (這樣可以獲得父容器的寬高)

設置pointer-events:none,利用點擊穿透(讓object窗體變成幽靈)

object元素的高度變化后,通知訂閱者

resize事件節流

問題:

  1. 創建object
  2. 事件處理函數掛載了元素本身上

javascript-detect-element-resize

創建三個子元素,利用scroll事件來監測變化。

原理:

https://zhuanlan.zhihu.com/p/24887312
The scroll event is fired when the document view or an element has been scrolled.
當文檔視圖或者元素滾動的時候會觸發 scroll 事件。
也就是說元素滾動的時候會觸發這個事件,那麼什麼時候元素會滾動?當元素大於其父級元素,且父級元素允許其滾動的時候,該元素可以進行滾動。
換句話說,元素可以滾動意味着父子元素大小不一致,這是這個方法的核心。
那麼我們需要讓元素大小發生改變時,使得 scrollLeft 或者 scrollTop 發生改變,從而觸發 scroll 事件,進一步得知其大小發生了改變。

visibility: hidden; opacity: 0; position: absolute;讓自己變得虛無

addEventListener(“scroll”, scrollListener, true) 在捕捉階段攔截事件,使用false無效

div.expand-trigger 變大

div.expand-trigger 變小

animationstart來監聽显示,比如style.display = ‘none’然後style.display = ‘block’

問題:

  1. 額外創建四個元素節點以及一個style節點
  2. 事件都掛載了元素本身身上,

ResizeObserver

原生自帶的方案, 兼容性並不高, resize-observer-polyfill 基於resize和MutationOberver的polyfill實現了ResizeObserver。

 const resizeObserver = new ResizeObserver(entries => {
        for (let entry of entries) {
            
            console.log(entry.target.id, `height:${entry.contentRect.height}  width:${entry.contentRect.width}`);
        }
        });
        resizeObserver.observe(document.querySelector('#my_element'));
        resizeObserver.observe(document.querySelector('#my_element2'));

此外

當然,我覺得還

  1. 定時器 + getComputedStyle 也是很低成本的實現。
  2. resize + MutationOberver 也是很簡單的方案。

【精選推薦文章】

如何讓商品強力曝光呢? 網頁設計公司幫您建置最吸引人的網站,提高曝光率!!

想要讓你的商品在網路上成為最夯、最多人討論的話題?

網頁設計公司推薦更多不同的設計風格,搶佔消費者視覺第一線

不管是台北網頁設計公司台中網頁設計公司,全省皆有專員為您服務

想知道最厲害的台北網頁設計公司推薦台中網頁設計公司推薦專業設計師"嚨底家"!!

.NET Core 微服務之Polly重試策略

接着上一篇說,正好也是最近項目里用到了,正好拿過來整理一下,園子里也有一些文章介紹比我詳細。

簡單介紹一下紹輕量的故障處理庫 Polly  Polly是一個.NET彈性和瞬態故障處理庫

允許我們以非常順暢和線程安全的方式來執行諸如重試、斷路器、超時、隔離、緩存、後退等策略, 能為我們在微服務架構提供更穩定的服務。當然,目前的 Service Mesh 顯得更高大上,而且更強大,它更偏向從運維層面解決以上問題,不過這還是的看具體項目中怎麼去使用和決定了。

 

在微服務架構下,我們可能會遇到類似以下問題:

  1. 某些接口異常,最終造成應用程序池奔潰;
  2. 某些接口不穩定、偶爾超時,數據獲取異常;
  3. 某些服務不穩定,調用方連接不上;
  4. 某些服務異常,最終主服務掛掉(雪崩效應);

 

 當然在實際情況下,我們可能只需要確保提供給用戶的服務是可用狀態,不出現 “Service Unavailable” 這樣的畫面就好。至於接口偶爾異常,可能對某些類型的項目來說並不太關鍵,用戶可能通過重新請求、刷新頁面就可以解決,當然我們還可以在代碼層面做兼容,滿滿的try/catch、for/while 循環解決重試來保證更高的可靠性。

 這個時候Polly就能很好的起來作用,Polly 的使用相對比較簡單,當然還是得看項目結構。我們的主項目在調用微服務接口時使用了AOP,類似這種情況下,所以調用微服務的接口都是統一入口,所以我們只需要在AOP內加上 Polly 的一些策略,其他代碼不用做任何修改,就可以解決一些問題了。

安裝

Install-Package Polly

使用步驟說明

  1. 定義策略
  2. 執行方法

可以看一下代碼,我們項目主要使用的是Grpc這個框架,其他的微服務框架,使用起來大致差不多
public void Intercept(IInvocation invocation)
{
    // some code 
    try
    {
        // 創建一個策略,如果 invocation.Proceed 的執行出現 Grpc.Core.RpcException 異常,並且 StatusCode == Grpc.Core.StatusCode.Unavailable,則重試一次
        var policy = Policy
        .Handle<Grpc.Core.RpcException>(t => t.Status.StatusCode == Grpc.Core.StatusCode.Unavailable)
        .Retry(); // 默認一次

        // 將策略應用到 invocation.Proceed 方法上
        policy.Execute(invocation.Proceed);
    }
    catch (Exception ex)
    {
        // some code 
        Console.WriteLine($"{ ex.Message},{ex.StackTrace}");
    }
}

 

 

策略條件定義

策略的執行需要依賴於條件,Polly 支持對異常與結果進行策略條件定義。

異常

// 指定某個異常
Policy
  .Handle<SomeExceptionType>();

// 指定某個異常條件
Policy
  .Handle<SomeExceptionType>(ex => ex.xxx == "xxx")

// 指定多個異常
Policy
  .Handle<SomeExceptionType1>()
  .Or<SomeExceptionType2>()

// 指定多個可能異常條件
Policy
  .Handle<SomeExceptionType1>(ex => ex.xxx1 == "xxx")
  .Or<SomeExceptionType2>(ex => ex.xxx2 == "xxx")

返回結果

// 指定某個結果
Policy
  .HandleResult<ResponseMessage>(r => r.xxx == "xxx")

// 指定多個可能的結果
Policy
  .HandleResult<ResponseMessage>(r => r.xxx1 == "xxx")
  .OrResult<ResponseMessage>(r => r.xxx2 == "xxx")

重試策略(Retry )

// 指定異常下重試一次
Policy
  .Handle<SomeExceptionType>()
  .Retry();

// 指定異常下重試3次
Policy
  .Handle<SomeExceptionType>()
  .Retry(3);

// 指定異常下無限重試
Policy
  .Handle<SomeExceptionType>()
  .RetryForever();

// 每次重試之間等待指定的時間間隔
Policy
  .Handle<SomeExceptionType>()
  .WaitAndRetry(new[]
  {
    TimeSpan.FromSeconds(1),
    TimeSpan.FromSeconds(3),
    TimeSpan.FromSeconds(7)
  });

Retry 可以指定一個要執行的 Action。Action 參數:exception 當前異常信息,retryCount 當前執行第幾次,context 當前執行上下文信息。

測試一下:

private static int times = 0;

public static void TestPolicy()
{
    var policy = Policy
        .Handle<Exception>()
        .Retry(3, (exception, retryCount, context) => // 出異常會執行以下代碼
        {
            Console.WriteLine($"exception:{ exception.Message}, retryCount:{retryCount}, id:{context["id"]}, name:{context["name"]}");
        });

    try
    {
        // 通過 new Context 傳遞上下文信息
        var result = policy.Execute(Test, new Context("data", new Dictionary<string, object>() { { "id", "1" }, { "name", "beck" } }));
        Console.WriteLine($"result:{result}");
    }
    catch (Exception ex)
    {
        Console.WriteLine(ex.Message);
    }
}

private static string Test()
{
    // 每執行一次加1
    times++;

    // 前2次都拋異常
    if (times < 3)
    {
        throw new Exception("exception message");
    }
    return "success";
}

測試結果:

 

可以看到得到了咱們想要的效果,具體項目可以具體去實施,下一篇咱們接着說Polly的熔斷策略。感興趣可以自行搜索Polly的相關文檔看看。

參考鏈接

  • Polly
  • Polly Project
  • PollySamples

 

沒有彩蛋

 

【精選推薦文章】

自行創業 缺乏曝光? 下一步"網站設計"幫您第一時間規劃公司的門面形象

網頁設計一頭霧水??該從何著手呢? 找到專業技術的網頁設計公司,幫您輕鬆架站!

評比前十大台北網頁設計台北網站設計公司知名案例作品心得分享

台北網頁設計公司這麼多,該如何挑選?? 網頁設計報價省錢懶人包"嚨底家"