3.26.2017

Rx 設計指南

忽然發現有這個文件,一邊學一邊翻譯。
原文來自:https://blogs.msdn.microsoft.com/rxteam/2010/10/28/rx-design-guidelines/
翻譯不出來的或是覺得不重要的就以"…"符號替換,或顯示原文。
辭不達義的地方所在多有,請包含…

  • Operator看狀況翻成函式或運算子

1.Table of Contents

2.介紹

本文件旨在輔助以Reactive Extension library(http://go.microsoft.com/fwlink/?LinkID=179929)開發應用程式或函式庫所用。

文件中列出的指南是Rx團隊在開發Rx函式庫過程中的成果。

本指南會隨著Rx的持續開發而更新。請確認你有的指南是最新版本,在Rx論壇中會發佈更新:http://go.microsoft.com/fwlink/?LinkId=201727

文件中描述的資訊僅是一組輔助開發時的指引,這些指導並不絕對正確,他們僅是開發團隊覺得會有幫助的一些模式;不代表你要盲目地遵循。仍然有些狀況是指南中沒有提到的,開發團隊僅列出已知的使用情境,每個開發者仍可就特定的情境來選擇自己覺得適用的方式。

文件中的指南沒有特定的排序, There is neither total nor partial
ordering in these guidelines.

Please contact us through the Rx forum: http://go.microsoft.com/fwlink/?LinkId=201727 for feedback
on the guidelines, as well as questions on whether certain guidelines are applicable in specific situations.

3.何時適用Rx

3.1 撰寫非同步或event-based的計算時

要處理多過一個事件或非同步計算的程式會很容易的變的複雜,因為我們需要建立一個狀態機以處理有序的事件,而且,程式中也要處理每一個計算的成功及因錯誤而中斷的問題,這使得程式難以遵守正常的控制流程,更加難以瞭解及維護。

Rx讓這種計算成為一等公民,它提供了一個模式讓我們撰寫可讀性更高及可合成的APIs以處理這些非同步計算(譯者:可為I/O bound或CPU bound)。

Sample

    var scheduler = new ControlScheduler(this); 
    var keyDown = Observable.FromEvent<KeyEventHandler, KeyEventArgs>( 
        d => d.Invoke, h => textBox.KeyUp += h, h => textBox.KeyUp -= h); 

    var dictionarySuggest = keyDown 
        .Select(_ => textBox1.Text) 
        .Where(text => !string.IsNullOrEmpty(text)) 
        .DistinctUntilChanged() 
        .Throttle(TimeSpan.FromMilliseconds(250), scheduler) 
        .SelectMany( 
            text => AsyncLookupInDictionary(text) 
                .TakeUntil(keyDown)); 

    dictionarySuggest.Subscribe( 
        results =>  
            listView1.Items.AddRange(results.Select( 
                result=>new ListViewItem(result)).ToArray()),  
        error => LogError(error)); 

這段範例展示了一種常見的UI範式 – 當使用者輸入字元時即時顯示完成建議。

Rx建立了一個代表KeyUp事件的可觀察序列(原始的WinForms程式不需更動)。

然後在事件上增加幾個過濾器及投射,以確保事件僅在輸入字串變更時才被觸發。(KeyUp事件會在每一次按鍵後觸發,即使使用者僅按了左右方向鍵以移動遊標但並不變更輸入字串)

接著使用Throttle函式以確保250ms後才觸發事件(如果使用者仍在輸入中,這保證潛在的昂貴查詢會被略過),且傳入一個scheduler以確定250ms的delay會在當前UI執行緒被觸發。

一般寫這種程式,在throttling的部份,通常會使用timer引入另一個callback,而timer的執行過程中可能會觸發例外(certain timers have a maximum amount of operations in flight)。

一旦使用者的輸入已被過濾完成,是時候執行字典查詢了,這通會是很昂貴的操作(例如:主機在地球的另一端),當然此操作本身也是非同步的。

SelectMany函式讓我們很容易的合成眾多的非同步操作,它不僅組合成功的值;同時也會追蹤每個獨立操作中的例外。

傳統寫法上,這又會引入另外的callbacks及例外的發生點。

如果使用者在軟體正在查詢時又輸入了新字元,我們應該不會想看到原先查詢的結果。輸入更多字元讓我們可找到更特定的字,但看到原先的結果會讓人搞混。

TakeUntil(KeyDown)函式會確定一旦新的KeyDown操作發生時略過查字典動作。

最後,我們訂閱可觀察序列的結果,且僅使用這時間點的值,並傳入兩個函式讓Subscribe呼叫:

  1. 從我們的計算接收結果
  2. 計算過程中如果發生例外,接收並處理之

什麼狀況下可忽略此指南

如果應用程式/函式庫有很少的非同步/event-based的操作,或很少有操作需要合成的,這時花在Rx上的成本(Rx函式庫的使用及其學習曲線)可能會大過你手動撰寫這些動作的成本。

3.2 處理非同步的序列資料

.Net 平台上另有幾個可以協助處理非同步操作的函式庫,雖然這些函式庫的功能也都很強大,但它們通常適用在回傳單一訊息的操作上,面對操作過程中會產生多個訊息的操作就不支援了。

Rx依循下列原則:’OnNext*(OnCompleted|OnError)?’這使得多重訊息可依時間而產生,也讓Rx適用在單一或多重訊息的操作狀況。

Sample

    //open a 4GB file for asynchronous reading in blocks of 64K
    var inFile = new FileStream(@"d:\temp\4GBfile.txt",  
        FileMode.Open, FileAccess.Read, FileShare.Read, 
        2 << 15, true); 

    //open a file for asynchronous writing in blocks of 64K 
    var outFile = new FileStream(@"d:\temp\Encrypted.txt",  
        FileMode.OpenOrCreate, FileAccess.Write, FileShare.None, 
        2 << 15, true); 


    inFile.AsyncRead(2 << 15) 
          .Select(Encrypt) 
          .WriteToStream(outFile) 
          .Subscribe( 
               _=> Console.WriteLine("Successfully encrypted the file."), 
                error=> Console.WriteLine( 
                    "An error occurred while encrypting the file: {0}",  
                    error.Message)); 

這個範例中,一個4GB大小的檔案被讀取、加密並儲存至另一個檔案。

一次將整個檔案讀入記憶體,對它加密並寫至檔案是一個很耗費資源的操作。

Instead, we rely on the fact that Rx can produce many messages.

我們用非同步的方式一次讀取64k大小的區塊,這會產生一個byte陣列式的可觀察序列,並對每個區塊加密(這個範例中我們假定加密演算法可對檔案個別部份執行操作),一旦區塊完成加密,馬上被傳至管道中以儲存到另一個檔案中,WriteToStream函式是一個可以產生多重訊息的非同步操作。

什麼狀況下可忽略此指南

如果應用程式/函式庫不太需要操作多重訊息,這時花在Rx上的成本(Rx函式庫的使用及其學習曲線)可能會大過你手動撰寫這些動作的成本。

4.Rx契約

介面IObservable<T>'及'IObserver<T>僅在它們的函式中指定了參數及回傳值,相較.Net型別,Rx函式庫提供了更多的assumptions,這些assumptions形成了所有Rx型別的生產及消費者都應遵守的約定,這約定確保了運算子(函式)及使用者程式更易於理解及證明其正確性。

4.1 遵守Rx原則

傳送至IObserver介面的實體的訊息遵循如下規則:

OnNext* (OnCompleted | OnError)?

這個規則允許可觀察序列傳送任意數量(0 or more)的OnNext訊息至已訂閱的觀察者實體,隨後則是單一的完成(OnCompleted)或失敗(OnError)訊息。

表示可觀察序列完成的單一訊息讓此序列的消費者可以確定它可以安全的執行清除動作。

而單一的失敗訊息更進一步的讓工作在多個可觀察序列的運算子維護其終止的語義行為(見章節6.6)。

Sample

var count = 0;
xs.Subscribe(v => 
    { 
        count++;             
    }, 
    e=> Console.WriteLine(e.Message), 
    ()=>Console.WriteLine("OnNext has been called {0} times.", count) 
    ); 

此範例中,我們可以確認當此可觀察序列遵循Rx原則時,一旦OnCompleted函式被呼叫後,OnNext函式的次數計數不會再被變更。

什麼狀況下可忽略此指南

僅在使用不符合IObservable介面的實作時,才忽略此原則。可以通過呼叫Synchronize運算子使可觀察序列在實體上一致。

4.2 假定觀察者實體會以循序的方式被呼叫

由於Rx使用Push模型而.NET支援多執行緒,不同的訊息可能在不同的執行環境1下同時抵達,如果可觀察序列的使用者必需在每個地方處理這種狀況,他們的程式碼需要做很多處理以避免常見的併發問題,這種程式碼會很難維護且可能遇到潛在的效能問題。

由於不是所有的可觀察序列被證明為可接收不同執行環境下的訊息,只有產生這些可觀察序列的函式需要去執行序列化(章節6.7),可觀察序列的使用者們可以自信地假定訊息會一個一個抵達。

Sample

var count = 0;
xs.Subscribe(v =>
{
    count++;
    Console.WriteLine("OnNext has been called {0} times.", count);
});

這個範例中,我們讀出或寫入count變數時,不用擔心鎖或互鎖的問題,因為一次只會有一個OnNext被呼叫。

什麼狀況下可忽略此指南

如果你需要使用一個不遵循Rx序列化合約的可觀察序列,使用Synchronize函式以確保你仍可以遵守此指南。

4.3 假定資源會在OnErrorOnCompleted發生後被清除

章節4.1指出在OnErrorOnCompleted後不應有任何訊息抵達,這讓我們在訂閱OnErrorOnCompleted時清除資源成為可能,馬上清除資源讓我們可以確定它不會造成任何不可預測的邊際效應,也讓系效可以再次使用資源。

Sample

Observable.Using( 
    () => new FileStream(@"d:\temp\test.txt", FileMode.Create), 
     fs => Observable.Range(0, 10000) 
         .Select(v => Encoding.ASCII.GetBytes(v.ToString())) 
         .WriteToStream(fs)) 
     .Subscribe(); 

這個範例中,Using函式建立了一個會在取消訂閱時被釋放的資源,Rx對cleanup的約定確保了一旦OnErrorOnCompleted訊息發生時會自動執行取消訂閱的動作。

什麼狀況下可忽略此指南

目前不存在已知的應忽略此指南的狀況。

4.4 Assume a best effort to stop all outstanding work on Unsubscribe

當可觀察序列的訂閱被取消時,它會儘最大的努力以停止所有尚未執行的工作,這表示任何尚在佇列中且未執行的動作將不再執行。

而正在執行的工作仍會繼續以至完成,因為我們無法保證中斷任何執行中的工作不會發生錯誤,而此工作的完成不會再被任何已訂閱的觀察者實體知悉。

Sample 1

Observable.Timer(TimeSpan.FromSeconds(2)).Subscribe(...).Dispose();

此範例中,Timer會產生一可觀察序列,且在ThreadPool scheduler中佇列一個2秒後推送一OnNext訊息的action,而此訂閱會馬上被取消,

Sample 2

      Observable.Start(()=> 
        { 
            Thread.Sleep(TimeSpan.FromSeconds(2)); 
            return 5; 
        }) 
        .Subscribe(...).Dispose(); 

此範例中,Start函式會馬上將由lambda提供的action馬上排程執行,訂閱的動作會把觀察者實體當成此執行動作的listener,而訂閱被取消時此lambda action已在執行,所以它會執行完成,但回傳值會被忽略。

5.使用Rx

5.1 考慮畫出 Marble-diagram

畫出你想要建立的可觀察序列的marble-diagram,之後你會知道那些operator適合被使用。

一個marble-diagram是一個會顯示在時間線上發生的事件的示意圖,一個marble-diagram包含了序列的輸入和輸出。

Sample

Marble-diagram

籍由繪出此圖,可以看出我們需要在使用者輸入後且觸發其它的非同步呼叫前加上某種延遲,此範例中的延遲對應至程式中的Throttle函式。而從一個可觀察序列中建立另一可觀察序列,我們可以使用SelectMany函式,最終程式如下:

    var dictionarySuggest =userInput 
        .Throttle(TimeSpan.FromMilliseconds(250)) 
        .SelectMany(input => serverCall(input)); 

什麼狀況下可忽略此指南

如果你對你寫的可觀察序列很有自信,你可以忽略此指南,但即使連Rx團隊也會在白板上畫出marble-diagram。

5.2 考慮傳給Subscribe多個參數

為了方便起見,Rx對Subscribe的擴充函式覆載讓你可以傳入委託而不用建立IObserver參數的實體,因為C#和VB並不支援匿名內部類別,所以這讓訂閱的過程方便了不少。

IObserver介面總共需要三個函式的實作(OnNext, OnError & OnCompleted),對Subscribe函式的擴充讓開發者可選用每個函式的預設值。

例如,當呼叫僅有一個onNext參數的Subscribe函式時,OnError的行為會從此可觀察序列所在的執行緒轉丟出例外,而OnCompleted則不會做任何事。

在很多情況下,處理例外會格外重要(不管是回復或是放棄應用程式的錯誤)。

通常知道可觀察序列已經成功完成也很重要。例如,應用程序通知使用者操作已經完成。

因此,最好是提供完整的3個參數給Subscribe函式。

什麼狀況下可忽略此指南

  • 當可觀察序列保證不會結束,如KeyUp事件
  • 當可觀察序列保證不會有OnError訊息(例如,一個事件、materialized observable sequence等)
  • 當預設的行為正如你所需的

5.3 考慮使用LINQ查詢表達式語法

Rx實作了在C# 3.0規格中所定義的query expression pattern,因此,你可以使用來對可觀察序列做查詢。

Sample

考慮如下查詢:

var r = rs.SelectMany(x => ys, (x,y) => x+y);

也可寫成:

var r1 = from x in xs
    from y in ys
    select x+y;

什麼狀況下可忽略此指南

當使需要在你的查詢中使用很多在query expression syntax中不支援的函式時可以考慮忽略此指南。This might negate the readability argument.

5.4 考慮傳入特定的scheduler給併發相關函式

與其使用ObserverOn函式來改變可觀察序列產生訊息的執行環境,不如在對的地方開始。
當函式的參數化靠著提供一個scheduler參數重載引入了併發性時,傳入正確的scheduler可減少ObserveOn必需使用的地方。( As
operators parameterize introduction of concurrency by providing a scheduler argument overload,
passing the right scheduler will lead to fewer places where the ObserveOn operator has to be used. )

Sample

var keyup = Observable.FromEvent<KeyEventArgs>(textBox, "KeyUp"); 
    var throttled = keyup.Throttle(TimeSpan.FromSeconds(1),  
        Scheduler.Dispatcher); 

此範例中,KeyUp事件的callbacks會在UI執行緒中抵達,Throttle函式的預設重載會將OnNext訊息放在ThreadPool上(使用ThreadPool的timer來節流),靠著傳入Scheduler.Dispatcher實體給Throttle函式,所有從可觀察序列的訊息會從UI執行緒發出。

什麼狀況下可忽略此指南

當結合來自不同執行環境的多個事件,使用指南5.5將所有的訊息放在特定的執行環境下且越晚越好。

5.5 儘可能越晚且越少呼叫ObserverOn函式

使用ObserverOn函式時,每個來自原始可觀察序列的訊息都會排程一個action。這可能會改變時間資訊且對系統增加額外的壓力,將此函式放在後面會減少前述疑慮。

Sample

    var result = 
        (from x in xs.Throttle(TimeSpan.FromSeconds(1)) 
            from y in ys.TakeUntil(zs).Sample(TimeSpan.FromMilliseconds(250)) 
            select x + y) 
            .Merge(ws) 
            .Where(x => x.Length < 10) 
            .ObserveOn(Scheduler.Dispatcher); 

此範例結合了執行在不同執行環境下的很多可觀察序列。查詢中過濾了很多訊息,若是將ObserverOn放在較前面會多做很多可被過濾掉的工作,將ObserverOn放在最後面提供了最佳的效能。

什麼狀況下可忽略此指南

如果你使用的可觀察序列並沒有綁定至特定的執行環境中,你可以忽略此指南,此時你應該不使用ObserveOn函式。

5.6 考慮限制buffer大小

Rx提供了好幾個在可觀察序列上建立buffer的函式或類別,如例Replay函式。這些可觀察序列上的buffer,其大小取決於序列的型態。如果沒有限制它,可能會造成記憶體的浪費。因此相關函式都會提供限制,不管是依大小或時間,以減輕記憶體的壓力。

Sample

var result = xs.Replay(10000, TimeSpan.FromHours(1)); 

這個範例中,Replay函式建立緩衝區,並限制最多會有10000筆訊息,且最久保持1個小時。

什麼狀況下可忽略此指南

當可觀察序列建立的緩衝區很小或有限制時。

5.7 明確的使用Do函式來使用邊際效應

因為很多Rx函式使用委託當做參數,可傳入任何有效的程式,這些程式碼可能變更全域的狀態(例如:變更全域變數、寫入磁碟)。

Rx的組合會在每個訂閱的每一個函式中執行(with exception of the sharing operators, such as Publish),這會使得邊際效應發生在每個訂閱中。

如果這些行為是需要的,那我們最好明確的使用Do函式來執行會造成邊際效應影響程式碼。

Sample

var result = xs.Where(x=>x.Failed).Do(x=>Log(x)).Subscribe(...);

此範例中,我們過濾出失敗的訊息,然後在送出至已訂閱的實體前先執行Log,這logging是一個邊際效應(例如將訊息傳至電腦的event log),且明確的由Do函式執行。

什麼狀況下可忽略此指南

當邊際效應需要的來自函式的資料在Do函式中不可見時可忽略。

5.8 僅使用Synchronize函式來”修正”自訂的IObservable實作

遵循Rx原則的約定(章節4.1)及序列化(章節4.2)所建立的可觀察序列,不需要使用Synchronize,僅將其用在來自它處且沒遵守Rx同步合約的可觀察序列(章節4.2)。

Sample

    var result = from x in xs.Synchronize() 
        from y in ys 
        where x > y 
        select y; 

此範例中,只有建立自其它來源且沒遵守同步化合約的可觀察序列被同步化,其它的就不需要了。

什麼狀況下可忽略此指南

目前無已知應忽略此指南的狀況。

5.9 假定訊息會持續到來,直到完成取消訂閱

因為Rx的push模型,訊息可能來自不同的執行環境,正在取消訂閱時訊息也可能在半路上,而完成取消程序後,不會再有訊息抵達,但此時原先的程序也可能在其它的環境中執行到一半。

什麼狀況下可忽略此指南

一旦接收到OnCompletedOnError函式,依據Rx原則會保證此訂閱已經結束。

5.10 使用Publish函式共享邊際效應

因為很多可觀察序列是cold的(參考 cold vs. hot on Channel 9),每個獨立的訂閱會有各自的邊際效應集合,某些狀況下會要求這些邊際效應只應發生一次,Publish函式靠著將單一訂閱廣播至多個訂閱者的機制來共享訂閱。

Publisher函式提供了數個覆載,最方便的一個覆載提供了個包裝了分享邊際效應的可觀察序列參數的函式。

Sample

    var xs = Observable.Create<string>(observer => 
    { 
        Console.WriteLine("Side effect"); 
        observer.OnNext("Hi"); 
        observer.OnCompleted(); 
        return Disposable.Empty; 
    });

    xs.Publish(sharedXs =>
    { 
        sharedXs.Subscribe(Console.WriteLine); 
        sharedXs.Subscribe(Console.WriteLine); 
        return sharedXs; 
    }).Run(); 

此範例中,xs是一個有邊際效應(寫至console)的可觀察序列,正常來說每個獨立的訂閱會觸發這些邊際效應,而Publish函式使用一個對xs的訂閱供給所有的訂閱者給sharedXs。

什麼狀況下可忽略此指南

僅在需要共享邊際效應時使用Publish函式,大多情況下你可以安全的建立各自的訂閱,不管是此訂閱沒有邊際效應問題或者是邊際效應會被多次執行。

6. 函式實作

6.1 使用現有運算子合成新運算子

很多運算子可由現有運算子組合而成,因此可得到較小且易於維護的程式碼。Rx團隊致力讓基礎的運算子可用於各種情境下,重覆使用這些運算子讓你的工作再輕鬆不過。

Sample

    public static IObservable<TResult> SelectMany<TSource, TResult>( 
        this IObservable<TSource> source,  
        Func<TSource, IObservable<TResult>> selector) 
    { 
        return source.Select(selector).Merge(); 
    } 

此範例中,SelectMany運算子使用了兩個現有的運算子組合:SelectMergeSelect運算子處理了任何選擇器函式會丟出的例外問題,Merge函式處理了多個可觀察序列同時發出訊息的同步問題。

什麼狀況下可忽略此指南

  • 沒有現成的運算子可使用
  • 經由效能分析得知現有的運算子無法擁有足夠的效能

6.2 使用Observable.Create(WithDisposable)建立自訂運算子

當無法遵循指南6.1時,使用Observable.Create(WithDisposable)函式來建立可觀察序列,因為它提供了在遵循Rx原則下建立的可觀察序列。

  • 當可觀察序列結束(OnErrorOnCompleted被觸發),任何已訂閱的都會被取消
  • 任何已訂閱的觀察者實體僅會看到一個OnErrorOnCompleted訊息,不會再有訊息抵達,這確保了Rx的OnNext*(OnCompleted|OnError)原則。

Sample

   public static IObservable<TResult> Select<TSource, TResult>( 
        this IObservable<TSource> source, Func<TSource, TResult> selector) 
    { 
        return Observable.CreateWithDisposable<TResult>( 
            observer => source.Subscribe( 
                x => 
                { 
                    TResult result; 
                    try 
                    { 
                        result = selector(x); 
                    } 
                    catch (Exception exception) 
                    { 
                        observer.OnError(exception); 
                        return; 
                    } 
                    observer.OnNext(result); 
                }, 
                observer.OnError, 
                observer.OnCompleted)); 
    } 

此範例中,Select使用Observable.CreateWithDisposable函式以回傳一個IObservable的介面實體。這保證不管你來原可觀察序列如何實作,輸出的可觀察序列也會遵循Rx協定,也確保了訂閱的生命週期會儘可能的短。

什麼狀況下可忽略此指南

  • 函式不需要回傳一個遵守Rx協定的可觀察序列,通常這應該避免(除非你在協定有問題時寫測試以釐清程式的行為)
  • 回傳的物件不只是要實作IObservable介面(例如:ISubject,或自定型別)

6.3 Implement operators for existing observable sequences as generic extension methods

一個函式可用在不同地方時會顯得特別有用,將它實作成擴充函式,可在現存的可觀察序列的自動完成列表中顯示,若再用成泛型,不管序列中的資料是什麼型態都可以應用。

Sample

    public static IObservable<TResult> Select<TSource, TResult>( 
        this IObservable<TSource> source, Func<TSource, TResult> selector) 
    { 
        return Observable.CreateWithDisposable<TResult>( 
            observer => source.Subscribe( 
                x => 
                { 
                    TResult result; 
                    try 
                    { 
                        result = selector(x); 
                    } 
                    catch (Exception exception) 
                    { 
                        observer.OnError(exception); 
                        return; 
                    } 
                    observer.OnNext(result); 
                }, 
                observer.OnError, 
                observer.OnCompleted)); 
    } 

此範例中,Select的定義為一擴充函式。因此,它可見於任何可觀察序列,且它的工作也可應用在序列中,因此定義成泛型型態。

什麼狀況下可忽略此指南

  • 此運算子不在可觀察序列中應用
  • 此運算子用在特定的無法泛型化的資料型態上

6.4 Protect calls to user code from within an operator

當使用者的程式在函式中被呼叫,它可能會執行在外部的一個execution context上(非同步地),其中出現的任何例外可能會導致程式無預期的中斷,因此它應被委託給已訂閱的觀察者來處理。

這些類型的程式碼應該被保護:
* 傳入函式中的選擇器函式
* Comparers passed into the operator
* 喚起字典、串列或hashsets等使用來自使用者提供的比較器

注意:此指南不考慮呼叫IScheduler的實作,因為它有僅會產生一小部份的事項(only a small set of issues would be caught)而大多數的排程器都會處理非同步的呼叫,也就是說對每一個排程器的實作它會確保傳入參數的使用。

Sample

    public static IObservable<TResult> Select<TSource, TResult>( 
        this IObservable<TSource> source, Func<TSource, TResult> selector) 
    { 
        return Observable.CreateWithDisposable<TResult>( 
            observer => source.Subscribe( 
                x => 
                { 
                    TResult result; 
                    try 
                    { 
                        result = selector(x); 
                    } 
                    catch (Exception exception) 
                    { 
                        observer.OnError(exception); 
                        return; 
                    } 
                    observer.OnNext(result); 
                }, 
                observer.OnError, 
                observer.OnCompleted)); 
    } 

此範例中會喚起一個由使用者提供的選擇器函式,過程中會補捉任何產生的例外,並通過對OnError函式的呼叫將例外轉換為對使用者的通知。

什麼狀況下可忽略此指南

當使用者程式已在建立可觀察序列前建立(在Observable.Create函式呼叫外面)可呼略此指南,這些呼叫會在目前的執行環境下且允許正常的控制流程。

注意:不要在呼叫Subscribe、Dispose、OnNext、OnError及OnCompleted函式時加上保護,在這些地呼叫OnError函式可能會導致未預期的行為。

6.5 Subscribe implementations should not throw

當數個可觀察序列被合成時,當使用者呼叫Subscribe時,想訂閱特定的某個序列時是不太可能的,(例:在Concat函式中想對第二個參數訂閱,僅能在第一個完成後),此時丟出例外會導致程式中斷。應通過OnError而不是例外來處理。

Sample

    public IObservable<byte[]> ReadSocket(Socket socket) 
    { 
        return Observable.CreateWithDisposable<byte[]>(observer => 
        { 
            if (!socket.Connected) 
            { 
                observer.OnError(new InvalidOperationException( 
                    "the socket is no longer connected")); 
                return Disposable.Empty; 
            } 
            ... 
        }); 
    } 

此範例中,一個錯誤的條件在訂閱函式中被偵測到,此時靠著OnError函式發出錯誤,而不是使用例外。這讓我們能妥當的處理例外,如果使用者一開始呼叫訂閱時,此訂閱是在外部執行環境中被喚起。(This allows for proper handling of the
exception if Subscribe is called outside of the execution context of the original call to Subscribe by the
user. )

什麼狀況下可忽略此指南

當發生災難性的錯誤時,應該要關掉整個程式時。

6.6 OnError訊息應帶有放棄的語意

如同一般在.NET中的控制流程,我們使用例外來表示放棄(the stack is unwound,目前執行路徑中斷),Rx也有這個行為。為了確保這個行為,在函式中當來源發生錯誤或例外被丟出,不應該用任何函式丟出訊息。( To ensure this behavior, no messages should be sent out
by an operator once one of it sources has an error message or an exception is thrown within the
operator. )

Sample

 public static IObservable<byte[]> MinimumBuffer( 
        this IObservable<byte[]> source, int bufferSize) 
{ 
    return Observable.CreateWithDisposable<byte[]>( 
    observer => 
    { 
        var data = new List<byte>(); 

        return source.Subscribe(value => 
        {                            
            data.AddRange(value); 

            if (data.Count > bufferSize) 
            { 
                observer.OnNext(data.ToArray()); 
                 data.Clear(); 
            } 
        }, 
        observer.OnError, 
        () => 
        { 
            if (data.Count > 0) 
                observer.OnNext(data.ToArray()); 

            observer.OnCompleted(); 
        }); 
    }); 
} 

此範例中,緩衝函式在訂閱來源發生錯誤時會儘快的放棄可觀察序列,當前的緩衝區資料不會送出至任何訂閱者手上,以確保放棄的語意狀態。

什麼狀況下可忽略此指南

目前無已知應忽略此指南的狀況。

6.7 在可觀察序列中循序呼叫IObserver函式

Rx是一組可合成的API,很多函式可一起使用。如果所有的函式都需要處理併發,那會導致各別的函式變得非常複雜,因此,最好在第一個有併發性需求的地方處理。最後,如果每個使用到Rx的地方都需要處理併發的話,那Rx API會更難使用。

Sample

 public static IObservable<TResult> ZipEx<TLeft, TRight, TResult>( 
        this IObservable<TLeft> left,  
        IObservable<TRight> right,  
        Func<TLeft, TRight, TResult> resultSelector) 
    { 
        return Observable.CreateWithDisposable<TResult>(observer => 
        { 
            var group = new CompositeDisposable(); 
            var gate = new object(); 

            var leftQ = new Queue<TLeft>(); 
            var rightQ = new Queue<TRight>(); 

            group.Add(left.Subscribe( 
                value => 
                {                         
                    lock(gate) 
                    { 
                        if (rightQ.Count > 0) 
                         { 
                            var rightValue = rightQ.Dequeue(); 
                            var result = default(TResult); 
                            try 
                            { 
                                result = resultSelector(value, rightValue); 
                            } 
                            catch(Exception e) 
                            { 
                                observer.OnError(e); 
                                return; 
                            } 
                            observer.OnNext(result); 
                        } 
                        else 
                        { 
                            leftQ.Enqueue(value); 
                        } 
                    } 
                }, 
                observer.OnError, 
                observer.OnCompleted)); 

            group.Add(right.Subscribe( 
                value => 
                { 
                    lock (gate) 
                    { 
                        if (leftQ.Count > 0) 
                        { 
                            var leftValue = leftQ.Dequeue(); 
                            var result = default(TResult); 
                            try 
                            { 
                                result = resultSelector(leftValue, value); 
                            } 
                            catch (Exception e) 
                            { 
                                observer.OnError(e); 
                                return; 
                            } 
                            observer.OnNext(result); 
                        } 
                        else 
                        { 
                            rightQ.Enqueue(value); 
                        } 
                    } 
                }, 
                observer.OnError,
                 observer.OnCompleted)); 

            return group;                                                                 
        }); 
    } 

這個範例中,兩個序列被zipped在一起,由於從左邊或右邊來的訊息可能同時發生,函式必須確定訊息被排序,因此它使用一個鎖來確認其內部狀態不會錯誤。

什麼狀況下可忽略此指南

  • 函式僅處理單一可觀察序列
  • 函式不會導致併發性
  • 有其餘的限制保證不會有併發性

NOTE:如果一來源可觀察序列不遵守Rx合約,開發者可在將此可觀察序列傳入至函式前呼叫Synchronize函式來修正它。

6.8 Avoid serializing operators

當所有Rx的函式都遵循指南6.7,函式可以安全的假設它們的輸入都是序列式地。使用太多的同步化會導致程式碼的混亂及影響到效能。

如果一個可觀察序列沒有遵守Rx合約,則由開發者決定撰寫終端用戶應用程式,並在最開始取得可觀察序列的地方以Synchronize函式來解決問題。這種方式中附加的同步範圍會限制在需要的地方。

Sample

public static IObservable<TResult> Select<TSource, TResult>( 
        this IObservable<TSource> source, Func<TSource, TResult> selector) 
{ 
    return Observable.CreateWithDisposable<TResult>( 
        observer => source.Subscribe( 
            x => 
            { 
                TResult result; 
                try 
                { 
                    result = selector(x); 
                } 
                catch (Exception exception) 
                { 
                    observer.OnError(exception); 
                    return; 
                } 
                observer.OnNext(result); 
            }, 
            observer.OnError, 
            observer.OnCompleted)); 
} 

此範例中,Select假定來源可觀察序列遵循序列化指南6.7且不需要額外的鎖定。

什麼狀況下可忽略此指南

目前無已知應忽略此指南的狀況。

6.9 提供一個scheduler參數來參數化併發性

由於存在許多的併發性概念,但沒有一體適用的解法,因此最好是在將函式引入的併發性參數化。在Rx中可透過抽象化的IScheduler介面來引入參數化的併發性。

Sample

public static IObservable<TValue> Return<TValue>(TValue value, 
        IScheduler scheduler) 
{ 
    return Observable.CreateWithDisposable<TValue>( 
        observer =>  
            scheduler.Schedule(() => 
            { 
                observer.OnNext(value); 
                observer.OnCompleted(); 
            })); 
} 

此範例中,Return函式依靠提供scheduler參數來參數化函式本身擁有的併發性,並使用scheduler來排程OnNextOnCompleted的觸發。

什麼狀況下可忽略此指南

  • 函式並不控制併發性的產生時(例:在一個將事件轉為可觀察序列的函式中,來源事件控制訊息的發送,而不是函式本身)。
  • 函式控制,但需要特定的scheduler以引進併發性。

6.10 提供預設scheduler

大多數情況下對函式來說都有可選擇的可引入參數化併發性的預設scheduler存在。這會讓使用此函式的程式碼更簡潔。

Note:依據指南6.12,當你選擇使用預設scheduler時,儘可能的使用immediate scheduler,且僅在需要更多的併發性控制時使用。

Sample

public static IObservable<TValue> Return<TValue>(TValue value)
{
    return Return(value, Scheduler.Immediate);
}

此範例中,我們提供了一個Return函式的不需scheduler的覆載,實作中使用其它的覆載,並且使用immediate scheduler。

什麼狀況下可忽略此指南

在沒有好的預設排程器時可忽略此指南。

6.11 排程器應是函式的最後一個參數

讓排程器處於函式的最後一個參數會讓自動提示更會流暢,指南6.10確保覆載會帶有預設排程器,不管是增加或拿掉排程器都會更簡單。

Sample

public static IObservable<TValue> Return<TValue>(TValue value) 
{ 
    return Return(value, Scheduler.Immediate); 
} 

public static IObservable<TValue> Return<TValue>(TValue value, 
IScheduler scheduler) 
{ 
    return Observable.CreateWithDisposable<TValue>( 
        observer =>  
            scheduler.Schedule(() => 
            { 
                observer.OnNext(value); 
                observer.OnCompleted(); 
            })); 
} 

此範例中,Return函式有兩個覆載,一個使用預設排程器而不需代入,一個需要帶入一排程器。當排程器處於參數的最後一個,增加或刪除此參數在自動提示中可以很明顯的看到,而不用去調整參數的順序。

什麼狀況下可忽略此指南

C#和VB支援params語法,使用時,params要處於參數的最後面,此時排程器參數要在其前面。

6.12 避免引入併發性

加入併發性時,我們改變了可觀察序列的時間軸,訊息會被排程以在稍後抵達。傳遞訊息所需的時間是數據本身,籍由增加併發性我們可能讓資料發生錯誤。

此指南所指的包含了不可將控制轉移至其它的context中,如UI的context。

Sample

public static IObservable<TResult> Select<TSource, TResult>( 
    this IObservable<TSource> source, Func<TSource, TResult> selector) 
{ 
    return Observable.CreateWithDisposable<TResult>( 
        observer => source.Subscribe( 
            x => 
            { 
                TResult result; 
                try 
                { 
                    result = selector(x); 
                } 
                catch (Exception exception) 
                { 
                    observer.OnError(exception); 
                    return; 
                } 
                observer.OnNext(result); 
            }, 
            observer.OnError, 
            observer.OnCompleted)); 
} 

此範例中,select函式不使用排程器送出以送出OnNext訊息,反之以來源序列呼叫OnNext來動作,因此保持了相同的time-window。

Sample 2

public static IObservable<TValue> Return<TValue>(TValue value) 
{ 
    return Return(value, Scheduler.Immediate); 
} 

此範例中,Return函式的預設排程器是immediator,此排程器不會引入併發性問題。

什麼狀況下可忽略此指南

如果函式本身引入的併發性是必須的,可忽略此指南。

Note:當我們使用Immediate排程器或在Subscribe函式中直接呼叫observer時,會使得Subscribe呼叫變為阻塞式的,此狀況下任何耗時的處理可能會變成引入併發性的可能。

6.13 交出所有建立在函式中的disposables實體給消費者

Disposable實體控制了訂閱的生命週期和已排程動作的取消操作,Rx提供使用者使用disposable實體來取消已訂閱的可觀察序列。

當一個訂閱結束,不再允許更多的訊息產生,此時,若在可觀察序列中留下任何仍生存著的狀態(變數等)是沒有效率的且可能產生未預期的語義。

為了輔助對多個disposable實體的合成,Rx提供了一組實作IDisposable的類別,其命名空間位於System.Disposable,例如:

Name Description
CompositeDisposable Composes and disposes a group of disposable instances together.
MutableDisposable A place holder for changing instances of disposable instances. Once new disposable instance is placed, the old disposable instance is disposed.
BooleanDisposable Mantains state on whether disposing has occurred.
CancellationDisposable Wraps the CancellationToken patterns into the disposable pattern.
ContextDisposable Disposes an underlying disposable instance in the specified (SynchronizationContext* instance.
ScheduledDisposable Uses a scheduler to dispose an underlying disposable instance.

Sample

    public static IObservable<TResult> ZipEx<TLeft, TRight, TResult>( 
        this IObservable<TLeft> left,  
        IObservable<TRight> right,  
        Func<TLeft, TRight, TResult> resultSelector) 
    { 
        return Observable.CreateWithDisposable<TResult>(observer => 
        { 
            var group = new CompositeDisposable(); 
            var gate = new object(); 

            var leftQ = new Queue<TLeft>(); 
            var rightQ = new Queue<TRight>(); 

            group.Add(left.Subscribe( 
                value => 
                {                         
                    lock(gate) 
                    { 
                        if (rightQ.Count > 0) 
                        { 
                            var rightValue = rightQ.Dequeue(); 
                            var result = default(TResult); 
                            try 
                            { 
                                result = resultSelector(value, rightValue); 
                            } 
                            catch(Exception e) 
                            { 
                                observer.OnError(e); 
                                return; 
                            } 
                            observer.OnNext(result); 
                        } 
                        else 
                        { 
                            leftQ.Enqueue(value); 
                        } 
                    } 
                }, 
                observer.OnError, 
                observer.OnCompleted)); 

            group.Add(right.Subscribe( 
                value => 
                { 
                    lock (gate) 
                    { 
                        if (leftQ.Count > 0) 
                        { 
                            var leftValue = leftQ.Dequeue(); 
                            var result = default(TResult); 
                            try 
                            { 
                                result = resultSelector(leftValue, value); 
  } 
                            catch (Exception e) 
                            { 
                                observer.OnError(e); 
                                return; 
                            } 
                            observer.OnNext(result); 
                        } 
                        else 
                        { 
                            rightQ.Enqueue(value); 
                        } 
                    } 
                }, 
                observer.OnError, 
                observer.OnCompleted)); 

            return group; 

        }); 
    }

此範例中,函式中將所有不同的訂閱的disposable實體群組起來,並回傳給外界的可觀察序列。當函式的使用者訂閱了此可觀察序列,他/她將會取得其內部所有的disposable實體。

什麼狀況下可忽略此指南

目前無已知應忽略此指南的狀況。

6.14 函式不應該是阻塞式的

Rx是一個使用可觀察序列來合成非同步及事件為基楚的程式的函式庫。

若函式為阻塞式的,我們會失去這些非同步的及合成的特性。(例如:回傳一個型別為T的值而不是IObservable<T>)

Sample

    public static IObservable<int> Sum(this IObservable<int> source) 
    { 
        return source.Aggregate(0, (prev, curr) => checked(prev + curr)); 
    } 

此範例中,Sum函式回傳了一個IObservable<int>而不是int的值,這種方式讓函式不會阻塞,也讓結果值可在未來被組合。

如果使用這些函式的開發者想離開可觀察式的世界,他/她可以使用已提供的First*Last*Single*等函式來達成。

什麼狀況下可忽略此指南

目前無已知應忽略此指南的狀況。

6.15 Avoid deep stacks caused by recursion in operators

Rx函式中的程式可能在不同的情境或execution context中被呼叫,幾 乎不可能確認被呼叫時的堆疊深度。如果函式本身已經有一定深度的堆疊(例如因為遞迴的關係),函式可能會比預期的更快導致stack overflow。

有兩個建議的方法可避免這個問題:
* 使用IScheduler介面中的遞迴Schedule擴充函式
* 使用yield iterator pattern實作一個無窮迴圈的`IEnumerable

Sample 1

    public static IObservable<TSource> Repeat<TSource>( 
        TSource value, IScheduler scheduler) 
    { 
        return Observable.CreateWithDisposable<TSource>( 
            observer =>  
                    scheduler.Schedule(self => 
                    { 
                        observer.OnNext(value); 
                        self(); 
                    }));                                                                                                         
    } 

此範例中,遞迴的Schedule擴充函式用來讓排程器排程下一個遞迴函式的走訪,Schedulers such as the current thread scheduler do not rely on
stack semantics,使用這種型式的排程器可避免堆疊溢出的問題。

Sample 2

    public static IObservable<TSource> Repeat<TSource>(TSource value) 
    { 
        return RepeatHelper(value).Concat(); 
    } 

    private static IEnumerable<IObservable<TSource>> 
         RepeatHelper<TSource>(TSource value) 
    { 
        while(true) 
            yield return Observable.Return(value); 
    } 

yield迭代模式確保堆疊深度不會急速的增加。而靠著回傳一無窮的`IEnumerable

什麼狀況下可忽略此指南

目前無已知應忽略此指南的狀況。

6.16 參數驗證應放在Observable.Create外面

如同指南6.5指出的,Observable.Create函式不應丟出例外,任何可能導致丟出例外的參數應該放在Observable.Create函式外面。

Sample

    public static IObservable<TResult> Select<TSource, TResult>( 
        this IObservable<TSource> source, Func<TSource, TResult> selector) 
    { 
        if (source == null) 
            throw new ArgumentNullException("source"); 
        if (selector == null) 
            throw new ArgumentNullException("selector"); 

        return new Observable.Create<TResult>(observer => source.Subscribe( 
                    x => 
                    { 
                        TResult result; 
                        try 
                        { 
                            result = selector(x); 
                        } 
                        catch (Exception exception) 
                        { 
                            observer.OnError(exception); 
                            return; 
                        } 
                        observer.OnNext(result); 
                    }, 
                    observer.OnError, 
                    observer.OnCompleted)); 
    } 

此範例中,參數在Observable.Create函式被呼叫前檢查是否為空。

什麼狀況下可忽略此指南

在某些參數只能等到被訂閱後才檢查的狀況時可乎略此指南。

6.17 Unsubscription should be idempotent

可觀察Subscribe函式會回傳一IDisposable實體,可用以取消訂閱。從IDisposable實體無法得知關於訂閱的任何狀態的資訊,由於消費者無從得知訂閱的狀態,因此多次呼叫Dispose函式應該是允許的。且應該只有在第一次呼叫時才執行清除訂閱的動作。

Sample

    var subscription = xs.Subscribe(Console.WriteLine); 
    subscription.Dispose(); 
    subscription.Dispose(); 

此範例中,訂閱被清除了兩次,第一次時訂閱被清除,而第二次不會做任何事。

什麼狀況下可忽略此指南

目前無已知應忽略此指南的狀況。

6.18 取消訂閱不應丟出例外

因為Rx的合成功能讓訂閱可被鏈接,所以取消訂閱也是。因此,任何函式都可隨時呼叫取消訂閱的動作。所以,丟出例外會導致程式未預期的失敗。當觀察者實體已經被取消訂閱時,它也無法被用來接收例外,因此,取消訂閱時不應產生例外。

什麼狀況下可忽略此指南

目前無已知應忽略此指南的狀況。

6.19 自訂的IObservable實作應遵循Rx原則

當無法遵循指南6.2時,自訂的IObservable介面實作仍應遵循Rx合約以從Rx函式裡得到正確的行為。

什麼狀況下可忽略此指南

Only ignore this guideline when writing observable sequences that need to break the contract on
purpose (e.g. for testing).

6.20 Operator implementations should follow guidelines for Rx usage

Rx是一組可合成的API函式,其實作通常使用其它函式。當實作這些函式時應堅決的遵守前面描述的Rx使用指南。

什麼狀況下可忽略此指南

As described in the introduction, only follow a guideline if it makes sense in that specific situation.


Written with StackEdit.


  1. Context,或稱情境、上下文

3.16.2017

Stateless狀態機函式庫和Rx的應用

狀態機不只適用在自動控制,程序式的動作也是很有用!

在之前介紹了個小型框架後,我覺得,在一個多核系統中,如果想要有效的利用此架構,不管是在每個processor中使用command queue,或是使用parallel command,狀態機絕對是不可或缺的。

正好在看狀態機時看到一個使用Stateless加上Rx的應用範例,如下圖:

(此圖來自原專案網頁)

詳細說明可參閱原文,這裡僅簡單介紹Rx的部份。

Rfq工作流程

基本上,這是一個RFQ(詢價)工作流程,從上圖可以看到,狀態機提供的功能被包在一個介面中 – IRfq:

public interface IRfq : IDisposable
{
    void RequestQuote(IQuoteRequest quoteRequest);
    void Cancel(long rfqId);
    void Execute(IExecutionRequest quote);

    IObservable<RfqUpdate> Updates { get; } 
}

而RFQ狀態機的的實作則由stateless完成,裡面定義了如上圖所需的State和Trigger(Event):

State

public enum RfqState
{
    Input,
    Requesting,
    Cancelling,
    Cancelled,
    Quoted,
    Executing,
    Error,
    Done
}

Event

public enum RfqEvent
{
    UserRequests,
    UserCancels,
    UserExecutes,

    ServerNewQuote,
    ServerQuoteError,
    ServerQuoteStreamComplete,
    ServerSendsExecutionReport,
    ServerExecutionError,
    ServerCancelled,
    ServerCancellationError,

    InternalError,
}

狀態更新

上面圖中可看到一藍色箭頭,由狀態機至View Model端,其負責更新(告知)控制端此狀態的變化過程,靠著IRfq介面定義中的

IObservable<RfqUpdate> Updates { get; }

實作,因此,在各個狀態中藉由Rx的OnNext通知各種狀態的變化:

_rfqUpdateSubject.OnNext(new RfqUpdate(RfqState.Quoted, quote, null));
_rfqUpdateSubject.OnNext(new RfqUpdate(RfqState.Requesting, null, null));
_rfqUpdateSubject.OnNext(new RfqUpdate(RfqState.Cancelling, null, null));
_rfqUpdateSubject.OnNext(new RfqUpdate(RfqState.Executing, null, null));
_rfqUpdateSubject.OnNext(new RfqUpdate(RfqState.Done, null, executionReport));
_rfqUpdateSubject.OnNext(new RfqUpdate(RfqState.Cancelled, null, null));

Data Access

Sample中的Data Access由IRfqService介面實作:

public interface IRfqService
{
    IObservable<IQuote> RequestQuoteStream(IQuoteRequest quoteRequest);
    IObservable<IExecutionReport> Execute(IExecutionRequest executionRequest);
    IObservable<Unit> Cancel(long rfqId);
}

此介面在Rfq類別的建構式中注入:

public Rfq(IRfqService rfqService, IConcurrencyService concurrencyService)
{
    _rfqService = rfqService;
    ...
}

並且在各實作中被訂閱,如下所示在詢價的動作中,要求_rfqService執行RequestQuoteStream,若是超過5秒則逾時,並觸發_rfqEventServerQuoteError詢價失敗事件(基本上事件皆用RfqEvent列舉定義,否則為代有參數的strongly typed triggers (events)),若完成則觸發_rfqEventServerSendsQuote事件。

private void OnEntryRequesting(IQuoteRequest quoteRequest)
{
    _requestSubscription.Disposable = _rfqService.RequestQuoteStream(quoteRequest)
        .Timeout(TimeSpan.FromSeconds(5))
        .ObserveOn(_concurrencyService.Dispatcher)
        .SubscribeOn(_concurrencyService.TaskPool)
        .Subscribe(
        // /!\ we are only allowed to transition the state machine here, no other code! 
        // This applies to all server callbacks
            quote => _stateMachine.Fire(_rfqEventServerSendsQuote, quote), 
            ex => _stateMachine.Fire(_rfqEventServerQuoteError, ex),
            () => _stateMachine.Fire(RfqEvent.ServerQuoteStreamComplete));
}

另外在OnEntryCancelling和OnEntryExecuting也是類似的處理方式。

Unit Test

此範例沒有提供實際的專案,不過倒是有單元測試,所以可以從中概略的看到使用方法,如:

public void HappyPathScenario()
{
    // user request quote
    _sut.RequestQuote(null);
    // server sends quote
    _rfqServiceDouble.RequestQuoteSubject.OnNext(null);
    // user executes
    _sut.Execute(null);
    // server sends execution report
    _rfqServiceDouble.ExecuteSubject.OnNext(null);

    Assert.AreEqual(5, _updates.Count);
    Assert.AreEqual(RfqState.Input, _updates[0].RfqState);
    Assert.AreEqual(RfqState.Requesting, _updates[1].RfqState);
    Assert.AreEqual(RfqState.Quoted, _updates[2].RfqState);
    Assert.AreEqual(RfqState.Executing, _updates[3].RfqState);
    Assert.AreEqual(RfqState.Done, _updates[4].RfqState);
}

果然,寫文章是學習的好方法,在過程中才發現原先理解錯誤的點:),為自己突破盲點,cheers!

Written with StackEdit.