3.16.2017

Stateless狀態機函式庫和Rx的應用

狀態機不只適用在自動控制,程序式的動作也是很有用!

在之前介紹了個小型框架後,我覺得,在一個多核系統中,如果想要有效的利用此架構,不管是在每個processor中使用command queue,或是使用parallel command,狀態機絕對是不可或缺的。

正好在看狀態機時看到一個使用Stateless加上Rx的應用範例,如下圖:

(此圖來自原專案網頁)

詳細說明可參閱原文,這裡僅簡單介紹Rx的部份。

Rfq工作流程

基本上,這是一個RFQ(詢價)工作流程,從上圖可以看到,狀態機提供的功能被包在一個介面中 – IRfq:

public interface IRfq : IDisposable
{
    void RequestQuote(IQuoteRequest quoteRequest);
    void Cancel(long rfqId);
    void Execute(IExecutionRequest quote);

    IObservable<RfqUpdate> Updates { get; } 
}

而RFQ狀態機的的實作則由stateless完成,裡面定義了如上圖所需的State和Trigger(Event):

State

public enum RfqState
{
    Input,
    Requesting,
    Cancelling,
    Cancelled,
    Quoted,
    Executing,
    Error,
    Done
}

Event

public enum RfqEvent
{
    UserRequests,
    UserCancels,
    UserExecutes,

    ServerNewQuote,
    ServerQuoteError,
    ServerQuoteStreamComplete,
    ServerSendsExecutionReport,
    ServerExecutionError,
    ServerCancelled,
    ServerCancellationError,

    InternalError,
}

狀態更新

上面圖中可看到一藍色箭頭,由狀態機至View Model端,其負責更新(告知)控制端此狀態的變化過程,靠著IRfq介面定義中的

IObservable<RfqUpdate> Updates { get; }

實作,因此,在各個狀態中藉由Rx的OnNext通知各種狀態的變化:

_rfqUpdateSubject.OnNext(new RfqUpdate(RfqState.Quoted, quote, null));
_rfqUpdateSubject.OnNext(new RfqUpdate(RfqState.Requesting, null, null));
_rfqUpdateSubject.OnNext(new RfqUpdate(RfqState.Cancelling, null, null));
_rfqUpdateSubject.OnNext(new RfqUpdate(RfqState.Executing, null, null));
_rfqUpdateSubject.OnNext(new RfqUpdate(RfqState.Done, null, executionReport));
_rfqUpdateSubject.OnNext(new RfqUpdate(RfqState.Cancelled, null, null));

Data Access

Sample中的Data Access由IRfqService介面實作:

public interface IRfqService
{
    IObservable<IQuote> RequestQuoteStream(IQuoteRequest quoteRequest);
    IObservable<IExecutionReport> Execute(IExecutionRequest executionRequest);
    IObservable<Unit> Cancel(long rfqId);
}

此介面在Rfq類別的建構式中注入:

public Rfq(IRfqService rfqService, IConcurrencyService concurrencyService)
{
    _rfqService = rfqService;
    ...
}

並且在各實作中被訂閱,如下所示在詢價的動作中,要求_rfqService執行RequestQuoteStream,若是超過5秒則逾時,並觸發_rfqEventServerQuoteError詢價失敗事件(基本上事件皆用RfqEvent列舉定義,否則為代有參數的strongly typed triggers (events)),若完成則觸發_rfqEventServerSendsQuote事件。

private void OnEntryRequesting(IQuoteRequest quoteRequest)
{
    _requestSubscription.Disposable = _rfqService.RequestQuoteStream(quoteRequest)
        .Timeout(TimeSpan.FromSeconds(5))
        .ObserveOn(_concurrencyService.Dispatcher)
        .SubscribeOn(_concurrencyService.TaskPool)
        .Subscribe(
        // /!\ we are only allowed to transition the state machine here, no other code! 
        // This applies to all server callbacks
            quote => _stateMachine.Fire(_rfqEventServerSendsQuote, quote), 
            ex => _stateMachine.Fire(_rfqEventServerQuoteError, ex),
            () => _stateMachine.Fire(RfqEvent.ServerQuoteStreamComplete));
}

另外在OnEntryCancelling和OnEntryExecuting也是類似的處理方式。

Unit Test

此範例沒有提供實際的專案,不過倒是有單元測試,所以可以從中概略的看到使用方法,如:

public void HappyPathScenario()
{
    // user request quote
    _sut.RequestQuote(null);
    // server sends quote
    _rfqServiceDouble.RequestQuoteSubject.OnNext(null);
    // user executes
    _sut.Execute(null);
    // server sends execution report
    _rfqServiceDouble.ExecuteSubject.OnNext(null);

    Assert.AreEqual(5, _updates.Count);
    Assert.AreEqual(RfqState.Input, _updates[0].RfqState);
    Assert.AreEqual(RfqState.Requesting, _updates[1].RfqState);
    Assert.AreEqual(RfqState.Quoted, _updates[2].RfqState);
    Assert.AreEqual(RfqState.Executing, _updates[3].RfqState);
    Assert.AreEqual(RfqState.Done, _updates[4].RfqState);
}

果然,寫文章是學習的好方法,在過程中才發現原先理解錯誤的點:),為自己突破盲點,cheers!

Written with StackEdit.

沒有留言:

張貼留言