4.05.2017

Rx應用

本文介紹來自:http://rehansaeed.com/reactive-extensions-part1-replacing-events/及其相關系列

原文作者在學習Rx時也有跟我一樣的疑問,它到底適用在那裡?

Event的部份也可參考:http://mark-dot-net.blogspot.tw/2014/04/reactive-extensions-observables-versus.html
PS:ReactiveUI也是使用Rx的一個有趣的可參考的框架

替換 C# Events

公開一個Event

一般的Event使用

public class JetFighter
{
    public event EventHandler<JetFighterEventArgs> PlaneSpotted;

    public void SpotPlane(JetFighter jetFighter)
    {
        EventHandler<JetFighterEventArgs> eventHandler = this.PlaneSpotted;
        if (eventHandler != null)
        {
            eventHandler(this, new JetFighterEventArgs(jetfighter));
        }
    }
}

換用Rx

public class JetFighter
{
    private Subject<JetFighter> planeSpotted = new Subject<JetFighter>();

    public IObservable<JetFighter> PlaneSpotted
    {
        get { return this.planeSpotted.AsObservable(); }
    }

    public void SpotPlane(JetFighter jetFighter)
    {
        this.planeSpotted.OnNext(jetFighter);
    }
}

此行

return this.planeSpotted.AsObservable();

是為了讓使用者不能透過將Subject介面轉型來自己發送訊息。

當然,Rx還可以提供錯誤及完成的通知:

public class JetFighter
{
    private Subject<JetFighter> planeSpotted = new Subject<JetFighter>();

    public IObservable<JetFighter> PlaneSpotted
    {
        get { return this.planeSpotted; }
    }

    public void AllPlanesSpotted()
    {
        this.planeSpotted.OnCompleted();
    }

    public void SpotPlane(JetFighter jetFighter)
    {
        try
        {
            if (string.Equals(jetFighter.Name, "UFO"))
            {
                throw new Exception("UFO Found")
            }

            this.planeSpotted.OnNext(jetFighter);
        }
        catch (Exception exception)
        {
            this.planeSpotted.OnError(exception);
        }
    }
}

使用Event

標準事件使用方式

public class BomberControl : IDisposable
{
    private JetFighter jetfighter;

    public BomberControl(JetFighter jetFighter)
    {
        jetfighter.PlaneSpotted += this.OnPlaneSpotted;
    }

    public void Dispose()
    {
        jetfighter.PlaneSpotted -= this.OnPlaneSpotted;
    }

    private void OnPlaneSpotted(object sender, JetFighterEventArgs e)
    {
        JetFighter spottedPlane = e.SpottedPlane;
    }
}

Rx的使用方式

public class BomberControl : IDisposable
{
    private IDisposable planeSpottedSubscription;

    public BomberControl(JetFighter jetFighter)
    {
        this. planeSpottedSubscription = jetfighter.PlaneSpotted.Subscribe(this.OnPlaneSpotted);
    }

    public void Dispose()
    {
        this.planeSpottedSubscription.Dispose();
    }

    private void OnPlaneSpotted(JetFighter jetFighter)
    {
        JetFighter spottedPlane = jetfighter;
    }
}

Rx還可以

jetfighter.PlaneSpotted.Where(x => string.Equals(x.Name, “Eurofighter”)).Subscribe(this.OnPlaneSpotted);

結論

Event也是觀察者模式的實作,但由上可知,它較Rx少了錯誤和完成的通知,當然,若是不需要的話也沒有差別,而Rx除了這兩種通知,它讓訂閱端多了對來源操作的可能性,如上述的Where操作。

包裝Event

包裝EventHandler

public event EventHandler BunnyRabbitsAttack;

public IObservable<object> WhenBunnyRabbitsAttack
{
    get
    {
        return Observable
            .FromEventPattern(
                h => this.BunnyRabbitsAttack += h,
                h => this.BunnyRabbitsAttack -= h);
    }
}

包裝帶參數的EventHandler

public event EventHandler<BunnRabbitsEventArgs> BunnyRabbitsAttack;
public IObservable<BunnRabbits> WhenBunnyRabbitsAttack
{
    get
    {
        return Observable
            .FromEventPattern<BunnRabbitsEventArgs>(
                h => this.BunnyRabbitsAttack += h,
                h => this.BunnyRabbitsAttack -= h)
            .Select(x => x.EventArgs.BunnRabbits);
    }
}

包裝自訂的EventHandler

public event BunnRabbitsEventHandler BunnyRabbitsAttack;

public IObservable<BunnRabbits> WhenBunnyRabbitsAttack
{
    get
    {
        return Observable
            .FromEventPattern<BunnRabbitsEventHandler, BunnRabbitsEventArgs>(
                h => this.BunnyRabbitsAttack += h,
                h => this.BunnyRabbitsAttack -= h)
            .Select(x => x.EventArgs.BunnRabbits);
    }
}

使用明確的介面實作以隱藏現存事件

public abstract class NotifyPropertyChanges : INotifyPropertyChanged
{
    event PropertyChangedEventHandler INotifyPropertyChanged.PropertyChanged
    {
        add { this.propertyChanged += value; }
        remove { this.propertyChanged -= value; }
    }

    private event PropertyChangedEventHandler propertyChanged;

    public IObservable<string> WhenPropertyChanged
    {
        get
        {
            return Observable
                .FromEventPattern<PropertyChangedEventHandler, PropertyChangedEventArgs>(
                    h => this.propertyChanged += h,
                    h => this.propertyChanged -= h)
                .Select(x => x.EventArgs.PropertyName);
        }
    }

    protected void OnPropertyChanged(string propertyName)
    {
        PropertyChangedEventHandler eventHandler = this.propertyChanged;
        if (eventHandler != null)
        {
            eventHandler(this, new PropertyChangedEventArgs(propertyName));
        }
    }
}

替換Timers

現存的.NET Timers

一般Timer的使用

public void StartTimer()
{
    Timer timer = new Timer(5000);
    timer.Elapsed += this.OnTimerElapsed;
    timer.Start();
}

private void OnTimerElapsed(object sender, ElapsedEventArgs e)
{
    // Do Stuff Here
    Console.WriteLine(e.SignalTime);
    // Console WriteLine Prints
    // 11/03/2014 10:58:35
    // 11/03/2014 10:58:40
    // 11/03/2014 10:58:45
    // ...
}

Rx的Timers

public void StartTimer()
{
    Observable
        .Interval(TimeSpan.FromSeconds(5))
        .Subscribe(
            x =>
            {
                // Do Stuff Here
                Console.WriteLine(x);
                    // Console WriteLine Prints
                    // 0
                    // 1
                    // 2
                    // ...
            });
}

5秒後執行一次

public void StartTimerAndFireOnce()
{
    Observable
        .Timer(TimeSpan.FromSeconds(5))
        .Subscribe(
            x =>
            {
                // Do Stuff Here
                Console.WriteLine(x);
                // Console WriteLine Prints
                // 0
            });
}

一分鐘後,每5秒執行一次

public void StartTimerInOneMinute()
{
    Observable
        .Timer(TimeSpan.FromMinutes(1), TimeSpan.FromSeconds(5))
        .Subscribe(
            x =>
            {
                // Do Stuff Here
                Console.WriteLine(x);
                // Console WriteLine Prints
                // 0
                // 1
                // 2
                // ...
            });
}

選擇在那一個Schedule中執行程式,如下所示在UI執行緒中執行,因此不需要用Invoke的方式

public void StartTimerOnUIThread()
{
    Observable
        .Interval(TimeSpan.FromSeconds(5), DispatcherScheduler.Current)
        .Subscribe(
            x =>
            {
                // Do UI Stuff Here
            });
}

Task ToObservable

將Tasks轉成可觀察序列

ToObservable擴充函式可讓你將Task或Task<T>轉成IObserverable<T>,在Task上呼叫ToObservable函式會回傳一IObservable<Unit>,Unit是一個不做任何事的空物件,存在的原因是因為不存在不需回傳值的介面。

IObservable<Unit> observable = Task.Run(() => Console.WriteLine("Working")).ToObservable();

IObservable<string> observableT = Task<string>.Run(() => "Working").ToObservable();

如果你訂閱上述可觀察序列,它們只會回傳一個值然後結束。

Putting It All Together

範例

public Task<string> GetHelloString()
{
    return Task.Run(
        async () =>
        {
            await Task.Delay(500);
            return "Hello";
        });
}

public Task<string> GetWorldString()
{
    return Task.Run(
        async () =>
        {
            await Task.Delay(1000);
            return "World";
        });
}

若是要取得第一個回傳的結果
TPL way

public async Task<string> WaitForFirstResultAndReturn()
{
    Task<string> task1 = this.GetHelloString();
    Task<string> task2 = this.GetWorldString();

    return await Task.WhenAny(task1, task2).Result;
}

Rx way

public async Task<string> WaitForFirstResultAndReturn()
{
    IObservable<string> observable1 = this.GetHelloString().ToObservable();
    IObservable<string> observable2 = this.GetWorldString().ToObservable();

    return await observable1.Merge(observable2).FirstAsync();
}

兩個方式很像,但TPL更簡單些。

再來,我們等待兩個Task完成並組合。
TPL way

public async Task<string> WaitForAllResultsAndReturnCombinedResult()
{
    Task<string> task1 = this.GetHelloString();
    Task<string> task2 = this.GetWorldString();

    return string.Join(" ", await Task.WhenAll(task1, task2));
}

Rx way

public async Task<string> WaitForAllResultsAndReturnCombinedResult()
{
    IObservable<string> observable1 = this.GetHelloString().ToObservable();
    IObservable<string> observable2 = this.GetWorldString().ToObservable();

    return await observable1.Zip(observable2, (x1, x2) => string.Join(" ", x1, x2));
}

兩個方式很像,但TPL仍然更簡單。

再來,我們等待第一個結果完成,但加上逾時限制。

TPL way

public async Task<string> WaitForFirstResultAndReturnResultWithTimeOut()
{
    Task<string> task1 = this.GetHelloString();
    Task<string> task2 = this.GetWorldString();
    Task timeoutTask = Task.Delay(100);

    Task completedTask = await Task.WhenAny(task1, task2, timeoutTask);
    if (completedTask == timeoutTask)
    {
        throw new TimeoutException("The operation has timed out");
    }

    return ((Task<string>)completedTask).Result;
}

Rx way

public async Task<string> WaitForFirstResultAndReturnResultWithTimeOut()
{
    IObservable<string> observable1 = this.GetHelloString().ToObservable();
    IObservable<string> observable2 = this.GetWorldString().ToObservable();

    return await observable1.Merge(observable2).Timeout(TimeSpan.FromMilliseconds(100)).FirstAsync();
}

當前的狀況,Rx勝出

如果我們要結合兩個目的。

public async Task<string> WaitForFirstResultAndReturnResultWithTimeOut2()
{
    Task<string> task1 = this.GetHelloString();
    Task<string> task2 = this.GetWorldString();

    return await Task
        .WhenAny(task1, task2)
        .ToObservable()
        .Timeout(TimeSpan.FromMilliseconds(1000))
        .FirstAsync();
}

對事件取樣

有時對事件的註冊會導致程式UI凍結。

this.TextBox.TextChanged += this.OnTextBoxTextChanged;

private void OnTextBoxTextChanged(object sender, TextChangedEventArgs e)
{
    // Heavy User Interface updates that can cause the application to lock up.
}

Rx可以簡單的對事件取樣以降低事件觸發的頻率。

public IObservable<TextChangedEventArgs> WhenTextChanged
{
    get
    {
        return Observable
            .FromEventPattern<TextChangedEventHandler, TextChangedEventArgs>(
                h => this.TextBox.TextChanged += h,
                h => this.TextBox.TextChanged -= h)
            .Select(x => x.EventArgs);
    }
}

this.WhenTextChanged
    .Sample(TimeSpan.FromSeconds(3))
    .Subscribe(x => Debug.WriteLine(DateTime.Now + " Text Changed"));

逾時

public async Task<string> WaitForFirstResultWithTimeOut()
{
    Task<string> task = this.DownloadTheInternet();

    return await task
        .ToObservable()
        .Timeout(TimeSpan.FromMilliseconds(1000))
        .FirstAsync();
}

Written with StackEdit.

沒有留言:

張貼留言