本文介紹來自:http://rehansaeed.com/reactive-extensions-part1-replacing-events/及其相關系列
原文作者在學習Rx時也有跟我一樣的疑問,它到底適用在那裡?
Event的部份也可參考:http://mark-dot-net.blogspot.tw/2014/04/reactive-extensions-observables-versus.html
PS:ReactiveUI也是使用Rx的一個有趣的可參考的框架
替換 C# Events
公開一個Event
一般的Event使用
public class JetFighter
{
public event EventHandler<JetFighterEventArgs> PlaneSpotted;
public void SpotPlane(JetFighter jetFighter)
{
EventHandler<JetFighterEventArgs> eventHandler = this.PlaneSpotted;
if (eventHandler != null)
{
eventHandler(this, new JetFighterEventArgs(jetfighter));
}
}
}
換用Rx
public class JetFighter
{
private Subject<JetFighter> planeSpotted = new Subject<JetFighter>();
public IObservable<JetFighter> PlaneSpotted
{
get { return this.planeSpotted.AsObservable(); }
}
public void SpotPlane(JetFighter jetFighter)
{
this.planeSpotted.OnNext(jetFighter);
}
}
此行
return this.planeSpotted.AsObservable();
是為了讓使用者不能透過將Subject介面轉型來自己發送訊息。
當然,Rx還可以提供錯誤及完成的通知:
public class JetFighter
{
private Subject<JetFighter> planeSpotted = new Subject<JetFighter>();
public IObservable<JetFighter> PlaneSpotted
{
get { return this.planeSpotted; }
}
public void AllPlanesSpotted()
{
this.planeSpotted.OnCompleted();
}
public void SpotPlane(JetFighter jetFighter)
{
try
{
if (string.Equals(jetFighter.Name, "UFO"))
{
throw new Exception("UFO Found")
}
this.planeSpotted.OnNext(jetFighter);
}
catch (Exception exception)
{
this.planeSpotted.OnError(exception);
}
}
}
使用Event
標準事件使用方式
public class BomberControl : IDisposable
{
private JetFighter jetfighter;
public BomberControl(JetFighter jetFighter)
{
jetfighter.PlaneSpotted += this.OnPlaneSpotted;
}
public void Dispose()
{
jetfighter.PlaneSpotted -= this.OnPlaneSpotted;
}
private void OnPlaneSpotted(object sender, JetFighterEventArgs e)
{
JetFighter spottedPlane = e.SpottedPlane;
}
}
Rx的使用方式
public class BomberControl : IDisposable
{
private IDisposable planeSpottedSubscription;
public BomberControl(JetFighter jetFighter)
{
this. planeSpottedSubscription = jetfighter.PlaneSpotted.Subscribe(this.OnPlaneSpotted);
}
public void Dispose()
{
this.planeSpottedSubscription.Dispose();
}
private void OnPlaneSpotted(JetFighter jetFighter)
{
JetFighter spottedPlane = jetfighter;
}
}
Rx還可以
jetfighter.PlaneSpotted.Where(x => string.Equals(x.Name, “Eurofighter”)).Subscribe(this.OnPlaneSpotted);
結論
Event也是觀察者模式的實作,但由上可知,它較Rx少了錯誤和完成的通知,當然,若是不需要的話也沒有差別,而Rx除了這兩種通知,它讓訂閱端多了對來源操作的可能性,如上述的Where操作。
包裝Event
包裝EventHandler
public event EventHandler BunnyRabbitsAttack;
public IObservable<object> WhenBunnyRabbitsAttack
{
get
{
return Observable
.FromEventPattern(
h => this.BunnyRabbitsAttack += h,
h => this.BunnyRabbitsAttack -= h);
}
}
包裝帶參數的EventHandler
public event EventHandler<BunnRabbitsEventArgs> BunnyRabbitsAttack;
public IObservable<BunnRabbits> WhenBunnyRabbitsAttack
{
get
{
return Observable
.FromEventPattern<BunnRabbitsEventArgs>(
h => this.BunnyRabbitsAttack += h,
h => this.BunnyRabbitsAttack -= h)
.Select(x => x.EventArgs.BunnRabbits);
}
}
包裝自訂的EventHandler
public event BunnRabbitsEventHandler BunnyRabbitsAttack;
public IObservable<BunnRabbits> WhenBunnyRabbitsAttack
{
get
{
return Observable
.FromEventPattern<BunnRabbitsEventHandler, BunnRabbitsEventArgs>(
h => this.BunnyRabbitsAttack += h,
h => this.BunnyRabbitsAttack -= h)
.Select(x => x.EventArgs.BunnRabbits);
}
}
使用明確的介面實作以隱藏現存事件
public abstract class NotifyPropertyChanges : INotifyPropertyChanged
{
event PropertyChangedEventHandler INotifyPropertyChanged.PropertyChanged
{
add { this.propertyChanged += value; }
remove { this.propertyChanged -= value; }
}
private event PropertyChangedEventHandler propertyChanged;
public IObservable<string> WhenPropertyChanged
{
get
{
return Observable
.FromEventPattern<PropertyChangedEventHandler, PropertyChangedEventArgs>(
h => this.propertyChanged += h,
h => this.propertyChanged -= h)
.Select(x => x.EventArgs.PropertyName);
}
}
protected void OnPropertyChanged(string propertyName)
{
PropertyChangedEventHandler eventHandler = this.propertyChanged;
if (eventHandler != null)
{
eventHandler(this, new PropertyChangedEventArgs(propertyName));
}
}
}
替換Timers
現存的.NET Timers
一般Timer的使用
public void StartTimer()
{
Timer timer = new Timer(5000);
timer.Elapsed += this.OnTimerElapsed;
timer.Start();
}
private void OnTimerElapsed(object sender, ElapsedEventArgs e)
{
// Do Stuff Here
Console.WriteLine(e.SignalTime);
// Console WriteLine Prints
// 11/03/2014 10:58:35
// 11/03/2014 10:58:40
// 11/03/2014 10:58:45
// ...
}
Rx的Timers
public void StartTimer()
{
Observable
.Interval(TimeSpan.FromSeconds(5))
.Subscribe(
x =>
{
// Do Stuff Here
Console.WriteLine(x);
// Console WriteLine Prints
// 0
// 1
// 2
// ...
});
}
5秒後執行一次
public void StartTimerAndFireOnce()
{
Observable
.Timer(TimeSpan.FromSeconds(5))
.Subscribe(
x =>
{
// Do Stuff Here
Console.WriteLine(x);
// Console WriteLine Prints
// 0
});
}
一分鐘後,每5秒執行一次
public void StartTimerInOneMinute()
{
Observable
.Timer(TimeSpan.FromMinutes(1), TimeSpan.FromSeconds(5))
.Subscribe(
x =>
{
// Do Stuff Here
Console.WriteLine(x);
// Console WriteLine Prints
// 0
// 1
// 2
// ...
});
}
選擇在那一個Schedule中執行程式,如下所示在UI執行緒中執行,因此不需要用Invoke的方式
public void StartTimerOnUIThread()
{
Observable
.Interval(TimeSpan.FromSeconds(5), DispatcherScheduler.Current)
.Subscribe(
x =>
{
// Do UI Stuff Here
});
}
Task ToObservable
將Tasks轉成可觀察序列
ToObservable擴充函式可讓你將Task或Task<T>
轉成IObserverable<T>
,在Task上呼叫ToObservable函式會回傳一IObservable<Unit>
,Unit是一個不做任何事的空物件,存在的原因是因為不存在不需回傳值的介面。
IObservable<Unit> observable = Task.Run(() => Console.WriteLine("Working")).ToObservable();
IObservable<string> observableT = Task<string>.Run(() => "Working").ToObservable();
如果你訂閱上述可觀察序列,它們只會回傳一個值然後結束。
Putting It All Together
範例
public Task<string> GetHelloString()
{
return Task.Run(
async () =>
{
await Task.Delay(500);
return "Hello";
});
}
public Task<string> GetWorldString()
{
return Task.Run(
async () =>
{
await Task.Delay(1000);
return "World";
});
}
若是要取得第一個回傳的結果
TPL way
public async Task<string> WaitForFirstResultAndReturn()
{
Task<string> task1 = this.GetHelloString();
Task<string> task2 = this.GetWorldString();
return await Task.WhenAny(task1, task2).Result;
}
Rx way
public async Task<string> WaitForFirstResultAndReturn()
{
IObservable<string> observable1 = this.GetHelloString().ToObservable();
IObservable<string> observable2 = this.GetWorldString().ToObservable();
return await observable1.Merge(observable2).FirstAsync();
}
兩個方式很像,但TPL更簡單些。
再來,我們等待兩個Task完成並組合。
TPL way
public async Task<string> WaitForAllResultsAndReturnCombinedResult()
{
Task<string> task1 = this.GetHelloString();
Task<string> task2 = this.GetWorldString();
return string.Join(" ", await Task.WhenAll(task1, task2));
}
Rx way
public async Task<string> WaitForAllResultsAndReturnCombinedResult()
{
IObservable<string> observable1 = this.GetHelloString().ToObservable();
IObservable<string> observable2 = this.GetWorldString().ToObservable();
return await observable1.Zip(observable2, (x1, x2) => string.Join(" ", x1, x2));
}
兩個方式很像,但TPL仍然更簡單。
再來,我們等待第一個結果完成,但加上逾時限制。
TPL way
public async Task<string> WaitForFirstResultAndReturnResultWithTimeOut()
{
Task<string> task1 = this.GetHelloString();
Task<string> task2 = this.GetWorldString();
Task timeoutTask = Task.Delay(100);
Task completedTask = await Task.WhenAny(task1, task2, timeoutTask);
if (completedTask == timeoutTask)
{
throw new TimeoutException("The operation has timed out");
}
return ((Task<string>)completedTask).Result;
}
Rx way
public async Task<string> WaitForFirstResultAndReturnResultWithTimeOut()
{
IObservable<string> observable1 = this.GetHelloString().ToObservable();
IObservable<string> observable2 = this.GetWorldString().ToObservable();
return await observable1.Merge(observable2).Timeout(TimeSpan.FromMilliseconds(100)).FirstAsync();
}
當前的狀況,Rx勝出
如果我們要結合兩個目的。
public async Task<string> WaitForFirstResultAndReturnResultWithTimeOut2()
{
Task<string> task1 = this.GetHelloString();
Task<string> task2 = this.GetWorldString();
return await Task
.WhenAny(task1, task2)
.ToObservable()
.Timeout(TimeSpan.FromMilliseconds(1000))
.FirstAsync();
}
對事件取樣
有時對事件的註冊會導致程式UI凍結。
this.TextBox.TextChanged += this.OnTextBoxTextChanged;
private void OnTextBoxTextChanged(object sender, TextChangedEventArgs e)
{
// Heavy User Interface updates that can cause the application to lock up.
}
Rx可以簡單的對事件取樣以降低事件觸發的頻率。
public IObservable<TextChangedEventArgs> WhenTextChanged
{
get
{
return Observable
.FromEventPattern<TextChangedEventHandler, TextChangedEventArgs>(
h => this.TextBox.TextChanged += h,
h => this.TextBox.TextChanged -= h)
.Select(x => x.EventArgs);
}
}
this.WhenTextChanged
.Sample(TimeSpan.FromSeconds(3))
.Subscribe(x => Debug.WriteLine(DateTime.Now + " Text Changed"));
逾時
public async Task<string> WaitForFirstResultWithTimeOut()
{
Task<string> task = this.DownloadTheInternet();
return await task
.ToObservable()
.Timeout(TimeSpan.FromMilliseconds(1000))
.FirstAsync();
}
Written with StackEdit.
沒有留言:
張貼留言