CQRS之旅——旅程6(我們系統的版本管理)

旅程6:我們系統的版本管理

準備下一站:升級和遷移

“變化是生活的調味品。”威廉·考珀

此階段的最高目標是了解如何升級包含實現CQRS模式和事件源的限界上下文的系統。團隊在這一階段實現的用戶場景包括對代碼的更改和對數據的更改:更改了一些現有的數據模式並添加了新的數據模式。除了升級系統和遷移數據外,團隊還計劃在沒有停機時間的情況下進行升級和遷移,以便在Microsoft Azure中運行實時系統。

本章的工作術語定義:

本章使用了一些術語,我們將在下面進行描述。有關更多細節和可能的替代定義,請參閱參考指南中的“深入CQRS和ES”。

  • Command(命令):命令是要求系統執行更改系統狀態的操作。命令是必須服從(執行)的一種指令,例如:MakeSeatReservation。在這個限界上下文中,命令要麼來自用戶發起請求時的UI,要麼來自流程管理器(當流程管理器指示聚合執行某個操作時)。單個接收方處理一個命令。命令總線(command bus)傳輸命令,然後命令處理程序將這些命令發送到聚合。發送命令是一個沒有返回值的異步操作。

  • 事件(Event):一個事件,比如OrderConfirmed,描述了系統中發生的一些事情,通常是一個命令的結果。領域模型中的聚合引發事件。事件也可以來自其他限界上下文。多個訂閱者可以處理特定的事件。聚合將事件發布到事件總線。處理程序在事件總線上註冊特定類型的事件,然後將事件傳遞給訂閱服務器。在訂單和註冊限界上下文中,訂閱者是流程管理器和讀取模型生成器。

  • 冪等性(Idempotency):冪等性是一個操作的特性,這意味着該操作可以多次應用而不改變結果。例如,“將x的值設置為10”的操作是冪等的,而“將x的值加1”的操作不是冪等的。在消息傳遞環境中,如果消息可以多次傳遞而不改變結果,則消息是冪等的:這可能是因為消息本身的性質,也可能是因為系統處理消息的方式。

用戶故事:

在這個過程的這個階段,團隊實現了下面描述的用戶故事。

不停機升級

V2版本的目標是升級系統,包括任何必要的數據遷移,而不需要把系統停機。如果這在當前實現中不可行,那麼停機時間應該最小化,並且應該修改系統,以便在將來支持零停機時間升級(從V3版本開始)。

Beth(業務經理)發言:

確保我們能夠在不停機的情況下進行升級,這對我們在市場中的信譽至關重要。

显示剩餘座位數量

目前,當註冊者創建一個訂單時,沒有显示每種座位類型的剩餘座位數量。當註冊者選擇購買座位時,UI應該显示此信息。

處理不需要付費的座位

目前,當註冊者選擇不需要付費的座位時,UI流仍然會將註冊者帶到支付頁面,即使不需要支付任何費用。系統應該檢測什麼時候沒有支付,並調整流程,讓註冊者直接進入訂單的確認頁面。

架構

該應用程序旨在部署到Microsoft Azure。在旅程的那個階段,應用程序由兩個角色組成,一個包含ASP.Net MVC Web應用程序的web角色和一個包含消息處理程序和領域對象的工作角色。應用程序在寫端和讀端都使用Azure SQL DataBase實例進行數據存儲。應用程序使用Azure服務總線來提供其消息傳遞基礎設施。下圖展示了這個高級體繫結構。

在研究和測試解決方案時,可以在本地運行它,可以使用Azure compute emulator,也可以直接運行MVC web應用程序,並運行承載消息處理程序和領域域對象的控制台應用程序。在本地運行應用程序時,可以使用本地SQL Server Express數據庫,並使用一個在SQL Server Express數據庫實現的簡單的消息傳遞基礎設施。

有關運行應用程序的選項的更多信息,請參見附錄1“發布說明”。

模式和概念

在旅程的這個階段,團隊處理的大多數關鍵挑戰都與如何最好地執行從V1到V2的遷移有關。本節將介紹其中的一些挑戰。

處理“事件定義發生更改”的情況

當團隊檢查V2的發布需求,很明顯,我們需要改變在訂單和註冊限界上下文中使用的一些事件來適應一些新特性:RegistrationProcessManager將會改變,當訂單有一個不需要付費的座位時系統將提供一個更好的用戶體驗。

訂單和註冊限界上下文使用事件源,因此在遷移到V2之後,事件存儲將包含舊事件,但將開始保存新事件。當系統事件被重放時,系統必須能正確處理所有的舊事件和新事件。

團隊考慮了兩種方法來處理系統中的這類更改。

在基礎設施中進行事件映射或過濾

在基礎設施中映射和過濾事件消息是一種選擇。此方法是對舊的事件消息和消息格式進行處理,在它們到達領域之前在基礎設施的某個位置處理它們。您可以過濾掉不再相關的舊消息,並使用映射將舊格式的消息轉換為新格式。這種方法最初比較複雜,因為它需要對基礎設施進行更改,但是它可以保持領域域的純粹,領域只需要理解當前的新事件集合就可以了。

在聚合中處理多個版本的消息

在聚合中處理多個版本的消息是另一種選擇。在這種方法中,所有消息類型(包括舊消息和新消息)都傳遞到領域,每個聚合必須能夠處理舊消息和新消息。從短期來看,這可能是一個合適的策略,但它最終會導致域模型受到遺留事件處理程序的污染。

團隊為V2版本選擇了這個選項,因為它包含了最少數量的代碼更改。

Jana(軟件架構師)發言:

當前在聚合中處理舊事件和新事件並不妨礙您以後使用第一種選擇:在基礎設施中使用映射/過濾機制。

履行消息冪等性

V2版本中要解決的一個關鍵問題是使系統更加健壯。在V1版本中,在某些場景中,可能會多次處理某些消息,導致系統中的數據不正確或不一致。

Jana(軟件架構師)發言:

消息冪等性在任何使用消息傳遞的系統中都很重要,這不僅僅是在實現CQRS模式或使用事件源的系統中。

在某些場景中,設計冪等消息是可能的,例如:使用“將座位配額設置為500”的消息,而不是“在座位配額中增加100”的消息。您可以安全地多次處理第一個消息,但不能處理第二個消息。

然而,並不總是能夠使用冪等消息,因此團隊決定使用Azure服務總線的重複刪除特性,以確保它只傳遞一次消息。團隊對基礎設施進行了一些更改,以確保Azure服務總線能夠檢測重複消息,並配置Azure服務總線來執行重複消息檢測。

要了解Contoso是如何實現這一點的,請參閱下面的“不讓命令消息重複”一節。此外,我們需要考慮系統中的消息處理程序如何從隊列和Topic檢索消息。當前的方法使用Azure服務總線peek/lock機制。這是一個分成三個階段的過程:

  1. 處理程序從隊列或Topic檢索消息,並在其中留下消息的鎖定副本。其他客戶端無法看到或訪問鎖定的消息。
  2. 處理程序處理消息。
  3. 處理程序從隊列中刪除鎖定的消息。如果鎖定的消息在固定時間后沒有解鎖或刪除,則解鎖該消息並使其可用,以便再次檢索。

如果步驟由於某種原因失敗,這意味着系統可以不止一次地處理消息。

Jana(軟件架構師)發言:

該團隊計劃在旅程的下一階段解決這個問題(步驟失敗的問題)。更多信息,請參見第7章“添加彈性和優化性能”。

阻止多次處理事件

在V1中,在某些場景里,如果在處理事件時發生錯誤,系統可能多次處理事件。為了避免這種情況,團隊修改了體繫結構,以便每個事件處理程序都有自己對Azure Topic的訂閱。下圖显示了兩個不同的模型。

在V1中,可能發生以下行為:

  1. EventProcessor實例從服務總線中的所有訂閱者那裡接收到OrderPlaced事件。
  2. EventProcessor實例有兩個已註冊的處理程序,RegistrationProcessManagerRouterOrderViewModelGenerator處理程序類,所以會在兩個裡都觸發調用Handle方法。
  3. OrderViewModelGenerator類中的Handle方法執行成功。
  4. RegistrationProcessManagerRouter類中的Handle方法拋出異常。
  5. EventProcessor實例捕獲到異常然後拋棄掉事件消息。消息將自動放回訂閱中。
  6. EventProcessor實例第二次從所有訂閱者那裡接收到OrderPlaced事件。
  7. 事件又觸發兩個處理方法,導致RegistrationProcessManagerRouter類和OrderViewModelGenerator第二次處理事件消息。
  8. 每當RegistrationProcessManagerRouter類拋出異常時,OrderViewModelGenerator類都會觸發處理該事件。

在V2模型中,如果處理程序類拋出異常,EventProcessor實例將事件消息放回與該處理程序類關聯的訂閱。重試邏輯現在只會導致EventProcessor實例重試引發異常的處理程序,因此沒有其他處理程序會重新處理消息。

集成事件的持久化

在V1版本中提出的一個問題是,系統如何持久化從會議管理限界上下文發送到訂單和註冊限界上下文的集成事件。這些事件包括關於會議創建和發布的信息,以及座位類型和配額更改的詳細信息。

在V1版本中,訂單和註冊上下文中的ConferenceViewModelGenerator類通過更新視圖模型並向SeatsAvailability聚合發送命令來處理這些事件,以告訴它更改座位配額值。

這種方法意味着訂單和註冊限界上下文不存儲任何歷史記錄,這可能會導致問題。例如,其他視圖從這裏中查找座椅類型描述時,這裏只包含座椅類型描述的最新值。因此,在其他地方重播一組事件可能會重新生成另一個包含不正確座椅類型描述的讀取模型投影。

團隊考慮了以下五個方法來糾正這種情況:

  • 將所有事件保存在原始限界上下文中(會議管理限界上下文中),並使用共享的事件存儲,訂單和註冊限界上下文中可以訪問該存儲來重播這些事件。接收限界上下文可以重放事件流,直到它需要查看的之前的座椅類型描述時為止。
  • 當所有事件到達接收限界上下文(訂單和註冊限界上下文)時保存它們。
  • 讓視圖模型生成器中的命令處理程序保存事件,只選擇它需要的那些。
  • 讓視圖模型生成器中的命令處理程序保存不同的事件,實際上就是為此視圖模型使用事件源。
  • 將來自所有限界上下文的所有命令和事件消息存儲在消息日誌中。

第一種選擇並不總是可行的。在這種特殊情況下,它可以工作,因為同一個團隊同時實現了限界上下文和基礎設施,使得使用共享事件存儲變得很容易。

Gary(CQRS專家)發言:

儘管從純粹主義者的角度來看,第一個選項破壞了限界上下文之間的嚴格隔離,但在某些場景中,它可能是一個可接受的實用解決方案。

第三種選擇可能存在的風險是,所需的事件集合可能在未來發生變化。如果我們現在不保存事件,它們將永遠丟失。

儘管第五個選項存儲了所有命令和事件,其中一些可能永遠都不需要再次引用,但它確實提供了一個完整的日誌,記錄了系統中發生的所有事情。這對於故障診斷很有用,還可以幫助您滿足尚未確定的需求。該團隊選擇了這個選項而不是選項二,因為它提供了一個更通用的機制,可能具有未來的好處。

持久化事件的目的是,當訂單和註冊上下文需要有關當前座位配額的信息時,可以回放這些事件,以便計算剩餘座位的數量。要一致地計算這些数字,必須始終以相同的順序回放事件。這種順序有幾種選擇:

  • 會議管理限界上下文發送事件的順序。
  • 訂單和註冊上下文接收事件的順序。
  • 訂單和註冊上下文處理事件的順序。

大多數情況下,這些順序是相同的。沒有什麼正確的順序。你只需要選擇一個和它保持一致就行了。因此,選擇由簡單性決定。在本例中,最簡單的方法是按照訂單和註冊限界上下文中處理程序接收事件的順序持久化事件(第二個選項)。

Markus(軟件開發人員)發言:

這種選擇通常不會出現在事件源中。每個聚合會都以固定的順序創建事件,這就是系統用於持久存儲事件的順序。在此場景中,集成事件不是由單個聚合創建的。

為這些事件保存時間戳也有類似的問題。如果將來需要查看特定時間剩餘的座位數量,那麼時間戳可能會很有用。這裏的選擇是,當事件在會議管理限界上下文中創建時,還是在訂單和註冊限界上下文中接收時,應該創建時間戳?當會議管理限界上下文創建事件時,訂單和註冊限界上下文可能由於某種原因離線。因此,團隊決定在會議管理有界上下文發布事件時創建時間戳。

消息排序

團隊創建並運行來驗證V1版本的驗收測試,凸顯出了消息排序的一個潛在問題:執行會議管理限界上下文的驗收測試向訂單和註冊限界上下文發送了一系列命令,這些命令有時會出現順序錯誤。

Markus(軟件開發人員)發言:

當人類用戶真實測試系統的這一部分時,不太會注意到這種效果,因為發出命令的時間間隔要長得多,這使得消息不太可能無序地到達。

團隊考慮了兩種方法來確保消息以正確的順序到達。

  • 第一個方法是使用消息會話,這是Azure服務總線的一個特性。如果您使用消息會話,這將確保會話內的消息以與它們發送時相同的順序傳遞。
  • 第二種方法是修改應用程序中的處理程序,通過使用發送消息時添加到消息中的序列號或時間戳來檢測無序消息。如果接收處理程序檢測到一條無序消息,它將拒絕該消息,並在處理了在被拒絕消息之前發送的消息之後,將其放回稍後處理的隊列或Topic。

在這種情況下,首選的解決方案是使用Azure服務總線消息會話,因為這隻需要對現有代碼進行更少的更改。這兩種方法都會給消息傳遞帶來一些額外的延遲,但是團隊並不認為這會對系統的性能產生顯著的影響。

實現細節

本節描述訂單和註冊限界上下文的實現的一些重要功能。您可能會發現擁有一份代碼拷貝很有用,這樣您就可以繼續學習了。您可以從Download center下載一個副本,或者在GitHub上查看存儲庫:https://github.com/mspnp/cqrs-journey-code。您可以從GitHub上的Tags頁面下載V2版本的代碼。

備註:不要期望代碼示例與參考實現中的代碼完全匹配。本章描述了CQRS過程中的一個步驟,隨着我們了解更多並重構代碼,實現可能會發生變化。

**添加對“不需要支付的訂單”的支持

做出這一改變有三個具體的目標,它們都是相關的。我們希望:

  • 修改RegistrationProcessManager類和相關聚合,以處理不需要支付的訂單。
  • 修改UI中的導航,當訂單不需要支付時跳過付款步驟。
  • 確保系統在升級到V2之後能夠正確地工作,包括使用新事件和舊事件。

RegistrationProcessManager類的更改

在此之前,RegistrationProcessManager類在收到來自UI的註冊者已完成支付的通知后發送了一個ConfirmOrderPayment命令。現在,如果有一個不需要支付訂單,UI將直接向訂單聚合發送一個ConfirmOrder命令。如果訂單需要支付,RegistrationProcessManager類在從UI接收到成功支付的通知后,再向訂單聚合發送一個ConfirmOrder命令。

Jana(軟件架構師)發言:

注意,命令的名稱已從ConfirmOrderPayment更改為ConfirmOrder。這反映了訂單不需要知道任何關於付款的信息。它只需要知道訂單已經確認。類似地,現在有一個新的OrderConfirmed事件用於替代舊的OrderPaymentConfirmed事件。

當訂單聚合接收到ConfirmOrder命令時,它將引發一個OrderConfirmed事件。除被持久化外,該事件還由以下對象處理:

  • OrderViewModelGenerator類,它在其中更新讀取模型中的訂單狀態。
  • SeatAssignments聚合,在其中初始化一個新的SeatAssignments實例。
  • RegistrationProcessManager類,它在其中觸發一個提交座位預訂的命令。

UI的更改

UI中的主要更改是在RegistrationController MVC控制器類中的SpecifyRegistrantAndPaymentDetails action里的。之前,此action方法返回InitiateRegistrationWithThirdPartyProcessorPayment(action result)。現在,如果Order對象的新IsFreeOfCharge屬性為true,它將返回一個CompleteRegistrationWithoutPayment(action result)。否則,它返回一個CompleteRegistrationWithThirdPartyProcessorPayment(action result)。

[HttpPost]
public ActionResult SpecifyRegistrantAndPaymentDetails(AssignRegistrantDetails command, string paymentType, int orderVersion)
{
    ...

    var pricedOrder = this.orderDao.FindPricedOrder(orderId);
    if (pricedOrder.IsFreeOfCharge)
    {
        return CompleteRegistrationWithoutPayment(command, orderId);
    }

    switch (paymentType)
    {
        case ThirdPartyProcessorPayment:

            return CompleteRegistrationWithThirdPartyProcessorPayment(command, pricedOrder, orderVersion);

        case InvoicePayment:
            break;

        default:
            break;
    }

    ...
}

CompleteRegistrationWithThirdPartyProcessorPayment將用戶重定向到ThirdPartyProcessorPayment action,CompleteRegistrationWithoutPayment方法將用戶直接重定向到ThankYou action。

數據遷移

會議管理限界上下文在其Azure SQL數據庫實例中的PricedOrders表中存儲來自訂單和註冊限界上下文的訂單信息。以前,會議管理限界上下文接收OrderPaymentConfirmed事件,現在它接收OrderConfirmed事件,該事件包含一個附加的IsFreeOfCharge屬性。這將成為數據庫中的一個新列。

Markus(軟件開發人員)發言:

在遷移過程中,我們不需要修改該表中的現有數據,因為布爾值的默認值為false。所有現有條目都是在系統支持不需要付費的訂單之前創建的。

在遷移過程中,任何正在運行的ConfirmOrderPayment命令都可能丟失,因為它們不再由訂單聚合處理。您應該驗證當前的命令總線沒有這些命令。

Poe(IT運維人員)發言:

我們需要仔細計劃如何部署V2版本,以便確保所有現有的、正在運行的ConfirmOrderPayment命令都由運行V1版本的工作角色實例處理。

系統將RegistrationProcessManager類實例的狀態保存到SQL數據庫表中。這個表的架構沒有變化。遷移后您將看到的惟一更改是StateValue列中的一個新添加值。這反映了RegistrationProcessManager類中的ProcessState枚舉中額外的PaymentConfirmationReceived值,如下面的代碼示例所示:

public enum ProcessState
{
    NotStarted = 0,
    AwaitingReservationConfirmation = 1,
    ReservationConfirmationReceived = 2,
    PaymentConfirmationReceived = 3,
}

在V1版本中,事件源系統為訂單聚合保存的事件包括OrderPaymentConfirmed事件。因此,事件存儲區包含此事件類型的實例。在V2版本中,OrderPaymentConfirmed事件被替換為OrderConfirmed事件。

團隊決定在V2版本中,當反序列化事件時,不在基礎設施級別映射和過濾事件。這意味着,當系統從事件存儲中重播這些事件時,處理程序必須同時理解舊事件和新事件。下面的代碼示例在SeatAssignmentsHandler類中显示了這一點:

static SeatAssignmentsHandler()
{
    Mapper.CreateMap<OrderPaymentConfirmed, OrderConfirmed>();
}

public SeatAssignmentsHandler(IEventSourcedRepository<Order> ordersRepo, IEventSourcedRepository<SeatAssignments> assignmentsRepo)
{
    this.ordersRepo = ordersRepo;
    this.assignmentsRepo = assignmentsRepo;
}

public void Handle(OrderPaymentConfirmed @event)
{
    this.Handle(Mapper.Map<OrderConfirmed>(@event));
}

public void Handle(OrderConfirmed @event)
{
    var order = this.ordersRepo.Get(@event.SourceId);
    var assignments = order.CreateSeatAssignments();
    assignmentsRepo.Save(assignments);
}

您還可以在OrderViewModelGenerator類中看到同樣的技術。

Order類中的方法略有不同,因為這是持久化到事件存儲中的事件之一。下面的代碼示例显示了Order類中受保護構造函數的一部分:

protected Order(Guid id)
    : base(id)
{
    ...
    base.Handles<OrderPaymentConfirmed>(e => this.OnOrderConfirmed(Mapper.Map<OrderConfirmed>(e)));
    base.Handles<OrderConfirmed>(this.OnOrderConfirmed);
    ...
}

Jana(軟件架構師)發言:

以這種方式處理舊事件對於這個場景非常簡單,因為惟一需要更改的是事件的名稱。如果事件的屬性也發生了變化,情況會更加複雜。將來,Contoso將考慮在基礎設施中進行映射,以避免遺留事件污染領域模型。

在UI中显示剩餘座位

做出這一改變有三個具體的目標,它們都是相關的。我們想要:

  • 修改系統,在會議系統的讀模型中包含每個座位類型的剩餘座位數量信息。
  • 修改UI以显示每種座位類型的剩餘座位數量。
  • 確保升級到V2后系統功能正常。

向讀模型添加關於剩餘座位數量的信息

系統要能显示剩餘座位數量的信息來自兩個地方:

  • 當業務客戶創建新的座位類型或修改座位配額時,會議管理限界上下文將引發SeatCreatedSeatUpdated事件。
  • 在訂單和註冊限界上下文中,當註冊者創建一個訂單的時候,可用座位(SeatsAvailability)聚合將引發SeatsReserved、SeatsReservationCancelled和AvailableSeatsChanged事件。

備註:ConferenceViewModelGenerator類不使用SeatCreatedSeatUpdated事件。

訂單和註冊限界上下文中的ConferenceViewModelGenerator類現在處理這些事件,並使用它們來計算和存儲讀模型中的座位類型數量。下面的代碼示例显示了ConferenceViewModelGenerator類中的相關處理程序:

public void Handle(AvailableSeatsChanged @event)
{
    this.UpdateAvailableQuantity(@event, @event.Seats);
}

public void Handle(SeatsReserved @event)
{
    this.UpdateAvailableQuantity(@event, @event.AvailableSeatsChanged);
}

public void Handle(SeatsReservationCancelled @event)
{
    this.UpdateAvailableQuantity(@event, @event.AvailableSeatsChanged);
}

private void UpdateAvailableQuantity(IVersionedEvent @event, IEnumerable<SeatQuantity> seats)
{
    using (var repository = this.contextFactory.Invoke())
    {
        var dto = repository.Set<Conference>().Include(x => x.Seats).FirstOrDefault(x => x.Id == @event.SourceId);
        if (dto != null)
        {
            if (@event.Version > dto.SeatsAvailabilityVersion)
            {
                foreach (var seat in seats)
                {
                    var seatDto = dto.Seats.FirstOrDefault(x => x.Id == seat.SeatType);
                    if (seatDto != null)
                    {
                        seatDto.AvailableQuantity += seat.Quantity;
                    }
                    else
                    {
                        Trace.TraceError("Failed to locate Seat Type read model being updated with id {0}.", seat.SeatType);
                    }
                }

                dto.SeatsAvailabilityVersion = @event.Version;

                repository.Save(dto);
            }
            else
            {
                Trace.TraceWarning ...
            }
        }
        else
        {
            Trace.TraceError ...
        }
    }
}

UpdateAvailableQuantity方法將事件上的版本與讀模型的當前版本進行比較,以檢測可能的重複消息。

Markus(軟件開發人員)發言:

此檢查僅檢測重複的消息,而不是超出序列的消息。

修改UI以显示剩餘的座位數量

現在,當UI向會議的讀模型查詢座位類型列表時,列表包括當前可用的座位數量。下面的代碼示例显示了RegistrationController MVC控制器如何使用SeatType類的AvailableQuantity

private OrderViewModel CreateViewModel()
{
    var seatTypes = this.ConferenceDao.GetPublishedSeatTypes(this.ConferenceAlias.Id);
    var viewModel =
        new OrderViewModel
        {
            ConferenceId = this.ConferenceAlias.Id,
            ConferenceCode = this.ConferenceAlias.Code,
            ConferenceName = this.ConferenceAlias.Name,
            Items =
                seatTypes.Select(
                    s =>
                        new OrderItemViewModel
                        {
                            SeatType = s,
                            OrderItem = new DraftOrderItem(s.Id, 0),
                            AvailableQuantityForOrder = s.AvailableQuantity,
                            MaxSelectionQuantity = Math.Min(s.AvailableQuantity, 20)
                        }).ToList(),
        };

    return viewModel;
}

數據遷移

保存會議讀模型數據的數據庫有一個新列來保存用於檢查重複事件的版本號,而保存座位類型讀模型數據有一個新列來保存可用的座椅數量。

作為數據遷移的一部分,有必要為每個可用座位(SeatsAvailability)聚合重放事件存儲中的所有事件,以便正確計算可用數量。

不讓命令消息重複

系統目前使用Azure服務總線傳輸消息。當系統從ConferenceProcessor類的啟動代碼初始化Azure服務總線時,它配置Topic來檢測重複的消息,如下面的ServiceBusConfig類的代碼示例所示:

private void CreateTopicIfNotExists() 
{     
    var topicDescription =         
        new TopicDescription(this.topic)         
        {             
            RequiresDuplicateDetection = true,
            DuplicateDetectionHistoryTimeWindow = topic.DuplicateDetectionHistoryTimeWindow,         
        };     
    try     
    {         
        this.namespaceManager.CreateTopic(topicDescription);     
    }     
    catch (MessagingEntityAlreadyExistsException) { } 
} 
備註:您可以在Settings.xml文件中配置DuplicateDetectionHistoryTimeWindow
可以向Topic元素添加這個屬性。默認值是1小時。

但是,為了使重複檢測工作正常,您必須確保每個消息都有一個惟一的ID。下面的代碼示例显示了MarkSeatsAsReserved命令:

public class MarkSeatsAsReserved : ICommand
{
    public MarkSeatsAsReserved()
    {
        this.Id = Guid.NewGuid();
        this.Seats = new List<SeatQuantity>();
    }

    public Guid Id { get; set; }

    public Guid OrderId { get; set; }

    public List<SeatQuantity> Seats { get; set; }

    public DateTime Expiration { get; set; }
}

CommandBus類中的BuildMessage方法使用命令Id創建一個惟一的消息Id, Azure服務總線可以使用這個消息Id來檢測重複:

private BrokeredMessage BuildMessage(Envelope command) 
{ 
    var stream = new MemoryStream(); 
    ...

    var message = new BrokeredMessage(stream, true);
    if (!default(Guid).Equals(command.Body.Id))
    {
        message.MessageId = command.Body.Id.ToString();
    }

...

    return message;
} 

保證消息順序

團隊決定使用Azure服務總線消息會話來保證系統中的消息順序。

系統從ConferenceProcessor類中的OnStart方法配置Azure服務總線Topic和訂閱。Settings.xml配置文件中的配置指定了具體的訂閱使用會話。ServiceBusConfig類中的以下代碼示例显示了系統如何創建和配置訂閱。

private void CreateSubscriptionIfNotExists(NamespaceManager namespaceManager, TopicSettings topic, SubscriptionSettings subscription)
{
    var subscriptionDescription =
        new SubscriptionDescription(topic.Path, subscription.Name)
        {
            RequiresSession = subscription.RequiresSession
        };

    try
    {
        namespaceManager.CreateSubscription(subscriptionDescription);
    }
    catch (MessagingEntityAlreadyExistsException) { }
}

以下來自SessionSubscriptionReceiver類的代碼示例演示了如何使用會話接收消息:

private void ReceiveMessages(CancellationToken cancellationToken)
{
    while (!cancellationToken.IsCancellationRequested)
    {
        MessageSession session;
        try
        {
            session = this.receiveRetryPolicy.ExecuteAction<MessageSession>(this.DoAcceptMessageSession);
        }
        catch (Exception e)
        {
            ...
        }

        if (session == null)
        {
            Thread.Sleep(100);
            continue;
        }


        while (!cancellationToken.IsCancellationRequested)
        {
            BrokeredMessage message = null;
            try
            {
                try
                {
                    message = this.receiveRetryPolicy.ExecuteAction(() => session.Receive(TimeSpan.Zero));
                }
                catch (Exception e)
                {
                    ...
                }

                if (message == null)
                {
                    // If we have no more messages for this session, exit and try another.
                    break;
                }

                this.MessageReceived(this, new BrokeredMessageEventArgs(message));
            }
            finally
            {
                if (message != null)
                {
                    message.Dispose();
                }
            }
        }

        this.receiveRetryPolicy.ExecuteAction(() => session.Close());
    }
}

private MessageSession DoAcceptMessageSession()
{
    try
    {
        return this.client.AcceptMessageSession(TimeSpan.FromSeconds(45));
    }
    catch (TimeoutException)
    {
        return null;
    }
}

Markus(軟件開發人員)發言:

您可能會發現,將使用消息會話的ReceiveMessages方法的這個版本與SubscriptionReceiver類中的原始版本進行比較是很有用的。

您必須確保當你發送消息包含一個會話ID,這樣才能使用消息會話接收一條消息。系統使用事件的SourceID作為會話ID,如下面的代碼示例所示的EventBus類中的BuildMessage方法:

var message = new BrokeredMessage(stream, true);
message.SessionId = @event.SourceId.ToString();

通過這種方式,您可以確保以正確的順序接收來自單個源的所有消息。

Poe(IT運維人員)發言:

在V2版本中,團隊更改了系統創建Azure服務總線Topic和訂閱的方式。之前,SubscriptionReceiver類創建了它們(如果它們還不存在)。現在,系統在應用程序啟動時使用配置數據創建它們。這發生在啟動過程的早期,以避免在系統初始化訂閱之前將消息發送到Topic時丟失消息的風險。

然而,只有當消息按正確的順序傳遞到總線上時,會話才能保證按順序傳遞消息。如果系統異步發送消息,則必須特別注意確保消息以正確的順序放在總線上。在我們的系統中,來自每個單獨聚合實例的事件按順序到達是很重要的,但是我們不關心來自不同聚合實例的事件的順序。因此,儘管系統異步發送事件,EventStoreBusPublisher實例仍然會在發送下一個事件之前等待前一個事件已發送的確認。以下來自TopicSender類的示例說明了這一點:

public void Send(Func<BrokeredMessage> messageFactory)
{
    var resetEvent = new ManualResetEvent(false);
    Exception exception = null;
    this.retryPolicy.ExecuteAction(
        ac =>
        {
            this.DoBeginSendMessage(messageFactory(), ac);
        },
        ar =>
        {
            this.DoEndSendMessage(ar);
        },
        () => resetEvent.Set(),
        ex =>
        {
            Trace.TraceError("An unrecoverable error occurred while trying to send a message:\r\n{0}", ex);
            exception = ex;
            resetEvent.Set();
        });

    resetEvent.WaitOne();
    if (exception != null)
    {
        throw exception;
    }
}

Jana(軟件架構師)發言:

此代碼示例展示了系統如何使用Transient Fault Handling Application Block來讓異步調用可靠。

有關消息排序和Azure服務總線的更多信息,請參見Microsoft Azure Queues and Microsoft Azure Service Bus Queues – Compared and Contrasted

有關異步發送消息和排序的信息,請參閱博客文章Microsoft Azure Service Bus Splitter and Aggregator

從會議管理限界上下文中持久化事件

團隊決定創建一個包含所有發送的命令和事件的消息日誌。這將使訂單和註冊限界上下文能夠從會議管理限界上下文查詢此日誌,以獲取其構建讀模型所需的事件。這不是事件源,因為我們沒有使用這些事件來重建聚合的狀態,儘管我們使用類似的技術來捕獲和持久化這些集成事件。

Gary(CQRS專家)發言:

此消息日誌確保不會丟失任何消息,以便將來能夠滿足其他需求。

向消息添加額外元數據

系統現在將所有消息保存到消息日誌中。為了方便查詢特定命令或事件,系統現在向每個消息添加了更多的元數據。以前,惟一的元數據是事件類型,現在,事件元數據包括事件類型、命名空間、程序集和路徑。系統將元數據添加到EventBus類中的事件和CommandBus類中的命令中。

捕獲消息並將消息持久化到消息日誌中

系統使用Azure服務總線中對會議/命令和會議/事件topic的額外訂閱來接收系統中每條消息的副本。然後,它將消息附加到Azure表存儲中。下面的代碼示例显示了AzureMessageLogWriter類的實例,它用於將消息保存到表中:

public class MessageLogEntity : TableServiceEntity 
{ 
    public string Kind { get; set; }     
    public string CorrelationId { get; set; }     
    public string MessageId { get; set; }     
    public string SourceId { get; set; }     
    public string AssemblyName { get; set; }     
    public string Namespace { get; set; }     
    public string FullName { get; set; }     
    public string TypeName { get; set; }     
    public string SourceType { get; set; }     
    public string CreationDate { get; set; }     
    public string Payload { get; set; } 
} 

Kind屬性指定消息是命令還是事件。MessageId和CorrelationId屬性由消息傳遞基礎設施設置的,其餘屬性是從消息元數據中設置的。

下面的代碼示例显示了這些消息的分區和RowKey的定義:

PartitionKey = message.EnqueuedTimeUtc.ToString("yyyMM"),
RowKey = message.EnqueuedTimeUtc.Ticks.ToString("D20") + "_" + message.MessageId

注意,RowKey保存了消息最初發送的順序,並添加到消息ID上,以確保惟一性,以防兩條消息同時入隊。

Jana(軟件架構師)發言:

這與事件存儲不同,在事件存儲區中,分區鍵標識聚合實例,而RowKey標識聚合的版本號。

數據遷移

當Contoso將系統從V1遷移到V2時,它將使用消息日誌在訂單和註冊限界上下文中重建會議和價格訂單的讀模型。

Gary(CQRS專家)發言:

Contoso可以在需要重建與聚合無關的事件構建的讀模型時來使用消息日誌,例如來自會議管理限界上下文的集成事件。

會議讀模型包含會議的信息,並包含來自會議管理限界上下文的ConferenceCreated、ConferenceUpdated、ConferencePublished、ConferenceUnpublished、SeatCreated和SeatUpdated事件的信息。

價格訂單讀模型持有來自於SeatCreated和SeatUpdated事件的信息,這些事件來自於會議管理限界上下文。

然而,在V1中,這些事件消息沒有被持久化,因此讀模型不能在V2中重新填充。為了解決這個問題,團隊實現了一個數據遷移實用程序,它使用一種最佳方法來生成包含要存儲在消息日誌中的丟失數據的事件。例如,在遷移到V2之後,消息日誌不包含ConferenceCreated事件,因此遷移實用程序在會議管理限界上下文使用的數據庫中找到這些信息,並創建丟失的事件。您可以在MigrationToV2項目的Migrator類中的GeneratePastEventLogMessagesForConferenceManagement方法中看到這是如何完成的。

Markus(軟件開發人員)發言:

您可以在這個類中看到,Contoso還將所有現有的事件源事件複製到消息日誌中。

如下面所示,Migrator類中的RegenerateViewModels方法重新構建讀取的模型。它通過調用Query方法從消息日誌中檢索所有事件,然後使用ConferenceViewModelGeneratorPricedOrderViewModelUpdater類來處理消息。

internal void RegenerateViewModels(AzureEventLogReader logReader, string dbConnectionString)
{
    var commandBus = new NullCommandBus();

    Database.SetInitializer<ConferenceRegistrationDbContext>(null);

    var handlers = new List<IEventHandler>();
    handlers.Add(new ConferenceViewModelGenerator(() => new ConferenceRegistrationDbContext(dbConnectionString), commandBus));
    handlers.Add(new PricedOrderViewModelUpdater(() => new ConferenceRegistrationDbContext(dbConnectionString)));

    using (var context = new ConferenceRegistrationMigrationDbContext(dbConnectionString))
    {
        context.UpdateTables();
    }

    try
    {
        var dispatcher = new MessageDispatcher(handlers);
        var events = logReader.Query(new QueryCriteria { });

        dispatcher.DispatchMessages(events);
    }
    catch
    {
        using (var context = new ConferenceRegistrationMigrationDbContext(dbConnectionString))
        {
            context.RollbackTablesMigration();
        }

        throw;
    }
}

Jana(軟件架構師)發言:

查詢可能不會很快,因為它將從多個分區檢索實體。

注意這個方法如何使用NullCommandBus實例來接收來自ConferenceViewModelGenerator實例的任何命令,因為我們只是在這裏重新構建讀模型。

以前,PricedOrderViewModelGenerator使用ConferenceDao類來獲取關於座位的信息。現在,它是自治的,並直接處理SeatCreatedSeatUpdated事件來維護這些信息。作為遷移的一部分,必須將此信息添加到讀模型中。在前面的代碼示例中,PricedOrderViewModelUpdater類只處理SeatCreatedSeatUpdated事件,並將缺失的信息添加到價格訂單讀模型中。

從V1遷移到V2

從V1遷移到V2需要更新已部署的應用程序代碼並遷移數據。在生產環境中執行遷移之前,應該始終在測試環境中演練遷移。以下是所需步驟:

  1. 將V2版本部署到Azure的staging環境中。V2版本有一個MaintenanceMode屬性,最初設置為true。在此模式下,應用程序向用戶显示一條消息,說明站點當前正在進行維護,而工作角色將不處理消息。
  2. 準備好之後,將V2版本(仍然處於維護模式,MaintenanceMode為true)切換到Azure生產環境中。
  3. 讓V1版本(現在在staging環境中運行)運行幾分鐘,以確保所有正在運行的消息都完成了它們的處理。
  4. 運行遷移程序來遷移數據(參見下面)。
  5. 成功完成數據遷移后,將每種工作角色的MaintenanceMode屬性更改為false。
  6. V2版本現在運行在Azure中。

Jana(軟件架構師)發言:

團隊考慮使用單獨的應用程序在升級過程中向用戶显示一條消息,告訴他們站點正在進行維護。然而,在V2版本中使用MaintenanceMode屬性提供了一個更簡單的過程,併為應用程序添加了一個潛在有用的新特性。

Poe(IT運維人員)發言:

由於對事件存儲的更改,不可能執行從V1到V2的無停機升級。然而,團隊所做的更改將確保從V2遷移到V3將不需要停機時間。

Markus(軟件開發人員)發言:

團隊對遷移實用程序應用了各種優化,例如批處理操作,以最小化停機時間。

下面幾節總結了從V1到V2的數據遷移。這些步驟中的一些在前面已經討論過,涉及到應用程序的特定更改或增強。

團隊為V2引入的一個更改是,將所有命令和事件消息的副本保存在消息日誌中,以便作為未來的證據,通過捕獲將來可能使用的所有內容來保證應用程序的安全性。遷移過程考慮到了這個新特性。

因為遷移過程複製了大量的數據,所以您應該在Azure工作角色中運行遷移過程,以最小化成本。遷移實用程序是一個控制台應用程序,因此您可以使用Azure和遠程桌面服務。有關如何在Azure角色實例中運行應用程序的信息,請參見Using Remote Desktop with Microsoft Azure Roles。

Poe(IT運維人員)發言:

在一些組織中,安全策略不允許您在Azure生產環境使用遠程桌面服務。但是,您只需要一個在遷移期間承載遠程桌面會話的工作角色,您可以在遷移完成后刪除它。您還可以將遷移代碼作為工作角色而不是控制台應用程序運行,並確保它記錄遷移的狀態,以便您驗證。

為會議管理限界上下文生成過去的日誌消息

遷移過程的一部分是在可能的情況下重新創建V1版本處理后丟棄的消息,然後將它們添加到消息日誌中。在V1版本中,所有從會議管理限界上下文發送到訂單和註冊限界上下文的集成事件都以這種方式丟失了。系統不能重新創建所有丟失的事件,但可以創建表示遷移時系統狀態的事件。

有關更多信息,請參見本章前面的“從會議管理限界上下文中持久化事件”一節。

遷移事件源里的事件

在V2版本中,事件存儲為每個事件存儲額外的元數據,以便於查詢事件。遷移過程將所有事件從現有事件存儲複製到具有新模式的新事件存儲。

Jana(軟件架構師)發言:

原始事件不會以任何方式更新,而是被視為不可變的。

同時,系統將所有這些事件的副本添加到V2版本中引入的消息日誌中。

有關更多信息,請參見MigrationToV2項目中Migrator類中的MigrateEventSourcedAndGeneratePastEventLogs

重建讀模型**

V2版本包括對訂單和註冊限界上下文中讀模型定義的幾個更改。MigrationToV2項目在訂單和註冊限界上下文中重新構建會議的讀模型和價格訂單的讀模型。

有關更多信息,請參見本章前面的“從會議管理限界上下文中持久化事件”一節。

對測試的影響

在這個過程的這個階段,測試團隊繼續擴展驗收測試集合。他們還創建了一組測試來驗證數據遷移過程。

再說SpecFlow

之前,這組SpecFlow測試以兩種方式實現:通過自動化web瀏覽器模擬用戶交互,或者直接在MVC控制器上操作。這兩種方法都有各自的優缺點,我們在第4章“擴展和增強訂單和註冊限界上下文”中討論過。

在與另一位專家討論了這些測試之後,團隊還實現了第三種方法。從領域驅動設計(DDD)方法的角度來看,UI不是領域模型的一部分,核心團隊的重點應該是在領域專家的幫助下理解領域,並在領域中實現業務邏輯。UI只是机械部分,用於使用戶能夠與領域進行交互。因此,驗收測試應該包括驗證領域模型是否以領域專家期望的方式工作。因此,團隊使用SpecFlow創建了一組驗收測試,這些測試旨在在不影響系統UI部分的情況下測試領域。

下面的代碼示例显示了SelfRegistrationEndToEndWithDomain.feature文件,該文件在Conference.AcceptanceTests項目中的Features\Domain\Registration文件夾里,注意When和Then子句怎麼使用命令和事件的。

Gary(CQRS專家)發言:

通常,如果您的領域模型只使用聚合,您會期望When子句發送命令,Then子句查看事件或異常。然而,在本例中,領域模型包含一個通過發送命令來響應事件的流程管理器。測試將檢查是否發送了所有預期的命令,並引發了所有預期的事件。

Feature: Self Registrant end to end scenario for making a Registration for a Conference site with Domain Commands and Events
    In order to register for a conference
    As an Attendee
    I want to be able to register for the conference, pay for the Registration Order and associate myself with the paid Order automatically


Scenario: Make a reservation with the selected Order Items
Given the list of the available Order Items for the CQRS summit 2012 conference
    | seat type                 | rate | quota |
    | General admission         | $199 | 100   |
    | CQRS Workshop             | $500 | 100   |
    | Additional cocktail party | $50  | 100   |
And the selected Order Items
    | seat type                 | quantity |
    | General admission         | 1        |
    | Additional cocktail party | 1        |
When the Registrant proceeds to make the Reservation
    # command:RegisterToConference
Then the command to register the selected Order Items is received 
    # event: OrderPlaced
And the event for Order placed is emitted
    # command: MakeSeatReservation
And the command for reserving the selected Seats is received
    # event: SeatsReserved
And the event for reserving the selected Seats is emitted
    # command: MarkSeatsAsReserved
And the command for marking the selected Seats as reserved is received
    # event: OrderReservationCompleted 
And the event for completing the Order reservation is emitted
    # event: OrderTotalsCalculated
And the event for calculating the total of $249 is emitted

下面的代碼示例显示了feature文件的一些步驟實現。這些步驟使用命令總線發送命令。

[When(@"the Registrant proceed to make the Reservation")]
public void WhenTheRegistrantProceedToMakeTheReservation()
{
    registerToConference = ScenarioContext.Current.Get<RegisterToConference>();
    var conferenceAlias = ScenarioContext.Current.Get<ConferenceAlias>();

    registerToConference.ConferenceId = conferenceAlias.Id;
    orderId = registerToConference.OrderId;
    this.commandBus.Send(registerToConference);

    // Wait for event processing
    Thread.Sleep(Constants.WaitTimeout);
}

[Then(@"the command to register the selected Order Items is received")]
public void ThenTheCommandToRegisterTheSelectedOrderItemsIsReceived()
{
    var orderRepo = EventSourceHelper.GetRepository<Registration.Order>();
    Registration.Order order = orderRepo.Find(orderId);

    Assert.NotNull(order);
    Assert.Equal(orderId, order.Id);
}

[Then(@"the event for Order placed is emitted")]
public void ThenTheEventForOrderPlacedIsEmitted()
{
    var orderPlaced = MessageLogHelper.GetEvents<OrderPlaced>(orderId).SingleOrDefault();

    Assert.NotNull(orderPlaced);
    Assert.True(orderPlaced.Seats.All(
        os => registerToConference.Seats.Count(cs => cs.SeatType == os.SeatType && cs.Quantity == os.Quantity) == 1));
}

在遷移過程中發現的bug

當測試團隊在遷移之後在系統上運行測試時,我們發現訂單和註冊限界上下文中座位類型的數量與遷移之前的數量不同。調查揭示了以下原因。

如果會議從未發布過,則會議管理限界上下文允許業務客戶刪除座位類型,但不會引發集成事件向訂單和註冊限界上下文報告這一情況。所以,當業務客戶創建新的座位類型時,訂單和註冊限界上下文從會議管理限界上下文接收事件,而不是當業務客戶刪除座位類型時。

遷移過程的一部分創建一組集成事件,以替換V1版本處理后丟棄的事件。它通過讀取會議管理限界上下文使用的數據庫來創建這些事件。此過程沒有為已刪除的座位類型創建集成事件。

總之,在V1版本中,已刪除的座位類型錯誤地出現在訂單和註冊限界上下文的讀模型中。在遷移到V2版本之後,這些已刪除的座位類型沒有出現在訂單和註冊限界上下文的讀模型中。

Poe(IT運維人員)發言:

測試遷移過程不僅驗證遷移是否按預期運行,而且可能揭示應用程序本身的bug。

總結

在我們旅程的這個階段,我們對系統進行了版本控制,並完成了V2偽生產版本。這個新版本包含了一些額外的功能和特性,比如支持不需要付費的訂單和在UI中显示更多信息。

我們還對基礎設施做了一些改變。例如,我們使更多的消息具有冪等性,現在持久化集成事件。下一章將描述我們旅程的最後階段,我們將繼續增強基礎設施,並在準備發布V3版本時加強系統。

【精選推薦文章】

如何讓商品強力曝光呢? 網頁設計公司幫您建置最吸引人的網站,提高曝光率!!

想要讓你的商品在網路上成為最夯、最多人討論的話題?

網頁設計公司推薦更多不同的設計風格,搶佔消費者視覺第一線

不管是台北網頁設計公司台中網頁設計公司,全省皆有專員為您服務

想知道最厲害的台北網頁設計公司推薦台中網頁設計公司推薦專業設計師"嚨底家"!!

【拆分版】Docker-compose構建Logstash多實例,基於7.1.0

【拆分版】Docker-compose構建Logstash多實例

寫在最前

說起Logstash,這個組件並沒有什麼集群的概念,與其說是集群,不如說是各自去收集日誌分析過濾存儲到Elasticsearch中。這裏做個多實例的Logstash,其實本質上只是為Logstash指定好佔用的端口,輸入輸出的配置的掛載,如是而已。

本文配置為紅框中的部分:Logstash多節點收集的數據,統統輸出數據到es-tribe,讓這個協調節點自己去負載均衡寫入數據。

配置詳見git倉庫 https://github.com/hellxz/docker-logstash-multiple.git
如有疑問或本文寫得有出入的地方,期望評論指定。

目錄結構

├── docker-ls-multiple-down.sh
├── docker-ls-multiple-up.sh
├── logstash-01
│   ├── config
│   │   ├── logstash.conf
│   │   └── logstash.yml
│   ├── docker-compose.yml
│   └── .env
├── logstash-02
│   ├── config
│   │   ├── logstash.conf
│   │   └── logstash.yml
│   ├── docker-compose.yml
│   └── .env
└── logstash-03
    ├── config
    │   ├── logstash.conf
    │   └── logstash.yml
    ├── docker-compose.yml
    └── .env

文件說明

logstash-01舉例說明

.envdocker-compose.yml提供了Logstash配置文件目錄的位置,如果不放置到其他位置,無需更改

# .env file for docker-compose default. please be careful.
# logstash config dir mount set. change inside dir config file to change logstash cluster settings.
# default use relation path. don't change if you don't know what means.
LOGSTASH_CONFIG_DIR=./config

docker-compose.yml 為docker-compose的配置文件,這裏只讀取了.env的配置文件的路徑,並把路徑下的logstash.conf掛載到容器中logstash目錄下pipeline/logstash.conf,掛載logstash.yml到logstash目錄下config/logstash.yml

version: "3"
services:
    logstash-1:
        image: logstash:7.1.0
        container_name: logstash-1
        volumes:
            - ${LOGSTASH_CONFIG_DIR}/logstash.conf:/usr/share/logstash/pipeline/logstash.conf:rw
            - ${LOGSTASH_CONFIG_DIR}/logstash.yml:/usr/share/logstash/config/logstash.yml:rw
        network_mode: "host"

logstash.conf為模板文件,輸入輸出以及配置都可以在這裏修改

input {     #輸入
  kafka {   #使用kafka方式輸入
    bootstrap_servers => "kafka1:9092,kafka2:9093,kafka3:9094" #kafka集群節點列表
    topics => ["all_logs"] #訂閱名為all_logs的topic
    group_id => "logstash" #設置組為logstash
    codec => json #轉換為json
  }
}

filter { #過濾分詞等都在這裏配置,暫時未配置

}

output {     #輸出
  elasticsearch { #輸出到es
    hosts => ["10.2.114.110:9204"] #es的路徑
    index => "all-logs-%{+YYYY.MM.dd}" #輸出到es的索引名稱,這裡是每天一個索引
    #user => "elastic"
    #password => "changeme"
  }
  stdout {
    codec => rubydebug
  }
}

此處設置並不是本文中的重點,有興趣和需要請參考其它文章的相關配置

logstash.yml 為logstash的配置文件,只寫了些與集群相關的,還有更多請參考其它文章.

# set now host ip to http.host
http.host: 10.2.114.110
# set the es-tribe-node host. let logstash monitor the es.
xpack.monitoring.elasticsearch.hosts:
- http://10.2.114.110:9204
# enable or disable the logstash monitoring the es.
xpack.monitoring.enabled: true

這裏沒有指定Logstash啟動時的端口號,Logstash默認端口為9600,多實例在同主機時,會自動分配9600后的端口
另外兩個腳本文件,僅在使用同一台主機時使用,便捷啟動/關閉多節點Logstash

使用說明

  1. 需要確保多台主機均能正常ping通
  2. 確保Zookeeper集群與Kafka集群已經啟動,並且Logstash訂閱的borkers的列表能對得起來
  3. 確保Elasticsearch集群正常啟動
  4. 宿主機/etc/hosts添加kafka1kafka2kafka3映射到對應的kafka所在的宿主機ip
  5. 修改每個Logstash目錄下的config/logstash.conf中的輸出es部分的ip到es-tribe對應的宿主機ip
  6. 修改每個Logstash目錄下的config/logstash.yml中的http.host為當前宿主機ip, 修改xpack.monitoring.elasticsearch.hosts為當前es-tribe宿主機ip與port
  7. 進入每個Logstash目錄執行docker-compose up -d以啟動集群,執行docker-compose down以關閉集群

本文系原創文章,謝絕轉載

【精選推薦文章】

自行創業 缺乏曝光? 下一步"網站設計"幫您第一時間規劃公司的門面形象

網頁設計一頭霧水??該從何著手呢? 找到專業技術的網頁設計公司,幫您輕鬆架站!

評比前十大台北網頁設計台北網站設計公司知名案例作品心得分享

台北網頁設計公司這麼多,該如何挑選?? 網頁設計報價省錢懶人包"嚨底家"

JavaScript系列–JavaScript數組高階函數reduce()方法詳解及奇淫技巧

一、前言

reduce() 方法接收一個函數作為累加器,數組中的每個值(從左到右)開始縮減,最終計算為一個值。

reduce() 可以作為一個高階函數,用於函數的 compose。

reduce()方法可以搞定的東西,for循環,或者forEach方法有時候也可以搞定,那為啥要用reduce()?這個問題,之前我也想過,要說原因還真找不到,唯一能找到的是:通往成功的道路有很多,但是總有一條路是最捷徑的,亦或許reduce()逼格更高。

 

二、語法

arr.reduce(callback,initialValue)

返回最後一個值,reduce 為數組中的每一個元素依次執行回調函數,不包括數組中被刪除或從未被賦值的元素,接受四個參數:初始值(或者上一次回調函數的返回值),當前元素值,當前索引,調用 reduce 的數組。

 

三、實例解析intialValue參數

1、第一個例子:

var arr = [1, 2, 3, 4]; var sum = arr.reduce(function(prev, cur, index, arr) { console.log(prev, cur, index); return prev + cur; }) console.log(arr, sum);

打印結果:
1 2 1
3 3 2
6 4 3
[1, 2, 3, 4] 10

 

2、第二個例子

var arr = [1, 2, 3, 4]; var sum = arr.reduce(function(prev, cur, index, arr) { console.log(prev, cur, index); return prev + cur; },0) //注意這裏設置了初始值 console.log(arr, sum);

打印結果:
0 1 0
1 2 1
3 3 2
6 4 3
[1, 2, 3, 4] 10

這個例子index是從0開始的,第一次的prev的值是我們設置的初始值0,數組長度是4,reduce函數循環4次。

結論:如果沒有提供initialValue,reduce 會從索引1的地方開始執行 callback 方法,跳過第一個索引。如果提供initialValue,從索引0開始。

 

注意:如果這個數組為空,運用reduce是什麼情況?

var arr = []; var sum = arr.reduce(function(prev, cur, index, arr) { console.log(prev, cur, index); return prev + cur; }) //報錯,"TypeError: Reduce of empty array with no initial value"

但是要是我們設置了初始值就不會報錯,如下:

var arr = []; var sum = arr.reduce(function(prev, cur, index, arr) { console.log(prev, cur, index); return prev + cur; },0) console.log(arr, sum); // [] 0

所以一般來說,提供初始值更加安全。

 

四、reduce簡單用法

當然最簡單的就是我們常用的數組求和,求乘積了。

var arr = [1, 2, 3, 4]; var sum = arr.reduce((x,y)=>x+y) var mul = arr.reduce((x,y)=>x*y) console.log( sum ); //求和,10 console.log( mul ); //求乘積,24

 

五、reduce高級用法

(1)計算數組中每個元素出現的次數

let names = ['Alice', 'Bob', 'Tiff', 'Bruce', 'Alice']; let nameNum = names.reduce((pre,cur)=>{ if(cur in pre){ pre[cur]++ }else{ pre[cur] = 1 } return pre },{}) console.log(nameNum); //{Alice: 2, Bob: 1, Tiff: 1, Bruce: 1}

 

(2)數組去重

let arr = [1,2,3,4,4,1] let newArr = arr.reduce((pre,cur)=>{ if(!pre.includes(cur)){ return pre.concat(cur) }else{ return pre } },[]) console.log(newArr);// [1, 2, 3, 4]

 

(3)將二維數組轉化為一維

let arr = [[0, 1], [2, 3], [4, 5]] let newArr = arr.reduce((pre,cur)=>{ return pre.concat(cur) },[]) console.log(newArr); // [0, 1, 2, 3, 4, 5]

 

(4)將多維數組轉化為一維

let arr = [[0, 1], [2, 3], [4,[5,6,7]]] const newArr = function(arr){ return arr.reduce((pre,cur)=>pre.concat(Array.isArray(cur)?newArr(cur):cur),[]) } console.log(newArr(arr)); //[0, 1, 2, 3, 4, 5, 6, 7]

 

(5)對象里的屬性求和

var result = [ { subject: 'math', score: 10 }, { subject: 'chinese', score: 20 }, { subject: 'english', score: 30 } ]; var sum = result.reduce(function(prev, cur) { return cur.score + prev; }, 0); console.log(sum) //60

 

(6)將[1,3,1,4]轉為数字1314

function addDigitValue(prev,curr,curIndex,array){ var exponent = (array.length -1) -curIndex; var digitValue = curr*Math.pow(10,exponent); return prev + digitValue; } var arr6 = [1,3,1,4]; var result7 = arr6.reduce(addDigitValue,0) console.info('result7',result7)

 

【精選推薦文章】

智慧手機時代的來臨,RWD網頁設計已成為網頁設計推薦首選

想知道網站建置、網站改版該如何進行嗎?將由專業工程師為您規劃客製化網頁設計及後台網頁設計

帶您來看台北網站建置台北網頁設計,各種案例分享

廣告預算用在刀口上,網站設計公司幫您達到更多曝光效益

『開發技巧』Python音頻操作工具PyAudio上手教程

『開發技巧』Python音頻操作工具PyAudio上手教程

0.引子

當需要使用Python處理音頻數據時,使用python讀取與播放聲音必不可少,下面介紹一個好用的處理音頻PyAudio工具包。

PyAudio是Python開源工具包,由名思義,是提供對語音操作的工具包。提供錄音播放處理等功能,可以視作語音領域的OpenCv。

 

1.簡介

 

PyAudio為跨平台音頻I / O庫PortAudio提供Python 綁定。使用PyAudio,您可以輕鬆地使用Python在各種平台上播放和錄製音頻,例如GNU / Linux,Microsoft Windows和Apple Mac OS X / macOS。

PyAudio的靈感來自:

  • pyPortAudio / fastaudio:PortAudio v18 API的Python綁定。
  • tkSnack:Tcl / Tk和Python的跨平台聲音工具包。

 

2.安裝

 

目前的版本是PyAudio v0.2.11。在大多數平台上使用pip安裝PyAudio。對於v0.2.9之前的版本,PyAudio分發安裝二進制文件,這些文件 存檔在這裏。

 

微軟Windows 

使用pip安裝:

python -m pip install pyaudio

筆記:

  • 如果pip尚未與您的Python安裝捆綁在一起,請在此處獲取 。
  • pip將獲取並安裝PyAudio輪(預先打包的二進制文件)。目前,有車輪兼容Python 2.7,3.4,3.5和3.6 的 官方發行版。對於這些版本,可以使用32位和64位車輪。
  • 這些二進制文件包括使用MinGW構建的PortAudio v19 v190600_20161030。它們僅支持Windows MME API,包括對DirectX,ASIO等的支持。如果需要支持未包含的API,則需要編譯PortAudio和PyAudio。

 

Apple Mac OS X.

使用Homebrew安裝必備的portaudio庫,然後使用pip安裝PyAudio:

brew install portaudio 
pip install pyaudio

筆記:

  • 如果尚未安裝,請下載 Homebrew。
  • pip將下載PyAudio源代碼併為您的Python版本構建它。
  • Homebrew和構建PyAudio還需要安裝Xcode命令行工具(更多信息)。

 

Debian / Ubuntu

使用包管理器安裝PyAudio:

sudo apt-get install python-pyaudio python3-pyaudio

如果沒有最新版本的PyAudio,請使用pip安裝它:

pip install pyaudio

筆記:

  • pip將下載PyAudio源併為您的系統構建它。請務必事先安裝portaudio庫開發包(portaudio19-dev)和python開發包(python-all-dev)。
  • 為了更好地隔離系統包,請考慮在virtualenv中安裝PyAudio 。

 

PyAudio來源

源代碼可從Python Package Index(PyPI)下載:pypi.python.org/pypi/PyAudio。

或克隆git存儲庫:

git clone https://people.csail.mit.edu/hubert/git/pyaudio.git

要從源代碼構建PyAudio,您還需要構建 PortAudio v19。有關為各種平台構建PyAudio的一些說明,請參閱編譯提示。要使用Microsoft Visual Studio構建PyAudio,請查看Sebastian Audet的說明。

 

 

3.示例

1).採集音頻

下面以一段代碼演示如何從計算機麥克風採集一段音頻,採集音頻時長 4s,保存文件 output.wav

使用了tqdm模塊,可以方便显示出來讀取過程,如下:

* recording
100%|██████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 172/172 [00:03<00:00, 43.40it/s] 
* done recording

import pyaudio import wave from tqdm import tqdm def record_audio(wave_out_path,record_second): CHUNK = 1024 FORMAT = pyaudio.paInt16 CHANNELS = 2 RATE = 44100 p = pyaudio.PyAudio() stream = p.open(format=FORMAT, channels=CHANNELS, rate=RATE, input=True, frames_per_buffer=CHUNK) wf = wave.open(wave_out_path, 'wb') wf.setnchannels(CHANNELS) wf.setsampwidth(p.get_sample_size(FORMAT)) wf.setframerate(RATE) print("* recording") for i in tqdm(range(0, int(RATE / CHUNK * record_second))): data = stream.read(CHUNK) wf.writeframes(data) print("* done recording") stream.stop_stream() stream.close() p.terminate() wf.close() record_audio("output.wav",record_second=4)

要使用PyAudio,首先使用pyaudio.PyAudio()(1)實例化PyAudio ,它設置portaudio系統。

要錄製或播放音頻,請使用pyaudio.PyAudio.open() (2)在所需設備上打開所需音頻參數的流。這設置了pyaudio.Stream播放或錄製音頻。

通過使用流式傳輸pyaudio.Stream.write()音頻數據或使用流式傳輸音頻數據來播放音頻 pyaudio.Stream.read()。(3)

請注意,在“阻止模式”中,每個pyaudio.Stream.write()或 pyaudio.Stream.read()阻止直到所有給定/請求的幀都被播放/記錄。或者,要動態生成音頻數據或立即處理錄製的音頻數據,請使用下面概述的“回調模式”。

使用pyaudio.Stream.stop_stream()暫停播放/錄製,並pyaudio.Stream.close()終止流。(4)

最後,使用pyaudio.PyAudio.terminate()(5)終止portaudio會話

 

2).播放音頻

下面使用播放的功能來播放1)中保存的音頻 output.wav

通過tqdm,显示播放進度條,如下:


100%|██████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 172/172 [00:03<00:00, 43.40it/s] 

"""PyAudio Example: Play a WAVE file.""" import pyaudio import wave from tqdm import tqdm def play_audio(wave_path): CHUNK = 1024 wf = wave.open(wave_path, 'rb') # instantiate PyAudio (1) p = pyaudio.PyAudio() # open stream (2) stream = p.open(format=p.get_format_from_width(wf.getsampwidth()), channels=wf.getnchannels(), rate=wf.getframerate(), output=True) # read data data = wf.readframes(CHUNK) # play stream (3) datas = [] while len(data) > 0: data = wf.readframes(CHUNK) datas.append(data) for d in tqdm(datas): stream.write(d) # stop stream (4) stream.stop_stream() stream.close() # close PyAudio (5) p.terminate() play_audio("output.wav")

2).以回調方式播放音頻

當需要在執行其他程序時同時播放音頻,可以使用回調的方式播放,示例代碼如下:

"""PyAudio Example: Play a WAVE file.""" import pyaudio import wave from tqdm import tqdm import time def play_audio_callback(wave_path): CHUNK = 1024 wf = wave.open(wave_path, 'rb') # instantiate PyAudio (1) p = pyaudio.PyAudio() def callback(in_data, frame_count, time_info, status): data = wf.readframes(frame_count) return (data, pyaudio.paContinue) # open stream (2) stream = p.open(format=p.get_format_from_width(wf.getsampwidth()), channels=wf.getnchannels(), rate=wf.getframerate(), output=True, stream_callback=callback) # read data stream.start_stream() while stream.is_active(): time.sleep(0.1) # stop stream (4) stream.stop_stream() stream.close() # close PyAudio (5) p.terminate() play_audio_callback("output.wav")

 

Reference:

1.http://people.csail.mit.edu/hubert/pyaudio/

 

 

【精選推薦文章】

如何讓商品強力曝光呢? 網頁設計公司幫您建置最吸引人的網站,提高曝光率!!

想要讓你的商品在網路上成為最夯、最多人討論的話題?

網頁設計公司推薦更多不同的設計風格,搶佔消費者視覺第一線

不管是台北網頁設計公司台中網頁設計公司,全省皆有專員為您服務

想知道最厲害的台北網頁設計公司推薦台中網頁設計公司推薦專業設計師"嚨底家"!!

技術境界的二三四

兩種能力境界

1.解決問題

在工程師中有一種人被稱為”救火隊長“。哪裡出了問題,哪裡就有他的身影,他的出現,燃眉之急就有救了。他們是解決問題的高人。但是“救火隊長”在晉陞上往往會遇到瓶頸。

對標人物:漫威-美國隊長

每天嚴陣以待,隨時準備拯救世界。無法接受鋼鐵俠防患於未然用機器來解決問題解放自己的方式。

 

2.發現問題

更高的高人會問一個問題:“為什麼每天會任務追着你跑?你為什麼沒從根源上解決所有的問題?”一個在辦公室里和下面人一起研究茶道的領導要比和大家一起加班到半夜的領導受歡迎。因為他們從更大的層面上杜絕了對救火隊長的需要。

對標人物:《罪惡黑名單》雷丁頓

雷丁頓總是運籌帷幄游刃有餘。所以在形勢危急的情況下,他總是評價哪家的什麼東西好吃,或者任何別人沒有注意到的生活瑣事,觀眾並不恨他。因為知道他早就搞定了一切。

 

 

三種需求對應境界

1.對應需求

一個兢兢業業的工程師或團隊,對產品有求必應。項目初期這樣沒有錯,時間一長,就會遇到維護和擴展性問題。

對標:下圖的狀態有木有很熟悉的趕腳?

 

2.快速對應需求

通過系統性的設計和不斷的迭代重構,一個需求來了,通過少量開發或者不開發就可以完成。每周上班五天,三天用來團建。很好,直到公司創始人完成了最初的宏圖偉業,連高層也不知道要干什麼,公司開始走下坡了。

對標:請參考《浪潮之巔》

 

3.引領需求

在線上跑着的服務就會產生數據,通過數據的分析,自己的觀察思考,推演出新的商機和需求,開拓更大的市場。

對標:請參考google的7-2-1原則。

 

 

四種技術運用境界

1.會用

很多面試者在面試中被淘汰時很不服氣,這些我會用,給我分配的活我都干出來了。為什麼不要我?答案很簡單,你這個工作別人也能幹。所以聰明的老闆寧願花4個人的錢招聘3個人干5個人的活。所以怎麼才能獲得一份收入不錯的工作?

2.知道各種優劣勢,知道怎麼用更好

公司絕對不會鼓勵重複造輪子,他們更鼓勵用好輪子。所以深入透徹的技術調研分析,根據場景選擇了合適的技術是個不錯的開始。但是現有的技術並不是為自己定製的。當自己用的足夠深,就發現很多方面,現有技術確實不能滿足自己的業務需要。

3.理解原理及技術血緣,深入運用

特別是一些新技術,由於場景覆蓋還不是很全面,需要在此基礎上做一些二次開發或者內部改造,甚至重寫。重寫重寫着,突然覺得自己有更好的想法?

4.創造技術

技術創造價值,技術引領一個時代。

 

總結

持續有聲音 

 

近期文章

代碼榮辱觀-以運用風格為榮,以隨意編碼為恥

你看不懂的spring原理是因為不知道這幾個概念

應屆生offer指南

【精選推薦文章】

自行創業 缺乏曝光? 下一步"網站設計"幫您第一時間規劃公司的門面形象

網頁設計一頭霧水??該從何著手呢? 找到專業技術的網頁設計公司,幫您輕鬆架站!

評比前十大台北網頁設計台北網站設計公司知名案例作品心得分享

台北網頁設計公司這麼多,該如何挑選?? 網頁設計報價省錢懶人包"嚨底家"

高級Java工程師必備 —– 深入分析 Java IO (三)

概述

Java IO即Java 輸入輸出系統。不管我們編寫何種應用,都難免和各種輸入輸出相關的媒介打交道,其實和媒介進行IO的過程是十分複雜的,這要考慮的因素特別多,比如我們要考慮和哪種媒介進行IO(文件、控制台、網絡),我們還要考慮具體和它們的通信方式(順序、隨機、二進制、按字符、按字、按行等等)。Java類庫的設計者通過設計大量的類來攻克這些難題,這些類就位於java.io包中。

在JDK1.4之後,為了提高Java IO的效率,Java又提供了一套新的IO,Java New IO簡稱Java NIO。它在標準java代碼中提供了高速的面向塊的IO操作。本篇文章重點介紹Java IO,關於Java NIO請參考我的另兩篇文章: 

高級Java工程師必備 —– 深入分析 Java IO (一)BIO

高級Java工程師必備 —– 深入分析 Java IO (二)NIO

Java IO類庫的框架

首先看個圖:

 Java IO的類型

雖然java IO類庫龐大,但總體來說其框架還是很清楚的。從是讀媒介還是寫媒介的維度看,Java IO可以分為:

  1. 輸入流:InputStream和Reader
  2. 輸出流:OutputStream和Writer

而從其處理流的類型的維度上看,Java IO又可以分為:

  1. 字節流:InputStream和OutputStream
  2. 字符流:Reader和Writer

下面這幅圖就清晰的描述了JavaIO的分類:

字節流 字符流
輸入流 InputStream Reader
輸出流 OutputStream Writer

我們的程序需要通過InputStream或Reader從數據源讀取數據,然後用OutputStream或者Writer將數據寫入到目標媒介中。其中,InputStream和Reader與數據源相關聯,OutputStream和writer與目標媒介相關聯。

Java IO的基本用法

Java IO :字節流

通過上面的介紹我們已經知道,字節流對應的類應該是InputStream和OutputStream,而在我們實際開發中,我們應該根據不同的媒介類型選用相應的子類來處理。下面我們就用字節流來操作文件媒介:

例1,用字節流寫文件

public static void writeByteToFile() throws IOException{
    String hello= new String( "hello word!");
     byte[] byteArray= hello.getBytes();
    File file= new File( "d:/test.txt");
     //因為是用字節流來寫媒介,所以對應的是OutputStream 
     //又因為媒介對象是文件,所以用到子類是FileOutputStream
    OutputStream os= new FileOutputStream( file);
     os.write( byteArray);
     os.close();
}

例2,用字節流讀文件

public static void readByteFromFile() throws IOException{
    File file= new File( "d:/test.txt");
     byte[] byteArray= new byte[( int) file.length()];
     //因為是用字節流來讀媒介,所以對應的是InputStream
     //又因為媒介對象是文件,所以用到子類是FileInputStream
    InputStream is= new FileInputStream( file);
     int size= is.read( byteArray);
    System. out.println( "大小:"+size +";內容:" +new String(byteArray));
     is.close();
}

CopyFileDemo

package com.chenhao.io.byteIO;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;

/**
 * @author ChenHao
 *
 */
public class CopyFileDemo {

    /**
     * @param args
     * @throws FileNotFoundException 
     */
    public static void main(String[] args) {
        String src ="E:/xp/test";
        String dest="e:/xp/test/4.jpg";
        try {
            copyFile(src,dest);
        } catch (FileNotFoundException e) {
            e.printStackTrace();
            System.out.println("文件不存在");
        } catch (IOException e) {
            e.printStackTrace();
            System.out.println("拷貝文件失敗|關閉流失敗");
        }
    }
    /**
     * 文件的拷貝
     * @param  源文件路徑
     * @param  目錄文件路徑
     * @throws FileNotFoundException,IOException
     * @return 
     */
    public static void copyFile(String srcPath,String destPath) throws FileNotFoundException,IOException {
        //1、建立聯繫 源(存在且為文件) +目的地(文件可以不存在)  
        File src =new File(srcPath);
        File dest =new File(destPath);
        if(! src.isFile()){ //不是文件或者為null
            System.out.println("只能拷貝文件");
            throw new IOException("只能拷貝文件");
        }
        //2、選擇流
        InputStream is =new FileInputStream(src);
        OutputStream os =new FileOutputStream(dest);
        //3、文件拷貝   循環+讀取+寫出
        byte[] flush =new byte[1024];
        int len =0;
        //讀取
        while(-1!=(len=is.read(flush))){
            //寫出
            os.write(flush, 0, len);
        }
        os.flush(); //強制刷出
        
        //關閉流
        os.close();
        is.close();
    }

}

Java IO :字符流

同樣,字符流對應的類應該是Reader和Writer。下面我們就用字符流來操作文件媒介:

例3,用字符流讀文件

public static void writeCharToFile() throws IOException{
    String hello= new String( "hello word!");
    File file= new File( "d:/test.txt");
    //因為是用字符流來讀媒介,所以對應的是Writer,又因為媒介對象是文件,所以用到子類是FileWriter
    Writer os= new FileWriter( file);
    os.write( hello);
    os.close();
}

例4,用字符流寫文件

public static void readCharFromFile() throws IOException{
    File file= new File( "d:/test.txt");
    //因為是用字符流來讀媒介,所以對應的是Reader
    //又因為媒介對象是文件,所以用到子類是FileReader
    Reader reader= new FileReader( file);
    char [] byteArray= new char[( int) file.length()];
    int size= reader.read( byteArray);
    System. out.println( "大小:"+size +";內容:" +new String(byteArray));
    reader.close();
}

Java IO :字節流轉換為字符流

字節流可以轉換成字符流,java.io包中提供的InputStreamReader類就可以實現,當然從其命名上就可以看出它的作用。其實這涉及到另一個概念,IO流的組合,後面我們詳細介紹。下面看一個簡單的例子:

例5 ,字節流轉換為字符流

public static void convertByteToChar() throws IOException{
    File file= new File( "d:/test.txt");
    //獲得一個字節流
    InputStream is= new FileInputStream( file);
    //把字節流轉換為字符流,其實就是把字符流和字節流組合的結果。
    Reader reader= new InputStreamReader( is);
    char [] byteArray= new char[( int) file.length()];
    int size= reader.read( byteArray);
    System. out.println( "大小:"+size +";內容:" +new String(byteArray));
    is.close();
    reader.close();
}

Java IO:文件媒介操作

例6 ,File操作

public class FileDemo {
  public static void main(String[] args) {
         //檢查文件是否存在
        File file = new File( "d:/test.txt");
         boolean fileExists = file.exists();
        System. out.println( fileExists);
         //創建文件目錄,若父目錄不存在則返回false
        File file2 = new File( "d:/fatherDir/subDir");
         boolean dirCreated = file2.mkdir();
        System. out.println( dirCreated);
         //創建文件目錄,若父目錄不存則連同父目錄一起創建
        File file3 = new File( "d:/fatherDir/subDir2");
         boolean dirCreated2 = file3.mkdirs();
        System. out.println( dirCreated2);
        File file4= new File( "d:/test.txt");
         //判斷長度
         long length = file4.length();
         //重命名文件
         boolean isRenamed = file4.renameTo( new File("d:/test2.txt"));
         //刪除文件
         boolean isDeleted = file4.delete();
        File file5= new File( "d:/fatherDir/subDir");
         //是否是目錄
         boolean isDirectory = file5.isDirectory();
         //列出文件名
        String[] fileNames = file5.list();
         //列出目錄
        File[]   files = file4.listFiles();
  }
}

隨機讀取File文件

通過上面的例子我們已經知道,我們可以用FileInputStream(文件字符流)或FileReader(文件字節流)來讀文件,這兩個類可以讓我們分別以字符和字節的方式來讀取文件內容,但是它們都有一個不足之處,就是只能從文件頭開始讀,然後讀到文件結束。

但是有時候我們只希望讀取文件的一部分,或者是說隨機的讀取文件,那麼我們就可以利用RandomAccessFile。RandomAccessFile提供了seek()方法,用來定位將要讀寫文件的指針位置,我們也可以通過調用getFilePointer()方法來獲取當前指針的位置,具體看下面的例子:

例7,隨機讀取文件

public static void randomAccessFileRead() throws IOException {
     // 創建一個RandomAccessFile對象
    RandomAccessFile file = new RandomAccessFile( "d:/test.txt", "rw");
     // 通過seek方法來移動讀寫位置的指針
     file.seek(10);
     // 獲取當前指針
     long pointerBegin = file.getFilePointer();
     // 從當前指針開始讀
     byte[] contents = new byte[1024];
     file.read( contents);
     long pointerEnd = file.getFilePointer();
    System. out.println( "pointerBegin:" + pointerBegin + "\n" + "pointerEnd:" + pointerEnd + "\n" + new String(contents));
     file.close();
}

例8,隨機寫入文件

public static void randomAccessFileWrite() throws IOException {
     // 創建一個RandomAccessFile對象
     RandomAccessFile file = new RandomAccessFile( "d:/test.txt", "rw");
     // 通過seek方法來移動讀寫位置的指針
     file.seek(10);
     // 獲取當前指針
     long pointerBegin = file.getFilePointer();
     // 從當前指針位置開始寫
     file.write( "HELLO WORD".getBytes());
     long pointerEnd = file.getFilePointer();
     System. out.println( "pointerBegin:" + pointerBegin + "\n" + "pointerEnd:" + pointerEnd + "\n" );
     file.close();
}

Java IO:BufferedInputStream和BufferedOutputStream

BufferedInputStream顧名思義,就是在對流進行寫入時提供一個buffer來提高IO效率。在進行磁盤或網絡IO時,原始的InputStream對數據讀取的過程都是一個字節一個字節操作的,而BufferedInputStream在其內部提供了一個buffer,在讀數據時,會一次讀取一大塊數據到buffer中,這樣比單字節的操作效率要高的多,特別是進程磁盤IO和對大量數據進行讀寫的時候,能提升IO性能。

使用BufferedInputStream十分簡單,只要把普通的輸入流和BufferedInputStream組合到一起即可。我們把上面的例2改造成用BufferedInputStream進行讀文件,請看下面例子:

例10 ,用緩衝流讀文件

public static void readByBufferedInputStream() throws IOException {
     File file = new File( "d:/test.txt");
     byte[] byteArray = new byte[( int) file.length()];
     //可以在構造參數中傳入buffer大小
     InputStream is = new BufferedInputStream( new FileInputStream(file),2*1024);
     int size = is.read( byteArray);
     System. out.println( "大小:" + size + ";內容:" + new String(byteArray));
     is.close();
}

BufferedOutputStream的情況和BufferedInputStream一致,在這裏就不多做描述了。

copyFile

package com.chenhao.io.buffered;

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
/**
 * 字節流文件拷貝+緩衝流 ,提高性能
 * 緩衝流(節點流)
 * @author ChenHao
 *
 */
public class BufferedByteDemo {

    public static void main(String[] args) {
        String src ="E:/xp/test";
        String dest="e:/xp/test/4.jpg";
        try {
            copyFile(src,dest);
        } catch (FileNotFoundException e) {
            e.printStackTrace();
            System.out.println("文件不存在");
        } catch (IOException e) {
            e.printStackTrace();
            System.out.println("拷貝文件失敗|關閉流失敗");
        }
    }
    /**
     * 文件的拷貝
     * @param  源文件路徑
     * @param  目錄文件路徑
     * @throws FileNotFoundException,IOException
     * @return 
     */
    public static void copyFile(String srcPath,String destPath) throws FileNotFoundException,IOException {
        //1、建立聯繫 源(存在且為文件) +目的地(文件可以不存在)  
        File src =new File(srcPath);
        File dest =new File(destPath);
        if(! src.isFile()){ //不是文件或者為null
            System.out.println("只能拷貝文件");
            throw new IOException("只能拷貝文件");
        }
        //2、選擇流
        InputStream is =new BufferedInputStream(new FileInputStream(src));
        OutputStream os =new BufferedOutputStream( new FileOutputStream(dest));
        //3、文件拷貝   循環+讀取+寫出
        byte[] flush =new byte[1024];
        int len =0;
        //讀取
        while(-1!=(len=is.read(flush))){
            //寫出
            os.write(flush, 0, len);
        }
        os.flush(); //強制刷出
        
        //關閉流
        os.close();
        is.close();
    }

}

Java IO:BufferedReader和BufferedWriter

BufferedReader、BufferedWriter 的作用基本和BufferedInputStream、BufferedOutputStream一致,具體用法和原理都差不多 ,只不過一個是面向字符流一個是面向字節流。同樣,我們將改造字符流中的例4,給其加上buffer功能,看例子:

public static void readByBufferedReader() throws IOException {
     File file = new File( "d:/test.txt");
     // 在字符流基礎上用buffer流包裝,也可以指定buffer的大小
     Reader reader = new BufferedReader( new FileReader(file),2*1024);
     char[] byteArray = new char[( int) file.length()];
     int size = reader.read( byteArray);
     System. out.println( "大小:" + size + ";內容:" + new String(byteArray));
     reader.close();
}

另外,BufferedReader提供一個readLine()可以方便地讀取一行,而FileInputStream和FileReader只能讀取一個字節或者一個字符,因此BufferedReader也被稱為行讀取器.

public static void keyIn() throws IOException {
 try (//InputStreamReader是從byte轉成char的橋樑
      InputStreamReader reader = new InputStreamReader(System.in);
      //BufferedReader(Reader in)是char類型輸入的包裝類
      BufferedReader br = new BufferedReader(reader);) {
         
         String line = null;
         while ((line = br.readLine()) != null) {
             if (line.equals("exit")) {
                 //System.exit(1);
                 break;
             }
             System.out.println(line);
         }
     } catch (IOException e) {
         e.printStackTrace();
     }
}

Java IO: 序列化與ObjectInputStream、ObjectOutputStream

推薦博客

  程序員寫代碼之外,如何再賺一份工資?

Serializable

如果你希望類能夠序列化和反序列化,必須實現Serializable接口,就像所展示的ObjectInputStream和ObjectOutputStream例子一樣。

ObjectInputStream

ObjectInputStream能夠讓你從輸入流中讀取Java對象,而不需要每次讀取一個字節。你可以把InputStream包裝到ObjectInputStream中,然後就可以從中讀取對象了。代碼如下:

ObjectInputStream input = new ObjectInputStream(new FileInputStream("object.data"));
MyClass object = (MyClass) input.readObject(); //etc.
input.close();

在這個例子中,你讀取的對象必須是MyClass的一個實例,並且必須事先通過ObjectOutputStream序列化到“object.data”文件中。

在你序列化和反序列化一個對象之前,該對象的類必須實現了java.io.Serializable接口。

ObjectOutputStream

ObjectOutputStream能夠讓你把對象寫入到輸出流中,而不需要每次寫入一個字節。你可以把OutputStream包裝到ObjectOutputStream中,然後就可以把對象寫入到該輸出流中了。代碼如下:

ObjectOutputStream output = new ObjectOutputStream(new FileOutputStream("object.data"));
MyClass object = new MyClass();  output.writeObject(object); //etc.
output.close();

例子中序列化的對象object現在可以從ObjectInputStream中讀取了。

同樣,在你序列化和反序列化一個對象之前,該對象的類必須實現了java.io.Serializable接口。

 

【精選推薦文章】

智慧手機時代的來臨,RWD網頁設計已成為網頁設計推薦首選

想知道網站建置、網站改版該如何進行嗎?將由專業工程師為您規劃客製化網頁設計及後台網頁設計

帶您來看台北網站建置台北網頁設計,各種案例分享

廣告預算用在刀口上,網站設計公司幫您達到更多曝光效益

聽說你在為天天寫業務代碼而煩惱?

寫業務代碼一般就是完成業務應用的功能,天天寫業務代碼的程序員也被戲稱為CURD程序員,CURD就是增(create)、改(update)、查(read)、刪(delete)的意思。CURD程序員每天的工作內容就是根據業務邏輯需要對數據庫數據進行增刪改查,這在很多人看來是沒有技術含量的,尤其是工作了多年的程序員,認為這無法提高他們的技術能力,寫了十年的業務代碼,卻和寫一年業務代碼的年輕人差別不大,可能對某個框架更為熟悉一點罷了,此外,少有其他方面的優勢。

實際上,每個能正常被使用的業務系統都需要CURD的工作,但僅僅是CURD也是無法完成一個比較複雜的業務系統的。一個業務系統除了需要編寫功能代碼,還要有需求分析、架構設計、詳細設計、功能編寫、測試、集成部署等等工作內容,CURD頂多是功能編寫的子集。因此,如果你的任務只是CURD,那時間長了以後確實可能會厭煩,並且覺得技能得不到提高。也因此,論壇里常有人求助於高手,問怎樣才能脫離這種CURD工作:

高手們的答案也不一致,有的說寫業務代碼同樣牛逼,CURD是核心競爭力呢,有的建議換工作,擺脫CURD,也有的說要做個有心人,多解決實際問題,自然就會提高,等你水平到一定程度了,就可以不做這類工作了。

而在我看來,不寫業務代碼的人,也不一定有多牛掰,他們也不過是根據需求實現一些東西,也需要領域知識和編程技能,只不過有些領域知識和我們常見的上層應用業務知識有比較大的區別。即使是內核開發人員,如果只是負責實現某個模塊,而且他並沒有多少進取心,每天只是讀讀文檔和協議,調調接口來實現功能,沒有深挖原理,也不關注其他方面的技術,沒有全局視角,那他其實頂多也算是一個搬磚的,離高手還是有很大的距離。

而做CURD工作的,也並不是完全學不到東西。CURD從小的方面來說,是老闆的需求,從大的方面來說,是社會需求,需要大量的人來從事這個工作。CURD程序員離業務比較近,有機會可能也必須去更多地理解業務,而業務知識也是一種領域知識,具有深刻的領域知識的人,在職場中是有競爭優勢的。因此,你可以在CURD的同時,多了解業務知識,或者多思考怎麼把CURD做得更好,比如製作一些模板工具,想辦法通過各種方式來提高工作效率,這樣面對同樣的工作時,你會更輕鬆因而也更具有競爭力。如果你真想擺脫或者基本擺脫,那麼在平時就應該注意積累其它方面的知識,能完成其他CURD程序員難以完成的任務,在工作中,你要懂得合理地越俎代庖、份內份外

越俎代庖本來是個貶義詞,指的是越權辦事、多管閑事的行為,但在這裡是褒義詞。其他同事遇到難題時,你主動幫忙解決,在你自己任務已經完成的情況下,可以研究其他人的工作內容,這樣可以在其他同事只有不太好的實現方案時,適時給出你自己的方案,這樣也不出現搶活邀功的現象。即便你的方案使用不上,你有過自己的思考研究,對自己的成長也是有利的。

可能有同學會說了,你這是站着說話不腰疼,平時加班加點才能完成任務,哪有時間去做這樣的事情。如果是這樣,那你的確難以擺脫這種境況,要不你就安心地每天CURD,要不就換個更適合自己的工作。就我自己而言,工作這麼多年,和行業里其他人相比,加班真的很少,不過我花在學習上的時間,可能會比大部分人都多。這個學習,包括工作的時候去學習其他人的任務所涉及的技能、整個項目的架構原理,以及其它自己認為有用或感興趣的技術。一般來說,工作上的事情,我工作時間就解決可能也順便理解其原理了,而要拓寬知識和技術面,一般就靠下班時間。下班時間學習的東西,有時候也是跟工作內容相關的,即便是不相關的內容,可能也會在你工作時給你帶來靈感,或者有助於你更快理解工作上的事情,這樣的話也使得你能更快速完成工作任務,於是又有更多的時間去學習和擴寬技能,形成一個良性循環。關於良性循環和惡性循環,可以參考我之前的文章:停止無謂抱怨,構建你的良性循環系統。

總之,如果你覺得自己目前就是CURD程序員並且不滿足於此,那你可以先思考如果把CURD做得更好更高效更少出bug,同時盡可能地熟悉業務,爭取在某個業務方向上比普通人更熟悉。因為你最熟悉CURD,可能在換工作時,人家還是傾向於給你提供CURD的崗位,因此如果要擺脫這種境況,就需要你在業餘時間加倍地學習、實踐新技能,然後在機會到來時,才有可能抓住它。“機會永遠只留給有準備的人”,以我的親身體會,這句話在99.9%的情況下應該是正確的,希望我們都記住它!

 

原文發表於:聽說你在為天天寫業務代碼而煩惱?

歡迎關注公眾號:

【精選推薦文章】

如何讓商品強力曝光呢? 網頁設計公司幫您建置最吸引人的網站,提高曝光率!!

想要讓你的商品在網路上成為最夯、最多人討論的話題?

網頁設計公司推薦更多不同的設計風格,搶佔消費者視覺第一線

不管是台北網頁設計公司台中網頁設計公司,全省皆有專員為您服務

想知道最厲害的台北網頁設計公司推薦台中網頁設計公司推薦專業設計師"嚨底家"!!

我的產品/競品分析鍛煉記錄(分析產品核心)

  一來,以前剛入行的時候也想學習下競品分析/產品分析,然後好提高自身的分析能力,當時看了很多文章,然後看到大多都是學生寫的,分析的思路都是從用戶、核心流程,然後一直說到交互,但是整片文章偏重的是說某個小交互怎麼設計得不好,怎麼改進怎麼。

  二來,最近面試了一次,面試我的人應該也是挺教條主義的,面試過程我說了下競品分析,了解過市場行情,當時她追問,我就說下我的分析思路,我的分析思路很特別,只是分析產品核心、整體設計的優缺、預判產品的未來發展方向等,對於細節、業務流程等,一概不分析。

  三來想起來幾年前,那時候我還非常空閑,做了個100個APP分析挑戰,結果沒做完,尷尬。

  所以想整理下這篇,關於我對產品/競品分析的一些思路及想法,以供其他人參考。

 

1、產品經理究竟是負責什麼?

  必須要理清這個點,如果沒對產品經理的職責有清晰定位,很難做出比較適合的分析。為什麼這麼說呢?因為很多剛入行的人,要不從運營/業務入行,要不就技術/畫圖入行,只是了解業務或者功能設計,而不是真正、完整的產品管理。

  產品經理是要對一個產品的從頭到尾的管理,包括用戶了解、分析、業務設計、功能設計實施、運營推廣,從產品冷啟動到產品發展到產品退出市場,產品經理都需要負起帶頭的責任。這個是我理解的產品經理該需要承擔起來的責任。(這裏不展開說,不然這文章沒完沒了)

  基於這個概念,所以我作為一個產品經理,應該關注產品核心價值,因為核心價值才是讓這個產品獨立生存在市場上面,關注市場用戶的情況,而不是將關注點局限在一個小功能、小交互。所以才會有文章一開始說的,分析思路跟那些教科書的,有差異。

2、產品分析與競品分析的差異?

  在我的概念裏面,產品分析是偏重某個產品的的分析,偏重產品的核心價值、核心業務、核心設計。而競品分析偏重是在某個市場環境裏面,同類產品的核心價值差異。

  一個相對深入點,一個相對注重分析面,並且注重市場環境。

3、我的產品分析思路,要分析什麼?

  競品分析就不怎麼寫了,以前我也沒怎麼做過真正的競品分析,產品分析我也是只是分析核心的而已。

  我的方法很原始,就是作為一個用戶、作為一個產品設計者,兩種角色交替去對一個產品進行觀察、感受,寫出它的優缺而已。

  

下面截圖一下當時做的一些內容:

 

 

 

  從上面截圖可以看到當時我做這個鍛煉的目的及原始的一些思路,這個挑戰持續一年的,所以當時在自我學習過程裏面,開始逐漸完善我自身的分析。

  當然,這個是幾年前的分析,分析重點是一個產品的核心,沒有分析細節怎麼設計是好的,怎麼不好,只是純粹從小白的角度去嘗試,從如果我是這個產品的負責人的話,市場環境大概是那樣,我該如何去突破尋求發展?

 

如果說專業一點的競品分析文章,這裏介紹一篇超級詳細的競品分析文章,分析非常深入透徹,但是個人覺得價值不大!為什麼會這麼說呢?

  1、請問要分析一個行業的競品,單靠一個人,要像這位哥們分析到這麼深入,需要多少時間?一個月?一個季度?

  2、以前做產品分析的時候就發現一個問題,當你做完產品分析,產品已經發生改變,你所做的,都是歷史記錄的(當然有很重要的參考意義,這點無可厚非),花費那麼大的時間精力,是否值得?(跟隨是無法超越對方的,因為對方的創新永遠比你快,除非你的創新比它快,這樣要求你的團隊比對方要牛逼,要更清楚用戶需要)

 

  文章鏈接:https://www.zhihu.com/question/23601989/answer/91519343

  作者:大禹

 

       在後來的工作上面,針對這種專門競品/產品分析幾乎是沒有,可能我孤陋寡聞,但是貌似,沒見過有什麼企業,會做比較專門的競品/產品分析,因為做這些實在是耗費時間精力,但是做出來的時候,市場環境可能已經發生了改變了。

       而且當分析深度不夠深的話,還不如外包給專業的市場調研機構,他們會更專業,更有效率產出相對來說更準確的報告。而產品人員通過這些分析,可以拓寬自身眼界,鍛煉思考產品的核心。如果是這樣~~~好像也沒必要畫流程圖啊、寫交互優缺~~

       以上是我個人對產品經理、產品/競品分析的一些見解,純屬個人看法,如有更好的歡迎下面評論一起討論,研究下。

【精選推薦文章】

自行創業 缺乏曝光? 下一步"網站設計"幫您第一時間規劃公司的門面形象

網頁設計一頭霧水??該從何著手呢? 找到專業技術的網頁設計公司,幫您輕鬆架站!

評比前十大台北網頁設計台北網站設計公司知名案例作品心得分享

台北網頁設計公司這麼多,該如何挑選?? 網頁設計報價省錢懶人包"嚨底家"

細談unity資源管理的設計

一、概要

本文主要說說Unity是如何管理的,基於何種方式,基於這種管理方式,又該如何規劃資源管理,以及構建bundle,是後面需要詳細討論的。

二、Unity的資源管理方式

2.1 資源分類

unity項目中的資源,大體上可以分為外部導入資源和內部生成資源兩種類型。
外部導入資源: 美術生成的大部分資源,都是外部帶入資源,模型,貼圖,UI用圖,基本是美術工具生成后,導入到工程中的。
內部生成資源: 部分美術生成資源,例如材質,shader,特效,場景等,屬於基於Unity引擎來製作生成的,此外各種prefab(UI/角色等),以及代碼腳本(c#為主),也屬於Unity的內部生成資源。

2.2 資源的存儲方式

在資源導入到unity工程后,會以各種方式進行轉換存儲,主要有以下幾種:

2.2.1 腳本類資源

對於工程中的腳本類資源,主要分為plugin和非plugin兩類。

  • plugin類:在plugin中引用的dll, 屬於自己生成相關的dll,在最終構建遊戲包的時候,被打入到相關遊戲包中:      
  • 非plugin類:unity會構建成4個基本的dll, 構建的順序為:
    • Assembly-CSharp-firstpass: standard assets/Pro standard assets/plugins 中的腳本
    • Assembly-CSharp-Editor-firstpass: editor scripts in standard assets/Pro standard assets/plugins 這個dll不會被打入到遊戲包中,屬於編輯器下特有的dll
    • Assembly-CSharp: all other scripts that not inside editor 主體遊戲邏輯的dll
    • Assembly-CSharp-Editor: all remaing scripts inside editor 這個dll也不會被打入到遊戲包中,編輯器中特有的dll

所有最終構建到遊戲包中的dll,主要分為:

  • Assembly-CSharp.dll/Assembly-CSharp-firstpass.dll 這2個主要遊戲邏輯dll
  • 引擎dll和插件引用的dll

2.2.2 美術類資源

美術類資源,分為外部導入和內部生成兩個大類

  • 外部導入類: 場景/模型/貼圖 都可以外部導入
    • 繼承自AssetPostprocesser后,可以對導入的貼圖,材質,模型,場景,均執行相關的修改
  • 內部生成類: shader/材質/prefab/場景 均可以內部生成美術資源的貼圖資源和特效資源,屬於重點關注對象,後面會細談這幾個資源的管理
    • 修改操作同上

2.2.3 meta文件

工程資源劃分好后,如何對這些資源進行管理? 不同的引擎有不同的管理方式,那麼unity中是如何管理的?
這兒管理分為2個步驟:序列化和meta文件的生成

2.2.3.1 unity的序列化

工程中的資源,要存儲到本地磁盤,那麼就會通過引擎進行一步序列化的操作,序列化的實質,就是將資源對象按照一定的順序轉換成二進制文件。

2.2.3.2 meta文件的生成

在完成序列化后,unity會對應的為該文件生成一份meta文件,這份meta文件會跟隨該文件一直存在,如果刪除該資源文件,其對應的meta文件也會被引擎自動刪除。
meta文件的主要構成:

  • 文件的guid: 這個文件的全工程中的唯一索引id,基於該id,可以對應的查找到該文件。guid的生成本質,就是基於文件的路徑來進行轉換生成的,同理,如果多個工程合併的時候出現guid衝突,可以自己重新生成一份guid,相關鏈接: https://gist.github.com/ZimM-LostPolygon/7e2f8a3e5a1be183ac19

  • 文件的導入設置:

    • 對於一般的文件,導入設置都比較簡單腳本類叫MonoImporter, 資源類叫NativeFormatImporter
    • 貼圖屬於需要重點關注的類型,其導入類型叫TextureImporter,裏面詳細的列出對該貼圖的各種壓縮格式,mipmaps, 類型,uv,貼圖大小等等詳細的設置信息

2.2.4 基於meta文件和序列化的資源管理

除了meta文件的guid,unity還會為每個資源生成一份文件id,也就是fileID, 不過現在fileID已經不再保留在meta文件中了,保留到文件的序列化文件中了,對於該資源,還會有一份localID, 這個localID, 對應的就是在一個資源中包含多個子資源的時候,定位每個子資源所用:  

那麼序列化是如何與guid/fileID關聯的?
在unity工程內部,如果給資源添加其他資源的引用,例如加一個腳本,拖拽一個外部引用,那麼就會觸發一次序列化操作,序列化操作的時候,就會將引用的資源的fileID和guid都序列化下來,這樣在反序列化的時候,就會基於fileID和guid來反向找到依賴的資源,從而加載進來。   

這個過程,在Unity中,就是一個裝載的過程,多說一句,如果一個資源依賴的其他資源越多,那麼這個裝載過程就會越耗時,所以在打開一個很大的UI的時候,有一部分的時間是消耗在裝載UI上各個組件上的。

三、總結

基於前文,可以對整個unity的資源管理有一個初步的認識,基於meta文件和序列化操作,可以管理工程中的資源,同時也能管理好各個資源的互相引用,那麼基於這樣的設計,在構建bundle的時候,是可以進行相關的設計和實現的。

【精選推薦文章】

智慧手機時代的來臨,RWD網頁設計已成為網頁設計推薦首選

想知道網站建置、網站改版該如何進行嗎?將由專業工程師為您規劃客製化網頁設計及後台網頁設計

帶您來看台北網站建置台北網頁設計,各種案例分享

廣告預算用在刀口上,網站設計公司幫您達到更多曝光效益

手把手教你學會 基於JWT的單點登錄

  最近我們組要給負責的一個管理系統 A 集成另外一個系統 B,為了讓用戶使用更加便捷,避免多個系統重複登錄,希望能夠達到這樣的效果——用戶只需登錄一次就能夠在這兩個系統中進行操作。很明顯這就是單點登錄(Single Sign-On)達到的效果,正好可以明目張膽的學一波單點登錄知識。

本篇主要內容如下:

  • SSO 介紹
  • SSO 的幾種實現方式對比
  • 基於 JWT 的 spring boot 單點登錄實戰

注意:
  SSO 這個概念已經出現很久很久了,目前各種平台都有非常成熟的實現,比如OpenSSOOpenAMKerberosCAS等,當然很多時候成熟意味着複雜。本文不討論那些成熟方案的使用,也不考慮 SSO 在 CS 應用中的使用。

什麼是 SSO

  單點點說就是:一次登錄后可免登陸訪問其他的可信平台。比如我們登錄淘寶網后,再打開天貓首頁可以發現已經是登錄狀態了。SSO 是一種比較流行的服務於企業業務整合的一種解決方案。

如何實現 SSO

  我們都知道目前的 http 協議是無狀態的,也就是第一次請求和第二次請求是完全獨立,不相關的,但現實中我們的業務邏輯都是有狀態的,這樣就引入了 cookie-session 的機制來維護狀態,瀏覽器端存儲一個 sessionId,後台存儲跟該 sessionId 相關的數據。每次向後台發起請求時都攜帶此 sessionId 就能維持狀態了。然後就有了 cookie,瀏覽器在發送請求時自動將 cookie 中的數據放到請求中,發給服務端,無需手動設置。

  然後我們可以考慮考慮實現 SSO 的核心是什麼?答案就是如何讓一個平台 A 登錄后,其他的平台也能獲取到平台 A 的登錄信息(在 cookie-session 機制中就是 sessionId)。

方案一 共享 cookie

  基於 cookie-session 機制的系統中,登錄系統後會返回一個 sessionId 存儲在 cookie 中,如果我們能夠讓另外一個系統也能獲取到這個 cookie,不就獲取到憑證信息了,無需再次登錄。剛好瀏覽器的 cookie 可以實現這樣的效果(詳見web 跨域及 cookie 學習)。

  cookie 允許同域名(或者父子域名)的不同端口中共享 cookie,這點和 http 的同域策略不一樣(http 請求只要協議、域名、端口不完全相同便認為跨域)。因此只需將多個應用前台頁面部署到相同的域名(或者父子域名),然後共享 session 便能夠實現單點登錄。架構如下:

  上面方案顯而易見的限制就是不僅前台頁面需要共享 cookie,後台也需要共享 session(可以用jwt來幹掉 session,但是又會引入新的問題,這裏不展開).這個方案太簡單了,不作進一步說明。

方案二 基於回調實現

  通過上文可以知道,要實現單點登錄只需將用戶的身份憑證共享給各個系統,讓後台知道現在是在訪問。就能實現一次登錄,到處訪問的效果,實在是非常方便的。在 session 機制中是共享 sessionId,然後多個後台使用同一個 session 源即可。這裏我們用一種新的基於 JWT 的 token 方式來實現,不了解 JWT 的可以看這篇:java-jwt 生成與校驗,簡單來說 jwt 可以攜帶無法篡改的信息(一段篡改就會校驗失敗),所以我們可以將用戶 id 等非敏感信息直接放到 jwt 中,幹掉了後台的 session。然後我們要做的就是將 jwt 共享給各個平台頁面即可。系統架構如下:

  此架構中,業務系統 A 和業務系統 B 之間不需要有任何聯繫,他們都只和 SSO 認證平台打交道,因此可以任意部署,沒有同域的限制。你可能就要問了這樣要怎麼共享身份憑證(也就是 jwt 字符串)?這裏就要通過 url 參數來進行騷操作了。文字總結來說是這樣的:jwt 存到認證平台前端的 localStore(不一定是 localStore,cookie,sessionStore 都可以),然後業務平台攜帶自己的回調地址跳轉到認證中心的前台,認證中心的前台再將 ujwt 作為 url 參數,跳回到那個回調地址上,這樣就完成了 jwt 的共享。

  文字很可能看不懂,下面是整個過程的路程圖:

相信通過上面的流程圖你應該能大概看明白,jwt 是如何共享了的吧,還看不懂的繼續看下來,下面上一個 spring boot 實現的簡易 SSO 認證。主要有兩個系統:SSO 認證中心,系統 A(系統 A 換不同端口運行就是系統 A、B、C、D 了).

實戰

實現 SSO 認證中心

  spring boot 框架先搭起來,由於是簡易項目,除 spring boot web 基本依賴,只需要如下的額外依賴:

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

<dependency>
  <groupId>com.alibaba</groupId>
  <artifactId>fastjson</artifactId>
  <version>1.2.4</version>
</dependency>
<dependency>
  <groupId>com.auth0</groupId>
  <artifactId>java-jwt</artifactId>
  <version>3.7.0</version>
</dependency>

完整的 POM 文件,請到 github 上查看.

後台實現

  後台做的事情並不多,只有以下 5 個方法:

  • /login : 登錄成功后簽發一個 jwt token
    在 demo 中只是簡單對比用戶名密碼如果是一樣的認為登錄成功,返回 token
  • /checkJwt : 檢查 jwt 的有效性
    檢查傳來的 jwt-token 是否有效,返回失效的 jwt 列表
  • /refreshjwt : 刷新 jwt
    判斷該 jwt 是否快要過期,如果快要過期,生成一個新的 jwt 返回
  • /inValid : 讓某個 jwt 失效
    jwt 如何失效一直是一個比較麻煩的問題,各有利弊。本例中採用的是為每個 jwt 生成一個隨機的秘鑰 secret,將 jwt–secret 保存到 redis 中,想要讓某個 jwt 失效,只需將該記錄在 redis 中刪除即可(這樣在解密時便無法獲取到 secret)。但是這樣讓無狀態的認證機制變成有狀態了(記錄了 jwt 和 secret 的對應關係)。

  總結來說 SSO 後台主要只做了兩件事:驗證用戶名密碼返回 jwt;驗證 jwt 是否合法。具體代碼查看 github 上 sso 目錄下的代碼。

前台實現

  前台的邏輯較為複雜,不是那麼容易理解,不明白的多看幾遍上面的流程圖。

  再次回到 SSO 的重點:分享登錄狀態。要如何在前台將登錄狀態(在這裏就是 jwt 字符串)分享出去呢?由於瀏覽器的限制,除了 cookie 外沒有直接共享數據的辦法。既然沒有直接共享,那肯定是有間接的辦法的!

  這個辦法就是回調。系統 A 的前台在跳轉到 SSO 的前台時,將當前路徑作為 url 參數傳遞給 sso 前台,sso 前台在獲取到 jwt 后,再跳轉到系統 A 傳過來的 url 路徑上,並帶上 jwt 作為 url 參數。這就完成了 jwt 的一次共享,從 sso 共享到系統 A。

打個比方:你點了個外賣,別人要怎麼把外賣給你呢?顯然你會留下的地址,讓別人帶上飯送到這個地址,然後你就能享用美食了。這和 jwt 的傳遞非常相識了。

系統 A 說:我要 jwt,快把它送到http://localhost:8081/test1/這個地址上。

SSO 說:好嘞,這個地址是合法的可以送 jwt 過去,這就跳轉過去:http://localhost:8081/test1/?jwt=abcdefj.asdf.asdfasf

系統 A 說:不錯不錯,真香。

  要注意這裏有個坑就是:如果另外一個惡意系統 C 安裝相同的格式跳轉到 SSO,想要獲取 jwt,這顯然是不應該給它的。所以在回跳回去的時候要判斷一下這個回調地址是不是合法的,能不能給 jwt 給它,可以向後台請求判斷也可以在 sso 前台直接寫死合法的地址。在 demo 是沒有這個判斷過程的。

實現業務系統

  業務系統代碼非常簡單,主要是用了一個攔截器,攔截 http 請求,提取出 token 向 sso 認證中心驗證 token 是否有效,有效放行,否則返回錯誤給前端。太簡單也不貼代碼了,到 github 上看看就明白了。

效果

  上面說了一大串都是原理了,其實這個難也就難在原理部分,代碼實現並沒有那麼複雜。這裏就不貼代碼了,有需要直接到 github 上看。

  這裏上幾個效果圖:

  • 系統 A 首次登陸系統

可以看到首次登陸是需要跳到 sso 認證中心輸入用戶名密碼進行登陸驗證的。登陸成功回跳後接口請求成功。

  • 將 A 的啟動端口改為 8082 后再次啟動,當作系統 B

可以看到這次是無需登陸的,跳到認證中心后就馬上跳回了,如果去掉 alert 一般是看不出跳轉過程的。

最後在任意一個系統註銷,都會讓所有的系統推出登陸。

可以看到,在系統 A 登錄系統后,系統 B,系統 C 都不再需要輸入用戶名密碼進行登錄。如果速度足夠快甚至都注意不到調到 SSO 再跳回來的過程。

源碼:github

本篇原創發佈於:www.tapme.top/blog/detail/2019-03-01-18-52

【精選推薦文章】

如何讓商品強力曝光呢? 網頁設計公司幫您建置最吸引人的網站,提高曝光率!!

想要讓你的商品在網路上成為最夯、最多人討論的話題?

網頁設計公司推薦更多不同的設計風格,搶佔消費者視覺第一線

不管是台北網頁設計公司台中網頁設計公司,全省皆有專員為您服務

想知道最厲害的台北網頁設計公司推薦台中網頁設計公司推薦專業設計師"嚨底家"!!