1.10.2017

NetMQ.ReactiveExtensions 介紹

之前有介紹 NetMQ+ RX (Streaming Data Demo App 2 of 2),作者用NetMQ配合Rx及類Actor實作了一個資料的推送系統,現在學NetMQ的過程中,發現了另一個有趣的專案 – NetMQ.ReactiveExtensions,另有提供NuGet套件。

基本上它目前僅以Rx的概念實作了NetMQPub/Sub模式(還沒確定是否支援XPub/XSub),用法如下所示(來自其GitHub的範例):

var publisher = new PublisherNetMq<int>("tcp://127.0.0.1:56001");
var subscriber = new SubscriberNetMq<int>("tcp://127.0.0.1:56001");
subscriber.Subscribe(message =>
{
    Console.Write(message); // Prints "42".
});
publisher.OnNext(42); // Sends 42.

它不只可以基本型別來傳送訊息,也以泛型支援自訂的型別,其內部還使用ProtoBuf做序列化(所以需要用一些annotation),以下是其GitHub的範例:

[ProtoContract]
public struct MyMessage
{
    [ProtoMember(1)]
    public int Num { get; set; }
    [ProtoMember(2)]
    public string Name { get; set; }

    // 原文少了此建構式
    public MyMessage(int num, string name)
    {
        Num = num;
        Name = name;
    }
}

var publisher = new PublisherNetMq<MyMessage>("tcp://127.0.0.1:56001");
var subscriber = new SubscriberNetMq<MyMessage>("tcp://127.0.0.1:56001");
subscriber.Subscribe(message =>
{
    Console.WriteLine(message.Num); // Prints "42".
    Console.WriteLine(message.Name); // Prints "Bill".
});
publisher.OnNext(new MyMessage(42, "Bill"); 

注意目前它還沒到1.0版,所以有點小問題 ー 型別的命名,如”PublisherNetMQ”,應該是”PublisherNetMq”,小寫的”q”!(我花了超過10分鐘找為什麼編譯有問題@@,還特別拉了原始碼下來看,有的命名是”MQ”,有的是”Mq”,已跟原作者提…)

雖然說是“僅以Rx的概念實作”,但是它的好處是隨之而來的Rx的特性,也就是其豐富的序列操作的運算子,如.Where(),.Select(), .Buffer(), .Throttle()等。

另外也可以參考一下它實作Rx的方式,以加深對Rx的印象。

Written with StackEdit.

沒有留言:

張貼留言