最近發現一個Rx的應用 – Obvs函式庫,作者用Rx的方式包裝了一些訊息傳輸用的函式庫,目前提供的擴充有:
- Transports: ActiveMQ / RabbitMQ / NetMQ / AzureServiceBus / Kafka / EventStore
- Serialization: XML / JSON.Net / NetJson / ProtoBuf / MsgPack
- Logging: NLog / log4net
- Monitoring: Performance Counters / ElasticSearch
- Integrations: Slack
Rx的訂閱特性非常適合用在訊息系統上,而作者進一步的包裝了不同的訊息函式庫,對於一般的需求來說應該是夠用了。
範例
P.S 以下說明來自專案github
定義預提供服務的訊息介面:
public interface IMyServiceMessage : IMessage { }
可建立不同需求的訊息類型:
public class MyCommand : IMyServiceMessage, ICommand { }
public class MyEvent : IMyServiceMessage, IEvent { }
public class MyRequest: IMyServiceMessage, IRequest { }
public class MyResponse : IMyServiceMessage, IResponse { }
建立訊息服務:
IServiceBus serviceBus = ServiceBus.Configure()
.WithActiveMQEndpoints<IMyServiceMessage>()
.Named("MyService")
.UsingQueueFor<ICommand>()
.ConnectToBroker("tcp://localhost:61616")
.SerializedAsJson()
.AsClientAndServer()
.Create();
傳送命令:
serviceBus.Commands.Subscribe(c => Console.WriteLine("Received a command!"));
await serviceBus.SendAsync(new MyCommand());
發佈事件:
serviceBus.Events.Subscribe(e => Console.WriteLine("Received an event!"));
await serviceBus.PublishAsync(new MyEvent());
請求/回應:
serviceBus.Requests
.OfType<MyRequest>()
.Subscribe(request => serviceBus.ReplyAsync(request, new MyResponse()));
serviceBus.GetResponses(new MyRequest())
.OfType<MyResponse>()
.Take(1)
.Timeout(TimeSpan.FromSeconds(1))
.Subscribe(r => Console.WriteLine("Received a response!"), err => Console.WriteLine("Oh no!"));
定義自訂的端點以包裝函式呼叫或和其它系統整合:
public class MyCustomEndpoint : IServiceEndpointClient
{
Type _serviceType = typeof(IMyCustomServiceMessage);
public IObservable<IEvent> Events
{
get
{
// subscribe to external MQ broker
}
}
public Task SendAsync(ICommand command)
{
// call external API
}
public IObservable<IResponse> GetResponses(IRequest request)
{
// call external API and wrap response in observable
}
public bool CanHandle(IMessage message)
{
return _serviceType.IsInstanceOfType(message);
}
}
...
IServiceBus serviceBus = ServiceBus.Configure()
.WithActiveMQEndpoints<IMyServiceMessage>()
.Named("MyService")
.UsingQueueFor<ICommand>()
.ConnectToBroker("tcp://localhost:61616")
.SerializedAsJson()
.AsClientAndServer()
.WithEndpoints(new MyCustomEndpoint())
.Create();
在各個mvvm framework中,都不約而同的提供了集中式的訊息架構,以利各個viewmodel及view之間的溝通,如Prism的Event Aggregator、MvvmLight的Messenger、及ReactiveUI中的MessageBus,在使用上述這些framework時也很適合使用此 ServiceBus 替換,減少學習不同功能類似的工具的時間花費。
Written with StackEdit.