之前有介紹 NetMQ+ RX (Streaming Data Demo App 2 of 2),作者用NetMQ配合Rx及類Actor實作了一個資料的推送系統,現在學NetMQ的過程中,發現了另一個有趣的專案 – NetMQ.ReactiveExtensions,另有提供NuGet套件。
基本上它目前僅以Rx
的概念實作了NetMQ
的 Pub/Sub
模式(還沒確定是否支援XPub/XSub),用法如下所示(來自其GitHub的範例):
var publisher = new PublisherNetMq<int>("tcp://127.0.0.1:56001");
var subscriber = new SubscriberNetMq<int>("tcp://127.0.0.1:56001");
subscriber.Subscribe(message =>
{
Console.Write(message); // Prints "42".
});
publisher.OnNext(42); // Sends 42.
它不只可以基本型別來傳送訊息,也以泛型支援自訂的型別,其內部還使用ProtoBuf做序列化(所以需要用一些annotation),以下是其GitHub的範例:
[ProtoContract]
public struct MyMessage
{
[ProtoMember(1)]
public int Num { get; set; }
[ProtoMember(2)]
public string Name { get; set; }
// 原文少了此建構式
public MyMessage(int num, string name)
{
Num = num;
Name = name;
}
}
var publisher = new PublisherNetMq<MyMessage>("tcp://127.0.0.1:56001");
var subscriber = new SubscriberNetMq<MyMessage>("tcp://127.0.0.1:56001");
subscriber.Subscribe(message =>
{
Console.WriteLine(message.Num); // Prints "42".
Console.WriteLine(message.Name); // Prints "Bill".
});
publisher.OnNext(new MyMessage(42, "Bill");
注意目前它還沒到1.0版,所以有點小問題 ー 型別的命名,如”PublisherNetMQ”,應該是”PublisherNetMq”,小寫的”q”!(我花了超過10分鐘找為什麼編譯有問題@@,還特別拉了原始碼下來看,有的命名是”MQ”,有的是”Mq”,已跟原作者提…)
雖然說是“僅以Rx
的概念實作”,但是它的好處是隨之而來的Rx的特性,也就是其豐富的序列操作的運算子,如.Where()
,.Select()
, .Buffer()
, .Throttle()
等。
另外也可以參考一下它實作Rx的方式,以加深對Rx的印象。
Written with StackEdit.
沒有留言:
張貼留言