12.31.2017

初學Actor Model - Akka.Net

此篇文章基本上是Akka.Net的基礎教學的筆記。

什麼是Actor?

Actor是一個系統中對參與者的模擬。它可能是一個實體,一個類別,可以做一些事情,及互相溝通。

Actors如何互相溝通?

Actors透過訊息互相溝通,任何POCO都可以是訊息,字串、整數、類別、或實作了一個介面的物件。

//this is a message!
public class SomeMessage
{
    public int SomeValue {get; set}
}

傳送訊息:

//send a string to an actor
someActorRef.Tell("this is a message too!");

Actor可以做什麼?

除了處理收到的訊息外,一個actor還可以:
* 建立另一個actor
* 傳送訊息給另一個actor
* 變更它自己的行為,並處理不同的訊息(相對於原先的行為的訊息)

什麼是一個ActorSystem

一個ActorSystem是對Akka.NET framework內部系統的一個參考,所有的actor都生存在其中,你也需要以它來建立你的actor。

安裝Akka.Net NuGet

Install-Package Akka

建立ActorSystem

var system = ActorSystem.Create("MyActorSystem");

建立一個Actor

var writerActor = system.ActorOf(
    Props.Create(
        () => new WriterActor()),
        "writerActor");

或是使用泛型的方式:

var writerActor = system.ActorOf(
    Props.Create<WriterActor>(), "writerActor");

PropsIActorRef’s

什麼是Props

Props是一個封裝了所有需要建立實體actor的資訊的設定類別。

如何建立Props

  1. 使用lambda語法:
Props props = Props.Create(() => new MyActor(..), "...");
  1. 使用泛型語法:
Props props = Props.Create<MyActor>();

什麼是IActorRef

IActorRef代表對一個actor的參考。目的是支援透過ActorSystem傳送訊息給一個actor。

子Actor,Actor階層及監管

enter image description here
建立最上層的actors:

// create the top level actors from above diagram
IActorRef a1 = MyActorSystem.ActorOf(Props.Create<BasicActor>(), "a1");
IActorRef a2 = MyActorSystem.ActorOf(Props.Create<BasicActor>(), "a2");

建立a2下的子actors:

// create the children of actor a2
// this is inside actor a2
IActorRef b1 = Context.ActorOf(Props.Create<BasicActor>(), "b1");
IActorRef b2 = Context.ActorOf(Props.Create<BasicActor>(), "b2");

Actor階層中如何監管?

每個actor會監管其子actor,且僅止於其子actor,如上面的階層圖,a2僅監管b1及b2。

何時需要監管?

發生錯誤時!
當一個子actor發生未處理的例外且要掛掉時,它會尋求它的父actor的幫忙。它會傳送一個Failure類別的訊息,並由父actor決定如何處理。

監管規則

當父actor收到來自其子actor的錯誤訊息,它可以決定以下列方式處理:
* 重啟子actor(預設行為)
* 停止子actor
* 上報錯誤(且停止自己的動作)

監管策略

有兩個內建的策略型態:
* 一對一:父actor發佈的命令僅針對發生錯誤的子actor
* 一對全部:父actor發佈的命令不僅只針對發生錯誤的子actor,而是其所有的子actor

重點是什麼?隔離!

如下圖所示:
Error Kernel
我們切分工作,把可能會發生錯誤的地方放在某一節點上,這讓潛在的錯誤被隔離,系統不會因為一個節點的錯誤而崩潰!

使用ActorSelection依位址尋找特定Actor

var selection = Context.ActorSelection("/path/to/actorName");
selection.Tell(message);

或使用ActorOf:

class FooActor : UntypedActor {}
Props props = Props.Create<FooActor>();

// the ActorPath for myFooActor is "/user/barBazActor"
// NOT "/user/myFooActor" or "/user/FooActor"
IActorRef myFooActor = MyActorSystem.ActorOf(props, "barBazActor");

// if you don't specify a name on creation as below, the system will
// auto generate a name for you, so the actor path will
// be something like "/user/$a"
IActorRef myFooActor = MyActorSystem.ActorOf(props);

也可參考:When Should I Use Actor Selection?

Actor的生命週期

lifecycle methods

使用ReceiveActor來更聰明的處理訊息

public class StringActor : ReceiveActor
{
    public StringActor()
    {
        Receive<string>(
            s => s.StartsWith("AkkaDotNet"), 
            s =>{ /* handle string */ });

        Receive<string>(
            s => s.StartsWith("AkkaDotNetSuccess"), 
            s =>{/* handle string*/});
    }
}

使用BecomeStackedUnbecomeStacked在執行時變更Actor的行為

public class UserActor : ReceiveActor {
    private readonly string _userId;
    private readonly string _chatRoomId;

    public UserActor(string userId, string chatRoomId) {
        _userId = userId;
        _chatRoomId = chatRoomId;

        // start with the Authenticating behavior
        Authenticating();
    }

    protected override void PreStart() {
        // start the authentication process for this user
        Context.ActorSelection("/user/authenticator/")
            .Tell(new AuthenticatePlease(_userId));
    }

    private void Authenticating() {
        Receive<AuthenticationSuccess>(auth => {
            Become(Authenticated); //switch behavior to Authenticated
        });
        Receive<AuthenticationFailure>(auth => {
            Become(Unauthenticated); //switch behavior to Unauthenticated
        });
        Receive<IncomingMessage>(inc => inc.ChatRoomId == _chatRoomId,
            inc => {
                // can't accept message yet - not auth'd
            });
        Receive<OutgoingMessage>(inc => inc.ChatRoomId == _chatRoomId,
            inc => {
                // can't send message yet - not auth'd
            });
    }

    private void Unauthenticated() {
        //switch to Authenticating
        Receive<RetryAuthentication>(retry => Become(Authenticating));
        Receive<IncomingMessage>(inc => inc.ChatRoomId == _chatRoomId,
            inc => {
                // have to reject message - auth failed
            });
        Receive<OutgoingMessage>(inc => inc.ChatRoomId == _chatRoomId,
            inc => {
                // have to reject message - auth failed
            });
    }

    private void Authenticated() {
        Receive<IncomingMessage>(inc => inc.ChatRoomId == _chatRoomId,
            inc => {
                // print message for user
            });
        Receive<OutgoingMessage>(inc => inc.ChatRoomId == _chatRoomId,
            inc => {
                // send message to chatroom
            });
    }
}

什麼是Switchable behavior?

在Actor Model中,一個Actor的核心屬性是它可以在訊息的處理過程中變更自己的行為。
這個特性讓Actor可以實作有限狀態機,或根據所收到的訊息變更處理的方式。

如何實作

  • 堆疊式

    • BecomeStackedenter image description here
    • UnbecomeStackedenter image description here
  • 直接變更

    • Become

Written with StackEdit.

12.16.2017

JackTimingApp

正在學Xamarin.Forms,順便用之前WPF的專案修改,原本想共用ViewModel,不過繪圖的部份改用SkiaSharp,加上原先並沒有把繪圖的部份分開,所以就重寫了XD。

初步成果如下圖:
Screen Shot1

程式的部份還是採用MVVM,另還用了XF範例中的EventToCommand方式,如下程式:

<skia:SKCanvasView x:Name="canvasView" Grid.Row="0">
            <skia:SKCanvasView.Behaviors>
                <jackTimingApp:EventToCommandBehavior Command="{Binding PaintSurfaceCommand}"
                EventName="PaintSurface" />
            </skia:SKCanvasView.Behaviors>
        </skia:SKCanvasView>

這樣子繪圖的部份就由ViewModel包了,當然這次把繪圖的動作分開了,如下所示:

PaintSurfaceCommand = new Command((arg) =>
{
    var args = arg as SKPaintSurfaceEventArgs;

    var info = args.Info;
    var surface = args.Surface;
    var canvas = surface.Canvas;

    if (_bitmap == null)
        _bitmap = new SKBitmap(info);

    if (_drawEngine == null)
        _drawEngine = new SkiaDrawEngine(_bitmap);    

    TimingDatas = TimingMapParser.Parse(TimingData);
    _bitmap = _drawEngine.Draw(TimingDatas);

    canvas.DrawBitmap(_bitmap, 0, 0);
    UpdateTimingDiagram();
});

而實際更新canvas的動作就交由XF內建的MessagingCenter來完成:

public MainPage(MainViewModel vm)
{
    InitializeComponent();

    BindingContext = vm;

    MessagingCenter.Subscribe<MessageToken>(this, "message", (item) =>
    {
        switch (item.TokenType)
        {
            case MessageTokenType.UpdateTimingDiagram:
                DrawTiming();
                break;
        }
    });
}

private void DrawTiming()
{
    canvasView.InvalidateSurface();
}

繪圖的部份還有滿多可以調整的,另外也沒有加上檔案的處理,但介面的部份感覺不太適合App的承現,有時間再調整了…

Written with StackEdit.

8.20.2017

最近寫了一個仿AndyTiming的程式

新工作下班後比較閒,又剛好需要畫一些簡單的時序圖,找了AndyTiming來用,可惜在Windows 10中會有字元無法顯示,順便自己寫了一個,放在GitHub,會持續更新。

介面如下(基本上就和AndyTiming一樣XD):
Screen Shot1

Written with StackEdit.

7.27.2017

NetMQ + RX Demo (譯)

之前找到的NetMQ範例,趁有時間邊翻譯邊學習,此譯文略過前面介紹Rx的部份(有興趣可參考IntroToRx,或是我之前翻譯的中文版),僅從原文中的General Idea開始。文章覺得不重要的或是不會翻的XD,會以代替。
另原文採用NetMQ 3.x版,當前版本4.x後有多項更新,若是需要,可參照此文的方式升級,但升級後在NetMQHeartBeatClient.cs的ShimHandler建構式要修改,斷線訊息才能正常顯示(或者我再找時間把升級後的版本放上去XD已fork並更新,網址:https://github.com/liaochihung/NetMQRxDemo)…

// 要加上parent參數,當然,在呼叫時要帶入
public ShimHandler(object parent,Subject<ConnectionInfo> subject, string address)
{
    this.address = address;
    this.subject = subject;
    // 原專案中忘了呼叫此函式
    Initialise(parent);
}
  • 我之前有翻譯過NetMQ的中文化文件,有需要也可參考。
  • Client - Server的程式,一般可能都會在client端採用輪詢的方式Pull server的資料,但請注意本文中使用Rx,所以它是相反方向的Push式的模式,在瞭解程式時請謹記此點。
  • 此專案的原始範本:SignalR + Rx(跨平台) – ReactiveTrader

General Idea

千言萬語不如圖一張:
enter image description here

The Publisher

pic

Publisher由數個部份組成,下文會說明,但要記住server和client都是使用NetMQ,所以在你開始閱讀前先瞭解下NetMQ會比較好。…

IOC

MainWindow

除了以下幾個控制項外,這個window沒有其它需說明的:

  • Start NetMQ:啟始NetMQPublisher Actor PublisherSocket(對模擬NetMQ server的崩潰/重啟很有幫助)
  • Stop NetMQ:結束NetMQPublisher Actor(對模擬NetMQ server的崩潰/重啟很有幫助)
  • Start Auto Ticker Data:以固定的時間間隔從NetMQPublisher Actor推送隨機產生的TickerDto物件
  • Stop Auto Ticker Data:會暫停從NetMQPublisher Actor推送TickerDto物件
  • Send One Ticker:從NetMQPublisher Actor推送單一的隨機TickerDto物件

現在還不用擔心我們提到的“Actor”是什麼意思,很快就會說明。

MainWindowViewModel

MainWindowViewModel只是用來傳送命令給不管是停止/建立NetMQPublisher Actor,或是告訴NetMQPublisher Actor去做一些事,以下是其完整的程式碼。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Windows.Forms;
using System.Windows.Input;
using Common.ViewModels;
using log4net;
using NetMQServer.Ticker;

namespace NetMQServer
{
    public class MainWindowViewModel
    {
        private readonly ITickerPublisher tickerPublisher;
        private readonly ITickerRepository tickerRepository;
        private Random rand;
        private static readonly ILog Log = LogManager.GetLogger(typeof(MainWindowViewModel));
        private CancellationTokenSource autoRunningCancellationToken;
        private Task autoRunningTask;
        private bool serverStarted;
        private bool autoTickerStarted;

        public MainWindowViewModel(ITickerPublisher tickerPublisher, ITickerRepository tickerRepository)
        {
            this.tickerPublisher = tickerPublisher;
            this.tickerRepository = tickerRepository;
            this.rand = new Random();

            serverStarted = false;
            autoTickerStarted = false;

            AutoTickerStartCommand = new DelegateCommand(
                AutoRunning, 
                () => serverStarted && !autoTickerStarted);

            AutoTickerStopCommand = new DelegateCommand(
                () =>
                {
                    if (autoRunningCancellationToken != null)
                    {
                        autoRunningCancellationToken.Cancel();
                        autoRunningTask.Wait();
                        autoTickerStarted = false;
                    }
                }, 
                () => serverStarted && autoTickerStarted);

            SendOneTickerCommand = new DelegateCommand(
                SendOneManualFakeTicker, 
                () => serverStarted && !autoTickerStarted);

            StartCommand = new DelegateCommand(
                StartServer, 
                () => !serverStarted);

            StopCommand = new DelegateCommand(
                StopServer, 
                () => serverStarted);
        }

        public DelegateCommand AutoTickerStartCommand { get; set; }
        public DelegateCommand AutoTickerStopCommand { get; set; }
        public DelegateCommand SendOneTickerCommand { get; set; }
        public DelegateCommand StartCommand { get; set; }
        public DelegateCommand StopCommand { get; set; }

        public void Start()
        {
            StartServer();
        }


        private void AutoRunning()
        {
            autoTickerStarted = true;
            autoRunningCancellationToken = new CancellationTokenSource();
            autoRunningTask = Task.Run(async () =>
            {
                //Publisher is not thread safe, so while the auto ticker is 
                //running only the autoticker is allowed to access the publisher
                while (!autoRunningCancellationToken.IsCancellationRequested)
                {
                    SendOneManualFakeTicker();

                    await Task.Delay(20);
                }
            });
        }

        private void SendOneManualFakeTicker()
        {
            var currentTicker = tickerRepository.GetNextTicker();

            var flipPoint = rand.Next(0, 100);

            if (flipPoint > 50)
            {
                currentTicker.Price += currentTicker.Price / 30;
            }
            else
            {
                currentTicker.Price -= currentTicker.Price / 30;
            }

            tickerRepository.StoreTicker(currentTicker);

            tickerPublisher.PublishTrade(currentTicker);
        }

        private void StartServer()
        {
            serverStarted = true;
            tickerPublisher.Start();
            AutoRunning();
        }

        private void StopServer()
        {
            if (autoTickerStarted)
            {
                autoRunningCancellationToken.Cancel();

                // Publisher is not thread safe, so while the auto ticker is 
                // running only the autoticker is allowed to access the publisher. 
                //Therefore before we can stop the publisher we have to 
                // wait for the autoticker task to complete
                autoRunningTask.Wait();
                autoTickerStarted = false;

                autoRunningCancellationToken = null;
                autoRunningTask = null;
            }
            tickerPublisher.Stop();

            serverStarted = false;
        }
    }
}

NetMQ Actors

在我們深入了解NetMQPublisher的工作原理之前,值得一提的是server和client都使用NetMQ的Actor模型,我很高興看到這個,因為這部分是由我撰寫的,也因此我認識了Somdoron。

你可以把Actor想成一個可以傳送訊息(命令)的socket,在Actor的內部使用了一個特殊的PairSocket,當你傳送訊息(命令)至Actor時,其實你是對其中一個pair寫入,而同時,Actor內部的另一個PairSocket可能正接收來自另一端的命令,且會執行此命令所相對應的動作。

你實際上要做的是在你的命令的來源和Actor間訂製一個簡單的協定。此協定可能包含要請Actor做的事情,或一個告知Actor去停止它所做的任何事情(因為常常需要,所以NetMQ提供了此命令 – ActorKnownMessages.END_PIPE(譯者注:原文採用NetMQ3.x版,目前新版4.x要改用NetMQActor.EndShimMessage))。

現在你可能會想知道為什麼我們選擇使用這個Actor模型?這很簡單,使用Actor模型可確保不會有任何關於lock的問題,因為根本就沒有資料共享,資料是通過socket發送的,且Actor保證會有自己的資料副本,因此不需要lock,也就不需要等待lock,這有助於讓事情簡單快速且又執行緒安全。

你可以在以下blog文章深入瞭解:#7:Simple Actor Model

Start The Publisher

我們在MainWindowViewModel上看到了其包含啟動/停止NetMQPublisher的命令,但是它們到底作了什麼?怎麼做的?

Start函式本身很簡單,在NetMQPublisher中它只是建立了一個actor以接收命令。

public void Start()
{
    actor = new Actor<object>(context, new ShimHandler(context, tickerRepository), null);
}

Who Does The Streaming To The Clients?

我們剛剛討論到的NetMQPublisher做了所有的工作,以下為程式碼:

NetMQPublisher實際上作了如下事情:
1. 送出一個快照以回應(籍由ResponseSocket)來自client端所初始化的請求(籍由RequestSocket)。
2. 發佈(籍由PublisherSocket)”Trades”給所有已連線且訂閱了”Trades”主題的client端。
3. 發佈(籍由PublisherSocket)”HB”給所有已連線且訂閱了”HB”主題的client端。

using System;
using System.Collections.Generic;using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Windows.Navigation;
using Common;
using NetMQ;
using NetMQ.Actors;
using NetMQ.InProcActors;
using NetMQ.Sockets;
using NetMQ.zmq;
using Newtonsoft.Json;
using Poller = NetMQ.Poller;

namespace NetMQServer.Ticker
{
    public class NetMQPublisher : ITickerPublisher
    {
        private const string PublishTicker = "P";        

        public class ShimHandler : IShimHandler<object>
        {
            private readonly NetMQContext context;
            private PublisherSocket publisherSocket;
            private ResponseSocket snapshotSocket;
            private ITickerRepository tickerRepository;
            private Poller poller;
            private NetMQTimer heartbeatTimer;

            public ShimHandler(NetMQContext context, ITickerRepository tickerRepository)
            {
                this.context = context;
                this.tickerRepository = tickerRepository;                
            }

            public void Initialise(object state)
            {

            }

            public void RunPipeline(PairSocket shim)
            {
                publisherSocket = context.CreatePublisherSocket();
                publisherSocket.Bind("tcp://*:" + StreamingProtocol.Port);

                snapshotSocket = context.CreateResponseSocket();
                snapshotSocket.Bind("tcp://*:" + SnapshotProtocol.Port);
                snapshotSocket.ReceiveReady += OnSnapshotReady;

                shim.ReceiveReady += OnShimReady;

                heartbeatTimer = new NetMQTimer(StreamingProtocol.HeartbeatInterval);
                heartbeatTimer.Elapsed += OnHeartbeatTimerElapsed;

                shim.SignalOK();

                poller = new Poller();
                poller.AddSocket(shim);
                poller.AddSocket(snapshotSocket);
                poller.AddTimer(heartbeatTimer);
                poller.Start();

                publisherSocket.Dispose();
                snapshotSocket.Dispose();
            }

            private void OnHeartbeatTimerElapsed(object sender, NetMQTimerEventArgs e)
            {
                publisherSocket.Send(StreamingProtocol.HeartbeatTopic);
            }

            private void OnSnapshotReady(object sender, NetMQSocketEventArgs e)
            {                
                string command = snapshotSocket.ReceiveString();

                // Currently we only have one type of events
                if (command == SnapshotProtocol.GetTradessCommand)
                {
                    var tickers = tickerRepository.GetAllTickers();

                    // we will send all the tickers in one message
                    foreach (var ticker in tickers)
                    {
                        snapshotSocket.SendMore(JsonConvert.SerializeObject(ticker));
                    }

                    snapshotSocket.Send(SnapshotProtocol.EndOfTickers);
                }
            }

            private void OnShimReady(object sender, NetMQSocketEventArgs e)
            {

                string command = e.Socket.ReceiveString();

                switch (command)
                {
                    case ActorKnownMessages.END_PIPE:
                        poller.Stop(false);
                        break;
                    case PublishTicker:
                        string topic = e.Socket.ReceiveString();
                        string json = e.Socket.ReceiveString();
                        publisherSocket.
                            SendMore(topic).
                            Send(json);
                        break;
                }

            }
        }

        private Actor<object> actor;
        private readonly NetMQContext context;
        private readonly ITickerRepository tickerRepository;

        public NetMQPublisher(NetMQContext context, ITickerRepository tickerRepository)
        {
            this.context = context;
            this.tickerRepository = tickerRepository;        
        }

        public void Start()
        {
            if (actor != null)
                return;

            actor = new Actor<object>(context, new ShimHandler(context, tickerRepository), null);
        }

        public void Stop()
        {
            if (actor != null)
            {
                actor.Dispose();
                actor = null;
            }
        }        

        public void PublishTrade(TickerDto ticker)
        {
            if (actor == null)
                return;

            actor.
                SendMore(PublishTicker).
                SendMore(StreamingProtocol.TradesTopic).
                Send(JsonConvert.SerializeObject(ticker));                
        }

    }
}
}

NetMQPublisher的Actor管道內部作的事是它建立了幾個額外的socket。

這些socket不是在Actor內部使用的,那些是您不會真正看到的專用的PairSocket pair,它們是NetMQ原始碼的一部分。我們在這裡討論的socket是用於應用程序邏輯的socket,在這個Demo程式中的應用如下所示:

  1. 一個PublisherSocket:這個socket是用來推送資料至客戶端的。NetMQ使用訊息框,如在第一段frame中放置主題,而之後才是實際的資料。這樣子客戶端(SubscriberSocket(s))就可以在處理訊息資料前先判斷此訊息是不是它們感興趣的。這一個PublisherSocket足夠用來服務多個主題了,你可以簡單的提供下列資訊給publisher:

    • 訊息主題
    • 實際訊息 透過NetMQPublisher來傳送特定訊息的範例如下:
      public void PublishTrade(TickerDto ticker)
      {
          actor.
              SendMore(PublishTicker).
              SendMore(StreamingProtocol.TradesTopic).
              Send(JsonConvert.SerializeObject(ticker));
      }

    實際上在這個範例中有如下兩個訊息被使用:

    • TradesTopic (“Trades”):此主題使用單一NetMQPublisher中的PublisherSocket在已連接的客戶端上串流傳輸TickerDto物件。而在客戶端,他們使用一個SubscriberSocket,並將其主題設置為”Trades”,以便它們只接收來自NetMQPublisher PublisherSocket發佈者所發布的與”Trades”匹配的主題。
    • HeartbeatTopic (“HB”):此主題也使用單一NetMQPublisher中的PublisherSocket在已連接的客戶端上串流傳輸僅包含”HB”(訊息的內容不重要,僅注意主題名,因此客戶端可以看到一個新的”HB”訊息來了)主題的單一訊息幀。而在客戶端,他們使用一個SubscriberSocket,並將其主題設置為”HB”,以便它們只接收來自NetMQPublisher PublisherSocket發佈者所發布的與”HB”匹配的主題。所以實際上server端會初始化要推送的訊息,而客戶端會有一個HeartBeat主題的訂閱者,此訂閱者的程式中會有如下排程:

      • 如果server(NetMQPublisher)依時間不斷地回應,此時雙方的通訊狀態會被認為是正常的。
      • 如果server(NetMQPublisher)無法依時間不斷地回應,此時雙方的通訊狀態會被認為是有問題的。

    2.一個ResponseSocket:這個socket是負責在client和server間傳送所有trades組成的快照,此快照為由發佈者行程所持有的儲存在記憶體中的一串TickerRespository(只是一個由trades組成的佇列),Client應該包含有一個RequestSocket,而server(NetMQPublisher)應包含一ResponseSocket。所以client會初始化並傳送一訊息(訊息內容不重要),且會期待從server端TickerRespository回傳的JSON型別的序列化trade資料,這動作在client端起始時會完成。

Simulating A Crash In The Publisher

這很簡單,只要按下MainWindow中的“Stop NetMQ”按鈕,這會執行以下程式,結束NetMQPublisher中的Actor

public void Stop()
{
    actor.Dispose();
}

然後應該發生的事是,任何已連線的客戶端在一段時間後會發覺server不存在且應顯示“DISCONNECTED”

Restarting The Publisher

這很簡單,只要按下MainWindow中的“Start NetMQ”按鈕,這會執行以下程式:

private void StartServer()
{
    serverStarted = true;
    tickerPublisher.Start();
    AutoRunning();
}

然後應該發生的事是,任何已連線的客戶端在一段時間後會發覺server已存在且不再顯示“DISCONNECTED”

NetMQ Client

netmq client
Client是一個標準的WPF應用程式,它也使用NetMQ。(…)

以下是同時執行多個client的圖示:
netmq multi client

而這是在按下”Stop NetMQ”後呈現的狀況:
netmq server offline

IOC

Clients

我們決定的架構是擁有Rx型別的client端,它會公開一個IObservable的串流以讓其它程式使用;在此IObservable中,我們會應用Observable.Create(...)的強大功能力來建立一個工廠,此工廠針對環境建立IObservable串流(透過呼叫/建立需要的NetMQ類型來滿足)。
(原文:The way we decided to structure things was to have a Rx’y type client which exposes a IObservable stream which the rest of the app can make use of. Within that IObservable we would use the power of Observable.Create(..) to create a factory that would create the IObservable stream for that scenario by calling into/creating the NetMQ classes required to forfill the stream requirements.)

所以在這demo程式中,一般的模式會像是:
XXXXClient會擁其它應用程式使用的IObservable串流,其內部會使用一個XXXXNetMQClientNetMQserver通訊。

TickerClient/NetMQTickerClient

我們做了一個決定:對每一個非”HeartBeat“主題,在client和server間,我們會有一個專屬的client。就像我們在part 1中,每一個Hub型別都有一個專用的client proxy(譯者注:這裡應是原作者和它的part1 SignalR混淆了)。

NetMQTickerClient

The NetMQTickerClientis where all the client side NetMQ shennanigans is.在NetMQTickerClient中,client會使用NetMQ SubscriberSocket來訂閱”Trades”主題,如同之前所述,我們會使用NetMQ中的Actor框架。NetMQTickerClient使用一個RequestSocket對server執行快照的初始化,而NetMQ server會有一個ResponseSocket;Server會初始化TickerDtos的快照,然後會讓ticker stream可被應用程式使用。

NetMQTickerClient也在其OnError中做了一些錯誤處理,但主要偵測錯誤的方式仍是對HeartBeatClient的使用。

這邊是NetMQTickerClient的程式:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Text;
using System.Threading.Tasks;
using Client.Factory;
using Client.Comms.Transport;
using Common;
using NetMQ;
using NetMQ.Actors;
using NetMQ.InProcActors;
using NetMQ.Sockets;
using NetMQ.zmq;
using Newtonsoft.Json;
using Poller = NetMQ.Poller;

namespace Client.Comms
{
    public class NetMQTickerClient : IDisposable
    {
        private Actor<object> actor;
        private Subject<TickerDto> subject;
        private CompositeDisposable disposables = new CompositeDisposable();

        class ShimHandler : IShimHandler<object>
        {
            private NetMQContext context;
            private SubscriberSocket subscriberSocket;
            private Subject<TickerDto> subject;
            private string address;
            private Poller poller;
            private NetMQTimer timeoutTimer;

            public ShimHandler(NetMQContext context, Subject<TickerDto> subject, string address)
            {
                this.context = context;
                this.address = address;
                this.subject = subject;
            }

            public void Initialise(object state)
            {

            }

            public void RunPipeline(PairSocket shim)
            {
                // we should signal before running the poller but this will block the application
                shim.SignalOK();

                this.poller = new Poller();

                shim.ReceiveReady += OnShimReady;
                poller.AddSocket(shim);

                timeoutTimer = new NetMQTimer(StreamingProtocol.Timeout);
                timeoutTimer.Elapsed += TimeoutElapsed;
                poller.AddTimer(timeoutTimer);

                Connect();

                poller.Start();

                if (subscriberSocket != null)
                {
                    subscriberSocket.Dispose();
                }
            }

            private void Connect()
            {
                // getting the snapshot
                using (RequestSocket requestSocket = context.CreateRequestSocket())
                {

                    requestSocket.Connect(string.Format("tcp://{0}:{1}", address, SnapshotProtocol.Port));

                    requestSocket.Send(SnapshotProtocol.GetTradessCommand);

                    string json;

                    requestSocket.Options.ReceiveTimeout = SnapshotProtocol.RequestTimeout;

                    try
                    {
                        json = requestSocket.ReceiveString();
                    }
                    catch (AgainException ex)
                    {
                        // Fail to receive trades, we call on error and don't try to do anything with subscriber
                        // calling on error from poller thread block the application
                        Task.Run(() => subject.OnError(new Exception("No response from server")));
                        return;
                    }

                    while (json != SnapshotProtocol.EndOfTickers)
                    {
                        PublishTicker(json);

                        json = requestSocket.ReceiveString();
                    }
                }

                subscriberSocket = context.CreateSubscriberSocket();
                subscriberSocket.Subscribe(StreamingProtocol.TradesTopic);
                subscriberSocket.Subscribe(StreamingProtocol.HeartbeatTopic);
                subscriberSocket.Connect(string.Format("tcp://{0}:{1}", address, StreamingProtocol.Port));
                subscriberSocket.ReceiveReady += OnSubscriberReady;

                poller.AddSocket(subscriberSocket);

                // reset timeout timer
                timeoutTimer.Enable = false;
                timeoutTimer.Enable = true;
            }

            private void TimeoutElapsed(object sender, NetMQTimerEventArgs e)
            {
                // no need to reconnect, the client would be recreated because of RX

                // because of RX internal stuff invoking on the poller thread block the entire application, so calling on Thread Pool
                Task.Run(() => subject.OnError(new Exception("Disconnected from server")));
            }

            private void OnShimReady(object sender, NetMQSocketEventArgs e)
            {
                string command = e.Socket.ReceiveString();

                if (command == ActorKnownMessages.END_PIPE)
                {
                    poller.Stop(false);
                }
            }

            private void OnSubscriberReady(object sender, NetMQSocketEventArgs e)
            {
                string topic = subscriberSocket.ReceiveString();

                if (topic == StreamingProtocol.TradesTopic)
                {
                    string json = subscriberSocket.ReceiveString();
                    PublishTicker(json);

                    // reset timeout timer also when a quote is received
                    timeoutTimer.Enable = false;
                    timeoutTimer.Enable = true;
                }
                else if (topic == StreamingProtocol.HeartbeatTopic)
                {
                    // reset timeout timer
                    timeoutTimer.Enable = false;
                    timeoutTimer.Enable = true;
                }
            }

            private void PublishTicker(string json)
            {
                TickerDto tickerDto = JsonConvert.DeserializeObject<TickerDto>(json);
                subject.OnNext(tickerDto);
            }
        }

        public NetMQTickerClient(NetMQContext context, string address)
        {
            subject = new Subject<TickerDto>();

            this.actor = new Actor<object>(context, new ShimHandler(context, subject, address), null);
            this.disposables.Add(this.actor);

            this.disposables.Add(NetMQHeartBeatClient.Instance.GetConnectionStatusStream()
                .Where(x => x.ConnectionStatus == ConnectionStatus.Closed)
                .Subscribe(x =>
                    this.subject.OnError(new InvalidOperationException("Connection to server has been lost"))));
        }

        public IObservable<TickerDto> GetTickerStream()
        {
            return subject.AsObservable();
        }

        public void Dispose()
        {
            this.disposables.Dispose();
        }
    }
}

TickerClient

TickerClient可用於整個應用程式,以串流方式傳輸TickerDto物件,其中它簡單的從NetMQTickerClient包裝另一個串流。重要的部份是當出現錯誤且TickerRepository中的Repeat發生時,TickerClient IObservable訂閱會重新建立NetMQHeartBeatClient。這會確保NetMQHeartBeatClient會再次試著和server通訊一次。As before it all comes down to good housekeeping and lifestyle management.

以下是TickerClient的程式碼:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Text;
using System.Threading.Tasks;
using Client.Comms.Transport;
using Common;
using NetMQ;

namespace Client.Comms
{
    public class TickerClient : ITickerClient
    {
        private readonly NetMQContext context;
        private readonly string address;

        public TickerClient(NetMQContext context, string address)
        {
            this.context = context;
            this.address = address;
        }

        public IObservable<TickerDto> GetTickerStream()
        {
            return Observable.Create<TickerDto>(observer =>
            {
                NetMQTickerClient client = new NetMQTickerClient(context, address);

                var disposable = client.GetTickerStream().Subscribe(observer);
                return new CompositeDisposable { client, disposable };
            })
            .Publish()
            .RefCount();
        }


        public IObservable<ConnectionInfo> ConnectionStatusStream()
        {
            return Observable.Create<ConnectionInfo>(observer =>
            {
                NetMQHeartBeatClient.Instance.InitialiseComms();

                var disposable = NetMQHeartBeatClient.Instance.
                    GetConnectionStatusStream().Subscribe(observer);

                return new CompositeDisposable { disposable };
            })
            .Publish()
            .RefCount();
        }
    }
}

HeartBeatClient / NetMQHeartBeatClient

We took the decision that the heartbeat between a single client and the server is a global concern in the context of that client.

因此,我們僅預期只會有單一個HeartBeatClient(籍由IOC註冊來達成),並且NetMQHeartBeatClient僅存在單一實體。

NetMQHeartBeatClient

The NetMQHeartBeatClient is where all the client side NetMQ shennanigans is.在NetMQHeartBeatClient中,client會使用NetMQ SubscriberSocket來訂閱”HeartBeat (HB)”主題。如同之前我們使用了NetMQ中的Actor框架,這也是我們預期在一段時間後會從server端的PublisherSocket得到的回應,如果沒有回應的話,我們會認為通訊發生問題,其中我們使用Rx的Subject<T>的OnNext來推送相關的ConnectionInfo/ConnectionStatus。

這邊是NetMQHeartBeatClient的程式碼:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Client.Factory;
using Client.Comms.Transport;
using Common;
using NetMQ;
using NetMQ.Actors;
using NetMQ.InProcActors;
using NetMQ.Sockets;
using NetMQ.zmq;
using Newtonsoft.Json;
using Poller = NetMQ.Poller;

namespace Client.Comms
{
    public class NetMQHeartBeatClient 
    {
        private readonly NetMQContext context;
        private readonly string address;
        private Actor<object> actor;
        private Subject<ConnectionInfo> subject;
        private static NetMQHeartBeatClient instance = null;
        private static object syncLock = new object();
        protected int requiresInitialisation = 1;

        class ShimHandler : IShimHandler<object>
        {
            private NetMQContext context;
            private SubscriberSocket subscriberSocket;
            private Subject<ConnectionInfo> subject;
            private string address;
            private Poller poller;
            private NetMQTimer timeoutTimer;
            private NetMQHeartBeatClient parent;

            public ShimHandler(NetMQContext context, Subject<ConnectionInfo> subject, string address)
            {
                this.context = context;
                this.address = address;
                this.subject = subject;
            }

            public void Initialise(object state)
            {
                parent = (NetMQHeartBeatClient) state;
            }

            public void RunPipeline(PairSocket shim)
            {
                // we should signal before running the poller but this will block the application
                shim.SignalOK();

                this.poller = new Poller();

                shim.ReceiveReady += OnShimReady;
                poller.AddSocket(shim);

                timeoutTimer = new NetMQTimer(StreamingProtocol.Timeout);
                timeoutTimer.Elapsed += TimeoutElapsed;
                poller.AddTimer(timeoutTimer);

                Connect();

                poller.Start();

                if (subscriberSocket != null)
                {
                    subscriberSocket.Dispose();
                }                
            }

            private void Connect()
            {
                subscriberSocket = context.CreateSubscriberSocket();
                subscriberSocket.Subscribe(StreamingProtocol.HeartbeatTopic);
                subscriberSocket.Connect(string.Format("tcp://{0}:{1}", address, StreamingProtocol.Port));

                subject.OnNext(new ConnectionInfo(ConnectionStatus.Connecting, this.address));
                subscriberSocket.ReceiveReady += OnSubscriberReady;
                poller.AddSocket(subscriberSocket);

                // reset timeout timer
                timeoutTimer.Enable = false;
                timeoutTimer.Enable = true;
            }

            private void TimeoutElapsed(object sender, NetMQTimerEventArgs e)
            {
                // no need to reconnect, the client would be recreated because of RX

                // because of RX internal stuff invoking on the poller thread block 
                // the entire application, so calling on Thread Pool
                Task.Run(() =>
                {
                    parent.requiresInitialisation = 1;
                    subject.OnNext(new ConnectionInfo(ConnectionStatus.Closed, this.address));
                });
            }

            private void OnShimReady(object sender, NetMQSocketEventArgs e)
            {
                string command = e.Socket.ReceiveString();

                if (command == ActorKnownMessages.END_PIPE)
                {
                    poller.Stop(false);
                }
            }

            private void OnSubscriberReady(object sender, NetMQSocketEventArgs e)
            {
                string topic = subscriberSocket.ReceiveString();

                if (topic == StreamingProtocol.HeartbeatTopic)
                {
                    subject.OnNext(new ConnectionInfo(ConnectionStatus.Connected, this.address));

                    // reset timeout timer
                    timeoutTimer.Enable = false;
                    timeoutTimer.Enable = true;
                }
            }
        }

        private NetMQHeartBeatClient(NetMQContext context, string address)
        {
            this.context = context;
            this.address = address;
            InitialiseComms();
        }

        public static NetMQHeartBeatClient CreateInstance(NetMQContext context, string address)
        {
            if (instance == null)
            {
                lock (syncLock)
                {
                    if (instance == null)
                    {
                        instance = new NetMQHeartBeatClient(context,address);
                    }
                }
            }
            return instance;
        }

        public void InitialiseComms()
        {
            if (Interlocked.CompareExchange(ref requiresInitialisation, 0, 1) == 1)
            {
                if (actor != null)
                {
                    this.actor.Dispose();
                }

                subject = new Subject<ConnectionInfo>();
                this.actor = new Actor<object>(context, new ShimHandler(context, subject, address), this);
            }
        }

        public IObservable<ConnectionInfo> GetConnectionStatusStream()
        {
            return subject.AsObservable();
        }

        public static NetMQHeartBeatClient Instance
        {
            get { return instance; }
        }
    }
}

HeartBeatClient

HeartBeatClient對整個程式公開,它單純的從NetMQHeartBeatClient中包裝了另一個stream,且可能被用在client與server間連線狀態的溝通。重要的部份是當出現錯誤且NetMQHeartBeatClient中的Repeat發生時,HeartBeatClient訂閱會重新建立。這會確保NetMQHeartBeatClient會再次試著和server通訊一次。As before it all comes down to good housekeeping and lifestyle management.

這邊是HeartBeatClient 的程式碼:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Text;
using System.Threading.Tasks;
using Client.Comms;
using Client.Comms.Transport;
using NetMQ;

namespace Client.Comms
{
    public class HeartBeatClient : IHeartBeatClient
    {
        public IObservable<ConnectionInfo> ConnectionStatusStream()
        {
            return Observable.Create<ConnectionInfo>(observer =>
            {
                NetMQHeartBeatClient.Instance.InitialiseComms();

                var disposable = NetMQHeartBeatClient.Instance
                    .GetConnectionStatusStream().Subscribe(observer);

                return new CompositeDisposable { disposable };
            })
            .Publish()
            .RefCount();
        }
    }
}

TickerRepository

TickerRepositoryObservable鏈結上的下一部份, 那它看起來像什麼呢?它實際上是另人訝異的簡單,但不要被愚弄了,它其實做了很多。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Linq;
using Client.Factory;
using Client.Comms;

namespace Client.Repositories
{
    class TickerRepository : ITickerRepository
    {
        private readonly ITickerClient tickerClient;
        private readonly ITickerFactory tickerFactory;

        public TickerRepository(ITickerClient tickerClient, ITickerFactory tickerFactory)
        {
            this.tickerClient = tickerClient;
            this.tickerFactory = tickerFactory;
        }

        public IObservable<Ticker> GetTickerStream()
        {
            return Observable.Defer(() => tickerClient.GetTickerStream())
                .Select(tickerFactory.Create)                
                .Catch<Ticker>(Observable.Empty<Ticker>())
                .Repeat()
                .Publish()
                .RefCount();
        }
    }
}

所以這裡到底做了什麼?

  • 我們使用Observable.Defer,因此實際上我們沒有使用內部的串流,直到某人訂閱了由Observable.Defer所建立的IObservable串流,這是讓Hot串流變成Cold串流的方式。
  • 我們使用Select以將串流資料從TickerDto轉成Ticker
  • 我們使用Catch補捉串流中的任何例外(OnError),且在其中使用預設值
  • 我們使用Repeat,注意這個很很很重要!這讓我們可以重覆整個串流,包含再次連線至server端。This along with the resilient stream logic are the MOST important bits to the app (at least in my opinion)
  • 我們使用Publish來共用內部的串流
  • 我們使用RefCount以在沒有訂閱者時自動disposal

現在我們已經看到了repository,在學習TickerStream的IObservable的旅途中只剩下一個部份了,讓我們來看看吧。

TickersViewModel

TickersViewModel呈現了螢幕上所看到的所有Ticker們,這個viewmodel使用了TickerRepository提供的lazy / repeatable / resilient IObservable,讓我們看看這段我覺得很易讀的程式碼:

using System;
using System.Collections.Generic;
using System.Collections.ObjectModel;
using System.Linq;
using System.Reactive.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Windows.Input;
using Client.Factory;
using Client.Repositories;
using Client.Services;
using Common;
using Common.ViewModels;
using log4net;


namespace Client.ViewModels
{
    public class TickersViewModel : INPCBase
    {
        private readonly ITickerRepository tickerRepository;
        private readonly IConcurrencyService concurrencyService;
        private bool stale = false;
        private static readonly ILog log = LogManager.GetLogger(typeof(TickersViewModel));

        public TickersViewModel(IReactiveTrader reactiveTrader,
                                IConcurrencyService concurrencyService,
            TickerViewModelFactory tickerViewModelFactory)
        {
            Tickers = new ObservableCollection<TickerViewModel>();
            Tickers.Add(tickerViewModelFactory.Create("Yahoo"));
            Tickers.Add(tickerViewModelFactory.Create("Google"));
            Tickers.Add(tickerViewModelFactory.Create("Apple"));
            Tickers.Add(tickerViewModelFactory.Create("Facebook"));
            Tickers.Add(tickerViewModelFactory.Create("Microsoft"));
            Tickers.Add(tickerViewModelFactory.Create("Twitter"));
            this.tickerRepository = reactiveTrader.TickerRepository;
            this.concurrencyService = concurrencyService;
            LoadTrades();
        }

        public ObservableCollection<TickerViewModel> Tickers { get; private set; }

        private void LoadTrades()
        {
            tickerRepository.GetTickerStream()
                            .ObserveOn(concurrencyService.Dispatcher)
                            .SubscribeOn(concurrencyService.TaskPool)
                            .Subscribe(
                                AddTicker,
                                ex => log.Error("An error occurred within the trade stream", ex));
        }

        private void AddTicker(Ticker ticker)
        {
            Tickers.Single(x => x.Name == ticker.Name)
                 .AcceptNewPrice(ticker.Price);
        }
    }
}

而每個Ticker是由單一個TickerViewModel所呈現的,如下程式所示。其中你也可以看到我們之前談論到的ConnectionStatusStream IObservable,這是用來讓TickerViewModel在斷線時顯示一個紅色的”DISCONNECTED“狀態框,稍後會談到這個。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Linq;
using System.Text;
using System.Threading.Tasks;
using Client.Comms.Transport;
using Client.Services;
using log4net;

namespace Client.ViewModels
{
    public class TickerViewModel : INPCBase
    {
        private decimal price;
        private bool isUp;
        private bool stale;
        private bool disconnected;
        private static readonly ILog log = LogManager.GetLogger(typeof(TickerViewModel));


        public TickerViewModel(
            IReactiveTrader reactiveTrader,
            IConcurrencyService concurrencyService,
            string name)
        {

            this.Name = name;

            reactiveTrader.ConnectionStatusStream
                .ObserveOn(concurrencyService.Dispatcher)
                .SubscribeOn(concurrencyService.TaskPool)
                .Subscribe(
                    OnStatusChange,
                    ex => log.Error("An error occurred within the connection status stream.", ex));
        }


        public string Name { get; private set; }


        public void AcceptNewPrice(decimal newPrice)
        {
            IsUp = newPrice > price;
            Price = newPrice;
        }


        public decimal Price
        {
            get { return this.price; }
            private set
            {
                this.price = value;
                base.OnPropertyChanged("Price");
            }
        }

        public bool IsUp
        {
            get { return this.isUp; }
            private set
            {
                this.isUp = value;
                base.OnPropertyChanged("IsUp");
            }
        }

        public bool Stale
        {
            get { return this.stale; }
            set
            {
                this.stale = value;
                base.OnPropertyChanged("Stale");
            }
        } 

        public bool Disconnected
        {
            get { return this.disconnected; }
            set
            {
                this.disconnected = value;
                base.OnPropertyChanged("Disconnected");
            }
        }


        private void OnStatusChange(ConnectionInfo connectionInfo)
        {

            switch (connectionInfo.ConnectionStatus)
            {
                case ConnectionStatus.Connecting:
                    Disconnected = true;
                    break;
                case ConnectionStatus.Connected:
                    Disconnected = false;
                    break;
                case ConnectionStatus.Closed:
                    Disconnected = true;
                    break;
                default:
                    throw new ArgumentOutOfRangeException();
            }
        }

    }
}

可以看到這個ViewModel使用IReactiveTrader.ConnectionStatusStream來監看對NetMQPublisher的連線狀態,這部份的程式職責在顯示價格及斷線的狀態,斷線狀態則是由Disconnected屬性持有。

ConnectivityStatusViewModel

最後一件我想說明的是ConnectionStatusStream如何被使用。當HeartBeatClient推送新值時這個串流的OnNexts會被呼叫,所以我們可以看到像是連線中,已連線、關閉等狀態,所有的變化都是來自我們之前討論的NetMQHeartBeatClient其中的邏輯,並使用標準Rx中的Subject<T>將之轉換成IObservable串流。

如下是整個程式中ConnectivityStatusViewModel所顯示在下方狀態列的資訊。
enter image description here

using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Linq;
using System.Text;
using System.Threading.Tasks;
using Client.Comms.Transport;
using Client.Services;
using log4net;

namespace Client.ViewModels
{
    public class ConnectivityStatusViewModel : INPCBase
    {
        private static readonly ILog log = LogManager.GetLogger(typeof(ConnectivityStatusViewModel));
        private string server;
        private string status;
        private bool disconnected;

        public ConnectivityStatusViewModel(
            IReactiveTrader reactiveTrader,
            IConcurrencyService concurrencyService)
        {
            reactiveTrader.ConnectionStatusStream
                .ObserveOn(concurrencyService.Dispatcher)
                .SubscribeOn(concurrencyService.TaskPool)
                .Subscribe(
                OnStatusChange,
                ex => log.Error("An error occurred within the connection status stream.", ex));
        }


        private void OnStatusChange(ConnectionInfo connectionInfo)
        {
            Server = connectionInfo.Server;

            switch (connectionInfo.ConnectionStatus)
            {
                case ConnectionStatus.Connecting:
                    Status = "Connecting...";
                    Disconnected = true;
                    break;
                case ConnectionStatus.Connected:
                    Status = "Connected";
                    Disconnected = false;
                    break;
                case ConnectionStatus.Closed:
                    Status = "Disconnected";
                    Disconnected = true;
                    break;
                default:
                    throw new ArgumentOutOfRangeException();
            }
        }

        public string Server
        {
            get { return this.server; }
            set
            {
                this.server = value;
                base.OnPropertyChanged("Server");
            }
        }

        public string Status
        {
            get { return this.status; }
            set
            {
                this.status = value;
                base.OnPropertyChanged("Status");
            }
        }

        public bool Disconnected
        {
            get { return this.disconnected; }
            set
            {
                this.disconnected = value;
                base.OnPropertyChanged("Disconnected");
            }
        }
    }
}

That’s It For Now

Anyway that is all I wanted to say for now, I hope you have enjoyed this very mini RX/SignalR/NetMQ series, and that this has maybe made you want to go away and have a play with RX/SignalR/NetMQ. If you have enjoyed it, please feel to free to leave a vote or a comment, Doron and I have worked quite hard on these demos, to try and iron out any bugs and make them as real world as possible, so comments/votes are always nice to receive.

Written with StackEdit.

7.20.2017

Visual studio with Vim

以前寫程式寫到用滑鼠時手腕會痛,怕得到腕隧道症候群,於是下定決心練習左手用滑鼠,以減輕右手手腕的負擔,再換用各式人體工學鍵盤,再然後發覺到另一個很有用的方式,也就是Vim。它讓你不再需要頻繁地讓右手移開至方向鍵或功能鍵,若是再配合有小紅點的IBM鍵盤,你會發現手臂不會那麼累了。

目前幾乎所有主流編輯器都有提供vi的功能,而在visual studio中則是透過VsVim來提供。

最開始是看到如下語法後就決定學習的:

enter image description here
圖片來自Android Studio : Using Vim

此命令是屬於vi中的組合鍵,可以把它記成 change inner word,第一個單字代表你要做的動作,比如說可以使用:

  • c:變更
  • y:複製
  • d:刪除

等指令,而第二個字單字可以表示範圍或目的,如常用的:

  • i:inner
  • t:直到

例如:

  • ci” – 變更目前遊標前後用”符號包含的整個字串
  • ci) – 變更目前遊標前後左右括號包含的整個字串
  • ci] – 變更目前遊標前後左右中括號包含的整個字串

運用vim提供的各種命令,會讓日常寫程式時效率提高不少,當然,要花不少時間去熟悉,但這是非常非常值得的投資。

相關參考:
* Android Studio : Using Vim(簡體中文)
* 給程式設計師的Vim入門圖解說明
* Youtube中文教學
* 大家來學VIM
* 鳥哥的 Linux 私房菜 第九章、vim 程式編輯器

在visual studio中會有一部份的key map衝突,這時候就要看個人的取捨了,另外一個我一定會用的是這個軟體 – SharpKeys。我會用它來把最常用的Caps Lock和Esc鍵調換,這樣在切換模式時就不會因手指不夠長而提起手腕了XD。(CodePlex若是關閉了可至其Github下載。)

Written with StackEdit.

7.18.2017

Wpf 3D - Helix Toolkit - 一個方便的3D展示函式庫

在youtube上看到了一個wpf上的3D控制展示,附source code,下載後發覺Helix toolkit把最麻煩的部份都處理掉了,可以很簡單的控制3d視圖中各部件的作動,看到這個就想到機台控制應該也可以用此方式來模擬,感覺上應該會很有趣XD。

3D

主要的控制程式如下(有稍微整理過):

public partial class MainWindow : Window
    {
        //provides functionality to 3d models
        Model3DGroup RA = new Model3DGroup();
        Model3D link1 = null;
        Model3D link2 = null;
        Model3D link3 = null;
        Model3D link4 = null;
        Model3D link5 = null;

        //provides render to model3d objects
        ModelVisual3D RoboticArm = new ModelVisual3D();

        //directroy of all stl files
        private const string MODEL_PATH1 = "j0_j1_link.stl";
        private const string MODEL_PATH2 = "j1_j2_link.stl";
        private const string MODEL_PATH3 = "j2_j3_link.stl";
        private const string MODEL_PATH4 = "j3_j4_link.stl";
        private const string MODEL_PATH5 = "j4_j5_link.stl";

        RotateTransform3D R = new RotateTransform3D();
        TranslateTransform3D T = new TranslateTransform3D();

        public MainWindow()
        {
            InitializeComponent();
            RoboticArm.Content = Initialize_Environment(MODEL_PATH1, MODEL_PATH2,MODEL_PATH3, MODEL_PATH4, MODEL_PATH5);
            viewPort3d.Children.Add(RoboticArm);
        }

        private Model3DGroup Initialize_Environment(string model1, string model2, string model3, string model4, string model5)
        {
            try
            {
                viewPort3d.RotateGesture = new MouseGesture(MouseAction.LeftClick);
                ModelImporter import = new ModelImporter();
                link1 = import.Load(model1);
                link2 = import.Load(model2);
                link3 = import.Load(model3);
                link4 = import.Load(model4);
                link5 = import.Load(model5);

                execute_fk();

                RA.Children.Add(link1);
                RA.Children.Add(link2);
                RA.Children.Add(link3);
                RA.Children.Add(link4);
                RA.Children.Add(link5);
            }
            catch (Exception e)
            {
                MessageBox.Show("Exception Error:" + e.StackTrace);
            }
            return RA;
        }

        private void joint1_ValueChanged(object sender, RoutedPropertyChangedEventArgs<double> e)
        {
          execute_fk();
        }        

        private void execute_fk()
        {
            var F1 = new Transform3DGroup();
            var F2 = new Transform3DGroup();
            var F3 = new Transform3DGroup();
            var F4 = new Transform3DGroup();
            var F5 = new Transform3DGroup();

            var p3d = new Point3D(0, 0, 0);
            R = new RotateTransform3D(new AxisAngleRotation3D(new Vector3D(0, 0, 1), joint1.Value), p3d);
            // F1為基座,僅能旋轉,目前角度為joint1.Value指定
            F1.Children.Add(R);

            p3d = new Point3D(0, 0, 9.5);
            T = new TranslateTransform3D(p3d.ToVector3D());
            // F2可旋轉,目前角度為joint2.Value
            R = new RotateTransform3D(new AxisAngleRotation3D(new Vector3D(0, 1, 0), joint2.Value), p3d);
            // F2連接至F1,位置在其Z軸方向9.5
            F2.Children.Add(F1);
            F2.Children.Add(T);
            F2.Children.Add(R);

            p3d = new Point3D(15, 0, 0);            
            T = new TranslateTransform3D(p3d.ToVector3D());
            R = new RotateTransform3D(new AxisAngleRotation3D(new Vector3D(0, 1, 0), joint3.Value), p3d);
            F3.Children.Add(T);
            F3.Children.Add(R);
            F3.Children.Add(F2);

            p3d = new Point3D(6.7, 0, 0);
            T = new TranslateTransform3D(p3d.ToVector3D());
            R = new RotateTransform3D(new AxisAngleRotation3D(new Vector3D(0, 1, 0), joint4.Value), p3d);
            F4.Children.Add(T);
            F4.Children.Add(R);
            F4.Children.Add(F3);

            p3d = new Point3D(5.35, 0, 0);
            T = new TranslateTransform3D(p3d.ToVector3D());
            R = new RotateTransform3D(new AxisAngleRotation3D(new Vector3D(1, 0, 0), joint5.Value), p3d);
            F5.Children.Add(T);
            F5.Children.Add(R);
            F5.Children.Add(F4);

            link1.Transform = F1;
            link2.Transform = F2;
            link3.Transform = F3;
            link4.Transform = F4;
            link5.Transform = F5;
        }
    }

程式中手臂主要是由五個副檔名為.stl的機構組成,每個.stl檔對映到一個Model3D物件,在每個Model3D物件中設定其旋轉(RotateTransform3D)及位置(TranslateTransform3D),最後所有的Model3D物件再組合成Model3DGroup - RA,再由ModelVisual3D RoboticArm負責Render,最後放到HelixViewport3D中展示。

作者也將此程式另行擴充成一個更完整的可編輯Frame來形成動畫的軟體,如下所示:

Robotic Arm Windows Application

Written with StackEdit.