正在學NetMQ,順便簡單的翻譯下文件…翻譯不出來的或是覺得不重要的就以”…”符號替換,或顯示原文。
當然,辭不達義的地方也會有,請包含…
原文來自NetMQ doc
依原文的建立方式,我也建了繁中版的NetMQ doc,可在那邊查閱,版面比較清楚(已詢問開發團隊,之後可能就在原本的文件網站選擇查看其它語系)
若你想幫忙修正翻譯,可在我fork的Github NetMQ zh_tw修改
介紹
所以你正在找訊息函式庫,也許你對WCF跟MSMQ感到沮喪(我們也是…),且聽說ZeroMQ很快,所以你找到這裡,NetMQ,一個Zero(或稱ØMQ)的.Net移植。
是的,NetMQ是一個訊息函式庫,而且很快,但需要一點學習時間,期望你能夠快速地掌握它。
從那裡開始
ZeroMQ
和NetMQ
不是那些你下載後,看一下範例就會的函式庫,它背後有一些原則,要先瞭解後才能順利應用,所以最佳的開始的地方是ZeroMQ guide,讀一或兩次後,再回來這篇文章。
ZeroMQ中的Zero
ZeroMQ
的哲理是從Zero開始。Zero是指Zero broker
(ZeroMQ
沒有中介者)、零延遲、零成本(免費的)及零管理。
更進一步說,“zero”代表滲透整個專案的極簡主義的文化。我們通過消除複雜性而不是增加新函式來提供功能。
取得函式庫
可以從NuGet取得函式庫。
傳送及接收
由於NetMQ
就是關於sockets
的,所以傳送及接收是很自然的預期。更由於這屬於NetMQ的一般區域,所以另有一個關於接收與傳送的介紹頁面。
第一個範例
讓我們開始第一個範例吧,(當然)是“Hello world”了:
Server
using (var server = new ResponseSocket())
{
server.Bind("tcp://*:5555");
while (true)
{
var message = server.ReceiveFrameString();
Console.WriteLine("Received {0}", message);
Thread.Sleep(100);
Console.WriteLine("Sending World");
server.SendFrame("World");
}
}
伺服端建立了一個response
的socket
型別(在request-response
章節有更多介紹),將它綁定到port 5555然後等待訊息。
你可以看到我們不用任何設定,只需要傳送字串。NetMQ
不只可以傳送字串,雖然它沒有實作序列化的功能(你需要自己實作),不過你可以在後續學到一些很酷的技巧(Multipart messages)。
Client
using (var client = new RequestSocket())
{
client.Connect("tcp://localhost:5555");
for (int i = 0; i < 10; i++)
{
Console.WriteLine("Sending Hello");
client.SendFrame("Hello");
var message = client.ReceiveFrameString();
Console.WriteLine("Received {0}", message);
}
}
Client端
建立了一個request
的socket
型別,連線並開始傳送訊息。
傳送及接收函式預設是阻塞式的。對接收來說很簡單:如果沒有收到訊息它會阻塞;而傳送較複雜一點,且跟它的socket型別有關。對request sockets來說,如果到達high watermark,且沒有另一端的連線,函式會阻塞。
然而你可以呼叫TrySend
和TryReceive
以避免等待,如果需要等待,它會回傳false。
string message;
if (client.TryReceiveFrameString(out message))
Console.WriteLine("Received {0}", message);
else
Console.WriteLine("No message received");
Bind vs Connect
上述範例中你可能會注意到server端使用Bind而client端使用Connect,為什麼?有什麼不同嗎?
ZeroMQ
為每個潛在的連線建立佇列。如果你的socket連線到三個socket端點,背後實際有三個佇列存在。
使用Bind
,可以讓其它端點和你建立連接,因為你不知道未來會有多少端點且無法先建立佇列,相反,佇列會在每個端點bound後建立。
使用Connect
,ZeroMQ
知道至少會有一個端點,因此它會馬上建立佇列,除了ROUTE型別外的所有型別都是如此,而ROUTE型別只會在我們連線的每個端點有了回應後才建立佇列。
因此,當傳送訊息至沒有綁定的端點的socket或至沒有連線的ROUTE時,將沒有可以儲存訊息的佇列存在。
何時使用bind或connect?
作為一般規則,在架構中最穩定的端點上使用bind,在動態的、易變的端點上使用connect。對request/reply型來說,伺服端使用bind,而client端使用connect,如同傳統的TCP一樣。
If you can’t figure out which parts are more stable (i.e. peer-to-peer), consider a stable device in the middle, which all sides can connect to.
如果你無法確認那一部份會比較穩定(例如點對點連線),可以考慮在中間放一個穩定的可讓所有端點連線的裝置。
你可 進一步閱讀ZeroMQ FAQ中的”Why do I see different behavior when I bind a socket versus connect a socket?”部份。
多段訊息
ZeroMQ/NetMQ在frame的概念上工作,大多數的訊息都可以想成含有一或多個frame。NetMQ提供一些方便的函式讓你傳送字串訊息,然而你也應該瞭解多段frame的概念及如何應用。
在Message章節有更多說明。
模式
ZeroMQ
(和NetMQ
)都是關於模式和building blocks的。ZeroMQ指南講述了所有你需要知道的知識,以幫助你應用這些模式。在你開始用NetMQ前請先確定你已讀過下列章節。
NetMQ也提供了以NetMQ API撰寫的針對少數幾個模式的範例。你應該也能夠在看過ZeroMQ指南後很簡單的改用NetMQ實作。
這裡有一些已用NetMQ實作的範例程式:
其餘的範例,ZeroMQ指南應是你的第一選擇。
ZeroMQ的模式是以特定型別實作的sockets配對。換句話說,要瞭解ZeroMQ的模式,你要先知道有那些socket型別及它們如何配合。Mostly, this just takes study; there is little that is obvious at this level.
ZeroMQ內建的核心模式是:
- 請求-回應,將一組客戶端連線至一組服務端。這是一種遠端程序呼叫和task分佈模式。
- 發佈-訂閱,連結一組發佈者至一組訂閱者。這是一種資料分佈式模式。
- 管線,連結在一個有多步驟及迴圈的fan-out/fan-in模式中的節點,這是一種 parallel task distribution and collection。
- Exclusive pair,獨占式地連接兩個socket。這是一種在process中連接兩個執行緒的模式,不要和一般的socket配對混肴。
下列是有效的connect-bind的socket合併配對(雙邊都可以bind):
PublisherSocket
and SubscriberSocket
RequestSocket
and ResponseSocket
RequestSocket
and RouterSocket
DealerSocket
and ResponseSocket
DealerSocket
and RouterSocket
DealerSocket
and DealerSocket
RouterSocket
and RouterSocket
PushSocket
and PullSocket
PairSocket
and PairSocket
任何其它的配對方式會產生undocumented及不可靠的結果,ZeroMQ未來的版本可能會在你嘗試時告知錯誤。當然,你也可以使用程式橋接不同的配對,如從某種socket讀取並寫至另一種。
選項
NetMQ提供了數個會影響動作的選項。
根據你使用的socket型別或你嘗試建立的拓撲,你可能發現需要設定一些ZeroMQ的選項。在NetMQ中,可透過NetMQSocket.Options屬性完成。
下面是你可以在NetMQSocket.Options上設定的可用屬性的列表。很難說要設定那些值,那取決於你想要實現什麼。這邊能做的是列出選項,以讓你知道。如下所示:
Affinity
BackLog
CopyMessages
DelayAttachOnConnect
Endian
GetLastEndpoint
IPv4Only
Identity
Linger
MaxMsgSize
MulticastHops
MulticastRate
MulticastRecoveryInterval
ReceiveHighWaterMark
ReceiveMore
ReceiveBuffer
ReconnectInterval
ReconnectIntervalMax
SendHighWaterMark
SendTimeout
SendBuffer
TcpAcceptFilter
TcpKeepAlive
TcpKeepaliveIdle
TcpKeepaliveInterval
XPubVerbose
這裡不會講到所有選項,在用到時才會提。現在只要注意,如果你已經在ZeroMQ指南中讀過某些選項,那麼這應是你需要設置/讀取的地方。
概念
如果你前面有讀過,那你應該有看到範例中有使用到ReceiveString()和SendString()了,但NetMQ讓我們不只可以傳送字串。
實際上有不少可以使用的選項,讓我們來看其中的一部份吧!
傳送及接收
接收
IReceivingSocket(所有socket皆繼承自此介面)有兩個函式:
void Receive(ref Msg msg);
bool TryReceive(ref Msg msg, TimeSpan timeout);
第一個函式會永遠阻塞直到訊息到達,第二個讓我們提供一個逾時時間(可能是零)。
這些函式依靠Msg物件的重覆使用提供我們很高的效能。然而大多時間你會想要更方便的函式來幫你接收string,byte[]等型別,NetMQ在ReceivingSocketExtensions類別中提供了很多IReceivingSocket型別的方便函式:
byte[] ReceiveFrameBytes()
byte[] ReceiveFrameBytes(out bool more)
bool TryReceiveFrameBytes(out byte[] bytes)
bool TryReceiveFrameBytes(out byte[] bytes, out bool more)
bool TryReceiveFrameBytes(TimeSpan timeout, out byte[] bytes)
bool TryReceiveFrameBytes(TimeSpan timeout, out byte[] bytes, out bool more)
List<byte[]> ReceiveMultipartBytes()
void ReceiveMultipartBytes(ref List<byte[]> frames)
bool TryReceiveMultipartBytes(ref List<byte[]> frames)
bool TryReceiveMultipartBytes(TimeSpan timeout, ref List<byte[]> frames)
string ReceiveFrameString()
string ReceiveFrameString(out bool more)
string ReceiveFrameString(Encoding encoding)
string ReceiveFrameString(Encoding encoding, out bool more)
bool TryReceiveFrameString(out string frameString)
bool TryReceiveFrameString(out string frameString, out bool more)
bool TryReceiveFrameString(Encoding encoding, out string frameString)
bool TryReceiveFrameString(Encoding encoding, out string frameString, out bool more)
bool TryReceiveFrameString(TimeSpan timeout, out string frameString)
bool TryReceiveFrameString(TimeSpan timeout, out string frameString, out bool more)
bool TryReceiveFrameString(TimeSpan timeout, Encoding encoding, out string frameString)
bool TryReceiveFrameString(TimeSpan timeout, Encoding encoding, out string frameString, out bool more)
List<string> ReceiveMultipartStrings()
List<string> ReceiveMultipartStrings(Encoding encoding)
bool TryReceiveMultipartStrings(ref List<string> frames)
bool TryReceiveMultipartStrings(Encoding encoding, ref List<string> frames)
bool TryReceiveMultipartStrings(TimeSpan timeout, ref List<string> frames)
bool TryReceiveMultipartStrings(TimeSpan timeout, Encoding encoding, ref List<string> frames)
NetMQMessage ReceiveMultipartMessage()
bool TryReceiveMultipartMessage(ref NetMQMessage message)
bool TryReceiveMultipartMessage(TimeSpan timeout, ref NetMQMessage message)
bool ReceiveSignal()
bool TryReceiveSignal(out bool signal)
bool TryReceiveSignal(TimeSpan timeout, out bool signal)
void SkipFrame()
void SkipFrame(out bool more)
bool TrySkipFrame()
bool TrySkipFrame(out bool more)
bool TrySkipFrame(TimeSpan timeout)
bool TrySkipFrame(TimeSpan timeout, out bool more)
注意為了可讀性this IReceivingSocket socket
參數被省略掉了。
這些擴充函式應符合大多數的需求,如果沒有的話你也可以很簡單的建立自己需要的。
這裡是上述擴充函式之一實作的方式,可以幫助你建立自己的:
public static string ReceiveFrameString(this IReceivingSocket socket, Encoding encoding, out bool more)
{
var msg = new Msg();
msg.InitEmpty();
socket.Receive(ref msg);
more = msg.HasMore;
var str = msg.Size > 0
? encoding.GetString(msg.Data, 0, msg.Size)
: string.Empty;
msg.Close();
return str;
}
傳送
一個NetMQSocket
(所有socket皆繼承至此)有一個send函式。
public virtual void Send(ref Msg msg, SendReceiveOptions options)
如果你不想使用這個函式,也可以用為了IOutgoingSocket
建立的其它方便的擴充函式。
下面列出這些擴充函式,應該夠你使用,若是不足也可以自行建立。
public static class OutgoingSocketExtensions
{
public static void Send(this IOutgoingSocket socket, byte[] data);
public static void Send(this IOutgoingSocket socket, byte[] data, int length, SendReceiveOptions options);
public static void Send(this IOutgoingSocket socket, string message, bool dontWait = false, bool sendMore = false);
public static void Send(this IOutgoingSocket socket, string message, Encoding encoding, SendReceiveOptions options);
public static void Send(this IOutgoingSocket socket, byte[] data, int length, bool dontWait = false, bool sendMore = false);
public static void Send(this IOutgoingSocket socket, string message, Encoding encoding, bool dontWait = false, bool sendMore = false);
public static void SendMessage(this IOutgoingSocket socket, NetMQMessage message, bool dontWait = false);
public static IOutgoingSocket SendMore(this IOutgoingSocket socket, byte[] data, bool dontWait = false);
public static IOutgoingSocket SendMore(this IOutgoingSocket socket, string message, bool dontWait = false);
public static IOutgoingSocket SendMore(this IOutgoingSocket socket, byte[] data, int length, bool dontWait = false);
public static IOutgoingSocket SendMore(this IOutgoingSocket socket, string message, Encoding encoding, bool dontWait = false);
....
....
}
這裡是上述擴充函式之一實作的方式,可以幫助你建立自己的:
public static void Send(this IOutgoingSocket socket, string message,
Encoding encoding, SendReceiveOptions options)
{
var msg = new Msg();
msg.InitPool(encoding.GetByteCount(message));
encoding.GetBytes(message, 0, message.Length, msg.Data, 0);
socket.Send(ref msg, options);
msg.Close();
}
進一步閱讀
…
訊息
訊息結構
using (var server = new ResponseSocket("@tcp://127.0.0.1:5556"))
using (var client = new RequestSocket(">tcp://127.0.0.1:5556"))
{
client.Send("Hello");
string fromClientMessage = server.ReceiveFrameString();
Console.WriteLine("From Client: {0}", fromClientMessage);
server.SendFrame("Hi Back");
string fromServerMessage = client.ReceiveFrameString();
Console.WriteLine("From Server: {0}", fromServerMessage);
Console.ReadLine();
}
也許你有注意到(或沒有)NetMQ的socket有一個ReceiveFrameString()
函式,這是一個很好且有用的函式,但如果你認為只能用它那就不對了。
事實是ZeroMQ/NetMQ是基於frame的,意味著它們實現某種型式的協定。Some of you may balk at this prospect, and may curse, and think damm it, I am not a protocol designer I was not expecting to get my hands that dirty.
While it is true that if you wish to come up with some complex and elaborate architecture you would be best of coming up with a nice protocol, thankfully you will not need to do this all the time. This is largely down to ZeroMQ/NetMQ’s clever sockets that abstract away a lot of that from you, and the way in which you can treat the sockets as building blocks to build complex architecture (think lego).
一個例子是RouterSocket,它與眾不同且聰明地使用frame,它在傳送者訊息上加了一層代表回傳位址的資訊,所以當它接收到一個回傳訊息(從另一個工作的socket),它可以使用收到的frame訊息來獲得來源位址,並依此位址回傳訊息。
所以你應該注意的一個內建的frame的使用的例子,但frame並不限制在RouterSocket類型,你可以在所有的地方使用,如下列範例:
- 你也許想讓
frame[0]
表示接下來的frame的型態,這讓接收者可以去掉不感興趣的訊息,且不需要花費時間反序列化訊息,ZeroMQ/NetMQ在Pub-Sub sockets中使用這個想法,你可以替換或是擴充它。
- 你也許想讓
frame[0]
代表某種命令,frame[1]
代表參數,frame[2]
代表實際訊息內容(也許包含序列化的物件)。
這只是一些範例,實際上你可以用任何你想的方式來操作frame,雖然一些socket型別會期待或產生特定的frame結構。
當你使用多段訊息(frames)時你需要一次傳送/接收所有區段的訊息。
有一個內建的”more”的概念可以讓你整合使用,稍後會有更多例子。
建立多段訊息
建立多段訊息很簡單,有兩個方式可以達成。
建立訊息物件
你可以建立NetMQMessage
物件並透過Append(...)
覆載函式來加上frame資料,也有其它覆載可讓你加上Blob
, NetMQFrae
, byte[]
, int
, long
及string
等。
下列是一個加上兩個frame的訊息的範例,每個frame都包含一個字串值:
var message = new NetMQMessage();
message.Append("IAmFrame0");
message.Append("IAmFrame1");
server.SendMessage(message);
Sending frame by frame
另一個傳送多段訊息的方法是使用SendMoreFrame
擴充函式,這不像SendMessage
一樣有很多覆載,但是它讓可以讓你很簡單地傳送byte[]
,string
資料。這是一個和前述範例相像的範例:
server.SendMoreFrame("IAmFrame0")
.SendFrame("IAmFrame1")
要傳送超過兩個frame,可將多個SendMoreFrame
呼叫鏈結在一起,只要確定最後一個是SendFrame
!
讀取多段訊息
讀取多段訊息也有兩個方法。
接收各別 frames
你可以從socket中一次讀出一個frame。Out參數more
會告訴你目前是不是最後一個訊息。
你也可以使用方便的NetMQ函式ReceiveFrameString(out more)
多次,只需要知道是不是還有frame待讀取,所以要追蹤more
變數的狀態,如下範例:
server.SendMoreFrame("A")
.SendFrame("Hello");
bool more = true;
while (more)
{
string frame = client.ReceiveFrameString(out more);
Console.WriteLine("frame={0}", frame);
Console.WriteLine("more={0}", more);
}
這個迴圈將執行兩次。第一次,more
將被設為true。第二次,false。輸出將是:
frame=A
more=true
frame=Hello
more=false
讀取整段訊息
一個更簡單的方法是使用ReceiveMultipartMessage()
函式,它提供一個包含消息的所有frame的物件。
NetMQMessage message = client.ReceiveMultipartMessage()
Console.WriteLine("message.FrameCount={0}", message.FrameCount)
Console.WriteLine("message[0]={0}", message[0].ConvertToString())
Console.WriteLine("message[1]={0}", message[1].ConvertToString())
輸出會是:
message.FrameCount=2
message[0]=A
message[1]=Hello
也有其它功能,如:
IEnumerable<string> framesAsStrings = client.ReceiveMultipartStrings();
IEnumerable<byte[]> framesAsByteArrays = client.ReceiveMultipartBytes();
一個完整的範例
這裡有一個完整的範例,以加深至目前為止我們談論的印象:
using (var server = new ResponseSocket("@tcp://127.0.0.1:5556"))
using (var client = new RequestSocket(">tcp://127.0.0.1:5556"))
{
Console.WriteLine("Client sending");
client.SendMoreFrame("A").SendFrame("Hello");
bool more = true;
while (more)
{
string frame = server.ReceiveFrameString(out more);
Console.WriteLine("Server received frame={0} more={1}",
frame, more);
}
Console.WriteLine("================================");
var msg = new NetMQMessage();
msg.Append("From");
msg.Append("Server");
Console.WriteLine("Server sending");
server.SendMultipartMessage(msg);
msg = client.ReceiveMultipartMessage();
Console.WriteLine("Client received {0} frames", msg.FrameCount);
foreach (var frame in msg)
Console.WriteLine("Frame={0}", frame.ConvertToString());
Console.ReadLine();
}
輸出如下:
Client sending
Server received frame=A more=true
Server received frame=Hello more=false
================================
Server sending
Client received 2 frames
Frame=From
Frame=Server
傳輸
傳輸協定
NetMQ支援三種主要的協定:
- TCP (tcp://)
- InProc (inproc://)
- PGM (pgm://) — requires MSMQ and running as administrator
下面會一一介紹。
TCP
TCP是最常用到的協定,因此,大部份的程式碼會使用TCP展示。
範例
又一個簡單的範例:
using (var server = new ResponseSocket())
using (var client = new RequestSocket())
{
server.Bind("tcp://*:5555");
client.Connect("tcp://localhost:5555");
Console.WriteLine("Sending Hello");
client.SendFrame("Hello");
var message = server.ReceiveFrameString();
Console.WriteLine("Received {0}", message);
Console.WriteLine("Sending World");
server.SendFrame("World");
message = client.ReceiveFrameString();
Console.WriteLine("Received {0}", message);
}
輸出:
Sending Hello
Received Hello
Sending World
Received World
位址格式
注意位址格式字串會傳送給Bind()
及Connect()
函式。
在TCP連線中,它會被組成:
這由三個部份構成:
- 協議(tcp)
- 主機(IP地址,主機名或匹配
"*"
的wildcard)
- 埠號(5555)
InProc 行程間(通訊)
InProc (in-process)讓你可以在同一個process中用sockets連線溝通,這很有用,有幾個理由:
- 取消共享狀態/鎖。當你傳送資料至socket時不需要擔心共享狀態。Socket的每一端都有自己的副本。
- 能夠在系統的不同的部分之間進行通信。
NetMQ提供了幾個使用InProc的組件,例如Actor模型和Devices,在相關文件中會再討論。
範例
現在讓我們通過在兩個執行緒之間傳送一個字串(為了簡單起見)展示一個簡單的InProc。
using (var end1 = new PairSocket())
using (var end2 = new PairSocket())
{
end1.Bind("inproc://inproc-demo");
end2.Connect("inproc://inproc-demo");
var end1Task = Task.Run(() =>
{
Console.WriteLine("ThreadId = {0}", Thread.CurrentThread.ManagedThreadId);
Console.WriteLine("Sending hello down the inproc pipeline");
end1.SendFrame("Hello");
});
var end2Task = Task.Run(() =>
{
Console.WriteLine("ThreadId = {0}", Thread.CurrentThread.ManagedThreadId);
var message = end2.ReceiveFrameString();
Console.WriteLine(message);
});
Task.WaitAll(new[] { end1Task, end2Task });
}
輸出:
ThreadId = 12
ThreadId = 6
Sending hello down the inproc pipeline
Hello
位址格式
注意位址格式字串會傳送給Bind()
及Connect()
函式。
在InProc連線中,它會被組成:
* inproc://inproc-demo
這由兩個部份構成:
1. 協定(inproc)
2. 辨識名稱(inproc-demo可以是任何字串,在process範圍內是唯一的名稱)
PGM
Pragmatic General Multicast (PGM)是一種可靠的多播傳輸協定,用於需要有序、無序、不重覆等可從多個來源至多個接收者的多播數據。
PGM保證群組中的接收者可接收來自不管是傳送或修復,或可偵測無法復原的資料封包的遺失。PGM被設計為一個擁有基本的可靠度需求的解決方案。它的中心設計目標是操作的簡易性且保證其彈性及網路效率。
要使用NetMQ的PGM,我們不用做太多,只須遵循下列三點:
- Sockets型別現在是
PublisherSocket
and SubscriberSocket
,在pub-sub pattern會有更詳細的介紹。
- 確定你以”Administrator”等級執行軟體。
- 確定已打開”Multicastng Support”,可依下列方式:
Example
這裡是一個使用PGM的小範例,以及PublisherSocket
and SubscriberSocket
和幾個選項值。
const int MegaBit = 1024
const int MegaByte = 1024
using (var pub = new PublisherSocket())
using (var sub1 = new SubscriberSocket())
using (var sub2 = new SubscriberSocket())
{
pub.Options.MulticastHops = 2
pub.Options.MulticastRate = 40 * MegaBit
pub.Options.MulticastRecoveryInterval = TimeSpan.FromMinutes(10)
pub.Options.SendBuffer = MegaByte * 10
pub.Connect("pgm://224.0.0.1:5555")
sub1.Options.ReceiveBuffer = MegaByte * 10
sub1.Bind("pgm://224.0.0.1:5555")
sub1.Subscribe("")
sub2.Bind("pgm://224.0.0.1:5555")
sub2.Options.ReceiveBuffer = MegaByte * 10
sub2.Subscribe("")
Console.WriteLine("Server sending 'Hi'")
pub.Send("Hi")
bool more
Console.WriteLine("sub1 received = '{0}'", sub1.ReceiveString(out more))
Console.WriteLine("sub2 received = '{0}'", sub2.ReceiveString(out more))
}
執行後輸出如下:
Server sending 'Hi'
sub1 received = 'Hi'
sub2 received = 'Hi'
注意傳入Bind()
and Connect()
的字串位址格式,對InProc連線來說,會類似:
它以三個部份組成:
- 協定(
pgm
)
- 主機(如
244.0.0.1
之類的IP位址,主機名稱,或萬用字元*
的匹配)
- Port number(
5555
)
另一個不錯的PGM的資料是PGM unit tests。
清除
NetMQ第4版中我們拿掉了NetMQContext,現在我們可以用新的運算子建立sockets了,雖然這讓函式庫較簡單,但也增加了一些需要清除的複雜性。
為什麼NetMQ需要清除
NetMQ在背景建立了一些執行緒。因此,當你在Socket上呼叫Dispose時,這個處理是非同步的且發生在背景執行緒中。而因為NetMQ的執行緒是屬於背景執行緒,所以你實際上可以不正確清除並離開程式,但不建議。
當離開AppDomain時會更複雜,所以你需要清除NetMQ。
什麼是Linger?
Linger是socket在被dispose時傳送當下尚未傳送所有訊息的允許時間。所以當我們在一個Linger設為1秒的socket上呼叫Dispose時,它會最多花費一秒直到socket被disposed,此時函式庫會試著傳送所有等待中的訊息,如果它在linger時間到達前傳送完成,此socket會馬上被disposed。
正如所說,這一切發生在背景中,所以若linger有被設置,但我們沒有正確清除函式庫,linger會被略過。如果linger對你很重要,要確保你正確的清除函式庫。
第四版中預設的Linger值是零,表示函式庫不會在dispose前等待。你可以變更單一socket的linger值,也可以透過NetMQConfig.Linger設定所有linger的值。
如何清除?
關於cleanup最重要的是你要在呼叫Cleanup前呼叫所有socket的Dispose,也要確認NetMQ函式庫中的其它資源如NetMQPoller、NetMQQueue等被正確cleanup,如果socket沒有被disposed,那NetMQConfig.Cleanup會永遠阻塞。
最後你需要呼叫NetMQConfig.Cleanup,你可以如下所示的方式:
static void Main(string[] args)
{
try
{
}
finally
{
NetMQConfig.Cleanup();
}
}
如果你很懶惰,不關心清理函式庫,你也可以呼叫NetMQConfig.Cleanup並將block參數設為false。當設為false時,cleanup不會等待Sockets發送所有訊息,並且只會kill背景執行緒。
Tests
若你在你的測試中使用NetMQ,你也要確認你正確的對函式庫做cleanup。
這邊建議可加一個全域的tear down在你的測試中,並呼叫NetMQConfig.Cleanup。
示範若是在NUnit中可以:
[SetUpFixture]
public class Setup
{
[OneTimeTearDown]
public void TearDown()
{
NetMQConfig.Cleanup(false);
}
}
在測試中,呼叫Cleanup並代入false可讓你在測試失敗時不讓程式中斷。
Components 元件
Pollers
Motivation 1: Efficiency
NetMQPoller有很多範例。首先讓我們來看一個簡單的伺服器:
using (var rep = new ResponseSocket("@tcp://*:5002"))
{
while (true)
{
var msg = rep.ReceiveFrameString();
rep.Send("Response");
}
}
這個伺服器會很悞快且永遠處理回應。
如果我們想在同一個執行緒中處理兩個不同的response sockets中呢?
using (var rep1 = new ResponseSocket("@tcp://*:5001"))
using (var rep2 = new ResponseSocket("@tcp://*:5002"))
{
while (true)
{
}
}
我們要如何公平的處理兩個response sockets的服務?不能一次處理一個嗎?
/ blocks until a message is received
var msg1 = rep1.ReceiveString();
// might never reach this code!
var msg2 = rep2.ReceiveString();
一個等待接收的函式會阻塞直到有訊息抵達。如果我們在rep1等待接收,那傳送給rep2的所有訊息會被忽略,直到rep1收到訊息-也可能永遠收不到,所以這當然不是一個好方法。
相反的,我們可以在rep1和rep2上用非阻塞式的接收函式,但這可能會在沒有訊息的狀況下讓當前CPU的負載過高,所以,這也不是一個好方法…
我們可以引進使用非阻塞式函式中的timeout參數。然而,什麼值比較合適呢?如果我們用10ms,那如果rep1沒有收到訊息,那rep2最多只能取得每秒100個訊息(反之也成立),這嚴重限制了吞吐量,而且無法有效地利用資源。
所以我們需要一個較好的方式。
Motivation 2: Correctness
接續上面的範例,也許你會考慮每個socket放在不同的執行緒當中,並且採用阻塞式呼叫,雖然這在一些狀況下是個好方法,但是它有一些限制。
對ZeroMQ/NetMQ來說,為了發揮最大效能,所存在的限制是我們使用socket的方式。特別地說,NetMQSocket
不是執行緒安全的,在多個執行緒中同步使用同一個socket是無效的。
舉例來說,考慮我們在Thread A中有一個socket A的迴圈在服務,在Thread B中有一個socket B的迴圈在服務,若試著在socket A中接收訊息,並傳送至socket B,是無效的。Socket不是執行緒安全的,所以試著在執行緒A和B中同步使用可能會導致錯誤。
事實上,這裡描述的模式被稱為proxy,並且也被內置在NetMQ中。在這一點上,你可能不會訝異地發現它由NetMQPoller來實作。
範例:ReceiveReady
讓我們使用一個Poller
來從一個執行緒簡單地服務兩個sockets:
using (var rep1 = new ResponseSocket("@tcp://*:5001"))
using (var rep2 = new ResponseSocket("@tcp://*:5002"))
using (var poller = new NetMQPoller { rep1, rep2 })
{
rep1.ReceiveReady += (s, a) =>
{
string msg = a.Socket.ReceiveString();
a.Socket.Send("Response");
};
rep2.ReceiveReady += (s, a) =>
{
string msg = a.Socket.ReceiveString();
a.Socket.Send("Response");
};
poller.Run();
}
這段程式設置了兩個sockets,並綁定到不同的位址,並在一個NetMQPoller
中使用集合初始化加入這兩個sockets(也可以使用Add(NetMQSocket)
函式),並在各別socket的ReceiveReady
事件加上處理函式,最後poller由Run()
啟動,並開始阻塞直到Poller的Stop
函式被呼叫為止。
在內部,NetMQPoller
以最佳方式解決上述問題。
範例:SendReady
…(作者沒寫)
Timers
Pollers有一個額外的功能:Timer。
如果你需要在一個執行緒當中對一或多個sockets,執行一些週期性的操作,你可以在NetMQPoller
中加上一個NetMQTimer
。
這個範例會每秒推送一個訊息至所有已連線的端點。
var timer = new NetMQTimer(TimeSpan.FromSeconds(1));
using (var pub = new PublisherSocket("@tcp://*:5001"))
using (var poller = new NetMQPoller { pub, timer })
{
pub.ReceiveReady += (s, a) => { /* ... */ };
timer.Elapsed += (s, a) =>
{
pub.Send("Beep!");
};
poller.Run();
}
加入/移除 sockets/timers
Sockets和timers在執行時可以被安全的加入至或從Poller中移除。
注意NetMQSocket
,NetMQActor
and NetMQBeacon
都實作了ISocketPollable
,所以NetMQPoller
可以監示所有這些型別。
- AddSocket(ISocketPollable)
- RemoveSocket(ISocketPollable)
- AddTimer(NetMQTimer)
- RemoveTimer(NetMQTimer)
- AddPollInSocket(System.Net.Sockets.Socket, Action)
- RemovePollInSocket(System.Net.Sockets.Socket)
控制polling
到目前為止,我們學到了Run函式
。這讓執行緒用於輪詢活動,直到Poller
被從socket/timer
事件處理程序或從另一個執行緒中取消。
如果您希望繼續使被調用執行緒進行其他操作,可以呼叫RunAsync
,它會在新執行緒中呼叫Run
。
要停止Poller
,請使用Stop
或StopAsync
。後者會等待直到Poller
的迴圈在返回之前完全離開,這在軟體完整的離開前是必需的。
一個更複雜的例子
讓我們看一個較複雜的範例,使用我們目前為止看到的大部分工具。我們在接收到第一條訊息時將從NetMQPoller
中刪除一個ResponseSocket
,即使訊息是正確的,ReceiveReady
也不會被觸發。
using (var rep = new ResponseSocket("@tcp://127.0.0.1:5002"))
using (var req = new RequestSocket(">tcp://127.0.0.1:5002"))
using (var poller = new NetMQPoller { rep })
{
rep.ReceiveReady += (s, a) =>
{
bool more;
string messageIn = a.Socket.ReceiveFrameString(out more);
Console.WriteLine("messageIn = {0}", messageIn);
a.Socket.SendFrame("World");
poller.Remove(a.Socket);
};
poller.RunAsync();
req.SendFrame("Hello");
bool more2;
string messageBack = req.ReceiveFrameString(out more2);
Console.WriteLine("messageBack = {0}", messageBack);
req.SendFrame("Hello Again");
Thread.Sleep(1000);
}
輸出如下:
messageIn = Hello
messageBack = World
看到為什麼Hello Again
沒有收到嗎?這是因為在RecieiveReady
中處理第一條訊息時將ResponseSocket
從NetMQPoller
中移除。
效能
使用poller
接收消息比在socket上直接呼叫Receive
函式慢。當處理數千條訊息時,第二個或更多的poller
可能是瓶頸。但是解決方案很簡單,我們只需要使用Try *
函式獲取當前可用的socket的所有訊息。以下是一個範例:
rep1.ReceiveReady += (s, a) =>
{
string msg
// receiving all messages currently available in the socket before returning to the poller
while (a.Socket.TryReceiveFrameString(out msg))
{
// send a response
a.Socket.Send("Response")
}
}
如果socket載入了不會停止的訊息串流,則上述解決方案可能導致其他socket的Starving。要解決這個問題,你可以限制一個批次中可以提取的訊息數量。
rep1.ReceiveReady += (s, a) =>
{
string msg
// receiving 1000 messages or less if not available
for (int count = 0
{
// exit the for loop if failed to receive a message
if (!a.Socket.TryReceiveFrameString(out msg))
break
// send a response
a.Socket.Send("Response")
}
}
進階閱讀
…
Actor
NetMQ Actor Model
什麼是 Actor model?
From wiki: 略…
一個很好的思考Actors的方式是─他們是用來減輕一些在同步化時使用共享資料結構需要注意的地方。這是在你的程式中與actor通過訊息傳送/接收實作的。Actor本身可以將訊息傳送給其他actor,或者處理傳送的訊息本身。通過使用訊息傳送而不是使用共享資料結構,它有助於讓你認為actor(或其發送訊息的任何後續actor)實際上是在資料的拷貝上工作,而不是在相同的共享資料結構上工作。讓我們擺脫了多執行緒程式中需要擔心的可怕事情,如鎖和任何討厭的定時問題。如果actor使用自己的資料拷貝,那麼我們應該沒有其他的執行緒想要使用此actor所擁有的資料的問題,因為資料只在actor本身之內可見,unless we pass another message to a different actor。如果我們這樣做,新的訊息給另一個actor也只是另一個資料的拷貝,因此也是執行緒安全的。
在多執行緒當中共用資料
一個相當普遍的事情是用多個執行緒運行以加快速度,然後你發現到你的執行緒需要改變一些共享資料的狀態,那麼你會涉及到執行緒同步(最常見的 lock(..) statements, 以建立自己的 critical sections)。這有用,但現在你正引入人為的延遲,由於必須等待鎖被釋放,所以你可以執行執行緒X的程式。
更進一步,讓我們看看一些程式,可以說明這一點。想像一下,我們有一個資料結構代表非常簡單的銀行帳戶:
public class Account
{
public Account(int id, string name, string sortCode, decimal balance)
{
Id = id;
Name = name;
SortCode = sortCode;
Balance = balance;
}
public int Id { get; set; }
public string Name { get; set; }
public string SortCode { get; set; }
public decimal Balance { get; set; }
public override string ToString()
{
return string.Format("Id: {0}, Name: {1}, SortCode: {2}, Balance: {3}",
Id, Name, SortCode, Balance);
}
}
這裡沒有什麼特別的,只是一些欄位。讓我們來看一些執行緒程式,我選擇只顯示兩個執行緒共用Account實體的程式。
static void Main()
{
var account = new Account(1, "sacha barber", "112233", 0);
var syncLock = new object();
// start two asynchronous tasks that both mutate the account balance
var task1 = Task.Run(() =>
{
var threadId = Thread.CurrentThread.ManagedThreadId;
Console.WriteLine("Thread Id {0}, Account balance before: {1}",
threadId, account.Balance);
lock (syncLock)
{
Console.WriteLine("Thread Id {0}, Adding 10 to balance",
threadId);
account.Balance += 10;
Console.WriteLine("Thread Id {0}, Account balance after: {1}",
threadId, account.Balance);
}
});
var task2 = Task.Run(() =>
{
var threadId = Thread.CurrentThread.ManagedThreadId;
Console.WriteLine("Thread Id {0}, Account balance before: {1}",
threadId, account.Balance);
lock (syncLock)
{
Console.WriteLine("Thread Id {0}, Subtracting 4 from balance",
threadId);
account.Balance -= 4;
Console.WriteLine("Thread Id {0}, Account balance after: {1}",
threadId, account.Balance);
}
});
// wait for all tasks to complete
task1.Wait();
task2.Wait();
}
你也許認為這個範例不會發生在現實生活中,誠實的說,這真的不會發生,誰會真的在一個執行緒當中存款,而在另一個執行緒中取款呢…我們都是聰明的開發者,不會這樣寫的,不是嗎?
老實說,不管這個範例是否會在現實生活中出現,要點出的問題仍然是相同的,因為我們有多個執行緒存取共用資料結構,存取時必須同步,且通常使用lock(.. )語法,如同程式所見。
現在不要誤會我,上面的程式碼可正常工作,如下面的輸出所示:
Thread Id 6, Account balance before: 0
Thread Id 6, Adding 10 to balance
Thread Id 6, Account balance after: 10
Thread Id 10, Account balance before: 10
Thread Id 10, Subtracting 4 to balance
Thread Id 10, Account balance after: 6
也許可能有一個更有趣的方式!
Actor model
Actor模型採用不同的方法,其中使用訊息傳遞的方式可能會涉及某種形式的序列化,因為訊息是向下傳遞的,保證沒有共享結構的競爭。我不是說所有的Actor框架都使用訊息傳遞(序列化),但本文中提供的程式碼是。
基本思想是每個執行緒都會與一個actor交談,並與actor傳送/接收訊息。
如果你想要得到更多的隔離性,你可以使用執行緒的local storage,每個執行緒可以有自己的actor的副本。
談的夠多了,讓我們來看程式碼吧…
Actor demo
我們會持續使用和傳統上的locking/sahred data相同類型的範例。
讓我們先介紹幾個helper類別:
AccountAction
public enum TransactionType { Debit = 1, Credit = 2 }
public class AccountAction
{
public AccountAction(TransactionType transactionType, decimal amount)
{
TransactionType = transactionType;
Amount = amount;
}
public TransactionType TransactionType { get; set; }
public decimal Amount { get; set; }
}
Account
和之前的一樣。
public class Account
{
public Account(int id, string name, string sortCode, decimal balance)
{
Id = id;
Name = name;
SortCode = sortCode;
Balance = balance;
}
public int Id { get; set; }
public string Name { get; set; }
public string SortCode { get; set; }
public decimal Balance { get; set; }
public override string ToString()
{
return string.Format("Id: {0}, Name: {1}, SortCode: {2}, Balance: {3}",
Id, Name, SortCode, Balance);
}
}
AccountActioner
以下是處理帳戶操作的Actor的完整程式。這個例子是故意簡單化的,我們只用一筆金額借/貸一個帳戶。你可以發送任何命令到Actor,而Actor只是一個一般化的處理訊息的系統。
程式碼如下:
public class AccountActioner
{
public class ShimHandler : IShimHandler
{
private PairSocket shim;
private NetMQPoller poller;
public void Initialise(object state)
{
}
public void Run(PairSocket shim)
{
this.shim = shim;
shim.ReceiveReady += OnShimReady;
shim.SignalOK();
poller = new NetMQPoller { shim };
poller.Run();
}
private void OnShimReady(object sender, NetMQSocketEventArgs e)
{
string command = e.Socket.ReceiveFrameString();
switch (command)
{
case NetMQActor.EndShimMessage:
Console.WriteLine("Actor received EndShimMessage");
poller.Stop();
break;
case "AmmendAccount":
Console.WriteLine("Actor received AmmendAccount message");
string accountJson = e.Socket.ReceiveFrameString();
Account account
= JsonConvert.DeserializeObject<Account>(accountJson);
string accountActionJson = e.Socket.ReceiveFrameString();
AccountAction accountAction
= JsonConvert.DeserializeObject<AccountAction>(
accountActionJson);
Console.WriteLine("Incoming Account details are");
Console.WriteLine(account);
AmmendAccount(account, accountAction);
shim.SendFrame(JsonConvert.SerializeObject(account));
break;
}
}
private void AmmendAccount(Account account, AccountAction accountAction)
{
switch (accountAction.TransactionType)
{
case TransactionType.Credit:
account.Balance += accountAction.Amount;
break;
case TransactionType.Debit:
account.Balance -= accountAction.Amount;
break;
}
}
}
private NetMQActor actor;
public void Start()
{
if (actor != null)
return;
actor = NetMQActor.Create(new ShimHandler());
}
public void Stop()
{
if (actor != null)
{
actor.Dispose();
actor = null;
}
}
public void SendPayload(Account account, AccountAction accountAction)
{
if (actor == null)
return;
Console.WriteLine("About to send person to Actor");
var message = new NetMQMessage();
message.Append("AmmendAccount");
message.Append(JsonConvert.SerializeObject(account));
message.Append(JsonConvert.SerializeObject(accountAction));
actor.SendMultipartMessage(message);
}
public Account GetPayLoad()
{
return JsonConvert.DeserializeObject<Account>(actor.ReceiveFrameString());
}
}
Tying it all together
你可以使用下列程式碼和Actor溝通,再次地說,你可以使用任何命令,這個範例只顯示對一個帳戶的借/貸。
class Program
{
static void Main(string[] args)
{
// CommandActioner uses an NetMq.Actor internally
var accountActioner = new AccountActioner()
var account = new Account(1, "Doron Somech", "112233", 0)
PrintAccount(account)
accountActioner.Start()
Console.WriteLine("Sending account to AccountActioner/Actor")
accountActioner.SendPayload(account,
new AccountAction(TransactionType.Credit, 15))
account = accountActioner.GetPayLoad()
PrintAccount(account)
accountActioner.Stop()
Console.WriteLine()
Console.WriteLine("Sending account to AccountActioner/Actor")
accountActioner.SendPayload(account,
new AccountAction(TransactionType.Credit, 15))
PrintAccount(account)
Console.ReadLine()
}
static void PrintAccount(Account account)
{
Console.WriteLine("Account now")
Console.WriteLine(account)
Console.WriteLine()
}
}
執行時應可以看見如下輸出:
我們希望這可以讓你知道可以用一個Actor做些什麼事…
Beacon
NetMQBeacon
實作了在區域網路中點對點的discovery服務。
一個beacon
可在區域網路中透過UDP做擴播或捕捉service announcements,你可以定義廣播出去的beacon,也可以設定過濾器以過濾接收到的beacons。Beacons會在背景非同步的執行傳送及接收的動作。
我們可以使用NetMQBeacon
自動地在網路中尋找及連線至其它NetMQ/CZMQ
的服務而不需要一個中央的設定。請注意若要使用NetMQBeacon
在你的架構中需要支援廣播(broadcast)服務。而目前大部份的服端服務商並不支援。
這個實作使用IPv4 UDP廣播,屬於zbeacon from czmq並加上維護網路相容性的擴充函式。
範例:Implementing a Bus
NetMQBeacon
可以用來建立簡單的bus系統,讓一組節點僅需透過一個共享的埠號即可找到其它的節點。
- 每個bus的節點綁定至一個subscriber socket且靠publisher socket連線至其它節點。
- 每個節點會透過
NetMQBeacon
去公告它的存在及尋找其它節點,我們將使用NetMQActor
來實作我們的節點。
範例在此:
bus.cs
(原文連結有誤,此處已修改)
進階閱讀
If you are looking at some of the method signatures, and wondering why/how you should use them, you should read a bit more on the messaging philosophy that NetMQ uses. The Message page has some helpful information around this area.
Timer
一個NetMQTimer
讓你可以執行週期性的動作。Timer實體可以加至NetMQPoller
中,且它的Elapsed
事件會依指定的Interval
及Enabled
屬性值被觸發。
下列事件在poller執行緒中被喚起。
var timer = new NetMQTimer(TimeSpan.FromMilliseconds(100));
timer.Elapsed += (sender, args) => { /* handle timer event */ };
using (var poller = new NetMQPoller { timer })
{
poller.Run();
}
Queue
NetMQQueue<T>
是一個支援多個生產者及單一消費者的生產者/消費者佇列。
你應該將佇列加至NetMQPoller
中,且在ReceiveReady
事件中加上消費者程式碼,而生產者會呼叫Enque(T)
將資料加入。
此類別籍由將眾多操作集結在單一執行緒中免去了你撰寫冗餘程式的時間浪費。
using (var queue = new NetMQQueue<ICommand>())
using (var poller = new NetMQPoller { queue })
{
queue.ReceiveReady += (sender, args) => ProcessCommand(queue.Dequeue());
poller.RunAsync();
queue.Enqueue(new DoSomethingCommand());
queue.Enqueue(new DoSomethingElseCommand());
}
Proactor
NetMQProactor
會使用專有的執行緒處理在socket上收到的訊息。
using (var receiveSocket = new DealerSocket(">tcp://localhost:5555"))
using (var proactor = new NetMQProactor(receiveSocket,
(socket, message) => ProcessMessage(message)))
{
// ...
}
在內部,proactor為socket建了一個NetMQPoller
,以及一個NetMQActor
處理poller執行緒及disposal。
Patterns
Request - Response
Request/Response 應該是所有NetMQ
socket 組合中最簡單的一種了。這不是說RequestSocket和ResponseSocket必須總是一起使用,不是的,只是會有很多時候你想將某一種socket和另一種socket一起使用。有一些特定的socket的組合,剛好很適合在一起使用,而RequestSocket和ResponseSocket就是這樣的一個模式。
可無間配合的socket組合在ZeroMQ指南中皆已清楚描述,雖然我可以單純的告訴你在某些地方可看到更多相關文件,但沒有比ZeroMQ指南更好的文件了!它提供了各種模式的良好說明。
我們有點離題了,無論如何,這篇文章是關於Request/Response,所以讓我們繼續吧!
它如何工作
Request / Response模式是兩個NetMQ sockets協調工作的一個配置。這種組合類似於你在發出一個web request時看到的模式,也就是說,你提出請求,且期望得到回應。
RequestSocket 和 ResponseSocket 是同步式、阻塞的,如果你試著以錯誤的順序讀取訊息,你會得到一個例外。
你應該使用RequestSocket
和ResponseSockets
連結的方式如下:
- 從
RequestSocket
傳送訊息
ResponseSocket
讀取請求的訊息
ResponseSocket
傳送回應訊息
RequestSocket
接收來自ResponseSocket
的訊息
不管你相信與否,你應該已看過這種範例很多次,因為它已是最簡單的示範了。
這裡有一個小範例,其中RequestSocket
和ResponseSockets
都在同一個process中,但這可以很容易地放在兩個不同的process中。我們盡可能保持簡單以用於展示的目的。
using (var responseSocket = new ResponseSocket("@tcp://*:5555"))
using (var requestSocket = new RequestSocket(">tcp://localhost:5555"))
{
Console.WriteLine("requestSocket : Sending 'Hello'")
requestSocket.SendFrame("Hello")
var message = responseSocket.ReceiveFrameString()
Console.WriteLine("responseSocket : Server Received '{0}'", message)
Console.WriteLine("responseSocket Sending 'World'")
responseSocket.SendFrame("World")
message = requestSocket.ReceiveFrameString()
Console.WriteLine("requestSocket : Received '{0}'", message)
Console.ReadLine()
}
輸出如下:
Request/Response 是阻塞式的
如上所述,RequestSocket
和ResponseSocket
是阻塞的,這意味著任何意外的發送或接呼叫將會導致異常。這裡是這種例外的範例。
這個範例中我們試著在RequestSocket
中執行兩次Send()。
或者這個範例,我們嘗試執行RecieveString()兩次,但只有一個訊息從RequestSocket傳送。
所以要小心你用Request/Response模式做了什麼,魔鬼總在細節裡。
Pub - Sub
From Wikipedia:
發布/訂閱(Publish/subscribe 或pub/sub)是一種訊息規範,訊息的傳送者(發布者)不是計劃傳送其訊息給特定的接收者(訂閱者)。而是發布的訊息分為不同的類別,而不需要知道什麼樣的訂閱者訂閱。訂閱者對一個或多個類別表達興趣,於是只接收感興趣的訊息,而不需要知道什麼樣的發布者發布的訊息。這種發布者和訂閱者的解耦可以允許更好的可延伸性和更為動態的網路拓撲.
上述所謂的類別也可以當成是一個”主題”或”過濾器”。
NetMQ
用兩種socket型別支援Pub/Sub
模式:
- PublisherSocket
- SubscriberSocket
Topics 主題
ZeroMQ/NetMQ
使用多段訊息傳送主題資訊,可用byte陣列來表示主題,或是字串並加上適當的System.Text.Encoding
。
pub.SendMoreFrame("status").SendFrame("All is well");
訂閱者使用SubscriberSocket的Subscribe函式指定他們有興趣的主題。
// subscribe to the 'status' topic
sub.Subscribe("status");
Topic heirarchies 主題階級
一個訊息的主題會用prefix檢查和訂閱者的訂閱主題比較。
也就是說,訂閱主題的訂閱者會接收具有主題的訊息:
- topic
- topic/subtopic
- topical
然而它不會接受這些主題:
- topi
- TOPIC(記住,它是以byte做為比較方式)
使用prefix比對行為的結果,可以讓你以空字串來訂閱所有發佈的訊息。
sub.Subscribe(""); // subscribe to all topics
範例
到了介紹範例的時間了,這範例很簡單,並遵守下列規則:
* 有一個發佈者的process,會以500ms的時間隨機發佈主題為TopicA
或’TopicB`的訊息。
* 可能會有很多訂閱者,欲訂閱的主題名稱會以命令列參數代入程式中。
Publichser
using System;
using System.Threading;
using NetMQ;
using NetMQ.Sockets;
namespace Publisher
{
class Program
{
static void Main(string[] args)
{
Random rand = new Random(50);
using (var pubSocket = new PublisherSocket())
{
Console.WriteLine("Publisher socket binding...");
pubSocket.Options.SendHighWatermark = 1000;
pubSocket.Bind("tcp://localhost:12345");
for (var i = 0; i < 100; i++)
{
var randomizedTopic = rand.NextDouble();
if (randomizedTopic > 0.5)
{
var msg = "TopicA msg-" + i;
Console.WriteLine("Sending message : {0}", msg);
pubSocket.SendMoreFrame("TopicA").SendFrame(msg);
}
else
{
var msg = "TopicB msg-" + i;
Console.WriteLine("Sending message : {0}", msg);
pubSocket.SendMoreFrame("TopicB").SendFrame(msg);
}
Thread.Sleep(500);
}
}
}
}
}
Subscriber
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using NetMQ;
using NetMQ.Sockets;
namespace SubscriberA
{
class Program
{
public static IList<string> allowableCommandLineArgs
= new [] { "TopicA", "TopicB", "All" };
static void Main(string[] args)
{
if (args.Length != 1 || !allowableCommandLineArgs.Contains(args[0]))
{
Console.WriteLine("Expected one argument, either " +
"'TopicA', 'TopicB' or 'All'");
Environment.Exit(-1);
}
string topic = args[0] == "All" ? "" : args[0];
Console.WriteLine("Subscriber started for Topic : {0}", topic);
using (var subSocket = new SubscriberSocket())
{
subSocket.Options.ReceiveHighWatermark = 1000;
subSocket.Connect("tcp://localhost:12345");
subSocket.Subscribe(topic);
Console.WriteLine("Subscriber socket connecting...");
while (true)
{
string messageTopicReceived = subSocket.ReceiveFrameString();
string messageReceived = subSocket.ReceiveFrameString();
Console.WriteLine(messageReceived);
}
}
}
}
}
在這邊提供三個批次檔,讓你方便執行,不過要稍微修改一下路徑等一適合你的環境。
RunPubSub.bat
start RunPublisher.bat
start RunSubscriber "TopicA"
start RunSubscriber "TopicB"
start RunSubscriber "All"
RunPublisher.bat
cd Publisher\bin\Debug
Publisher.exe
RunSubscriber.bat
set "topic=%~1"
cd Subscriber\bin\Debug
Subscriber.exe %topic%
執行時輸出如下:
Other Considerations
High water mark
SendHighWaterMark / ReceiveHighWaterMark選項可設定指定socket的high water mark。High water mark是對未完成訊息的最大數量的限制,NetMQ會將正在與指定的socket通訊的任何端點的訊息排入佇列中。
如果到達此限制,socket會進入異常狀態,並且根據socket類型,NetMQ應採取適當的措施,如阻止或丟棄發送的訊息。
預設的SendHighWaterMark / ReceiveHighWaterMark值為1000.零值表示“無限制”。
你也可以使用xxxSocket.Options
屬性值設定下列兩個屬性:
- pubSocket.Options.SendHighWatermark = 1000;
- pubSocket.Options.ReceiveHighWatermark = 1000;
Slow subscribers
ZeroMQ指南有提到。
Late joining subscribers
ZeroMQ指南有提到。
Push - Pull
NetMQ
提供了PushSocket
和PullSocket
,這些是什麼?要如何使用?
嗯,PushSocket
一般是用來推送訊息至PullSocket
,而PullSocket
是用來從PushSocket
取得訊息,聽起來很對吧!
你通常使用這種設定的socket來產生一些分佈式的工作,有點像divide and conquer的安排。
這個想法是,你有一些產生工作的東西,然後將工作分配給多個工人。工人每個都做一些工作,並將結果推送到其他工序(可能是一個執行緒),工人的產出在那裡累積。
在ZeroMQ指南中,它顯示了一個範例,其中work generator只是告訴每個工人睡眠一段時間。
我們試圖創建一個比這更複雜的例子,但是最終覺得這個例子的簡單性是相當重要的,所以我們讓每個工人的工作量變成一個代入值,告訴工作休眠幾毫秒(從而模擬一些實際工作)。這個例子,正如我所說,是從ZeroMQ指南借來的。
In real life the work could obviously be anything, though you would more than likely want the work to be something that could be cut up and distributed without the work generator caring/knowing how many workers there are.
這裡是我們試圖實作的:
Ventilator
using System;
using NetMQ;
namespace Ventilator
{
public class Program
{
public static void Main(string[] args)
{
Console.WriteLine("====== VENTILATOR ======");
using (var sender = new PushSocket("@tcp://*:5557"))
using (var sink = new PushSocket(">tcp://localhost:5558"))
{
Console.WriteLine("Press enter when worker are ready");
Console.ReadLine();
Console.WriteLine("Sending start of batch to Sink");
sink.SendFrame("0");
Console.WriteLine("Sending tasks to workers");
Random rand = new Random(0);
int totalMs = 0;
for (int taskNumber = 0; taskNumber < 100; taskNumber++)
{
int workload = rand.Next(0, 100);
totalMs += workload;
Console.WriteLine("Workload : {0}", workload);
sender.SendFrame(workload.ToString());
}
Console.WriteLine("Total expected cost : {0} msec", totalMs);
Console.WriteLine("Press Enter to quit");
Console.ReadLine();
}
}
}
}
Worker
using System;
using System.Threading;
using NetMQ;
namespace Worker
{
public class Program
{
public static void Main(string[] args)
{
Console.WriteLine("====== WORKER ======");
using (var receiver = new PullSocket(">tcp://localhost:5557"))
using (var sender = new PushSocket(">tcp://localhost:5558"))
{
while (true)
{
string workload = receiver.ReceiveFrameString();
Thread.Sleep(int.Parse(workload));
Console.WriteLine("Sending to Sink");
sender.SendFrame(string.Empty);
}
}
}
}
}
Sink
using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using NetMQ;
namespace Sink
{
public class Program
{
public static void Main(string[] args)
{
Console.WriteLine("====== SINK ======");
using (var receiver = new PullSocket("@tcp://localhost:5558"))
{
var startOfBatchTrigger = receiver.ReceiveFrameString();
Console.WriteLine("Seen start of batch");
var watch = Stopwatch.StartNew();
for (int taskNumber = 0; taskNumber < 100; taskNumber++)
{
var workerDoneTrigger = receiver.ReceiveFrameString();
if (taskNumber % 10 == 0)
{
Console.Write(":");
}
else
{
Console.Write(".");
}
}
watch.Stop();
Console.WriteLine();
Console.WriteLine("Total elapsed time {0} msec", watch.ElapsedMilliseconds);
Console.ReadLine();
}
}
}
}
執行範例
要執行這個,這三個批次檔會很有用,若你選擇將此程式碼複製到新方案中,你需要更改路徑以符合。
RunWorker.bat
cd Ventilator/bin/Debug
start Ventilator.exe
cd../../..
cd Sink/bin/Debug
start Sink.exe
cd../../..
cd Worker/bin/Debug
start Worker.exe
在這個Sink的Process執行後,應該會在Console有如下的輸出:(顯然你的PC可能運行比我的更快/更慢):
====== SINK ======
Seen start of batch
:.........:.........:.........:.........:.........:.........:.........:.........
:.........:.........
Total elapsed time 5695 msec
Run2Workers.bat
cd Ventilator/bin/Debug
start Ventilator.exe
cd../../..
cd Sink/bin/Debug
start Sink.exe
cd../../..
cd Worker/bin/Debug
start Worker.exe
start Worker.exe
在這個Sink的Process執行後,應該會在Console有如下的輸出:(顯然你的PC可能運行比我的更快/更慢):
====== SINK ======
Seen start of batch
:.........:.........:.........:.........:.........:.........:.........:.........
:.........:.........
Total elapsed time 2959 msec
Run4Workers.bat
cd Ventilator/bin/Debug
start Ventilator.exe
cd../../..
cd Sink/bin/Debug
start Sink.exe
cd../../..
cd Worker/bin/Debug
start Worker.exe
start Worker.exe
start Worker.exe
start Worker.exe
在這個Sink的Process執行後,應該會在Console有如下的輸出:(顯然你的PC可能運行比我的更快/更慢):
====== SINK ======
Seen start of batch
:.........:.........:.........:.........:.........:.........:.........:.........
:.........:.........
Total elapsed time 1492 msec
這個模式有幾個要注意的重點:
Ventilator
使用NetMQ
中的PushSocket
以將工作發佈至Worker
,這也稱為負載平衡。
Ventilator
和Sink
是系統中固定的部份,而Worker
是動態的,添加更多Worker
的是很簡單的事,我們可以啟動一個新的Worker
實體,在理論上,工作會更快完成(越多Worker
越快)。
- 我們要同步啟動批次檔(當
Worker
準備好時),如果沒有,最先連線的Worker
會比其它的取得更多的訊息,那就不夠負載平衡了。
Sink
使用NetMQ
的PullSocket
去累積Worker
的產出。
Router - Dealer
RouterSocket
從 ZeroMQ guide:
ROUTER socket,不像其它的sockets,會追蹤它的每個連線,且告知caller。告知的方式是透過在收到的訊息的前面加上一連線示別的資訊。示別碼,有時也被稱為位址,只是一個表示“這是代表此連線的唯一示別碼”,而不包含任何其它資訊。然後,當你透過ROUTER socket傳送訊息時,你會傳送一個示別碼的frame。
當接收訊息時,一個ZMQ_ROUTER socket應在傳送至應用程式前,在訊息前置一個包含原始節點的辨視碼,收到的訊息會公平地將所有節點的訊息放至佇列中。當傳送訊息時,一個ZMQ_ROUTER socket應該將訊息的第一個部份移除,並使用目的端的辨視碼取代。
Identities是一個很難的概念,但如果你想成為一個ZeroMQ的專家,它是至關重要的。ROUTER socket會為它的每一個連線隨機產生一個辨視碼。如果有三個REQ socket連線至一個ROUTER socket上,它會產生三個辨視碼,對映至每一個REQ socket上。
所以我們來看一個較小的範例,我們有一個DealerSocket
,帶有一個3 byte的示別碼”ABC”,在內部,這表示RouterSocket
型別的socket內保有一個hash table,它可以搜尋”ABC”,並找到這一個DealerSocket
的TCP連線。
當我們收到來自DealerSocket
的訊息時,我們取得三個frames:
Identities and Addresses
From ZeroMQ guide, Identities and Addresses:
ZeroMQ中的辨視碼概念特指的是ROUTER sockets,以及它們如何辨別與其它socket的連線。更廣泛的說,辨視碼被當作為回信的地址。大多狀況下,辨視碼是arbitrary且在本地對映至ROUTER socket上:它是一個雜湊表中的查詢鍵。所以一個節點可以有一個實體的位址(如”tcp://192.168.55.117:5670”的網路端點)或邏輯上的位址(一個UUID或是email或其它的唯一鍵值)。
一個使用ROUTER socket和特定節點溝通的應用程式,如果有建立雜湊表,就可以將一個邏輯位址轉成辨視碼。因為ROUTER socket只announce一個連線(至特定節點)的identity,當此連線傳送訊息時,你只能夠回覆,而不能自發地與之交談。
這是事實,即時你將規則翻轉,且讓ROUTER連線至節點,而不是等待節點連線至ROUTER。然而你可以強制一個ROUTER socket使用邏輯位址來替代其identity,zmq_setsockopt說明頁呼叫這個以設定socket的identity,它的工作原理如下:
- 節點應用程式在binding或connecting前設定它的節點socket(DEALER or REQ)的ZMQ_IDENTITY選項。
- 再來這節點會連線至already-bound的ROUTER socket上,但ROUTER也可以連線至此節點。
- 在連線時,節點socket會告訴router socket,“請為此連線使用這個辨視碼”。
- 如果節點socket沒有這樣子說,router會隨機產生一個辨視碼給此連線。
- ROUTER socket現在會提供一個邏輯位址給此程式,做為所有來自此節點的訊息的前置辨視碼用的frame。
DealerSocket
NetMQ的DealerSocket
不做任何特別的事情,它提供的是以完全非同步方式工作的能力。
Which if you recall was not something that other socket types could do, where the ReceieveXXX / SendXXX methods are blocking, and would also throw exceptions should you try to call things in the wrong order, or more than expected.
DealerSocket的主要賣點是它的非同步能力。通常,DealerSocket
會與RouterSocket
結合使用,這就是為什麼我們決定將這兩種socket型別的介紹放在一起。
如果你想知道更多的關於socket組合而成的DealerSockets的資訊,再次的說,指南會是你的好朋友,特別是Request-Reply Combinations頁面。
範例
又到了看範例的時間,此範例重點如下:
- 有一個伺服器,它綁定了一個
RouterSocket
,因此會儲存傳入的請求連線的示別資訊,所以可以正確的將訊息回應至client socket。
- 有很多個client,每個client都屬於個別執行緒,這些client的型別是
DealerSocket
,這一個client socket會提供固定的示別碼,以讓伺服端(DealerSocket
)可以正確的回應訊息。
程式碼如下:
public static void Main(string[] args)
{
const int delay = 3000;
var clientSocketPerThread = new ThreadLocal<DealerSocket>();
using (var server = new RouterSocket("@tcp://127.0.0.1:5556"))
using (var poller = new NetMQPoller())
{
for (int i = 0; i < 3; i++)
{
Task.Factory.StartNew(state =>
{
DealerSocket client = null;
if (!clientSocketPerThread.IsValueCreated)
{
client = new DealerSocket();
client.Options.Identity =
Encoding.Unicode.GetBytes(state.ToString());
client.Connect("tcp://127.0.0.1:5556");
client.ReceiveReady += Client_ReceiveReady;
clientSocketPerThread.Value = client;
poller.Add(client);
}
else
{
client = clientSocketPerThread.Value;
}
while (true)
{
var messageToServer = new NetMQMessage();
messageToServer.AppendEmptyFrame();
messageToServer.Append(state.ToString());
Console.WriteLine("======================================");
Console.WriteLine(" OUTGOING MESSAGE TO SERVER ");
Console.WriteLine("======================================");
PrintFrames("Client Sending", messageToServer);
client.SendMultipartMessage(messageToServer);
Thread.Sleep(delay);
}
}, string.Format("client {0}", i), TaskCreationOptions.LongRunning);
}
poller.RunAsync();
while (true)
{
var clientMessage = server.ReceiveMessage();
Console.WriteLine("======================================");
Console.WriteLine(" INCOMING CLIENT MESSAGE FROM CLIENT ");
Console.WriteLine("======================================");
PrintFrames("Server receiving", clientMessage);
if (clientMessage.FrameCount == 3)
{
var clientAddress = clientMessage[0];
var clientOriginalMessage = clientMessage[2].ConvertToString();
string response = string.Format("{0} back from server {1}",
clientOriginalMessage, DateTime.Now.ToLongTimeString());
var messageToClient = new NetMQMessage();
messageToClient.Append(clientAddress);
messageToClient.AppendEmptyFrame();
messageToClient.Append(response);
server.SendMultipartMessage(messageToClient);
}
}
}
}
void PrintFrames(string operationType, NetMQMessage message)
{
for (int i = 0; i < message.FrameCount; i++)
{
Console.WriteLine("{0} Socket : Frame[{1}] = {2}", operationType, i,
message[i].ConvertToString());
}
}
void Client_ReceiveReady(object sender, NetMQSocketEventArgs e)
{
bool hasmore = false;
e.Socket.Receive(out hasmore);
if (hasmore)
{
string result = e.Socket.ReceiveFrameString(out hasmore);
Console.WriteLine("REPLY {0}", result);
}
}
執行後,輸出應如下所示:
======================================
OUTGOING MESSAGE TO SERVER
======================================
======================================
OUTGOING MESSAGE TO SERVER
======================================
Client Sending Socket : Frame[0] =
Client Sending Socket : Frame[1] = client 1
Client Sending Socket : Frame[0] =
Client Sending Socket : Frame[1] = client 0
======================================
INCOMING CLIENT MESSAGE FROM CLIENT
======================================
Server receiving Socket : Frame[0] = c l i e n t 1
Server receiving Socket : Frame[1] =
Server receiving Socket : Frame[2] = client 1
======================================
INCOMING CLIENT MESSAGE FROM CLIENT
======================================
Server receiving Socket : Frame[0] = c l i e n t 0
Server receiving Socket : Frame[1] =
Server receiving Socket : Frame[2] = client 0
REPLY client 1 back from server 08:05:56
REPLY client 0 back from server 08:05:56
記住這是非同步的程式碼,所以事件的發生順序可能不如你所預期的。
XPub - XSub
發佈/訂閱模式適用於多個訂閱者和單一發佈者,但是如果您需要多個發佈者,那麼XPub / XSub模式會比較有趣。
XPub / XSub還可以協助所謂的”dynamic discovery problem”。 從ZeroMQ指南:
當你在設計較大型的分佈式架構時可能會遇到的一個問題是discovery,也就是每一個節點如何知道其它節點?特別是在節點來來去去的狀況下,我們把可狀況稱做”dynamic discovery problem”。
有幾種解決方法。最簡單的是整網路架構以hard-coding (or configuring)的方式手動指定以全然避免掉此狀況,也就是說當你增加一個新節點後,重新設置網路。
在實際上,這會導致越來越脆弱和笨重的架構。假設你有一個發佈者和一百個訂閱者。你通過在每個訂閱者中設定發佈伺服器端點,將每個訂閱者連接到發佈者。這很容易。訂閱者是動態的;發佈者是靜態的。現在如果說突然間你要增加更多發佈者,這不再容易完成。如果你繼續將每個訂閱者連接到每個發佈者,則避免dynamic discovery的成本會越來越高。
這有不少解答,最簡單的是增加一個中介層;也就是說,在網路中增加一個固定的點,以讓其它節點連線。在典型的訊息架構中,這會由message broker負責。ZeroMQ並沒有這樣的一個message broker,但它讓建立中介層的工作變得很簡單。
你也許會疑惑,如果所有的網路最終會大到需要一個中介層,為什麼我們不為所有的應用都提供一個中介層?對於初學者,這是一個公平的妥協。總是使用星狀拓璞,不要考慮效能,事情總是能夠工作。然而,message brokers是貪婪的東西;在它們做為中央中介者的角色,會變得太複雜,太多狀態,最終會造成問題。
最好是把中介層當做一個簡單的無狀態的訊息交換機。一個好的類比是HTTP代理;它存在那裡,但不作為任何特定的角色。在我們的範例中,增加一個pub-sub代理可解決dynamic discovery問題,我們將代理設置在網路的”中間”,這個代理會打開一個XSUB的socket,及一個XPUB的socket,並綁定至一個大家都知道的IP及port上,然後,所有其它的節點連線至此代理,而不是互相連線。增加更多的訂閱者或是發佈者不再是問題。
我們需要XPUB和XSUB socket,因為ZeroMQ會把訂閱者的訂閱轉發至發佈者。XPUB和XSUB與PUB和SUB完全一樣,除了它們將訂閱當成特別的訊息。代理器需轉發這些訂閱者的訂閱至發佈者,靠著從XSUB socket讀取並寫至XPUB socket上,這是XPUB和XSUB主要的使用方式。
範例
所以現在我們已經了解了為什麼要使用XPub / XSub,現在讓我們看一個依上述描述的範例。分為三個部分:
- Publisher
- Intermediary
- Subscriber
Publisher
可以看到PublisherSocket
連線到XSubscriberSocket
的位址。
using (var pubSocket = new PublisherSocket(">tcp://127.0.0.1:5678"))
{
Console.WriteLine("Publisher socket connecting...");
pubSocket.Options.SendHighWatermark = 1000;
var rand = new Random(50);
while (true)
{
var randomizedTopic = rand.NextDouble();
if (randomizedTopic > 0.5)
{
var msg = "TopicA msg-" + randomizedTopic;
Console.WriteLine("Sending message : {0}", msg);
pubSocket.SendMore("TopicA").Send(msg);
}
else
{
var msg = "TopicB msg-" + randomizedTopic;
Console.WriteLine("Sending message : {0}", msg);
pubSocket.SendMore("TopicB").Send(msg);
}
}
}
Intermediary
負責在XPublisherSocket
和XSubscriberSocket
之間雙向地中繼訊息。NetMQ
提供了一個使用簡單的代理類別。
using (var xpubSocket = new XPublisherSocket("@tcp://127.0.0.1:1234"))
using (var xsubSocket = new XSubscriberSocket("@tcp://127.0.0.1:5678"))
{
Console.WriteLine("Intermediary started, and waiting for messages");
var proxy = new Proxy(xsubSocket, xpubSocket);
proxy.Start();
}
Subscriber
可以看到SubscriberSocket
連線到XPublisherSocket
的位址。
string topic = /* ... */; // one of "TopicA" or "TopicB"
using (var subSocket = new SubscriberSocket(">tcp://127.0.0.1:1234"))
{
subSocket.Options.ReceiveHighWatermark = 1000;
subSocket.Subscribe(topic);
Console.WriteLine("Subscriber socket connecting...");
while (true)
{
string messageTopicReceived = subSocket.ReceiveString();
string messageReceived = subSocket.ReceiveString();
Console.WriteLine(messageReceived);
}
}
執行時,可以看到如下列輸出:
不像發佈/訂閱模式,我們可以有不定數量的發佈者及訂閱者。
Written with StackEdit.