最近發現一個Rx的應用 – Obvs函式庫,作者用Rx的方式包裝了一些訊息傳輸用的函式庫,目前提供的擴充有:
- Transports: ActiveMQ / RabbitMQ / NetMQ / AzureServiceBus / Kafka / EventStore
 - Serialization: XML / JSON.Net / NetJson / ProtoBuf / MsgPack
 - Logging: NLog / log4net
 - Monitoring: Performance Counters / ElasticSearch
 - Integrations: Slack
 
Rx的訂閱特性非常適合用在訊息系統上,而作者進一步的包裝了不同的訊息函式庫,對於一般的需求來說應該是夠用了。
範例
P.S 以下說明來自專案github
定義預提供服務的訊息介面:
public interface IMyServiceMessage : IMessage { }
可建立不同需求的訊息類型:
public class MyCommand : IMyServiceMessage, ICommand { }
public class MyEvent : IMyServiceMessage, IEvent { }
public class MyRequest: IMyServiceMessage, IRequest { }
public class MyResponse : IMyServiceMessage, IResponse { }
建立訊息服務:
IServiceBus serviceBus = ServiceBus.Configure()
    .WithActiveMQEndpoints<IMyServiceMessage>()
        .Named("MyService")
        .UsingQueueFor<ICommand>()
        .ConnectToBroker("tcp://localhost:61616")
        .SerializedAsJson()
        .AsClientAndServer()
    .Create();
傳送命令:
serviceBus.Commands.Subscribe(c => Console.WriteLine("Received a command!"));
await serviceBus.SendAsync(new MyCommand());
發佈事件:
serviceBus.Events.Subscribe(e => Console.WriteLine("Received an event!"));
await serviceBus.PublishAsync(new MyEvent());
請求/回應:
serviceBus.Requests
      .OfType<MyRequest>()
      .Subscribe(request => serviceBus.ReplyAsync(request, new MyResponse()));
serviceBus.GetResponses(new MyRequest())
      .OfType<MyResponse>()
      .Take(1)
      .Timeout(TimeSpan.FromSeconds(1))
      .Subscribe(r => Console.WriteLine("Received a response!"), err => Console.WriteLine("Oh no!"));
定義自訂的端點以包裝函式呼叫或和其它系統整合:
public class MyCustomEndpoint : IServiceEndpointClient
{
    Type _serviceType = typeof(IMyCustomServiceMessage);
    public IObservable<IEvent> Events
    {
            get
            {
                // subscribe to external MQ broker
            }
    }
    public Task SendAsync(ICommand command)
    {
            // call external API
    }
    public IObservable<IResponse> GetResponses(IRequest request)
    {
            // call external API and wrap response in observable
    }
    public bool CanHandle(IMessage message)
    {
            return _serviceType.IsInstanceOfType(message);
    }
}
...
IServiceBus serviceBus = ServiceBus.Configure()
    .WithActiveMQEndpoints<IMyServiceMessage>()
        .Named("MyService")
        .UsingQueueFor<ICommand>()
        .ConnectToBroker("tcp://localhost:61616")
        .SerializedAsJson()
        .AsClientAndServer()
    .WithEndpoints(new MyCustomEndpoint())
    .Create();
在各個mvvm framework中,都不約而同的提供了集中式的訊息架構,以利各個viewmodel及view之間的溝通,如Prism的Event Aggregator、MvvmLight的Messenger、及ReactiveUI中的MessageBus,在使用上述這些framework時也很適合使用此 ServiceBus 替換,減少學習不同功能類似的工具的時間花費。
Written with StackEdit.
沒有留言:
張貼留言