12.31.2016

Rx介紹 Part 3 - Advanced error handling(譯...)

嗯,正在學Rx,學了一輪後實際應用時發現還有不懂的地方,再重讀一次,順便簡單的翻譯下…翻譯不出來的或是覺得不重要的就以”…”符號替換,或顯示原文。
當然,辭不達義的地方也會有,請包含…
話說,本章中的swallow我翻譯成”吃掉或吞掉”…或者有更文雅的名詞?
另Exception我基本上翻成例外。

Advanced error handling

例外總是會發生。例外無所謂好或壞,不管我們如何喚起或是捕捉它們。一些例外是可預測的,且通常都是因為粗心的程式碼導致,例如 DivideByZeroException。而一些例外無法被防禦式編程捕獲,例如I/O例外(FileNotFoundException、TimeoutException)。在這些例子中,我們要小心的處理例外,提供使用者錯誤訊息、記錄錯誤或者是重試都是我們處理例外的方法。

The IObserver interface and Subscribe extension methods provide the ability to cater for sequences that terminate in error, however they leave the sequence terminated. They also do not offer a composable way to cater for different Exception types. A functional approach that enables composition of error handlers, allowing us to remain in the monad, would be more useful. Again, Rx delivers.
IObserver<T>介面及訂閱的擴充函式提供我們處理序列因錯誤而中斷的方法,然而它們會導致序列中斷,也沒有提供一個可組合的方式以處理不同的例外類型。一個函數式的目標會更有用,它讓我們可以組合錯誤處理函式,讓我們保持在monad的狀態。再次的說,Rx提供了這類處理。

Control flow constructs

我們將使用marble圖來研究以不同的方式處理不同的控制流程,與正常的.NET程式一樣,我們有如同 try/catch/finally 的流程控制結構;在本章中,我們會看到它們如何用於可觀察序列中。

Catch

就像SEH(結構式例外處理)一樣,用Rx你可以選擇把例外吞掉、包成另一個例外或執行其它的處理。

我們已經知道可觀察序列可以用OnError架構來處理錯誤狀況。在Rx中,一個稱為 Catch 的擴充函式可以用來處理OnError的訊息推送,它讓你可以攔截一個特定的例外型別並接續其它的序列。

下面是catch覆載的定義:

public static IObservable<TSource> Catch<TSource>(
    this IObservable<TSource> first, 
    IObservable<TSource> second)
{
    ...
}

Swallowing exceptions

With Rx, you can catch and swallow exceptions in a similar way to SEH. It is quite simple; we use the Catch extension method and provide an empty sequence as the second value.
用Rx,你可以像SEH一樣捕捉並吞掉例外,很簡單 – 我們使用 Catch 擴充函式並提供一個空序列當做第二個值。

我們用marble圖呈現一個被吃掉的例外:

S1--1--2--3--X
S2            -|
R --1--2--3----|

S1代表第一個序列,它以一個錯誤(X)結束,S2是一個空的接續序列,R是以S1開始,在S1中斷後接續S2的結果序列。

var source = new Subject<int>();
var result = source.Catch(Observable.Empty<int>());
result.Dump("Catch");
source.OnNext(1);
source.OnNext(2);
source.OnError(new Exception("Fail!"));

輸出:

Catch-->1
Catch-->2
Catch completed

上述範例會捕捉並吃掉所有類型的例外,這跟下列SEH相同:

try
{
    DoSomeWork();
}
catch
{
}

就如同一般在SEH中會做的,在Rx中你也會想指定要吃掉的例外型別。你也許想處理一個特定的型別,正好Catch有一個可讓你指定特定型別的覆載,如同下列範例中,你想捕捉一個TimeoutException:

try
{
    //
}
catch (TimeoutException tx)
{
    //
}

Rx提供一個Catch的覆載以處理這種情況:

public static IObservable<TSource> Catch<TSource, TException>(
    this IObservable<TSource> source, 
    Func<TException, IObservable<TSource>> handler) 
    where TException : Exception
{
    //...
}

以下Rx程式讓你可以捕捉TimeoutException例外。我們提供了一個接受例外並回傳序列的函式,讓你不用提供第二個序列當做參數,且可用工廠方法來建立你的continuation。下列範例中,我們在錯誤序列中加入一個-1的值並結束它。

var source = new Subject<int>();
var result = source.Catch<int, TimeoutException>(tx=>Observable.Return(-1));
result.Dump("Catch");
source.OnNext(1);
source.OnNext(2);
source.OnError(new TimeoutException());

輸出:

Catch-->1
Catch-->2
Catch-->-1
Catch completed

如果序列以無法被轉換為TimeoutException的例外結束,則錯誤不會被捕捉,並且將被推送至訂閱者。

var source = new Subject<int>();
var result = source.Catch<int, TimeoutException>(
    tx => Observable.Return(-1));
result.Dump("Catch");
source.OnNext(1);
source.OnNext(2);
source.OnError(new ArgumentException("Fail!"));

輸出:

Catch-->1
Catch-->2
Catch failed-->Fail!
Finally

類似SEH中的finally語法,Rx提供了在序列結束時執行程式碼的功能(不管它如何結束)。Finally擴充函式接受一個Action作為參數。不管序列是正常或錯誤地結束,或者訂閱被disposed,都會呼叫此Action。

public static IObservable<TSource> Finally<TSource>(
    this IObservable<TSource> source, 
    Action finallyAction)
{
    ...
}

在這個範例中,我們有一個正常完成的序列。我們提供一個Action,並看到它在我們的OnCompleted後被執行。

var source = new Subject<int>();
var result = source.Finally(() => 
    Console.WriteLine("Finally action ran"));
result.Dump("Finally");
source.OnNext(1);
source.OnNext(2);
source.OnNext(3);
source.OnCompleted();

輸出:

Finally-->1
Finally-->2
Finally-->3
Finally completed
Finally action ran

相同地,來源序列可能被一個例外終止。這種情況下,例外會被推送至console,而我們提供的委託會被執行。

或者,我們可以取消我們的訂閱。在下一個範例中,我們看到即使序列未完成,Finally函式也會被呼叫。

var source = new Subject<int>();
var result = source.Finally(() => 
    Console.WriteLine("Finally"));
var subscription = result.Subscribe(
    Console.WriteLine,
    Console.WriteLine,
    () => Console.WriteLine("Completed"));
source.OnNext(1);
source.OnNext(2);
source.OnNext(3);
subscription.Dispose();

輸出:

1
2
3
Finally

注意,在當前的實作中有一個不正常的地方,如果沒有提供OnError處理程序,錯誤將被示為例外並拋出。這將在Finally函式被呼叫之前完成。我們可以通過從上面的示例中刪除OnError處理程序來輕鬆重現這種行為。

var source = new Subject<int>();
var result = source.Finally(() => Console.WriteLine("Finally"));
result.Subscribe(
Console.WriteLine,
//Console.WriteLine,
() => Console.WriteLine("Completed"));
source.OnNext(1);
source.OnNext(2);
source.OnNext(3);
//Brings the app down. Finally action is not called.
source.OnError(new Exception("Fail"));

希望這被標識為一個bug,並在你閱讀下一個版本的Rx時被修復。出於學術興趣,這裡有一個Finally擴充函式的示範,它將按預期工作。(譯者注:此錯誤已被修正)

public static IObservable<T> MyFinally<T>(
    this IObservable<T> source, 
    Action finallyAction)
{
    return Observable.Create<T>(o =>
    {
        var finallyOnce = Disposable.Create(finallyAction);
        var subscription = source.Subscribe(
            o.OnNext,
            ex =>
            {
                try { o.OnError(ex); }
                finally { finallyOnce.Dispose(); }
            },
            () =>
            {
                try { o.OnCompleted(); }
                finally { finallyOnce.Dispose(); }
            });
        return new CompositeDisposable(subscription, finallyOnce);
        });
    }

Using

Using工廠方法允許你將資源的生命週期綁定到可觀察序列的生命週期。函式定義本身需要兩個工廠方法;一個提供資源,一個提供序列。這允許一切都被lazily evaluated。

public static IObservable<TSource> Using<TSource, TResource>(
    Func<TResource> resourceFactory, 
    Func<TResource, IObservable<TSource>> observableFactory) 
    where TResource : IDisposable
{
    ...
}

當你訂閱序列時,Using函式將呼叫這兩個工廠函式。當序列正常結束、被錯誤結束或訂閱被取消時,資源也會被disposed。

為了提供範例,我們要重新介紹第三章講過的TimeIt類別,可以使用這個方便的小類別去計算訂閱的經過時間。下個範例中,我們建立一個使用Using工廠函式的可觀察序列,並提供一個工廠函式給TimeIt資源及一個回傳序列的函式。

var source = Observable.Interval(TimeSpan.FromSeconds(1));
var result = Observable.Using(
    () => new TimeIt("Subscription Timer"),
    timeIt => source);
result.Take(5).Dump("Using");

輸出:

Using-->0
Using-->1
Using-->2
Using-->3
Using-->4
Using completed
Subscription Timer took 00:00:05.0138199

由於Take(5) decorator,序列在5個元素後完成,訂閱因此被取消;與此同時,TimeIt資源會被disposed,因此記錄經過時間的函式也被喚起。

這種機制可以在有想像力的開發者手中找到各種實際應用。資源做為IDisposable型別是很方便的;事實上,它使得許多類型的資源可以被綁定,例如其他訂閱、串流讀取器/寫入器、資料庫連接及使用者控制等,且再與Disposable.Create(Action)合用幾乎可以做到任何事。

OnErrorResumeNext

Just the title of this section will send a shudder down the spines of old VB developers! In Rx, there is an extension method called OnErrorResumeNext that has similar semantics to the VB keywords/statement that share the same name. This extension method allows the continuation of a sequence with another sequence regardless of whether the first sequence completes gracefully or due to an error. Under normal use, the two sequences would merge as below:

S1--0--0--|
S2        --0--|
R --0--0----0--|

即使第一個序列中發生錯誤,結果序列也會被合成:

S1--0--0--X
S2        --0--|
R --0--0----0--|

OnErrorResumeNext的覆載如下:

public static IObservable<TSource> OnErrorResumeNext<TSource>(
    this IObservable<TSource> first, 
    IObservable<TSource> second)
{
    ...
}
public static IObservable<TSource> OnErrorResumeNext<TSource>(
    params IObservable<TSource>[] sources)
{
    ...
}
public static IObservable<TSource> OnErrorResumeNext<TSource>(
    this IEnumerable<IObservable<TSource>> sources)
{
    ...
}

使用上很簡單;你可以用這些覆載傳入你想要的任何數量的序列,但當然要有個數限制,如同在VB中對OnErrorResumeNext這個關鍵字的警示說明,所以在Rx中要謹慎使用它。它悄悄地將例外吃掉,這可能會導致你的軟體處在未知的狀態中。一般來說這會讓你的軟體更難維護及除錯。

Retry

如果你知道你的序列將遇到已知的可能問題時,你可能只想讓動作重試。這樣的例子就如同在執行一個I/O(例如web request或磁碟存取)動作時,你會想再試一次,因I/O常可能會有這樣的間歇性故障發生。Retry擴充函式讓你可以在錯誤發生時以指定的次數重試或重試到成功為止。

//Repeats the source observable sequence until it successfully terminates.
public static IObservable<TSource> Retry<TSource>(
    this IObservable<TSource> source)
{
    ...
}
// Repeats the source observable sequence the specified number of times or until it 
//  successfully terminates.
public static IObservable<TSource> Retry<TSource>(
    this IObservable<TSource> source, int retryCount)
{
    ...
}

下列marble圖中,來源序列(S)產生值然後錯誤,再產生再出錯;總共兩次,結果序列(R)是對來源序列(S)訂閱後所有成功的值的組合。

S --1--2--X
            --1--2--3--X
                         --1
R --1--2------1--2--3------1

下個範例中,我們使用當任何例外發生時永遠重試的覆載。

public static void RetrySample<T>(IObservable<T> source)
{
    source.Retry().Subscribe(t=>Console.WriteLine(t)); //Will always retry
    Console.ReadKey();
}

給予來源[0,1,2,X],輸出為:

0
1
2
0
1
2
0
1
2

This output would continue forever, as we throw away the token from the subscribe method. As a marble diagram it would look like this:

S--0--1--2--x
             --0--1--2--x
                         --0--
R--0--1--2-----0--1--2-----0--

另外,我們可以指定最大重試次數。在這個範例中,我們只重試一次,然後第二個訂閱推送的錯誤會被傳至最後的訂閱中。注意雖然只重試一次但你傳入的值是2,也許這個函式應該被命名為”Try”?

source.Retry(2).Dump("Retry(2)"); 

輸出:

Retry(2)-->0
Retry(2)-->1
Retry(2)-->2
Retry(2)-->0
Retry(2)-->1
Retry(2)-->2
Retry(2) failed-->Test Exception

Marble圖看起來會像:

S--0--1--2--x
             --0--1--2--x
R--0--1--2-----0--1--2--x

在使用永遠重覆的覆載時要小心。很顯然地如果在你的序列中有一個一直存在的錯誤,你會發現自己卡在無窮迴圈中。另外,注意目前沒有一個可以讓你指定特定例外型別的Retry覆載函式。

一個有用的可讓你加入自己的函式庫的擴充函式可能是”Back off and Retry”函式。我的團隊發現這樣的功能在執行I/O,特別是網路請求時很有用。概念是執行,並且在失敗後等待一段時間,然後再執行。你自己的版本可能要考慮需重試的例外類型及可重試的最大次數,你可能甚至想在每次重試時延長等待時間。

例外管理的需求比一般的OnError處理程序還多是司空見慣的。Rx提供了基本的例外處理運算子,你可以使用這些來組合更複雜且強健的查詢。本章中我也介紹了Rx中進階的例外處理,及一些資源管理功能。我們瞭解了Catch、Finally和Using等函式,及其它如OnErrorResumeNext和Retry函式,這些讓你可以玩一下’fast and loose’。我們還重新使用marble圖來幫助我們對多個序列的組合的視覺化。這將幫助我們學習在下一章要看的組合和聚合可觀察序列的其它函式。

Written with StackEdit.

12.30.2016

Rx介紹 Part 3 - Leaving the monad(譯ing...)

嗯,正在學Rx,學了一輪後實際應用時發現還有不懂的地方,再重讀一次,順便簡單的翻譯下…翻譯不出來的或是覺得不重要的就以”…”符號替換,或顯示原文。
當然,辭不達義的地方也會有,請包含…

Leaving the monad

可觀察序列是一個很有用的概念,特別是我們應用LINQ來組合複雜的查詢時。但即使我們認知可觀察序列的優點,有時會需要離開IObservable<T>的應用來使用其它的方式,也許是為了讓你能夠與現有的API(如event和Task<T>)合作,也許是你發現會更好測試,或更簡單,於是在你學習Rx時你需要在可觀察範式和原先熟悉的範式切換使用。

What is a monad

我們在本書前面有稍微的提及了monad,它其實是一個外來的名詞。我會避免以過度複雜的方式來解釋monad,謹提供足夠的說明以幫助我們瞭解下一個要說明的函式群組。Monad的完整定義是非常抽象的。許多人試著用太空人到愛麗絲夢遊仙境等方式提供它的定義,而另一些monadic的教學則使用Haskell的範例程式,這更增加了它的混亂。對我們來說,monad其實是一個表示計算的程式結構。將此與其它編程架構來比較:

Data structure
- 純綷的狀態,例如List、Tree或Tuple
Contract
- 契約定義或抽象化函式,例如介面或抽像類別
Object-Orientated structure
- 狀態和行為一起

通常一個monadic架構讓你將運算子串接在一起以形成一個pipeline,就像我們在擴充函式中做的一樣。

Monads是一種抽象資料型別建構器,它在領域模型中封裝了程式邏輯而不是資料。

這個簡潔的monad的定義來自於維基百科,它讓我們可將序列當為一個monad;這種狀況下的抽象資料型別就是IObservable<T>。當我們使用可觀察序列時,我們將函式組合進抽象資料型別(IObservable<T>)中以建立查詢,這個查詢本身變成了被封裝的程式邏輯。

使用monad來定義控制流程在處理典型的麻煩的程式領域(如IO、同步和異常)等非常有用,這恰好是Rx的優勢之一!

Why leave the monad?

有很多的原因導致你想在不同的範式間使用可觀察序列。需要公開特定函式的函式庫可能要求以事件或是Task實體來呈現,在示範和範例中你可能偏好使用同步函式來限制非同步部份的程式碼數量,這會讓Rx的學習曲線不那麼陡峭。

在產品程式碼中,很少會建議你’break the monad’,特別是從可觀察序列移到阻塞式的函式。在切換非同步和同步範式時要很謹慎,因為這是例如死鎖和可伸縮性問題的主要的常見原因。

In this chapter, we will look at the methods in Rx which allow you to leave the IObservable monad.

ForEach

ForEach函式提供你一個在資料到達時處理它們的方式。ForEach和Subscribe主要的分別是ForEach會阻塞當前執行緒直到序列完成。

var source = Observable.Interval(TimeSpan.FromSeconds(1))
    .Take(5);
source.ForEach(i => Console.WriteLine("received {0} @   {1}", i, DateTime.Now));
Console.WriteLine("completed @ {0}", DateTime.Now);

輸出:

received 0 @ 01/01/2012 12:00:01 a.m.
received 1 @ 01/01/2012 12:00:02 a.m.
received 2 @ 01/01/2012 12:00:03 a.m.
received 3 @ 01/01/2012 12:00:04 a.m.
received 4 @ 01/01/2012 12:00:05 a.m.
completed @ 01/01/2012 12:00:05 a.m.

注意,如你預期的,完成行是最後一行。更清楚的說,你可以用Subscribe擴充函式得到相似的結果,但Subscribe函式不會阻塞程式,所以如果你用Subscribe替代範例中的ForEach,我們會先看到完成行先顯示。

var source = Observable.Interval(TimeSpan.FromSeconds(1))
    .Take(5);
source.Subscribe(i => Console.WriteLine("received {0} @ {1}", i, DateTime.Now));
Console.WriteLine("completed @ {0}", DateTime.Now);

輸出:

completed @ 01/01/2012 12:00:00 a.m.
received 0 @ 01/01/2012 12:00:01 a.m.
received 1 @ 01/01/2012 12:00:02 a.m.
received 2 @ 01/01/2012 12:00:03 a.m.
received 3 @ 01/01/2012 12:00:04 a.m.
received 4 @ 01/01/2012 12:00:05 a.m.

不像Subscribe擴充函式,ForEach只有一個覆載;需要代入一個Action<T>參數。相較之下,之前(預覽)版本的Rx時,ForEach有和Subscribe大部份相同的覆載,但目前已被棄置,我也認為是正確的決定,因在非同步呼叫中不需要OnCompleted處理程序。你可以在ForEach完成後在處理就可,如同我們上述範例的方式。此外,OnError處理程序現構可以替換為標準的try/catch結構化異常處理區塊,就像你在其它的同步程式中所做的一樣。這也給出了在List<T>型別上的ForEach實例函式的對稱性。

var source = Observable.Throw<int>(new Exception("Fail"));
try
{
    source.ForEach(Console.WriteLine);
}
catch (Exception ex)
{
    Console.WriteLine("error @ {0} with {1}", DateTime.Now, ex.Message);
}
finally
{
    Console.WriteLine("completed @ {0}", DateTime.Now);    
}

輸出:

error @ 01/01/2012 12:00:00 a.m. with Fail
completed @ 01/01/2012 12:00:00 a.m.

ForEach函式,如同其它阻塞式的函式(First或Last等),應該謹慎使用。我謹呈現ForEach函式的測試及範例。在後續介紹concurrency時會和阻塞式呼叫的引入一起討論。

ToEnumerable

另一個切換出IObservable<T>的方式是呼叫ToEnumerable擴充函式。一個簡單的範例:

var period = TimeSpan.FromMilliseconds(200);
var source = Observable.Timer(TimeSpan.Zero, period) 
    .Take(5); 
var result = source.ToEnumerable();
foreach (var value in result) 
{ 
    Console.WriteLine(value); 
} 
Console.WriteLine("done");

輸出:

0
1
2
3
4
done

當你開始列舉序列時(i.e. lazily),來源可觀察序列會被實際訂閱。和ForEach相反的是,使用ToEnumerable函式代表你只有在試著移到下一個元素或不存在時才會被阻塞。此外,如果來源序列的產生比你使用的速度還快,值會被快取起來。

為了處理錯誤,你可以對如同其它可列舉序列處理的一樣,將foreach包在一個try/catch中:

try 
{ 
    foreach (var value in result)
    { 
        Console.WriteLine(value); 
    } 
} 
catch (Exception e) 
{ 
    Console.WriteLine(e.Message);
}

As you are moving from a push to a pull model (non-blocking to blocking), the standard warning applies.
當你從push轉到pull模式(非阻塞式到阻塞式)時,標準警告要被加上。

To a single collection

為了避免在push和pull之間反覆,可以使用以下四種方法之一以在單次通知中回傳整個串列。它們都有相同的語義,但產生不同格式的資料。它們類似於對應的IEnumerable <T>運算子,但回傳值不同,以保留非同步行為。

ToArray and ToList

ToArray和ToList都採用可觀察序列,並將其分別打包進List <T>的陣列或實體中。一旦可觀察序列完成,陣列或串列將作為結果序列的單一值被推送。

var period = TimeSpan.FromMilliseconds(200); 
var source = Observable.Timer(TimeSpan.Zero, period).Take(5); 
var result = source.ToArray(); 
result.Subscribe(arr => 
    { 
        Console.WriteLine("Received array"); 
        foreach (var value in arr) 
        { 
            Console.WriteLine(value); 
        } 
    }, 
    () => Console.WriteLine("Completed")); 
Console.WriteLine("Subscribed"); 

輸出:

Subscribed
Received array
0
1
2
3
4
Completed

因為這些函式仍然回傳可觀察序列,所以我們可以使用我們的OnError處理這些錯誤。注意來源序列被打包成單一的推送;你不是得到整個序列就是得到錯誤。如果來源序列產生值,然後產生錯誤,你僅能得到錯誤。這四個運算子(ToArray, ToList, ToDictionary and ToLookup)都以相同的方式處理。

ToDictionary and ToLookup

做為陣列和串列的替代,Rx也可以用ToDictionary和ToLookup將可觀察序列打包進一個dictionary或lookup中。兩個函式都具有和ToArray及ToList函式相同的語義,他們也回傳僅有單一值的序列及相同的錯誤處理方式。

ToDictionary擴充函式的覆載:

// Creates a dictionary from an observable sequence according to a specified key selector 
// function, a comparer, and an element selector function.
public static IObservable<IDictionary<TKey, TElement>> ToDictionary<TSource, TKey, TElement>(
    this IObservable<TSource> source, 
    Func<TSource, TKey> keySelector, 
    Func<TSource, TElement> elementSelector, 
    IEqualityComparer<TKey> comparer) 
{...} 
// Creates a dictionary from an observable sequence according to a specified key selector 
// function, and an element selector function. 
public static IObservable<IDictionary<TKey, TElement>> ToDictionary<TSource, TKey, TElement>( 
    this IObservable<TSource> source, 
    Func<TSource, TKey> keySelector, 
    Func<TSource, TElement> elementSelector) 
{...} 
// Creates a dictionary from an observable sequence according to a specified key selector 
// function, and a comparer. 
public static IObservable<IDictionary<TKey, TSource>> ToDictionary<TSource, TKey>( 
    this IObservable<TSource> source, 
    Func<TSource, TKey> keySelector,
    IEqualityComparer<TKey> comparer) 
{...} 
// Creates a dictionary from an observable sequence according to a specified key selector 
// function. 
public static IObservable<IDictionary<TKey, TSource>> ToDictionary<TSource, TKey>( 
    this IObservable<TSource> source, 
    Func<TSource, TKey> keySelector) 
{...} 

ToLookup擴充函式的覆載:

// Creates a lookup from an observable sequence according to a specified key selector 
// function, a comparer, and an element selector function. 
public static IObservable<ILookup<TKey, TElement>> ToLookup<TSource, TKey, TElement>( 
    this IObservable<TSource> source, 
    Func<TSource, TKey> keySelector, 
    Func<TSource, TElement> elementSelector,
    IEqualityComparer<TKey> comparer) 
{...} 
// Creates a lookup from an observable sequence according to a specified key selector 
// function, and a comparer. 
public static IObservable<ILookup<TKey, TSource>> ToLookup<TSource, TKey>(
    this IObservable<TSource> source, 
    Func<TSource, TKey> keySelector, 
    IEqualityComparer<TKey> comparer) 
{...} 
// Creates a lookup from an observable sequence according to a specified key selector 
// function, and an element selector function. 
public static IObservable<ILookup<TKey, TElement>> ToLookup<TSource, TKey, TElement>( 
    this IObservable<TSource> source, 
    Func<TSource, TKey> keySelector, 
    Func<TSource, TElement> elementSelector)
{...} 
// Creates a lookup from an observable sequence according to a specified key selector 
// function. 
public static IObservable<ILookup<TKey, TSource>> ToLookup<TSource, TKey>( 
    this IObservable<TSource> source, 
    Func<TSource,
    TKey> keySelector) 
{...} 

ToDictionary和ToLookup都需要一個函式可以套用在每個元素中來獲取它的鍵值。此外,ToDictionary方法的覆載確認所有鍵應該是唯一的。如果找到重複的鍵值,它使用DuplicateKeyException終止序列。另一方面,ILookup <TKey,TElement>被設計為具有由鍵值分組的多個值。如果每個鍵值有多個值,則ToLookup可能是更好的選項。

ToTask

我們已經將AsyncSubject <T>Task <T>進行了比較,甚至展示了如何從一個Task轉換成一個可觀察序列。ToTask擴充函式將允許你將可觀察序列轉換為Task<T>。像AsyncSubject <T>,此函式將忽略多個值,只返回最後一個值。

// Returns a task that contains the last value of the observable sequence. 
public static Task<TResult> ToTask<TResult>(
    this IObservable<TResult> observable) 
{...} 
// Returns a task that contains the last value of the observable sequence, with state to 
//  use as the underlying task's AsyncState. 
public static Task<TResult> ToTask<TResult>(
    this IObservable<TResult> observable,
    object state) 
{...} 
// Returns a task that contains the last value of the observable sequence. Requires a 
//  cancellation token that can be used to cancel the task, causing unsubscription from 
//  the observable sequence. 
public static Task<TResult> ToTask<TResult>(
    this IObservable<TResult> observable, 
    CancellationToken cancellationToken) 
{...} 
// Returns a task that contains the last value of the observable sequence, with state to 
//  use as the underlying task's AsyncState. Requires a cancellation token that can be used
//  to cancel the task, causing unsubscription from the observable sequence. 
public static Task<TResult> ToTask<TResult>(
    this IObservable<TResult> observable, 
    CancellationToken cancellationToken, 
    object state) 
{...}

這是一個簡單的範例,展示ToTask如何被使用。注意,ToTask屬於System.Reactive.Threading.Tasks的namespace。

var source = Observable.Interval(TimeSpan.FromSeconds(1)) 
    .Take(5);
var result = source.ToTask(); //Will arrive in 5 seconds. 
Console.WriteLine(result.Result);

輸出:

4

如果來源序列要顯示錯誤,task會延續它本來的錯誤處理語義。

var source = Observable.Throw<long>(new Exception("Fail!")); 
var result = source.ToTask(); 
try 
{ 
    Console.WriteLine(result.Result);
} 
catch (AggregateException e) 
{ 
    Console.WriteLine(e.InnerException.Message); 
}

輸出:

Fail!

一旦你有了task,理所當然的你可以使用所有TPL的功能,例如continuations等。

ToEvent<T>

就如同你可以用FromEventPattern來將事件轉為可觀察序列的來源,你也可以用ToEvent擴充函式讓你的可觀察序列看起來就像標準的.Net事件一樣。

// Exposes an observable sequence as an object with a .NET event. 
public static IEventSource<unit> ToEvent(this       IObservable<Unit> source)
{...} 
// Exposes an observable sequence as an object with a .NET event. 
public static IEventSource<TSource> ToEvent<TSource>(
    this IObservable<TSource> source) 
{...} 
// Exposes an observable sequence as an object with a .NET event. 
public static IEventPatternSource<TEventArgs> ToEventPattern<TEventArgs>(
    this IObservable<EventPattern<TEventArgs>> source) 
    where TEventArgs : EventArgs 
{...} 

ToEvent函式回傳一個IEventSource<T>型別,它有一個事件成員:OnNext。

public interface IEventSource<T> 
{ 
    event Action<T> OnNext; 
} 

當我們將可觀察序列用ToEvent函式轉換成事件,我們可以提供一個Action<T>來訂閱它,這邊我們用lambda表達式:

var source = Observable.Interval(TimeSpan.FromSeconds(1))
    .Take(5); 
var result = source.ToEvent(); 
result.OnNext += val => Console.WriteLine(val);

輸出:

0
1
2
3
4

ToEventPattern

注意這和標準的事件模式並不一樣,正常來說,當你訂閱了事件,你需要處理sender和EventArgs參數。而上面的範例,我們僅僅取值。如果你想讓你的序列轉成標準的事件模式,你要用ToEventPattern。

ToEventPattern將接受一個IObservable <EventPattern <TEventArgs >>並將其轉換為IEventPatternSource <TEventArgs>。這些型別的公開介面非常簡單。

public class EventPattern<TEventArgs> :     IEquatable<EventPattern<TEventArgs>>
    where TEventArgs : EventArgs 
{ 
    public EventPattern(object sender, TEventArgs e)
    { 
        this.Sender = sender; 
        this.EventArgs = e; 
    } 
    public object Sender { get; private set; } 
    public TEventArgs EventArgs { get; private set; } 
    //...equality overloads
} 
public interface IEventPatternSource<TEventArgs> where TEventArgs : EventArgs
{ 
    event EventHandler<TEventArgs> OnNext; 
} 

這些看起來很容易應用。因此,如果我們建立一個EventArgs型別,然後用Select來做一些簡單的轉換,我們可以讓一個標準的序列適用於此模式。

EventArgs 型別:

public class MyEventArgs : EventArgs 
{ 
    private readonly long _value; 
    public MyEventArgs(long value) 
    { 
        _value = value; 
    } 
    public long Value 
    { 
        get { return _value; } 
    } 
} 

轉換:

var source = Observable.Interval(TimeSpan.FromSeconds(1))
    .Select(i => new EventPattern<MyEventArgs>(this, new MyEventArgs(i)));

Now that we have a sequence that is compatible, we can use the ToEventPattern, and in turn, a standard event handler.
現在我們有一個相容的序列,我們可以使用ToEventPattern,且為標準的事件處理函式。

var result = source.ToEventPattern(); 
result.OnNext += (sender, eventArgs) => Console.WriteLine(eventArgs.Value);

現在我們知道如何轉成.NET的事件了,讓我們暫停一下,且記住為什麼Rx在這裡會是比較好的模式。

  • 在C#中,事件有一個很奇怪的介面,一些人覺得+=跟-=是很不直覺的註測回呼函式的方法。
  • 事件很難被組合
  • 事件並沒有提供在時間上可被查詢的方法
  • 事件常常導致記憶體leak
  • 事件並沒有一個標準的通知完成的模式
  • 事件在concurrency或多執行緒應用上幾乎幫不到什麼忙,例如,在另一個執行緒上喚起一個事件需要你做一堆事。

The set of methods we have looked at in this chapter complete the circle started in the Creating a Sequence chapter. We now have the means to enter and leave the observable sequence monad. Take care when opting in and out of the IObservable monad. Doing so excessively can quickly make a mess of your code base, and may indicate a design flaw.

Written with StackEdit.

12.29.2016

Rx介紹 Part 3 - Side efforts(譯ing...)

嗯,正在學Rx,學了一輪後實際應用時發現還有不懂的地方,再重讀一次,順便簡單的翻譯下…翻譯不出來的或是覺得不重要的就以”…”符號替換,或顯示原文。
當然,辭不達義的地方也會有,請包含…

Side efforts

生產系統的非功能需求通常需要高可用性,質量監測特性和低缺陷辨視率的前置時間。日誌記錄,除錯,儀器和日誌記錄是開發人員為生產就緒系統需要考慮的常見的非功能需求。這些成品可以被認為是主要業務工作流的邊際效應。邊際效應是一個現實生活中的問題,程式範例和使用導覽經常忽略,但Rx提供了工具來幫助。

在本章中,我們將討論在使用可觀察序列時引入邊際效應的後果。如果除了任何回傳值,它還具有一些其他observable effort,則函式被認為具有邊際效應。一般來說,“可觀察效應”是對狀態的修改。 這個可觀察的效果可以

  • 修改具有比函式更大範圍的變數(即全域,靜態或參數)
  • I/O,例如從文件或網路讀/寫
  • 更新顯示

邊際效應的問題

函數式編程通常盡量避免產生任何邊際效應。具有邊際效應的函式,特別是會修改狀態的函式,要求programmer不僅須了解函式的輸入和輸出。它們需要理解的地方現在需要擴展到被修改的狀態的歷史和context。這可能大大增加函式的複雜性,從而使得更難正確地理解和維護。

邊際效應並不一定是意外,也不總是故意的。減少意外的邊際效應的簡單方法是減少變更的表面積。我們可以採取的簡單動作是減少狀態的可視性或範圍,且讓其immutable。你可以通過將變數範圍限定為類似函式的程式碼區塊來降低變數的可視性。 您可以通過將類別成員設為private或protected來降低其可視性。通過定義不可變的資料不能被修改,所以不會有邊際效應。這些是明智的封裝規則,將顯著提高你的Rx代碼的可維護性。

為了提供具有邊際效用的查詢的範例,我們將試著通過更新變數(在select子句中)來輸出接收到的元素的索引和值。

var letters = Observable.Range(0, 3)
    .Select(i => (char)(i + 65));
var index = -1;
var result = letters.Select(c =>
    {
        index++;
        return c;
    });
result.Subscribe(c => 
    Console.WriteLine("Received {0} at index {1}", c, index),
    () => Console.WriteLine("completed"));

輸出:

Received A at index 0
Received B at index 1
Received C at index 2
completed

雖然這似乎是無害的,想像一下若另一個人看了這段程式,且理解為這個團隊使用類似的模式來寫程式。他們也採用這種風格。為了示範,我們在前面的例子中再加上一個訂閱。

var letters = Observable.Range(0, 3)
.Select(i => (char)(i + 65));
var index = -1;
var result = letters.Select(c =>
    {
        index++;
        return c;
    });
result.Subscribe(
    c => Console.WriteLine("Received {0} at index {1}", c, index),
    () => Console.WriteLine("completed"));
result.Subscribe(
    c => Console.WriteLine("Also received {0} at index {1}", c, index),
    () => Console.WriteLine("2nd completed"));

Output

Received A at index 0
Received B at index 1
Received C at index 2
completed
Also received A at index 3
Also received B at index 4
Also received C at index 5
2nd completed

現在第二個訂閱的輸出顯然是無效的。原其預期的索引值是0、1和2,但是卻得到3、4和5。我曾經在程式庫中見過更誇張的邊際效應。更不好的是修改了布林值的狀態,如hasValues、isStreaming等。我們在後續章節將學到在可觀察序列中比共享狀態來控制工作流程的更好的方式。

除了在現在軟體中建立潛在的無法預測的結果外,帶有邊際效應的程式更難以被測試和維護。且對於未來的重構、增強或是其它的維護性都更加不穩固,在非同步或是併行軟體中更是如此。

Composing data in a pipeline

取得狀態的較佳方式是將其引入管道中,理想上,我們想讓管道上的每一個部份都是獨立且可確認的。也就是說,組成管道中的每個功能都有自己的輸入和輸出做為它的唯一狀態。為了修正範例,我們增強管道中的資料以讓其不再共享狀態,這將是一個很好的示範以select覆載輸出索引的方式。

var source = Observable.Range(0, 3);
var result = source.Select(
    (idx, value) => new
    {
        Index = idx,
        Letter = (char) (value + 65)
    });
result.Subscribe(
    x => Console.WriteLine("Received {0} at index {1}", x.Letter, x.Index),
    () => Console.WriteLine("completed"));
result.Subscribe(
    x => Console.WriteLine("Also received {0} at index {1}", x.Letter, x.Index),
    () => Console.WriteLine("2nd completed"));

輸出:

Received A at index 0
Received B at index 1
Received C at index 2
completed
Also received A at index 0
Also received B at index 1
Also received C at index 2
2nd completed

跳脫一下思考的範圍,我們也可以使用其它例如Scan函式來建構相似的結果,下面是範例:

var result = source.Scan(
    new
    {
        Index = -1,
        Letter = new char()
    },
    (acc, value) => new
    {
        Index = acc.Index + 1,
        Letter = (char)(value + 65)
    });

這裡的關鍵是隔離狀態,並減少或移掉任何會修改狀態的邊際效應。

Do

We should aim to avoid side effects, but in some cases it is unavoidable. The Do extension method allows you to inject side effect behavior. The signature of the Do extension method looks very much like the Select method;
我們應該努力避免邊際效應,但在某些狀況下無法避免的。而Do擴充函式可讓你注入邊際效應的行為。它的函式定義很像Select函式:

他們都有各種覆載,以適應OnNext,OnError和OnCompleted處理程序的組合:

They both return and take an observable sequence
// Invokes an action with side effecting behavior for each element in the observable 
//  sequence.
public static IObservable<TSource> Do<TSource>(
this IObservable<TSource> source, 
Action<TSource> onNext)
{...}
// Invokes an action with side effecting behavior for each element in the observable 
//  sequence and invokes an action with side effecting behavior upon graceful termination
//  of the observable sequence.
public static IObservable<TSource> Do<TSource>(
this IObservable<TSource> source, 
Action<TSource> onNext, 
Action onCompleted)
{...}
// Invokes an action with side effecting behavior for each element in the observable
//  sequence and invokes an action with side effecting behavior upon exceptional 
//  termination of the observable sequence.
public static IObservable<TSource> Do<TSource>(
this IObservable<TSource> source, 
Action<TSource> onNext, 
Action<Exception> onError)
{...}
// Invokes an action with side effecting behavior for each element in the observable
//  sequence and invokes an action with side effecting behavior upon graceful or
//  exceptional termination of the observable sequence.
public static IObservable<TSource> Do<TSource>(
this IObservable<TSource> source, 
Action<TSource> onNext, 
Action<Exception> onError, 
Action onCompleted)
{...}
// Invokes the observer's methods for their side effects.
public static IObservable<TSource> Do<TSource>(
this IObservable<TSource> source, 
IObserver<TSource> observer)
{...}

Select覆載為它們的OnNext處理程序帶來Func參數,並且還提供回傳與來源不同類型的可觀察序列的能力。相反,Do方法僅對OnNext處理程序採用Action <T>,因此只能回傳與來源類型相同的序列。因為可以傳遞給Do覆載的每個參數都是actions,所以它們隱含地引起邊際效應。

對於下一個範例,我們首先定義以下記錄:

private static void Log(object onNextValue)
{
    Console.WriteLine("Logging OnNext({0}) @ {1}", onNextValue, DateTime.Now);
}
private static void Log(Exception onErrorValue)
{
    Console.WriteLine("Logging OnError({0}) @ {1}", onErrorValue, DateTime.Now);
}
private static void Log()
{
    Console.WriteLine("Logging OnCompleted()@ {0}", DateTime.Now);
}

這段程式可以讓Do使用上面的方法來介紹一些日誌記錄。

var source = Observable
    .Interval(TimeSpan.FromSeconds(1))
    .Take(3);
var result = source.Do(
    i => Log(i),
    ex => Log(ex),
    () => Log());
result.Subscribe(
    Console.WriteLine,
    () => Console.WriteLine("completed"));

輸出:

Logging OnNext(0) @ 01/01/2012 12:00:00
0
Logging OnNext(1) @ 01/01/2012 12:00:01
1
Logging OnNext(2) @ 01/01/2012 12:00:02
2
Logging OnCompleted() @ 01/01/2012 12:00:02
completed

注意範例中因為Do在查詢中比Subscribe早執行,所以它將先接收到值,因此先寫入Console中。我喜歡把Do方法看作是連到一個序列的接線,它讓你接收序列的值,而無法修改它。

在Rx中看到的最常見的可接受的邊際效應是記錄的需求。Do的定義允許你將它注入到查詢鏈中。這允許我們將日誌記錄添加到我們的序列中並保持封裝性。當存儲庫,服務代理或提供者公開可觀察序列時,它們可以在序列被公開前先加入想要的邊際效應(例如日誌記錄)。然後,消費者可以在查詢中增加運算子(例如,Where、SelectMany),這不會影響來源程序的日誌記錄。

考慮下面的方法。它產生數字,但也記錄它產生的(簡化起見輸出至Console中)。對於使用者的程式來說,日誌記錄是看不見的。

private static IObservable<long> GetNumbers()
{
    return Observable.Interval(TimeSpan.FromMilliseconds(250))
        .Do(i => Console.WriteLine("pushing {0} from GetNumbers", i));
}

使用如下程式呼叫,

var source = GetNumbers();
var result = source.Where(i => i%3 == 0)
    .Take(3)
    .Select(i => (char) (i + 65));
result.Subscribe(
    Console.WriteLine,
    () => Console.WriteLine("completed"));

輸出:

pushing 0 from GetNumbers
A
pushing 1 from GetNumbers
pushing 2 from GetNumbers
pushing 3 from GetNumbers
D
pushing 4 from GetNumbers
pushing 5 from GetNumbers
pushing 6 from GetNumbers
G
completed

這個範例顯示生產者或中介者可以在序列中增加Log,無論最終使用者要做什麼。

另一個Do的覆載允許你傳入一個IObserver<T>做為參數。在這個覆載中,每一個OnNext, OnError and OnCompleted函式會被傳至另一個Do的覆載做為要執行的動作。

使用邊際效應會增加查詢的複雜度。如果邊際效應是必要的惡,那明確的展示可幫助你的同事瞭解你的意圖。使用Do函式是最受歡迎的方式。這也許沒什麼,但考慮到當商業領域在非同步和併行下的固有複雜性,開發者不需要隱藏在Subscribe或Select後的額外增加的邊際效應。

Encapsulating with AsObservable

Poor encapsulation is a way developers can leave the door open for unintended side effects. Here is a handful of scenarios where carelessness leads to leaky abstractions. Our first example may seem harmless at a glance, but has numerous problems.
不完善的封裝性,會讓開發者留下加入非預期邊際效應的空間。下列是一個因粗心導致抽象上設計不佳而引起的狀況,第一眼看到可能覺得沒什麼,但有很多問題。

public class UltraLeakyLetterRepo
{
    public ReplaySubject<string> Letters { get; set; }
    public UltraLeakyLetterRepo()
    {
        Letters = new ReplaySubject<string>();
        Letters.OnNext("A");
        Letters.OnNext("B");
        Letters.OnNext("C");
    }
}

此範例中我們公開了一個可觀察序列的屬性。第一個問題就是它的值可以被外界設定,使用者可能在他們想要的狀況下變更。對於此類別的其它使用者是個很不好的體驗。我們加上一些簡單的改變讓它看起來較安全。

public class LeakyLetterRepo
{
    private readonly ReplaySubject<string> _letters;
    public LeakyLetterRepo()
    {
        _letters = new ReplaySubject<string>();
        _letters.OnNext("A");
        _letters.OnNext("B");
        _letters.OnNext("C");
    }
    public ReplaySubject<string> Letters
    {
        get { return _letters; }
    }
}

現在,Letters屬性只有一個getter並且由一個read only支持。這好多了,敏銳的讀者會注意到Letters屬性返回一個ReplaySubject <string>。這在封裝上不好,使用者可以呼叫OnNext / OnError / OnCompleted。要關閉這個漏洞,我們可以簡單地讓回傳型別為IObservable <string>

public IObservable<string> Letters
{
    get { return _letters; }
}

這個類別看起來好多了,然而,改進的只是美觀上。沒有東西可以防止使用者將結果轉型為ISubject<string>,然後呼叫想要的任何函式。此範例中,我們看到外部的程式推送它們的數值進入序列中。

var repo = new ObscuredLeakinessLetterRepo();
var good = repo.GetLetters();
var evil = repo.GetLetters();
good.Subscribe(
    Console.WriteLine);
//Be naughty
var asSubject = evil as ISubject<string>;
if (asSubject != null)
{
    //So naughty, 1 is not a letter!
    asSubject.OnNext("1");
}
else
{
    Console.WriteLine("could not sabotage");
}

輸出:

A
B
C
1

這問題的解法相當簡單,使用AsObservable擴充函式,_letters欄位會被包裝成僅實作IObservable<T>的型別。

public IObservable<string> GetLetters()
{
    return _letters.AsObservable();
}

輸出:

A
B
C
could not sabotage

雖然我在這些例子中使用了像“邪惡”和“破壞”這樣的字眼,但它往往是一種疏忽,而不是固意導致問題的發生。首先敗在設計類別封裝的設計師。設計介面是很困難的,但我們應該盡最大努力幫助我們的程式碼的使用者成功,給予他們易於發現和強固的類別。如果我們減少它們外觀,只公開我們想要讓使用者使用的功能,類別變得更容易被使用。在這個例子中,我們減少了類別的介面屬性。通過移除屬性setter並透過AsObservable函式返回一個更簡單的型別。

Mutable elements cannot be protected

雖然AsObservable方法可以封裝你的序列,但你仍然應該意識到它沒有給mutable元素提供任何保護。考慮這個類別序列的使用者可以做什麼:

public class Account
{
    public int Id { get; set; }
    public string Name { get; set; }
}

這是個簡短的範例,以展示如果我們選擇在序列中修改它的元素,可能造成的混亂:

var source = new Subject<Account>();
//Evil code. It modifies the Account object.
source.Subscribe(account => account.Name = "Garbage");
//unassuming well behaved code
source.Subscribe(
    account=>Console.WriteLine("{0} {1}", account.Id, account.Name),
    ()=>Console.WriteLine("completed"));
source.OnNext(new Account {Id = 1, Name = "Microsoft"});
source.OnNext(new Account {Id = 2, Name = "Google"});
source.OnNext(new Account {Id = 3, Name = "IBM"});
source.OnCompleted();

輸出:

1 Garbage
2 Garbage
3 Garbage
completed

第二個消費者期待的是得到是”Microsoft”、”Google”及”IBM”,但只得到”Garbage”。

可觀察序列將被認為是一系列已解決的事件:事件作為發生了什麼的事實的陳述。這意味著兩件事:首先,每個元素表示在發佈時的狀態的快照,其次,信息從可靠的來源發出。我們想要消除篡改的可能性。理想情況下,類別T將是不可變的,解決這兩個問題。 這樣,序列的消費者可以確信他們獲得的資料是來源產生的資料。不能改變元素似乎是對消費者的限制,但是這些需求最好通過提供更好封裝性的轉換運算子來滿足。

儘可能的避免邊際效應。任何併行和共享狀態的組合通常需要復雜的鎖定,對CPU架構的深刻瞭解,以及他們如何用你的語言和鎖定及最佳化功能合作。較簡單及較佳的方法是避免資料共用,偏好使用不可變資料型別,並利用查詢的合成及轉換。在Where和Select中隱藏邊際效應可能造成程式碼的混亂。如果真的會產生邊際效應,那使用Do函式以明確的表式可能會造成邊際效應。

Written with StackEdit.

12.28.2016

Rx介紹 Part 2 - Transformation of sequences(譯ing...)

嗯,正在學Rx,學了一輪後實際應用時發現還有不懂的地方,再重讀一次,順便簡單的翻譯下…翻譯不出來的或是覺得不重要的就以”…”符號替換,或顯示原文。
當然,辭不達義的地方也會有,請包含…

Transformation of sequences

我們消費的序列中的值並不總是我們需要的格式。有時在數據中有太多的雜訊,所以我們要將雜訊去掉。有時,每個值都需要擴展進更豐富的物件中或更多的值。通過組合運算子,Rx允許您控制所消耗的可觀察序列中的值的質量以及數量。

到目前為止,我們已經學習了序列的建立,將值轉移至序列,以及通過過濾,聚合或fold來縮減序列。在本章中,我們將討論序列的轉換。這讓我們得以介紹我們的第三種函數式方法,bind。Rx中的bind函數對序列中的每個元素應用一組轉換以產生新序列。回顧一下:
Ana(morphism) T –> IObservable<T>
Cata(morphism) IObservable<T> –> T
Bind IObservable<T1> –> IObservable<T2>

現在我們已經介紹了所有的三個higher order函數,你可能會發現你已經知道他們。Bind和Cata(morphism)由來自Google的MapReduce框架而著名。這裡Google通過他們或許更常見的別名來引用Bind和Cata;Map 及 Reduce。

記住higher order functions的ABCs代表的意思,它是很好的助憶碼。
A na 進入序列,T –> IObservable<T>
B ind 修改序列,IObservable<T1> –> IObservable<T2>
C ata 離開序列, IObservable<T> –> T

Select

經典的轉換函式是Select。它允許你提供一個接受TSource的值並返回TResult的值的函式。Select的函式定義良好且簡單,並且建議其最常見的用法是從一種類型轉換到另一種類型,即IObservable <TSource>IObservable <TResult>

IObservable<TResult> Select<TSource, TResult>(
    this IObservable<TSource> source, 
    Func<TSource, TResult> selector)

注意,並沒有任何限制說TSource和TResult不能相同。 所以對於我們的第一個範例,我們將用一個整數序列,並對每個值用加3的方式來轉換,以產生另一個整數序列。

var source = Observable.Range(0, 5);
source.Select(i=>i+3)
    .Dump("+3")

輸出:

+3-->3
+3-->4
+3-->5
+3-->6
+3-->7
+3 completed

雖然這可能很有用,但更常見的用途是將值從一種型別轉換為另一種型別。在這個例子中,我們將整數值轉換為字元。

Observable.Range(1, 5)
    .Select(i =>(char)(i + 64))
    .Dump("char");

輸出:

char-->A
char-->B
char-->C
char-->D
char-->E
char completed

如果我們真的想利用LINQ,我們可以將我們的整數序列轉換為一個匿名型別序列。

Observable.Range(1, 5)
    .Select(i => 
        new 
        { 
            Number = i, 
            Character = (char)(i + 64) 
        })
    .Dump("anon");

輸出:

anon-->{ Number = 1, Character = A }
anon-->{ Number = 2, Character = B }
anon-->{ Number = 3, Character = C }
anon-->{ Number = 4, Character = D }
anon-->{ Number = 5, Character = E }
anon completed

為了更進一步利用LINQ,我們可以使用query comprehension syntax編寫上述查詢。

var query = 
        from i in Observable.Range(1, 5)
        select new 
        {
            Number = i, 
            Character = (char) (i + 64)
        };
query.Dump("anon");

在Rx中,Select有另一個覆載。第二個覆載為選擇器函數提供兩個參數。附加參數是序列中元素的索引。如果序列中的元素的索引對於selector function很重要,請使用此方法。

Cast and OfType

如果你取得的是一個object序列,例如IObservable <object>,你可能會發現它不太有用。有一個專門針對IObservable <object>的方法,它將每個元素轉換為給定的型別,並在邏輯上將其稱為Cast <T>()

var objects = new Subject<object>();
objects.Cast<int>().Dump("cast");
objects.OnNext(1);
objects.OnNext(2);
objects.OnNext(3);
objects.OnCompleted();

輸出:

cast-->1
cast-->2
cast-->3
cast completed

然而,如果我們要添加一個不能被轉換成序列的值(型別不符),那麼我們會得到錯誤。

var objects = new Subject<object>();
objects.Cast<int>().Dump("cast");
objects.OnNext(1);
objects.OnNext(2);
objects.OnNext("3");//Fail

輸出:

cast-->1
cast-->2
cast failed -->Specified cast is not valid.

幸運的是,如果這不是我們想要的,我們可以使用替代的擴充函式OfType <T>()

var objects = new Subject<object>();
objects.OfType<int>().Dump("OfType");
objects.OnNext(1);
objects.OnNext(2);
objects.OnNext("3");//Ignored
objects.OnNext(4);
objects.OnCompleted();

輸出:

OfType-->1
OfType-->2
OfType-->4
OfType completed

公平地說,雖然這些函式很方便,但我們可以使用我們已知的運算子來建立它們。

//source.Cast<int>(); is equivalent to
source.Select(i=>(int)i);
//source.OfType<int>();
source.Where(i=>i is int).Select(i=>(int)i);

Timestamp and TimeInterval

由於可觀察序列是非同步的,因此可以很方便地知道接收到元素的時間。Timestamp擴充函式是一種方便的函式,它將序列的元素包裹在輕量Timestamp結構T中。 Timestampe<T>型別是一個結構,它公開了它所包裝的元素的值,以及使用DateTimeOffset建立的timestamp。

在這個例子中,我們每隔一秒建立一個包含三個值的序列,然後將其轉換為帶Timestamp的序列。 ToString()在Timestampe<T>上的實做給了我們一個易讀的輸出。

Observable.Interval(TimeSpan.FromSeconds(1))
    .Take(3)
    .Timestamp()
    .Dump("TimeStamp");

輸出:

TimeStamp-->0@01/01/2012 12:00:01 a.m. +00:00
TimeStamp-->1@01/01/2012 12:00:02 a.m. +00:00
TimeStamp-->2@01/01/2012 12:00:03 a.m. +00:00
TimeStamp completed

我們可以看到,值0、1和2每隔一秒產生。取得絕對timestamp的另一種方法是僅獲取自最後一個元素以來的間隔。TimeInterval擴充函式提供了這個。根據timestamp函式,元素被包裹在輕量結構中。 這個時候的結構是型別TimeInterval <T>

Observable.Interval(TimeSpan.FromSeconds(1))
    .Take(3)
    .TimeInterval()
    .Dump("TimeInterval");

Output:

TimeInterval-->0@00:00:01.0180000
TimeInterval-->1@00:00:01.0010000
TimeInterval-->2@00:00:00.9980000
TimeInterval completed

正如你可以從輸出中看到的,間隔不是正好一秒鐘,但是非常接近。

Materialize and Dematerialize

Timestamp和TimeInterval變換運算子可以證明對記錄和除錯序列有用,Materialize運算子也是如此。Materialize將序列轉換為序列的metadata 代表,將IObservable<T>轉換為IObservable<Notification<T>>。通知型別提供序列事件的metadata。

如果我們對一個序列做materialize,我們可以看到回傳的包裝的值。

Observable.Range(1, 3)
    .Materialize()
    .Dump("Materialize");

輸出:

Materialize-->OnNext(1)
Materialize-->OnNext(2)
Materialize-->OnNext(3)
Materialize-->OnCompleted()
Materialize completed

注意,當來源序列完成時,materialized的序列產生一個“OnCompleted”推送值,然後完成。Notification<T>是一個具有三個實做的抽像類:

  • OnNextNotification
  • OnErrorNotification
  • OnCompletedNotification

Notification<T>公開了四個公開屬性,以讓你知道它:Kind、HasValue、Value和Exception。顯然只有OnNextNotification會為HasValue返回true,並且有一個有用的Value實做。還應當明顯的是,OnErrorNotification是唯一的具有Exception值的實做。Kind屬性回傳一個列舉,應該夠你知道有哪些方法適合使用。

public enum NotificationKind
{
    OnNext,
    OnError,
    OnCompleted,
}

在下一個例子中,我們產生一個有錯誤的序列。注意,序列的最終值是OnErrorNotification。此外,這一個materialized序列沒有錯誤,它成功完成。

var source = new Subject<int>();
source.Materialize()
    .Dump("Materialize");
source.OnNext(1);
source.OnNext(2);
source.OnNext(3);
source.OnError(new Exception("Fail?"));

輸出:

Materialize-->OnNext(1)
Materialize-->OnNext(2)
Materialize-->OnNext(3)
Materialize-->OnError(System.Exception)
Materialize completed

對序列Materializing的實做對於執行序列的分析或記錄非常方便。你可以通過應用Dematerialize擴充函式以解開被materialized的序列。Dematerialize只在IObservable<Notification<TSource>>上有用。

SelectMany

在上面的轉換運算子中,我們可以看到Select是最有用的。它在其變換輸出中非常的有彈性,甚至可以用於再現一些其他的變換運算子。然而SelectMany函式甚至更強大。在LINQ以及Rx中,bind函式是SelectMany。大多數其他轉換運算子可以使用SelectMany建立。考慮到這一點,可以認為SelectMany可能是LINQ中最被誤解的方法之一。

在我個人理解的Rx,我很艱難的學習SelectMany擴充函式。我的一個同事建議我把它想成“from one, select many”,這幫我更好的理解SelectMany。一個更好的定義是,“From one, select zero or more”。如果我們看SelectMany的函式定義,可看到它需要一個來源序列和一個函數作為其參數。

IObservable<TResult> SelectMany<TSource, TResult>(
    this IObservable<TSource> source, 
    Func<TSource, IObservable<TResult>> selector)

選擇器參數是一個取單一T值並返回一個序列的函式。請注意,選擇器回傳的序列不必與來源型別相同。最後,SelectMany回傳型別與選擇器回傳型別相同。

如果你希望有效地使用Rx,瞭解這個函式是非常重要的,所以讓我們慢慢地來,同樣重要的是注意它與IEnumerable<T>的SelectMany運算子的些微差別,我們很快就會看到。

我們的第一個範例將採用單一值’3’的序列。我們提供的選擇器函數將產生另一個帶有數字的序列。該結果序列將是從1到所提供的值即3的範圍。因此,我們取序列[3]並從我們的選擇器函數返回序列[1,2,3]。

Observable.Return(3)
    .SelectMany(i => Observable.Range(1, i))
    .Dump("SelectMany");

Output:

SelectMany-->1
SelectMany-->2
SelectMany-->3
SelectMany completed

如果我們將源碼修改為[1,2,3]的序列,就像這樣…

Observable.Range(1,3)
    .SelectMany(i => Observable.Range(1, i))
    .Dump("SelectMany");

…我們現在得到一個輸出,每個序列([1],[1,2]和[1,2,3])的結果被平展以產生[1,1,2,1,2,3] 。

SelectMany-->1
SelectMany-->1
SelectMany-->2
SelectMany-->1
SelectMany-->2
SelectMany-->3
SelectMany completed

最後一個範例更好地說明了SelectMany如何獲取單個值並將其擴展為多個值。當我們將其應用於值序列時,結果是每個子序列被組合以產生最終序列。 在這兩個範例中,我們返回了一個與來源類型相同的序列。這不是一個限制,所以在下一個範例中,我們回傳一個不同的類型。我們將重用將整數轉換為ASCII的Select範例。為此,選擇器函數僅回傳具有單一值的char序列。

Func<int, char> letter = i => (char)(i + 64);
Observable.Return(1)
    .SelectMany(
        i => Observable.Return(letter(i)))
    .Dump("SelectMany");

因此,輸入[1],我們回傳序列[A]。

SelectMany-->A
SelectMany completed

擴展來源序列以具有許多值,將給我們帶有許多值的結果。

Func<int, char> letter = i => (char)(i + 64);
Observable.Range(1,3)
    .SelectMany(i => Observable.Return(letter(i)))
    .Dump("SelectMany");

現在,[1,2,3]的輸入產生[[A],[B],[C]],其被平展為僅存[A,B,C]。

SelectMany-->A
SelectMany-->B
SelectMany-->C

注意,我們有效地重新建立了Select運算子。

最後一個範例將數字對映至字元。由於只有26個字母,忽略大於26的值是很好的且很容易做到。雖然我們必須為來源的每個元素返回一個序列,但沒有任何規則阻止它是一個空序列。在這種情況下,如果元素值是在範圍1-26之外的數字,我們返回一個空序列。

Func<int, char> letter = i => (char)(i + 64);
Observable.Range(1, 30)
    .SelectMany(i =>
        {
            if (0 < i && i < 27)
            {
                return Observable.Return(letter(i));
            }
            else
            {
                return Observable.Empty<char>();
            }
        })
    .Dump("SelectMany");

輸出:

A
B
C
...
X
Y
Z
Completed

要清楚,對於來源序列[1..30],值1產生序列[A],值2產生序列[B],依此類推,直到值26產生序列[Z]。當來源產生值27時,選擇器函數返回空序列[]。值28,29和30也產生空序列。一旦來自對選擇器的呼叫的所有序列已經被延展以產生最終結果,我們最終得到序列[A..Z]。

現在我們已經瞭解了我們三個higher order函數中的第三個函數,讓我們花時間來思考我們已經學習的一些函式。首先我們可以考慮Where擴充函式。我們首先在縮減序列的章節中看到這個方法。雖然這種方法確實縮減了一個序列,但它不適合functional fold,因為結果仍然是一個序列。考慮到這一點,我們發現Where實際上是bind的一個fit。作為練習,嘗試使用SelectMany運算符寫自己的擴充函式版本。查看最後一個範例以獲得一些幫助…

使用SelectMany寫的Where的擴充函式範例:

public static IObservable<T> Where<T>(this IObservable<T> source, Func<T, bool> predicate)
{
return source.SelectMany(
    item =>
    {
        if (predicate(item))
        {
            return Observable.Return(item);
        }
        else
        {
            return Observable.Empty<T>();
        }
    });
}

現在我們知道我們可以使用SelectMany來建立Where函式,它應該是一個很自然的過程,我們可以擴充這個以重現其他過濾器,如Skip和Take。

作為另一個練習,嘗試使用SelectMany編寫您自己的Select擴充函式版本。如果你需要一些幫助,參考我們使用SelectMany將int值轉換為char值的範例…

使用SelectMany編寫的Select擴充函式的範例:

public static IObservable<TResult> MySelect<TSource, TResult>(
    this IObservable<TSource> source, 
    Func<TSource, TResult> selector)
{
    return source.SelectMany(
        value => Observable.Return(selector(value)));
}

IEnumerable vs. IObservable SelectMany

值得注意的是,IEnumerable<T> SelectMany和IObservable <T> SelectMany的實做之間的區別。考慮IEnumerable <T>序列是基於pull和blocking的。這意味著當使用SelectMany處理IEnumerable <T>時,它會一次向選擇器函數傳遞一個項目,並等待它在從來源請求(pull)下一個值之前處理來自選擇器的所有值 。

考慮一個[1,2,3]的IEnumerable <T>來源序列。如果我們使用返回[x * 10,(x * 10)+1,(x * 10)+2]序列的SelectMany運算子來處理,我們將得到[[10,11,12],[20 ,21,22],[30,31,32]]。

private IEnumerable<int> GetSubValues(int offset)
{
    yield return offset * 10;
    yield return (offset * 10) + 1;
    yield return (offset * 10) + 2;
}

然後我們使用以下程式應用GetSubValues函式:

var enumerableSource = new [] {1, 2, 3};
var enumerableResult =
    enumerableSource.SelectMany(GetSubValues);
foreach (var value in enumerableResult)
{
    Console.WriteLine(value);
}

所得到的子序列被展開[10,11,12,20,21,22,30,31,32]:

10
11
12
20
21
22
30
31
32

IObservable <T>序列的區別在於,對SelectMany的選擇器函數的呼叫不會阻塞,結果序列可以隨時間產生值。這意味著後續的“子”序列可以重疊。 讓我們再次考慮一個[1,2,3]的序列,但是這個時間值是相隔三秒產生的。 選擇器函數也將按照上面的例子產生[x * 10,(x * 10)+1,(x * 10)+2]的序列,然而這些值將相隔4秒。

為了可視化這種非同步數據,我們需要空間和時間的表示。

Visualizing sequences

Let’s divert quickly and talk about a technique we will use to help communicate the concepts relating to sequences. Marble diagrams are a way of visualizing sequences. Marble diagrams are great for sharing Rx concepts and describing composition of sequences. When using marble diagrams there are only a few things you need to know

a sequence is represented by a horizontal line
time moves to the right (i.e. things on the left happened before things on the right)
notifications are represented by symbols:
讓我們快速轉向並談談我們用來幫助傳達與序列相關的概念的技術。Marble 圖是一種可視化序列的方法。Marble 圖是共享Rx概念和描述序列的組成的好方法。當使用Marble 圖時,只有幾個事情你需要知道:

  1. 序列由水平線表示
  2. 時間向右移動(即左邊的事情發生在右邊的事情之前)
  3. 通知由符號表示:
    1. ’0’ for OnNext
    2. ‘X’ for an OnError
    3. ‘|’ for OnCompleted
  4. 許多同步序列可以通過建立序列行來可視化

這是完成的三個值的序列的樣本:

--0--0--0-|

這是一個四個值序列的樣本,然後是一個錯誤:

--0--0--0--0--X

現在回到我們的SelectMany範例,我們可以通過使用值而不是0標記來可視化我們的輸入序列。這是間隔三秒的序列[1,2,3]的Marble 圖表示(注意每個字元表示一秒鐘)。

--1--2--3|

現在我們可以通過引入時間和空間的概念來利用Marble 圖的效用。這裡我們看到由第一個值1產生的序列的可視化,它給出了序列[10,11,12] (請由上往下看)。這些值間隔四秒,但是初始值立即產生。

1---1---1|
0   1   2|

由於值是兩位數字,它們占用兩行,因此值10不會與值1緊接著的值0混淆。我們為選擇器函數產生的每個序列添加一行。

--1--2--3|
  1---1---1|
  0   1   2|
     2---2---2|
     0   1   2|
        3---3---3|
        0   1   2|

現在我們可以可視化來源序列及其子序列,我們應該能夠推導出SelectMany運算子的預期輸出。為了為Marble 圖創建一個結果的行,我們簡單的允許每個子序列的值“落入”新的結果行。

--1--2--3|
  1---1---1|
  0   1   2|
     2---2---2|
     0   1   2|
        3---3---3|
        0   1   2|
--1--21-321-32--3|
  0  01 012 12  2|

如果我們進行這個練習,現在將其應用於程式中,我們可以驗證我們的Marble 圖。首先我們的函式將產生我們的子序列:

private IObservable<long> GetSubValues(long offset)
{
//Produce values [x*10, (x*10)+1, (x*10)+2] 4 seconds apart, but starting immediately.
    return Observable.Timer(
        TimeSpan.Zero, 
        TimeSpan.FromSeconds(4))
        .Select(x => (offset*10) + x)
        .Take(3);
}

這是接收來源序列以產生最終輸出的程式:

// Values [1,2,3] 3 seconds apart.
Observable.Interval(TimeSpan.FromSeconds(3))
    .Select(i => i + 1) //Values start at 0, so add 1.
    .Take(3)            //We only want 3 values
    .SelectMany(GetSubValues) //project into child sequences
    .Dump("SelectMany");

產生的輸出符合我們對Marble 圖的期望。

SelectMany-->10
SelectMany-->20
SelectMany-->11
SelectMany-->30
SelectMany-->21
SelectMany-->12
SelectMany-->31
SelectMany-->22
SelectMany-->32
SelectMany completed

我們之前已經看過在查詢語法中使用Select運算子,因而值得注意的是如何使用SelectMany運算子。Select擴充函式很明顯地映射到query comprehension syntax,而SelectMany不是那麼明顯。正如我們在前面的例子中所看到的,只使用Select的簡單實做如下:

var query = from i in Observable.Range(1, 5)
select i;

如果我們想添加一個簡單的where子句,我們可以這樣做:

var query = from i in Observable.Range(1, 5)
    where i%2==0
    select i;

要在查詢中增加SelectMany,我們實際上加了一個額外的from子句。

var query = from i in Observable.Range(1, 5)
    where i%2==0
    from j in GetSubValues(i)
    select j;
//Equivalent to 
var query = Observable.Range(1, 5)
    .Where(i=>i%2==0)
    .SelectMany(GetSubValues);

使用query comprehension syntax的優點是,您可以輕鬆地存取查詢範圍內的其他變數。在這個例子中,我們將來源的值及子值轉換至一匿名型別中。

var query = from i in Observable.Range(1, 5)
    where i%2==0
    from j in GetSubValues(i)
    select new {i, j};
query.Dump("SelectMany");

輸出:

SelectMany-->{ i = 2, j = 20 }
SelectMany-->{ i = 4, j = 40 }
SelectMany-->{ i = 2, j = 21 }
SelectMany-->{ i = 4, j = 41 }
SelectMany-->{ i = 2, j = 22 }
SelectMany-->{ i = 4, j = 42 }
SelectMany completed

我們準備結束第2章了,這裡的關鍵是讓讀者能夠理解Rx的一個關鍵原則:函數式合成。當我們學會本章後,範例會變得越來越複雜。我們利用LINQ的力量將擴充函式鏈接在一起組成複雜的查詢。

我們沒有一次嘗試學習所有的運算子,我們將它們分組以學習。

  • Creation
  • Reduction
  • Inspection
  • Aggregation
  • Transformation

對運算子的更深入分析,我們發現大多數運算子實際上是higher order function 概念的特殊化。我們將它們命名為所謂ABC的函數式編程:

  • Anamorphism, aka:
    • Ana
    • Unfold
    • Generate
  • Bind, aka:
    • Map
    • SelectMany
    • Projection
    • Transform
  • Catamorphism, aka:
    • Cata
    • Fold
    • Reduce
    • Accumulate
    • Inject

現在你應該覺得你對如何操作序列有很深的理解。然而,到目前為止我們所學到的東西也都大部分可以應用在IEnumerable序列上。Rx可能比許多人在IEnumerable世界中處理的複雜得多,正如我們在SelectMany函式中看到的。在本書的下一部分中,我們將瞭解Rx天然非同步的特性。憑著我們已經建立的基礎,我們應該能夠解決Rx中更具挑戰性和有趣的功能。

Written with StackEdit.