7.27.2017

NetMQ + RX Demo (譯)

之前找到的NetMQ範例,趁有時間邊翻譯邊學習,此譯文略過前面介紹Rx的部份(有興趣可參考IntroToRx,或是我之前翻譯的中文版),僅從原文中的General Idea開始。文章覺得不重要的或是不會翻的XD,會以代替。
另原文採用NetMQ 3.x版,當前版本4.x後有多項更新,若是需要,可參照此文的方式升級,但升級後在NetMQHeartBeatClient.cs的ShimHandler建構式要修改,斷線訊息才能正常顯示(或者我再找時間把升級後的版本放上去XD已fork並更新,網址:https://github.com/liaochihung/NetMQRxDemo)…

// 要加上parent參數,當然,在呼叫時要帶入
public ShimHandler(object parent,Subject<ConnectionInfo> subject, string address)
{
    this.address = address;
    this.subject = subject;
    // 原專案中忘了呼叫此函式
    Initialise(parent);
}
  • 我之前有翻譯過NetMQ的中文化文件,有需要也可參考。
  • Client - Server的程式,一般可能都會在client端採用輪詢的方式Pull server的資料,但請注意本文中使用Rx,所以它是相反方向的Push式的模式,在瞭解程式時請謹記此點。
  • 此專案的原始範本:SignalR + Rx(跨平台) – ReactiveTrader

General Idea

千言萬語不如圖一張:
enter image description here

The Publisher

pic

Publisher由數個部份組成,下文會說明,但要記住server和client都是使用NetMQ,所以在你開始閱讀前先瞭解下NetMQ會比較好。…

IOC

MainWindow

除了以下幾個控制項外,這個window沒有其它需說明的:

  • Start NetMQ:啟始NetMQPublisher Actor PublisherSocket(對模擬NetMQ server的崩潰/重啟很有幫助)
  • Stop NetMQ:結束NetMQPublisher Actor(對模擬NetMQ server的崩潰/重啟很有幫助)
  • Start Auto Ticker Data:以固定的時間間隔從NetMQPublisher Actor推送隨機產生的TickerDto物件
  • Stop Auto Ticker Data:會暫停從NetMQPublisher Actor推送TickerDto物件
  • Send One Ticker:從NetMQPublisher Actor推送單一的隨機TickerDto物件

現在還不用擔心我們提到的“Actor”是什麼意思,很快就會說明。

MainWindowViewModel

MainWindowViewModel只是用來傳送命令給不管是停止/建立NetMQPublisher Actor,或是告訴NetMQPublisher Actor去做一些事,以下是其完整的程式碼。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Windows.Forms;
using System.Windows.Input;
using Common.ViewModels;
using log4net;
using NetMQServer.Ticker;

namespace NetMQServer
{
    public class MainWindowViewModel
    {
        private readonly ITickerPublisher tickerPublisher;
        private readonly ITickerRepository tickerRepository;
        private Random rand;
        private static readonly ILog Log = LogManager.GetLogger(typeof(MainWindowViewModel));
        private CancellationTokenSource autoRunningCancellationToken;
        private Task autoRunningTask;
        private bool serverStarted;
        private bool autoTickerStarted;

        public MainWindowViewModel(ITickerPublisher tickerPublisher, ITickerRepository tickerRepository)
        {
            this.tickerPublisher = tickerPublisher;
            this.tickerRepository = tickerRepository;
            this.rand = new Random();

            serverStarted = false;
            autoTickerStarted = false;

            AutoTickerStartCommand = new DelegateCommand(
                AutoRunning, 
                () => serverStarted && !autoTickerStarted);

            AutoTickerStopCommand = new DelegateCommand(
                () =>
                {
                    if (autoRunningCancellationToken != null)
                    {
                        autoRunningCancellationToken.Cancel();
                        autoRunningTask.Wait();
                        autoTickerStarted = false;
                    }
                }, 
                () => serverStarted && autoTickerStarted);

            SendOneTickerCommand = new DelegateCommand(
                SendOneManualFakeTicker, 
                () => serverStarted && !autoTickerStarted);

            StartCommand = new DelegateCommand(
                StartServer, 
                () => !serverStarted);

            StopCommand = new DelegateCommand(
                StopServer, 
                () => serverStarted);
        }

        public DelegateCommand AutoTickerStartCommand { get; set; }
        public DelegateCommand AutoTickerStopCommand { get; set; }
        public DelegateCommand SendOneTickerCommand { get; set; }
        public DelegateCommand StartCommand { get; set; }
        public DelegateCommand StopCommand { get; set; }

        public void Start()
        {
            StartServer();
        }


        private void AutoRunning()
        {
            autoTickerStarted = true;
            autoRunningCancellationToken = new CancellationTokenSource();
            autoRunningTask = Task.Run(async () =>
            {
                //Publisher is not thread safe, so while the auto ticker is 
                //running only the autoticker is allowed to access the publisher
                while (!autoRunningCancellationToken.IsCancellationRequested)
                {
                    SendOneManualFakeTicker();

                    await Task.Delay(20);
                }
            });
        }

        private void SendOneManualFakeTicker()
        {
            var currentTicker = tickerRepository.GetNextTicker();

            var flipPoint = rand.Next(0, 100);

            if (flipPoint > 50)
            {
                currentTicker.Price += currentTicker.Price / 30;
            }
            else
            {
                currentTicker.Price -= currentTicker.Price / 30;
            }

            tickerRepository.StoreTicker(currentTicker);

            tickerPublisher.PublishTrade(currentTicker);
        }

        private void StartServer()
        {
            serverStarted = true;
            tickerPublisher.Start();
            AutoRunning();
        }

        private void StopServer()
        {
            if (autoTickerStarted)
            {
                autoRunningCancellationToken.Cancel();

                // Publisher is not thread safe, so while the auto ticker is 
                // running only the autoticker is allowed to access the publisher. 
                //Therefore before we can stop the publisher we have to 
                // wait for the autoticker task to complete
                autoRunningTask.Wait();
                autoTickerStarted = false;

                autoRunningCancellationToken = null;
                autoRunningTask = null;
            }
            tickerPublisher.Stop();

            serverStarted = false;
        }
    }
}

NetMQ Actors

在我們深入了解NetMQPublisher的工作原理之前,值得一提的是server和client都使用NetMQ的Actor模型,我很高興看到這個,因為這部分是由我撰寫的,也因此我認識了Somdoron。

你可以把Actor想成一個可以傳送訊息(命令)的socket,在Actor的內部使用了一個特殊的PairSocket,當你傳送訊息(命令)至Actor時,其實你是對其中一個pair寫入,而同時,Actor內部的另一個PairSocket可能正接收來自另一端的命令,且會執行此命令所相對應的動作。

你實際上要做的是在你的命令的來源和Actor間訂製一個簡單的協定。此協定可能包含要請Actor做的事情,或一個告知Actor去停止它所做的任何事情(因為常常需要,所以NetMQ提供了此命令 – ActorKnownMessages.END_PIPE(譯者注:原文採用NetMQ3.x版,目前新版4.x要改用NetMQActor.EndShimMessage))。

現在你可能會想知道為什麼我們選擇使用這個Actor模型?這很簡單,使用Actor模型可確保不會有任何關於lock的問題,因為根本就沒有資料共享,資料是通過socket發送的,且Actor保證會有自己的資料副本,因此不需要lock,也就不需要等待lock,這有助於讓事情簡單快速且又執行緒安全。

你可以在以下blog文章深入瞭解:#7:Simple Actor Model

Start The Publisher

我們在MainWindowViewModel上看到了其包含啟動/停止NetMQPublisher的命令,但是它們到底作了什麼?怎麼做的?

Start函式本身很簡單,在NetMQPublisher中它只是建立了一個actor以接收命令。

public void Start()
{
    actor = new Actor<object>(context, new ShimHandler(context, tickerRepository), null);
}

Who Does The Streaming To The Clients?

我們剛剛討論到的NetMQPublisher做了所有的工作,以下為程式碼:

NetMQPublisher實際上作了如下事情:
1. 送出一個快照以回應(籍由ResponseSocket)來自client端所初始化的請求(籍由RequestSocket)。
2. 發佈(籍由PublisherSocket)”Trades”給所有已連線且訂閱了”Trades”主題的client端。
3. 發佈(籍由PublisherSocket)”HB”給所有已連線且訂閱了”HB”主題的client端。

using System;
using System.Collections.Generic;using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Windows.Navigation;
using Common;
using NetMQ;
using NetMQ.Actors;
using NetMQ.InProcActors;
using NetMQ.Sockets;
using NetMQ.zmq;
using Newtonsoft.Json;
using Poller = NetMQ.Poller;

namespace NetMQServer.Ticker
{
    public class NetMQPublisher : ITickerPublisher
    {
        private const string PublishTicker = "P";        

        public class ShimHandler : IShimHandler<object>
        {
            private readonly NetMQContext context;
            private PublisherSocket publisherSocket;
            private ResponseSocket snapshotSocket;
            private ITickerRepository tickerRepository;
            private Poller poller;
            private NetMQTimer heartbeatTimer;

            public ShimHandler(NetMQContext context, ITickerRepository tickerRepository)
            {
                this.context = context;
                this.tickerRepository = tickerRepository;                
            }

            public void Initialise(object state)
            {

            }

            public void RunPipeline(PairSocket shim)
            {
                publisherSocket = context.CreatePublisherSocket();
                publisherSocket.Bind("tcp://*:" + StreamingProtocol.Port);

                snapshotSocket = context.CreateResponseSocket();
                snapshotSocket.Bind("tcp://*:" + SnapshotProtocol.Port);
                snapshotSocket.ReceiveReady += OnSnapshotReady;

                shim.ReceiveReady += OnShimReady;

                heartbeatTimer = new NetMQTimer(StreamingProtocol.HeartbeatInterval);
                heartbeatTimer.Elapsed += OnHeartbeatTimerElapsed;

                shim.SignalOK();

                poller = new Poller();
                poller.AddSocket(shim);
                poller.AddSocket(snapshotSocket);
                poller.AddTimer(heartbeatTimer);
                poller.Start();

                publisherSocket.Dispose();
                snapshotSocket.Dispose();
            }

            private void OnHeartbeatTimerElapsed(object sender, NetMQTimerEventArgs e)
            {
                publisherSocket.Send(StreamingProtocol.HeartbeatTopic);
            }

            private void OnSnapshotReady(object sender, NetMQSocketEventArgs e)
            {                
                string command = snapshotSocket.ReceiveString();

                // Currently we only have one type of events
                if (command == SnapshotProtocol.GetTradessCommand)
                {
                    var tickers = tickerRepository.GetAllTickers();

                    // we will send all the tickers in one message
                    foreach (var ticker in tickers)
                    {
                        snapshotSocket.SendMore(JsonConvert.SerializeObject(ticker));
                    }

                    snapshotSocket.Send(SnapshotProtocol.EndOfTickers);
                }
            }

            private void OnShimReady(object sender, NetMQSocketEventArgs e)
            {

                string command = e.Socket.ReceiveString();

                switch (command)
                {
                    case ActorKnownMessages.END_PIPE:
                        poller.Stop(false);
                        break;
                    case PublishTicker:
                        string topic = e.Socket.ReceiveString();
                        string json = e.Socket.ReceiveString();
                        publisherSocket.
                            SendMore(topic).
                            Send(json);
                        break;
                }

            }
        }

        private Actor<object> actor;
        private readonly NetMQContext context;
        private readonly ITickerRepository tickerRepository;

        public NetMQPublisher(NetMQContext context, ITickerRepository tickerRepository)
        {
            this.context = context;
            this.tickerRepository = tickerRepository;        
        }

        public void Start()
        {
            if (actor != null)
                return;

            actor = new Actor<object>(context, new ShimHandler(context, tickerRepository), null);
        }

        public void Stop()
        {
            if (actor != null)
            {
                actor.Dispose();
                actor = null;
            }
        }        

        public void PublishTrade(TickerDto ticker)
        {
            if (actor == null)
                return;

            actor.
                SendMore(PublishTicker).
                SendMore(StreamingProtocol.TradesTopic).
                Send(JsonConvert.SerializeObject(ticker));                
        }

    }
}
}

NetMQPublisher的Actor管道內部作的事是它建立了幾個額外的socket。

這些socket不是在Actor內部使用的,那些是您不會真正看到的專用的PairSocket pair,它們是NetMQ原始碼的一部分。我們在這裡討論的socket是用於應用程序邏輯的socket,在這個Demo程式中的應用如下所示:

  1. 一個PublisherSocket:這個socket是用來推送資料至客戶端的。NetMQ使用訊息框,如在第一段frame中放置主題,而之後才是實際的資料。這樣子客戶端(SubscriberSocket(s))就可以在處理訊息資料前先判斷此訊息是不是它們感興趣的。這一個PublisherSocket足夠用來服務多個主題了,你可以簡單的提供下列資訊給publisher:

    • 訊息主題
    • 實際訊息 透過NetMQPublisher來傳送特定訊息的範例如下:
      public void PublishTrade(TickerDto ticker)
      {
          actor.
              SendMore(PublishTicker).
              SendMore(StreamingProtocol.TradesTopic).
              Send(JsonConvert.SerializeObject(ticker));
      }

    實際上在這個範例中有如下兩個訊息被使用:

    • TradesTopic (“Trades”):此主題使用單一NetMQPublisher中的PublisherSocket在已連接的客戶端上串流傳輸TickerDto物件。而在客戶端,他們使用一個SubscriberSocket,並將其主題設置為”Trades”,以便它們只接收來自NetMQPublisher PublisherSocket發佈者所發布的與”Trades”匹配的主題。
    • HeartbeatTopic (“HB”):此主題也使用單一NetMQPublisher中的PublisherSocket在已連接的客戶端上串流傳輸僅包含”HB”(訊息的內容不重要,僅注意主題名,因此客戶端可以看到一個新的”HB”訊息來了)主題的單一訊息幀。而在客戶端,他們使用一個SubscriberSocket,並將其主題設置為”HB”,以便它們只接收來自NetMQPublisher PublisherSocket發佈者所發布的與”HB”匹配的主題。所以實際上server端會初始化要推送的訊息,而客戶端會有一個HeartBeat主題的訂閱者,此訂閱者的程式中會有如下排程:

      • 如果server(NetMQPublisher)依時間不斷地回應,此時雙方的通訊狀態會被認為是正常的。
      • 如果server(NetMQPublisher)無法依時間不斷地回應,此時雙方的通訊狀態會被認為是有問題的。

    2.一個ResponseSocket:這個socket是負責在client和server間傳送所有trades組成的快照,此快照為由發佈者行程所持有的儲存在記憶體中的一串TickerRespository(只是一個由trades組成的佇列),Client應該包含有一個RequestSocket,而server(NetMQPublisher)應包含一ResponseSocket。所以client會初始化並傳送一訊息(訊息內容不重要),且會期待從server端TickerRespository回傳的JSON型別的序列化trade資料,這動作在client端起始時會完成。

Simulating A Crash In The Publisher

這很簡單,只要按下MainWindow中的“Stop NetMQ”按鈕,這會執行以下程式,結束NetMQPublisher中的Actor

public void Stop()
{
    actor.Dispose();
}

然後應該發生的事是,任何已連線的客戶端在一段時間後會發覺server不存在且應顯示“DISCONNECTED”

Restarting The Publisher

這很簡單,只要按下MainWindow中的“Start NetMQ”按鈕,這會執行以下程式:

private void StartServer()
{
    serverStarted = true;
    tickerPublisher.Start();
    AutoRunning();
}

然後應該發生的事是,任何已連線的客戶端在一段時間後會發覺server已存在且不再顯示“DISCONNECTED”

NetMQ Client

netmq client
Client是一個標準的WPF應用程式,它也使用NetMQ。(…)

以下是同時執行多個client的圖示:
netmq multi client

而這是在按下”Stop NetMQ”後呈現的狀況:
netmq server offline

IOC

Clients

我們決定的架構是擁有Rx型別的client端,它會公開一個IObservable的串流以讓其它程式使用;在此IObservable中,我們會應用Observable.Create(...)的強大功能力來建立一個工廠,此工廠針對環境建立IObservable串流(透過呼叫/建立需要的NetMQ類型來滿足)。
(原文:The way we decided to structure things was to have a Rx’y type client which exposes a IObservable stream which the rest of the app can make use of. Within that IObservable we would use the power of Observable.Create(..) to create a factory that would create the IObservable stream for that scenario by calling into/creating the NetMQ classes required to forfill the stream requirements.)

所以在這demo程式中,一般的模式會像是:
XXXXClient會擁其它應用程式使用的IObservable串流,其內部會使用一個XXXXNetMQClientNetMQserver通訊。

TickerClient/NetMQTickerClient

我們做了一個決定:對每一個非”HeartBeat“主題,在client和server間,我們會有一個專屬的client。就像我們在part 1中,每一個Hub型別都有一個專用的client proxy(譯者注:這裡應是原作者和它的part1 SignalR混淆了)。

NetMQTickerClient

The NetMQTickerClientis where all the client side NetMQ shennanigans is.在NetMQTickerClient中,client會使用NetMQ SubscriberSocket來訂閱”Trades”主題,如同之前所述,我們會使用NetMQ中的Actor框架。NetMQTickerClient使用一個RequestSocket對server執行快照的初始化,而NetMQ server會有一個ResponseSocket;Server會初始化TickerDtos的快照,然後會讓ticker stream可被應用程式使用。

NetMQTickerClient也在其OnError中做了一些錯誤處理,但主要偵測錯誤的方式仍是對HeartBeatClient的使用。

這邊是NetMQTickerClient的程式:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Text;
using System.Threading.Tasks;
using Client.Factory;
using Client.Comms.Transport;
using Common;
using NetMQ;
using NetMQ.Actors;
using NetMQ.InProcActors;
using NetMQ.Sockets;
using NetMQ.zmq;
using Newtonsoft.Json;
using Poller = NetMQ.Poller;

namespace Client.Comms
{
    public class NetMQTickerClient : IDisposable
    {
        private Actor<object> actor;
        private Subject<TickerDto> subject;
        private CompositeDisposable disposables = new CompositeDisposable();

        class ShimHandler : IShimHandler<object>
        {
            private NetMQContext context;
            private SubscriberSocket subscriberSocket;
            private Subject<TickerDto> subject;
            private string address;
            private Poller poller;
            private NetMQTimer timeoutTimer;

            public ShimHandler(NetMQContext context, Subject<TickerDto> subject, string address)
            {
                this.context = context;
                this.address = address;
                this.subject = subject;
            }

            public void Initialise(object state)
            {

            }

            public void RunPipeline(PairSocket shim)
            {
                // we should signal before running the poller but this will block the application
                shim.SignalOK();

                this.poller = new Poller();

                shim.ReceiveReady += OnShimReady;
                poller.AddSocket(shim);

                timeoutTimer = new NetMQTimer(StreamingProtocol.Timeout);
                timeoutTimer.Elapsed += TimeoutElapsed;
                poller.AddTimer(timeoutTimer);

                Connect();

                poller.Start();

                if (subscriberSocket != null)
                {
                    subscriberSocket.Dispose();
                }
            }

            private void Connect()
            {
                // getting the snapshot
                using (RequestSocket requestSocket = context.CreateRequestSocket())
                {

                    requestSocket.Connect(string.Format("tcp://{0}:{1}", address, SnapshotProtocol.Port));

                    requestSocket.Send(SnapshotProtocol.GetTradessCommand);

                    string json;

                    requestSocket.Options.ReceiveTimeout = SnapshotProtocol.RequestTimeout;

                    try
                    {
                        json = requestSocket.ReceiveString();
                    }
                    catch (AgainException ex)
                    {
                        // Fail to receive trades, we call on error and don't try to do anything with subscriber
                        // calling on error from poller thread block the application
                        Task.Run(() => subject.OnError(new Exception("No response from server")));
                        return;
                    }

                    while (json != SnapshotProtocol.EndOfTickers)
                    {
                        PublishTicker(json);

                        json = requestSocket.ReceiveString();
                    }
                }

                subscriberSocket = context.CreateSubscriberSocket();
                subscriberSocket.Subscribe(StreamingProtocol.TradesTopic);
                subscriberSocket.Subscribe(StreamingProtocol.HeartbeatTopic);
                subscriberSocket.Connect(string.Format("tcp://{0}:{1}", address, StreamingProtocol.Port));
                subscriberSocket.ReceiveReady += OnSubscriberReady;

                poller.AddSocket(subscriberSocket);

                // reset timeout timer
                timeoutTimer.Enable = false;
                timeoutTimer.Enable = true;
            }

            private void TimeoutElapsed(object sender, NetMQTimerEventArgs e)
            {
                // no need to reconnect, the client would be recreated because of RX

                // because of RX internal stuff invoking on the poller thread block the entire application, so calling on Thread Pool
                Task.Run(() => subject.OnError(new Exception("Disconnected from server")));
            }

            private void OnShimReady(object sender, NetMQSocketEventArgs e)
            {
                string command = e.Socket.ReceiveString();

                if (command == ActorKnownMessages.END_PIPE)
                {
                    poller.Stop(false);
                }
            }

            private void OnSubscriberReady(object sender, NetMQSocketEventArgs e)
            {
                string topic = subscriberSocket.ReceiveString();

                if (topic == StreamingProtocol.TradesTopic)
                {
                    string json = subscriberSocket.ReceiveString();
                    PublishTicker(json);

                    // reset timeout timer also when a quote is received
                    timeoutTimer.Enable = false;
                    timeoutTimer.Enable = true;
                }
                else if (topic == StreamingProtocol.HeartbeatTopic)
                {
                    // reset timeout timer
                    timeoutTimer.Enable = false;
                    timeoutTimer.Enable = true;
                }
            }

            private void PublishTicker(string json)
            {
                TickerDto tickerDto = JsonConvert.DeserializeObject<TickerDto>(json);
                subject.OnNext(tickerDto);
            }
        }

        public NetMQTickerClient(NetMQContext context, string address)
        {
            subject = new Subject<TickerDto>();

            this.actor = new Actor<object>(context, new ShimHandler(context, subject, address), null);
            this.disposables.Add(this.actor);

            this.disposables.Add(NetMQHeartBeatClient.Instance.GetConnectionStatusStream()
                .Where(x => x.ConnectionStatus == ConnectionStatus.Closed)
                .Subscribe(x =>
                    this.subject.OnError(new InvalidOperationException("Connection to server has been lost"))));
        }

        public IObservable<TickerDto> GetTickerStream()
        {
            return subject.AsObservable();
        }

        public void Dispose()
        {
            this.disposables.Dispose();
        }
    }
}

TickerClient

TickerClient可用於整個應用程式,以串流方式傳輸TickerDto物件,其中它簡單的從NetMQTickerClient包裝另一個串流。重要的部份是當出現錯誤且TickerRepository中的Repeat發生時,TickerClient IObservable訂閱會重新建立NetMQHeartBeatClient。這會確保NetMQHeartBeatClient會再次試著和server通訊一次。As before it all comes down to good housekeeping and lifestyle management.

以下是TickerClient的程式碼:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Text;
using System.Threading.Tasks;
using Client.Comms.Transport;
using Common;
using NetMQ;

namespace Client.Comms
{
    public class TickerClient : ITickerClient
    {
        private readonly NetMQContext context;
        private readonly string address;

        public TickerClient(NetMQContext context, string address)
        {
            this.context = context;
            this.address = address;
        }

        public IObservable<TickerDto> GetTickerStream()
        {
            return Observable.Create<TickerDto>(observer =>
            {
                NetMQTickerClient client = new NetMQTickerClient(context, address);

                var disposable = client.GetTickerStream().Subscribe(observer);
                return new CompositeDisposable { client, disposable };
            })
            .Publish()
            .RefCount();
        }


        public IObservable<ConnectionInfo> ConnectionStatusStream()
        {
            return Observable.Create<ConnectionInfo>(observer =>
            {
                NetMQHeartBeatClient.Instance.InitialiseComms();

                var disposable = NetMQHeartBeatClient.Instance.
                    GetConnectionStatusStream().Subscribe(observer);

                return new CompositeDisposable { disposable };
            })
            .Publish()
            .RefCount();
        }
    }
}

HeartBeatClient / NetMQHeartBeatClient

We took the decision that the heartbeat between a single client and the server is a global concern in the context of that client.

因此,我們僅預期只會有單一個HeartBeatClient(籍由IOC註冊來達成),並且NetMQHeartBeatClient僅存在單一實體。

NetMQHeartBeatClient

The NetMQHeartBeatClient is where all the client side NetMQ shennanigans is.在NetMQHeartBeatClient中,client會使用NetMQ SubscriberSocket來訂閱”HeartBeat (HB)”主題。如同之前我們使用了NetMQ中的Actor框架,這也是我們預期在一段時間後會從server端的PublisherSocket得到的回應,如果沒有回應的話,我們會認為通訊發生問題,其中我們使用Rx的Subject<T>的OnNext來推送相關的ConnectionInfo/ConnectionStatus。

這邊是NetMQHeartBeatClient的程式碼:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Client.Factory;
using Client.Comms.Transport;
using Common;
using NetMQ;
using NetMQ.Actors;
using NetMQ.InProcActors;
using NetMQ.Sockets;
using NetMQ.zmq;
using Newtonsoft.Json;
using Poller = NetMQ.Poller;

namespace Client.Comms
{
    public class NetMQHeartBeatClient 
    {
        private readonly NetMQContext context;
        private readonly string address;
        private Actor<object> actor;
        private Subject<ConnectionInfo> subject;
        private static NetMQHeartBeatClient instance = null;
        private static object syncLock = new object();
        protected int requiresInitialisation = 1;

        class ShimHandler : IShimHandler<object>
        {
            private NetMQContext context;
            private SubscriberSocket subscriberSocket;
            private Subject<ConnectionInfo> subject;
            private string address;
            private Poller poller;
            private NetMQTimer timeoutTimer;
            private NetMQHeartBeatClient parent;

            public ShimHandler(NetMQContext context, Subject<ConnectionInfo> subject, string address)
            {
                this.context = context;
                this.address = address;
                this.subject = subject;
            }

            public void Initialise(object state)
            {
                parent = (NetMQHeartBeatClient) state;
            }

            public void RunPipeline(PairSocket shim)
            {
                // we should signal before running the poller but this will block the application
                shim.SignalOK();

                this.poller = new Poller();

                shim.ReceiveReady += OnShimReady;
                poller.AddSocket(shim);

                timeoutTimer = new NetMQTimer(StreamingProtocol.Timeout);
                timeoutTimer.Elapsed += TimeoutElapsed;
                poller.AddTimer(timeoutTimer);

                Connect();

                poller.Start();

                if (subscriberSocket != null)
                {
                    subscriberSocket.Dispose();
                }                
            }

            private void Connect()
            {
                subscriberSocket = context.CreateSubscriberSocket();
                subscriberSocket.Subscribe(StreamingProtocol.HeartbeatTopic);
                subscriberSocket.Connect(string.Format("tcp://{0}:{1}", address, StreamingProtocol.Port));

                subject.OnNext(new ConnectionInfo(ConnectionStatus.Connecting, this.address));
                subscriberSocket.ReceiveReady += OnSubscriberReady;
                poller.AddSocket(subscriberSocket);

                // reset timeout timer
                timeoutTimer.Enable = false;
                timeoutTimer.Enable = true;
            }

            private void TimeoutElapsed(object sender, NetMQTimerEventArgs e)
            {
                // no need to reconnect, the client would be recreated because of RX

                // because of RX internal stuff invoking on the poller thread block 
                // the entire application, so calling on Thread Pool
                Task.Run(() =>
                {
                    parent.requiresInitialisation = 1;
                    subject.OnNext(new ConnectionInfo(ConnectionStatus.Closed, this.address));
                });
            }

            private void OnShimReady(object sender, NetMQSocketEventArgs e)
            {
                string command = e.Socket.ReceiveString();

                if (command == ActorKnownMessages.END_PIPE)
                {
                    poller.Stop(false);
                }
            }

            private void OnSubscriberReady(object sender, NetMQSocketEventArgs e)
            {
                string topic = subscriberSocket.ReceiveString();

                if (topic == StreamingProtocol.HeartbeatTopic)
                {
                    subject.OnNext(new ConnectionInfo(ConnectionStatus.Connected, this.address));

                    // reset timeout timer
                    timeoutTimer.Enable = false;
                    timeoutTimer.Enable = true;
                }
            }
        }

        private NetMQHeartBeatClient(NetMQContext context, string address)
        {
            this.context = context;
            this.address = address;
            InitialiseComms();
        }

        public static NetMQHeartBeatClient CreateInstance(NetMQContext context, string address)
        {
            if (instance == null)
            {
                lock (syncLock)
                {
                    if (instance == null)
                    {
                        instance = new NetMQHeartBeatClient(context,address);
                    }
                }
            }
            return instance;
        }

        public void InitialiseComms()
        {
            if (Interlocked.CompareExchange(ref requiresInitialisation, 0, 1) == 1)
            {
                if (actor != null)
                {
                    this.actor.Dispose();
                }

                subject = new Subject<ConnectionInfo>();
                this.actor = new Actor<object>(context, new ShimHandler(context, subject, address), this);
            }
        }

        public IObservable<ConnectionInfo> GetConnectionStatusStream()
        {
            return subject.AsObservable();
        }

        public static NetMQHeartBeatClient Instance
        {
            get { return instance; }
        }
    }
}

HeartBeatClient

HeartBeatClient對整個程式公開,它單純的從NetMQHeartBeatClient中包裝了另一個stream,且可能被用在client與server間連線狀態的溝通。重要的部份是當出現錯誤且NetMQHeartBeatClient中的Repeat發生時,HeartBeatClient訂閱會重新建立。這會確保NetMQHeartBeatClient會再次試著和server通訊一次。As before it all comes down to good housekeeping and lifestyle management.

這邊是HeartBeatClient 的程式碼:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Text;
using System.Threading.Tasks;
using Client.Comms;
using Client.Comms.Transport;
using NetMQ;

namespace Client.Comms
{
    public class HeartBeatClient : IHeartBeatClient
    {
        public IObservable<ConnectionInfo> ConnectionStatusStream()
        {
            return Observable.Create<ConnectionInfo>(observer =>
            {
                NetMQHeartBeatClient.Instance.InitialiseComms();

                var disposable = NetMQHeartBeatClient.Instance
                    .GetConnectionStatusStream().Subscribe(observer);

                return new CompositeDisposable { disposable };
            })
            .Publish()
            .RefCount();
        }
    }
}

TickerRepository

TickerRepositoryObservable鏈結上的下一部份, 那它看起來像什麼呢?它實際上是另人訝異的簡單,但不要被愚弄了,它其實做了很多。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Linq;
using Client.Factory;
using Client.Comms;

namespace Client.Repositories
{
    class TickerRepository : ITickerRepository
    {
        private readonly ITickerClient tickerClient;
        private readonly ITickerFactory tickerFactory;

        public TickerRepository(ITickerClient tickerClient, ITickerFactory tickerFactory)
        {
            this.tickerClient = tickerClient;
            this.tickerFactory = tickerFactory;
        }

        public IObservable<Ticker> GetTickerStream()
        {
            return Observable.Defer(() => tickerClient.GetTickerStream())
                .Select(tickerFactory.Create)                
                .Catch<Ticker>(Observable.Empty<Ticker>())
                .Repeat()
                .Publish()
                .RefCount();
        }
    }
}

所以這裡到底做了什麼?

  • 我們使用Observable.Defer,因此實際上我們沒有使用內部的串流,直到某人訂閱了由Observable.Defer所建立的IObservable串流,這是讓Hot串流變成Cold串流的方式。
  • 我們使用Select以將串流資料從TickerDto轉成Ticker
  • 我們使用Catch補捉串流中的任何例外(OnError),且在其中使用預設值
  • 我們使用Repeat,注意這個很很很重要!這讓我們可以重覆整個串流,包含再次連線至server端。This along with the resilient stream logic are the MOST important bits to the app (at least in my opinion)
  • 我們使用Publish來共用內部的串流
  • 我們使用RefCount以在沒有訂閱者時自動disposal

現在我們已經看到了repository,在學習TickerStream的IObservable的旅途中只剩下一個部份了,讓我們來看看吧。

TickersViewModel

TickersViewModel呈現了螢幕上所看到的所有Ticker們,這個viewmodel使用了TickerRepository提供的lazy / repeatable / resilient IObservable,讓我們看看這段我覺得很易讀的程式碼:

using System;
using System.Collections.Generic;
using System.Collections.ObjectModel;
using System.Linq;
using System.Reactive.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Windows.Input;
using Client.Factory;
using Client.Repositories;
using Client.Services;
using Common;
using Common.ViewModels;
using log4net;


namespace Client.ViewModels
{
    public class TickersViewModel : INPCBase
    {
        private readonly ITickerRepository tickerRepository;
        private readonly IConcurrencyService concurrencyService;
        private bool stale = false;
        private static readonly ILog log = LogManager.GetLogger(typeof(TickersViewModel));

        public TickersViewModel(IReactiveTrader reactiveTrader,
                                IConcurrencyService concurrencyService,
            TickerViewModelFactory tickerViewModelFactory)
        {
            Tickers = new ObservableCollection<TickerViewModel>();
            Tickers.Add(tickerViewModelFactory.Create("Yahoo"));
            Tickers.Add(tickerViewModelFactory.Create("Google"));
            Tickers.Add(tickerViewModelFactory.Create("Apple"));
            Tickers.Add(tickerViewModelFactory.Create("Facebook"));
            Tickers.Add(tickerViewModelFactory.Create("Microsoft"));
            Tickers.Add(tickerViewModelFactory.Create("Twitter"));
            this.tickerRepository = reactiveTrader.TickerRepository;
            this.concurrencyService = concurrencyService;
            LoadTrades();
        }

        public ObservableCollection<TickerViewModel> Tickers { get; private set; }

        private void LoadTrades()
        {
            tickerRepository.GetTickerStream()
                            .ObserveOn(concurrencyService.Dispatcher)
                            .SubscribeOn(concurrencyService.TaskPool)
                            .Subscribe(
                                AddTicker,
                                ex => log.Error("An error occurred within the trade stream", ex));
        }

        private void AddTicker(Ticker ticker)
        {
            Tickers.Single(x => x.Name == ticker.Name)
                 .AcceptNewPrice(ticker.Price);
        }
    }
}

而每個Ticker是由單一個TickerViewModel所呈現的,如下程式所示。其中你也可以看到我們之前談論到的ConnectionStatusStream IObservable,這是用來讓TickerViewModel在斷線時顯示一個紅色的”DISCONNECTED“狀態框,稍後會談到這個。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Linq;
using System.Text;
using System.Threading.Tasks;
using Client.Comms.Transport;
using Client.Services;
using log4net;

namespace Client.ViewModels
{
    public class TickerViewModel : INPCBase
    {
        private decimal price;
        private bool isUp;
        private bool stale;
        private bool disconnected;
        private static readonly ILog log = LogManager.GetLogger(typeof(TickerViewModel));


        public TickerViewModel(
            IReactiveTrader reactiveTrader,
            IConcurrencyService concurrencyService,
            string name)
        {

            this.Name = name;

            reactiveTrader.ConnectionStatusStream
                .ObserveOn(concurrencyService.Dispatcher)
                .SubscribeOn(concurrencyService.TaskPool)
                .Subscribe(
                    OnStatusChange,
                    ex => log.Error("An error occurred within the connection status stream.", ex));
        }


        public string Name { get; private set; }


        public void AcceptNewPrice(decimal newPrice)
        {
            IsUp = newPrice > price;
            Price = newPrice;
        }


        public decimal Price
        {
            get { return this.price; }
            private set
            {
                this.price = value;
                base.OnPropertyChanged("Price");
            }
        }

        public bool IsUp
        {
            get { return this.isUp; }
            private set
            {
                this.isUp = value;
                base.OnPropertyChanged("IsUp");
            }
        }

        public bool Stale
        {
            get { return this.stale; }
            set
            {
                this.stale = value;
                base.OnPropertyChanged("Stale");
            }
        } 

        public bool Disconnected
        {
            get { return this.disconnected; }
            set
            {
                this.disconnected = value;
                base.OnPropertyChanged("Disconnected");
            }
        }


        private void OnStatusChange(ConnectionInfo connectionInfo)
        {

            switch (connectionInfo.ConnectionStatus)
            {
                case ConnectionStatus.Connecting:
                    Disconnected = true;
                    break;
                case ConnectionStatus.Connected:
                    Disconnected = false;
                    break;
                case ConnectionStatus.Closed:
                    Disconnected = true;
                    break;
                default:
                    throw new ArgumentOutOfRangeException();
            }
        }

    }
}

可以看到這個ViewModel使用IReactiveTrader.ConnectionStatusStream來監看對NetMQPublisher的連線狀態,這部份的程式職責在顯示價格及斷線的狀態,斷線狀態則是由Disconnected屬性持有。

ConnectivityStatusViewModel

最後一件我想說明的是ConnectionStatusStream如何被使用。當HeartBeatClient推送新值時這個串流的OnNexts會被呼叫,所以我們可以看到像是連線中,已連線、關閉等狀態,所有的變化都是來自我們之前討論的NetMQHeartBeatClient其中的邏輯,並使用標準Rx中的Subject<T>將之轉換成IObservable串流。

如下是整個程式中ConnectivityStatusViewModel所顯示在下方狀態列的資訊。
enter image description here

using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Linq;
using System.Text;
using System.Threading.Tasks;
using Client.Comms.Transport;
using Client.Services;
using log4net;

namespace Client.ViewModels
{
    public class ConnectivityStatusViewModel : INPCBase
    {
        private static readonly ILog log = LogManager.GetLogger(typeof(ConnectivityStatusViewModel));
        private string server;
        private string status;
        private bool disconnected;

        public ConnectivityStatusViewModel(
            IReactiveTrader reactiveTrader,
            IConcurrencyService concurrencyService)
        {
            reactiveTrader.ConnectionStatusStream
                .ObserveOn(concurrencyService.Dispatcher)
                .SubscribeOn(concurrencyService.TaskPool)
                .Subscribe(
                OnStatusChange,
                ex => log.Error("An error occurred within the connection status stream.", ex));
        }


        private void OnStatusChange(ConnectionInfo connectionInfo)
        {
            Server = connectionInfo.Server;

            switch (connectionInfo.ConnectionStatus)
            {
                case ConnectionStatus.Connecting:
                    Status = "Connecting...";
                    Disconnected = true;
                    break;
                case ConnectionStatus.Connected:
                    Status = "Connected";
                    Disconnected = false;
                    break;
                case ConnectionStatus.Closed:
                    Status = "Disconnected";
                    Disconnected = true;
                    break;
                default:
                    throw new ArgumentOutOfRangeException();
            }
        }

        public string Server
        {
            get { return this.server; }
            set
            {
                this.server = value;
                base.OnPropertyChanged("Server");
            }
        }

        public string Status
        {
            get { return this.status; }
            set
            {
                this.status = value;
                base.OnPropertyChanged("Status");
            }
        }

        public bool Disconnected
        {
            get { return this.disconnected; }
            set
            {
                this.disconnected = value;
                base.OnPropertyChanged("Disconnected");
            }
        }
    }
}

That’s It For Now

Anyway that is all I wanted to say for now, I hope you have enjoyed this very mini RX/SignalR/NetMQ series, and that this has maybe made you want to go away and have a play with RX/SignalR/NetMQ. If you have enjoyed it, please feel to free to leave a vote or a comment, Doron and I have worked quite hard on these demos, to try and iron out any bugs and make them as real world as possible, so comments/votes are always nice to receive.

Written with StackEdit.

7.20.2017

Visual studio with Vim

以前寫程式寫到用滑鼠時手腕會痛,怕得到腕隧道症候群,於是下定決心練習左手用滑鼠,以減輕右手手腕的負擔,再換用各式人體工學鍵盤,再然後發覺到另一個很有用的方式,也就是Vim。它讓你不再需要頻繁地讓右手移開至方向鍵或功能鍵,若是再配合有小紅點的IBM鍵盤,你會發現手臂不會那麼累了。

目前幾乎所有主流編輯器都有提供vi的功能,而在visual studio中則是透過VsVim來提供。

最開始是看到如下語法後就決定學習的:

enter image description here
圖片來自Android Studio : Using Vim

此命令是屬於vi中的組合鍵,可以把它記成 change inner word,第一個單字代表你要做的動作,比如說可以使用:

  • c:變更
  • y:複製
  • d:刪除

等指令,而第二個字單字可以表示範圍或目的,如常用的:

  • i:inner
  • t:直到

例如:

  • ci” – 變更目前遊標前後用”符號包含的整個字串
  • ci) – 變更目前遊標前後左右括號包含的整個字串
  • ci] – 變更目前遊標前後左右中括號包含的整個字串

運用vim提供的各種命令,會讓日常寫程式時效率提高不少,當然,要花不少時間去熟悉,但這是非常非常值得的投資。

相關參考:
* Android Studio : Using Vim(簡體中文)
* 給程式設計師的Vim入門圖解說明
* Youtube中文教學
* 大家來學VIM
* 鳥哥的 Linux 私房菜 第九章、vim 程式編輯器

在visual studio中會有一部份的key map衝突,這時候就要看個人的取捨了,另外一個我一定會用的是這個軟體 – SharpKeys。我會用它來把最常用的Caps Lock和Esc鍵調換,這樣在切換模式時就不會因手指不夠長而提起手腕了XD。(CodePlex若是關閉了可至其Github下載。)

Written with StackEdit.

7.18.2017

Wpf 3D - Helix Toolkit - 一個方便的3D展示函式庫

在youtube上看到了一個wpf上的3D控制展示,附source code,下載後發覺Helix toolkit把最麻煩的部份都處理掉了,可以很簡單的控制3d視圖中各部件的作動,看到這個就想到機台控制應該也可以用此方式來模擬,感覺上應該會很有趣XD。

3D

主要的控制程式如下(有稍微整理過):

public partial class MainWindow : Window
    {
        //provides functionality to 3d models
        Model3DGroup RA = new Model3DGroup();
        Model3D link1 = null;
        Model3D link2 = null;
        Model3D link3 = null;
        Model3D link4 = null;
        Model3D link5 = null;

        //provides render to model3d objects
        ModelVisual3D RoboticArm = new ModelVisual3D();

        //directroy of all stl files
        private const string MODEL_PATH1 = "j0_j1_link.stl";
        private const string MODEL_PATH2 = "j1_j2_link.stl";
        private const string MODEL_PATH3 = "j2_j3_link.stl";
        private const string MODEL_PATH4 = "j3_j4_link.stl";
        private const string MODEL_PATH5 = "j4_j5_link.stl";

        RotateTransform3D R = new RotateTransform3D();
        TranslateTransform3D T = new TranslateTransform3D();

        public MainWindow()
        {
            InitializeComponent();
            RoboticArm.Content = Initialize_Environment(MODEL_PATH1, MODEL_PATH2,MODEL_PATH3, MODEL_PATH4, MODEL_PATH5);
            viewPort3d.Children.Add(RoboticArm);
        }

        private Model3DGroup Initialize_Environment(string model1, string model2, string model3, string model4, string model5)
        {
            try
            {
                viewPort3d.RotateGesture = new MouseGesture(MouseAction.LeftClick);
                ModelImporter import = new ModelImporter();
                link1 = import.Load(model1);
                link2 = import.Load(model2);
                link3 = import.Load(model3);
                link4 = import.Load(model4);
                link5 = import.Load(model5);

                execute_fk();

                RA.Children.Add(link1);
                RA.Children.Add(link2);
                RA.Children.Add(link3);
                RA.Children.Add(link4);
                RA.Children.Add(link5);
            }
            catch (Exception e)
            {
                MessageBox.Show("Exception Error:" + e.StackTrace);
            }
            return RA;
        }

        private void joint1_ValueChanged(object sender, RoutedPropertyChangedEventArgs<double> e)
        {
          execute_fk();
        }        

        private void execute_fk()
        {
            var F1 = new Transform3DGroup();
            var F2 = new Transform3DGroup();
            var F3 = new Transform3DGroup();
            var F4 = new Transform3DGroup();
            var F5 = new Transform3DGroup();

            var p3d = new Point3D(0, 0, 0);
            R = new RotateTransform3D(new AxisAngleRotation3D(new Vector3D(0, 0, 1), joint1.Value), p3d);
            // F1為基座,僅能旋轉,目前角度為joint1.Value指定
            F1.Children.Add(R);

            p3d = new Point3D(0, 0, 9.5);
            T = new TranslateTransform3D(p3d.ToVector3D());
            // F2可旋轉,目前角度為joint2.Value
            R = new RotateTransform3D(new AxisAngleRotation3D(new Vector3D(0, 1, 0), joint2.Value), p3d);
            // F2連接至F1,位置在其Z軸方向9.5
            F2.Children.Add(F1);
            F2.Children.Add(T);
            F2.Children.Add(R);

            p3d = new Point3D(15, 0, 0);            
            T = new TranslateTransform3D(p3d.ToVector3D());
            R = new RotateTransform3D(new AxisAngleRotation3D(new Vector3D(0, 1, 0), joint3.Value), p3d);
            F3.Children.Add(T);
            F3.Children.Add(R);
            F3.Children.Add(F2);

            p3d = new Point3D(6.7, 0, 0);
            T = new TranslateTransform3D(p3d.ToVector3D());
            R = new RotateTransform3D(new AxisAngleRotation3D(new Vector3D(0, 1, 0), joint4.Value), p3d);
            F4.Children.Add(T);
            F4.Children.Add(R);
            F4.Children.Add(F3);

            p3d = new Point3D(5.35, 0, 0);
            T = new TranslateTransform3D(p3d.ToVector3D());
            R = new RotateTransform3D(new AxisAngleRotation3D(new Vector3D(1, 0, 0), joint5.Value), p3d);
            F5.Children.Add(T);
            F5.Children.Add(R);
            F5.Children.Add(F4);

            link1.Transform = F1;
            link2.Transform = F2;
            link3.Transform = F3;
            link4.Transform = F4;
            link5.Transform = F5;
        }
    }

程式中手臂主要是由五個副檔名為.stl的機構組成,每個.stl檔對映到一個Model3D物件,在每個Model3D物件中設定其旋轉(RotateTransform3D)及位置(TranslateTransform3D),最後所有的Model3D物件再組合成Model3DGroup - RA,再由ModelVisual3D RoboticArm負責Render,最後放到HelixViewport3D中展示。

作者也將此程式另行擴充成一個更完整的可編輯Frame來形成動畫的軟體,如下所示:

Robotic Arm Windows Application

Written with StackEdit.

7.15.2017

ReactiveUI的ReactiveCommand

RxUI的命令和其它MVVM中實作ICommand的最大不一樣的地方就是 – 它可以有回傳值(當然,是reactive式的IObservable)!

之前剛學Prism(under Xarmin.Forms)的時候,在DelegateCommand的說明那裡找不到回傳值的用法,再看了一次RxUI的API說明,才發覺到正常來說,ICommand介面的實作中基本上就是執行動作,其概念中是不管動作的執行結果的,不管是Prism中的DelegateCommand或MvvmLight中的RelayCommand都是如此。

而在Reactive的世界中,實作時思考的方式是需要改變的,如上一篇blog介紹的範例中,RetrieveWordCommand的定義如下:

public ReactiveCommand<Unit, List<WordOption>> RetrieveWordCommand

ReactiveCommand<Unit, List<WordOption>>表示此命令不需要代入參數(Unit),但會傳回一IObservable<List<WordOption>>,因此,在建立此命令的程式中:

RetrieveWordCommand = ReactiveCommand.CreateFromTask<List<WordOption>>(async (arg) =>
{
    // ... 
    return wordResults.Select(wr => new WordOption{ //...    }
}, canRetrieve);

它回傳了一個可觀察佇列,且在接續的程式中,定義了當其命令完成後的動作:

RetrieveWordCommand
    .ObserveOn(RxApp.MainThreadScheduler)
    .Subscribe(wordOptions =>
    {
        // ...
    });

程式中,ObserveOn表示後續動作會在其指定的執行緒上被執行,此例中是UI主執行緒,因為它會修改被綁定到View上的屬性,而Subscribe中則會收到原先定義的回傳值List<WordOpton>當參數。

Written with StackEdit.

7.10.2017

在Xamarin.Forms上的MVVM

ReactiveUI

最近在看Xamarin.Forms的MVVM framework,個人徧好ReactiveUI,於是找到了這篇介紹 –
A Simple Vocabulary App Using ReactiveUI and Xamarin Forms,它實作了一個猜字的遊戲,如下圖:
enter image description here

基本上它使用的方式和在Windows平台上沒有什麼差別,仍是focus在view和viewmodel的互動,沒有其它的東西,如頁面的routing、Navigation、IoC的使用等。

此程式主要的動作在WordPickViewModel中:

public WordPickViewModel(IWordRepository wordRepository)
{
    _wordRepository = wordRepository;
    WordOptions = new ObservableCollection<WordOption>();
    CorrectPct = "0%";

    // 設定狀態的條件,型別為IObservable<bool>
    var canRetrieve = this.WhenAnyValue(x => x.CanRetrieve).Select(x => x);
    var canSelect = this.WhenAnyValue(x => x.CanRetrieve).Select(x => !x);

    // 原文使用CreateAsyncTask,目前版本v7.4,要改用如下函式
    // 定義命令,此命令完成後會回傳一List<WordOption>>
    RetrieveWordCommand = ReactiveCommand.CreateFromTask<List<WordOption>>(async (arg) =>
    {
        var wordResults = await _wordRepository.GetWords(_rangeFloor, RangeCeiling);

        return wordResults.Select(wr =>
            new WordOption
            {
                Word = wr.Name,
                Definition = wr.Definition,
                WordId = wr.Id
            }).ToList();
    }, canRetrieve);

    // 原文使用CreateAsyncTask,目前版本v7.4,要改用如下函式
    SelectAnswerCommand = ReactiveCommand.CreateFromTask(async arg =>
    {
        await HandleItemSelectedAsync(arg);
    }, canSelect);

    // ObserveOn表示後續動作會在其指定的執行緒上被執行
    // Subscribe定義命令完成後的處理
    RetrieveWordCommand
        .ObserveOn(RxApp.MainThreadScheduler)
        .Subscribe(wordOptions =>
        {
            _timerCancellationToken = new CancellationTokenSource();
            NextRange();
            CanRetrieve = false;
            WordOptions.Clear();

            // randomly determine the word to challenge user with
            var rand = new Random();
            var challengeWord = wordOptions[rand.Next(wordOptions.Count)];
            ChallengeWord = $"\"{challengeWord.Word}\"";

            foreach (var item in wordOptions)
            {
                var isAnswer = item.WordId == challengeWord.WordId;
                item.IsAnswer = isAnswer;
                item.Image = isAnswer ? "check.png" : "x.png";
                WordOptions.Add(item);
            }

            TimerCountdown = 10;
            Device.StartTimer(new TimeSpan(0, 0, 1), () =>
            {
                if (_timerCancellationToken.IsCancellationRequested)
                {
                    return false;
                }

                if (TimerCountdown == 0)
                {
                    ProcessAnswer();
                    return false;
                }
                TimerCountdown--;
                return true;
            });
        });

    //Behaviors
    this.WhenAnyValue(x => x.Begin).InvokeCommand(RetrieveWordCommand);
}

不過程式中對xaml頁面的部份,都是用code behind中的程式碼產生,View和ViewModel的綁定也是,個人比較不建議這個方式,這讓xaml的優勢不再。

xamvvm

這個輕量級的framework填補了ReactiveUI缺少的部份,因此兩個可以一起合用

Prism

Xarmin.Forms上的Prism不像WPF環境下那麼龐大,它精簡了很多,當然也比ReactiveUI完整多了,若是需要可以互相配合使用。

這邊有一個不錯的教學影片可參考.。

MVVMLight

在學Wpf時主要都是使用這一個,在Xamarin.Forms上它也滿適合的,輕量且剛剛好的功能,作者也寫了一個跨平台的範例程式,另在channel9有影片介紹

另有以此為基礎的延伸框架:

Xarch-starter

可以把它當作基楚的樣板來擴充,wiki中也提到了另一個更進階的框架 - Exrin,也可以參考看看…

這些framework都滿足了最基本的需求,不過大部份的說明文件都不甚豐富,想要應用都需要一點時間;當然也可以不依靠這些framework,之前在學wpf時就看過一系列的教學沒有應用任何framework,不過後來程式一步步擴大,不斷的重構後就出來了另一個框架了,就學習層面來說這很不錯,但還是要看個人的取舍了XD。

Written with StackEdit.

7.08.2017

好多的Icon

之前都是用Font-Awesome-WPF來替介面加上一些圖示,後來才發覺正在用的MahApps也有提供一組獨立的Icon組合 - MahApps.Metro.IconPacks,裡面也包含FontAwesome,馬上就跳蹧了,它也提供了一範例程式,展示所有的Icon,如下所示。

基本上此Pack是一個組合,其中包含了來自其它來源的Icon。

每一組Icon都可以單獨安裝,若是直接安裝MahApps.Metro.IconPacks則會包含以上Icon的組合。

目前它支援Wpf及UWP,理論上Xamarin.Forms應該也可以用(要用這個Iconize),之後再來試…

Written with StackEdit.