5.18.2017

Observable microservice bus

最近發現一個Rx的應用 – Obvs函式庫,作者用Rx的方式包裝了一些訊息傳輸用的函式庫,目前提供的擴充有:

  • Transports: ActiveMQ / RabbitMQ / NetMQ / AzureServiceBus / Kafka / EventStore
  • Serialization: XML / JSON.Net / NetJson / ProtoBuf / MsgPack
  • Logging: NLog / log4net
  • Monitoring: Performance Counters / ElasticSearch
  • Integrations: Slack

Rx的訂閱特性非常適合用在訊息系統上,而作者進一步的包裝了不同的訊息函式庫,對於一般的需求來說應該是夠用了。

範例

P.S 以下說明來自專案github

定義預提供服務的訊息介面:

public interface IMyServiceMessage : IMessage { }

可建立不同需求的訊息類型:

public class MyCommand : IMyServiceMessage, ICommand { }

public class MyEvent : IMyServiceMessage, IEvent { }

public class MyRequest: IMyServiceMessage, IRequest { }

public class MyResponse : IMyServiceMessage, IResponse { }

建立訊息服務:

IServiceBus serviceBus = ServiceBus.Configure()
    .WithActiveMQEndpoints<IMyServiceMessage>()
        .Named("MyService")
        .UsingQueueFor<ICommand>()
        .ConnectToBroker("tcp://localhost:61616")
        .SerializedAsJson()
        .AsClientAndServer()
    .Create();

傳送命令:

serviceBus.Commands.Subscribe(c => Console.WriteLine("Received a command!"));
await serviceBus.SendAsync(new MyCommand());

發佈事件:

serviceBus.Events.Subscribe(e => Console.WriteLine("Received an event!"));
await serviceBus.PublishAsync(new MyEvent());

請求/回應:

serviceBus.Requests
      .OfType<MyRequest>()
      .Subscribe(request => serviceBus.ReplyAsync(request, new MyResponse()));

serviceBus.GetResponses(new MyRequest())
      .OfType<MyResponse>()
      .Take(1)
      .Timeout(TimeSpan.FromSeconds(1))
      .Subscribe(r => Console.WriteLine("Received a response!"), err => Console.WriteLine("Oh no!"));

定義自訂的端點以包裝函式呼叫或和其它系統整合:

public class MyCustomEndpoint : IServiceEndpointClient
{
    Type _serviceType = typeof(IMyCustomServiceMessage);

    public IObservable<IEvent> Events
    {
            get
            {
                // subscribe to external MQ broker
            }
    }

    public Task SendAsync(ICommand command)
    {
            // call external API
    }

    public IObservable<IResponse> GetResponses(IRequest request)
    {
            // call external API and wrap response in observable
    }

    public bool CanHandle(IMessage message)
    {
            return _serviceType.IsInstanceOfType(message);
    }
}

...

IServiceBus serviceBus = ServiceBus.Configure()
    .WithActiveMQEndpoints<IMyServiceMessage>()
        .Named("MyService")
        .UsingQueueFor<ICommand>()
        .ConnectToBroker("tcp://localhost:61616")
        .SerializedAsJson()
        .AsClientAndServer()
    .WithEndpoints(new MyCustomEndpoint())
    .Create();

在各個mvvm framework中,都不約而同的提供了集中式的訊息架構,以利各個viewmodel及view之間的溝通,如Prism的Event Aggregator、MvvmLight的Messenger、及ReactiveUI中的MessageBus,在使用上述這些framework時也很適合使用此 ServiceBus 替換,減少學習不同功能類似的工具的時間花費。

Written with StackEdit.