本文介紹來自:http://rehansaeed.com/reactive-extensions-part1-replacing-events/及其相關系列
原文作者在學習Rx時也有跟我一樣的疑問,它到底適用在那裡?
Event的部份也可參考:http://mark-dot-net.blogspot.tw/2014/04/reactive-extensions-observables-versus.html
PS:ReactiveUI也是使用Rx的一個有趣的可參考的框架
替換 C# Events
公開一個Event
一般的Event使用
public class JetFighter
{
    public event EventHandler<JetFighterEventArgs> PlaneSpotted;
    public void SpotPlane(JetFighter jetFighter)
    {
        EventHandler<JetFighterEventArgs> eventHandler = this.PlaneSpotted;
        if (eventHandler != null)
        {
            eventHandler(this, new JetFighterEventArgs(jetfighter));
        }
    }
}換用Rx
public class JetFighter
{
    private Subject<JetFighter> planeSpotted = new Subject<JetFighter>();
    public IObservable<JetFighter> PlaneSpotted
    {
        get { return this.planeSpotted.AsObservable(); }
    }
    public void SpotPlane(JetFighter jetFighter)
    {
        this.planeSpotted.OnNext(jetFighter);
    }
}此行
return this.planeSpotted.AsObservable();是為了讓使用者不能透過將Subject介面轉型來自己發送訊息。
當然,Rx還可以提供錯誤及完成的通知:
public class JetFighter
{
    private Subject<JetFighter> planeSpotted = new Subject<JetFighter>();
    public IObservable<JetFighter> PlaneSpotted
    {
        get { return this.planeSpotted; }
    }
    public void AllPlanesSpotted()
    {
        this.planeSpotted.OnCompleted();
    }
    public void SpotPlane(JetFighter jetFighter)
    {
        try
        {
            if (string.Equals(jetFighter.Name, "UFO"))
            {
                throw new Exception("UFO Found")
            }
            this.planeSpotted.OnNext(jetFighter);
        }
        catch (Exception exception)
        {
            this.planeSpotted.OnError(exception);
        }
    }
}使用Event
標準事件使用方式
public class BomberControl : IDisposable
{
    private JetFighter jetfighter;
    public BomberControl(JetFighter jetFighter)
    {
        jetfighter.PlaneSpotted += this.OnPlaneSpotted;
    }
    public void Dispose()
    {
        jetfighter.PlaneSpotted -= this.OnPlaneSpotted;
    }
    private void OnPlaneSpotted(object sender, JetFighterEventArgs e)
    {
        JetFighter spottedPlane = e.SpottedPlane;
    }
}Rx的使用方式
public class BomberControl : IDisposable
{
    private IDisposable planeSpottedSubscription;
    public BomberControl(JetFighter jetFighter)
    {
        this. planeSpottedSubscription = jetfighter.PlaneSpotted.Subscribe(this.OnPlaneSpotted);
    }
    public void Dispose()
    {
        this.planeSpottedSubscription.Dispose();
    }
    private void OnPlaneSpotted(JetFighter jetFighter)
    {
        JetFighter spottedPlane = jetfighter;
    }
}Rx還可以
jetfighter.PlaneSpotted.Where(x => string.Equals(x.Name, “Eurofighter”)).Subscribe(this.OnPlaneSpotted);結論
Event也是觀察者模式的實作,但由上可知,它較Rx少了錯誤和完成的通知,當然,若是不需要的話也沒有差別,而Rx除了這兩種通知,它讓訂閱端多了對來源操作的可能性,如上述的Where操作。
包裝Event
包裝EventHandler
public event EventHandler BunnyRabbitsAttack;
public IObservable<object> WhenBunnyRabbitsAttack
{
    get
    {
        return Observable
            .FromEventPattern(
                h => this.BunnyRabbitsAttack += h,
                h => this.BunnyRabbitsAttack -= h);
    }
}包裝帶參數的EventHandler
public event EventHandler<BunnRabbitsEventArgs> BunnyRabbitsAttack;
public IObservable<BunnRabbits> WhenBunnyRabbitsAttack
{
    get
    {
        return Observable
            .FromEventPattern<BunnRabbitsEventArgs>(
                h => this.BunnyRabbitsAttack += h,
                h => this.BunnyRabbitsAttack -= h)
            .Select(x => x.EventArgs.BunnRabbits);
    }
}包裝自訂的EventHandler
public event BunnRabbitsEventHandler BunnyRabbitsAttack;
public IObservable<BunnRabbits> WhenBunnyRabbitsAttack
{
    get
    {
        return Observable
            .FromEventPattern<BunnRabbitsEventHandler, BunnRabbitsEventArgs>(
                h => this.BunnyRabbitsAttack += h,
                h => this.BunnyRabbitsAttack -= h)
            .Select(x => x.EventArgs.BunnRabbits);
    }
}使用明確的介面實作以隱藏現存事件
public abstract class NotifyPropertyChanges : INotifyPropertyChanged
{
    event PropertyChangedEventHandler INotifyPropertyChanged.PropertyChanged
    {
        add { this.propertyChanged += value; }
        remove { this.propertyChanged -= value; }
    }
    private event PropertyChangedEventHandler propertyChanged;
    public IObservable<string> WhenPropertyChanged
    {
        get
        {
            return Observable
                .FromEventPattern<PropertyChangedEventHandler, PropertyChangedEventArgs>(
                    h => this.propertyChanged += h,
                    h => this.propertyChanged -= h)
                .Select(x => x.EventArgs.PropertyName);
        }
    }
    protected void OnPropertyChanged(string propertyName)
    {
        PropertyChangedEventHandler eventHandler = this.propertyChanged;
        if (eventHandler != null)
        {
            eventHandler(this, new PropertyChangedEventArgs(propertyName));
        }
    }
}替換Timers
現存的.NET Timers
一般Timer的使用
public void StartTimer()
{
    Timer timer = new Timer(5000);
    timer.Elapsed += this.OnTimerElapsed;
    timer.Start();
}
private void OnTimerElapsed(object sender, ElapsedEventArgs e)
{
    // Do Stuff Here
    Console.WriteLine(e.SignalTime);
    // Console WriteLine Prints
    // 11/03/2014 10:58:35
    // 11/03/2014 10:58:40
    // 11/03/2014 10:58:45
    // ...
}Rx的Timers
public void StartTimer()
{
    Observable
        .Interval(TimeSpan.FromSeconds(5))
        .Subscribe(
            x =>
            {
                // Do Stuff Here
                Console.WriteLine(x);
                    // Console WriteLine Prints
                    // 0
                    // 1
                    // 2
                    // ...
            });
}5秒後執行一次
public void StartTimerAndFireOnce()
{
    Observable
        .Timer(TimeSpan.FromSeconds(5))
        .Subscribe(
            x =>
            {
                // Do Stuff Here
                Console.WriteLine(x);
                // Console WriteLine Prints
                // 0
            });
}一分鐘後,每5秒執行一次
public void StartTimerInOneMinute()
{
    Observable
        .Timer(TimeSpan.FromMinutes(1), TimeSpan.FromSeconds(5))
        .Subscribe(
            x =>
            {
                // Do Stuff Here
                Console.WriteLine(x);
                // Console WriteLine Prints
                // 0
                // 1
                // 2
                // ...
            });
}選擇在那一個Schedule中執行程式,如下所示在UI執行緒中執行,因此不需要用Invoke的方式
public void StartTimerOnUIThread()
{
    Observable
        .Interval(TimeSpan.FromSeconds(5), DispatcherScheduler.Current)
        .Subscribe(
            x =>
            {
                // Do UI Stuff Here
            });
}Task ToObservable
將Tasks轉成可觀察序列
ToObservable擴充函式可讓你將Task或Task<T>轉成IObserverable<T>,在Task上呼叫ToObservable函式會回傳一IObservable<Unit>,Unit是一個不做任何事的空物件,存在的原因是因為不存在不需回傳值的介面。
IObservable<Unit> observable = Task.Run(() => Console.WriteLine("Working")).ToObservable();
IObservable<string> observableT = Task<string>.Run(() => "Working").ToObservable();如果你訂閱上述可觀察序列,它們只會回傳一個值然後結束。
Putting It All Together
範例
public Task<string> GetHelloString()
{
    return Task.Run(
        async () =>
        {
            await Task.Delay(500);
            return "Hello";
        });
}
public Task<string> GetWorldString()
{
    return Task.Run(
        async () =>
        {
            await Task.Delay(1000);
            return "World";
        });
}若是要取得第一個回傳的結果 
TPL way
public async Task<string> WaitForFirstResultAndReturn()
{
    Task<string> task1 = this.GetHelloString();
    Task<string> task2 = this.GetWorldString();
    return await Task.WhenAny(task1, task2).Result;
}Rx way
public async Task<string> WaitForFirstResultAndReturn()
{
    IObservable<string> observable1 = this.GetHelloString().ToObservable();
    IObservable<string> observable2 = this.GetWorldString().ToObservable();
    return await observable1.Merge(observable2).FirstAsync();
}兩個方式很像,但TPL更簡單些。
再來,我們等待兩個Task完成並組合。 
TPL way
public async Task<string> WaitForAllResultsAndReturnCombinedResult()
{
    Task<string> task1 = this.GetHelloString();
    Task<string> task2 = this.GetWorldString();
    return string.Join(" ", await Task.WhenAll(task1, task2));
}Rx way
public async Task<string> WaitForAllResultsAndReturnCombinedResult()
{
    IObservable<string> observable1 = this.GetHelloString().ToObservable();
    IObservable<string> observable2 = this.GetWorldString().ToObservable();
    return await observable1.Zip(observable2, (x1, x2) => string.Join(" ", x1, x2));
}兩個方式很像,但TPL仍然更簡單。
再來,我們等待第一個結果完成,但加上逾時限制。
TPL way
public async Task<string> WaitForFirstResultAndReturnResultWithTimeOut()
{
    Task<string> task1 = this.GetHelloString();
    Task<string> task2 = this.GetWorldString();
    Task timeoutTask = Task.Delay(100);
    Task completedTask = await Task.WhenAny(task1, task2, timeoutTask);
    if (completedTask == timeoutTask)
    {
        throw new TimeoutException("The operation has timed out");
    }
    return ((Task<string>)completedTask).Result;
}Rx way
public async Task<string> WaitForFirstResultAndReturnResultWithTimeOut()
{
    IObservable<string> observable1 = this.GetHelloString().ToObservable();
    IObservable<string> observable2 = this.GetWorldString().ToObservable();
    return await observable1.Merge(observable2).Timeout(TimeSpan.FromMilliseconds(100)).FirstAsync();
}當前的狀況,Rx勝出
如果我們要結合兩個目的。
public async Task<string> WaitForFirstResultAndReturnResultWithTimeOut2()
{
    Task<string> task1 = this.GetHelloString();
    Task<string> task2 = this.GetWorldString();
    return await Task
        .WhenAny(task1, task2)
        .ToObservable()
        .Timeout(TimeSpan.FromMilliseconds(1000))
        .FirstAsync();
}對事件取樣
有時對事件的註冊會導致程式UI凍結。
this.TextBox.TextChanged += this.OnTextBoxTextChanged;
private void OnTextBoxTextChanged(object sender, TextChangedEventArgs e)
{
    // Heavy User Interface updates that can cause the application to lock up.
}Rx可以簡單的對事件取樣以降低事件觸發的頻率。
public IObservable<TextChangedEventArgs> WhenTextChanged
{
    get
    {
        return Observable
            .FromEventPattern<TextChangedEventHandler, TextChangedEventArgs>(
                h => this.TextBox.TextChanged += h,
                h => this.TextBox.TextChanged -= h)
            .Select(x => x.EventArgs);
    }
}
this.WhenTextChanged
    .Sample(TimeSpan.FromSeconds(3))
    .Subscribe(x => Debug.WriteLine(DateTime.Now + " Text Changed"));逾時
public async Task<string> WaitForFirstResultWithTimeOut()
{
    Task<string> task = this.DownloadTheInternet();
    return await task
        .ToObservable()
        .Timeout(TimeSpan.FromMilliseconds(1000))
        .FirstAsync();
}Written with StackEdit.
 
 
沒有留言:
張貼留言