相對於PLC或SoftPLC開發,故障處理或重試等都是在同一個context中;在Windows事件驅動環境下就要多花一些手腳去實作類似功能,因此也發現了這一套函式庫,以下介紹大多來自其Github說明,可以把它當作是簡略的翻譯。
Polly 是 .Net 下的一套暫態故障處理及恢復的函式庫,可讓開發者以fluent及執行緒安全的方式來應用諸如Retry、Circuit Breaker、Timeout、Bulkhead Isolation及Fallback等策略。
基本用法
// Execute an action
var policy = Policy
.Handle<DivideByZeroException>()
.Retry();
policy.Execute(() => DoSomething());
應用 – 錯誤(故障)處理策略
此策略處理執行的委託中所丟出的特定例外或是回傳值。
Step1:指定此策略想要處理的例外/故障
適用在:Retry、CircuitBreaker和Fallback
// 處理指定例外
Policy
.Handle<DivideByZeroException>()
// 處理有條件的指定例外
Policy
.Handle<SqlException>(ex => ex.Number == 1205)
// 處理多種例外
Policy
.Handle<DivideByZeroException>()
.Or<ArgumentException>()
// 處理多種有條件的例外
Policy
.Handle<SqlException>(ex => ex.Number == 1205)
.Or<ArgumentException>(ex => ex.ParamName == "example")
Step2:指定策略如何處理故障
重試
// 重試一次
Policy
.Handle<DivideByZeroException>()
.Retry()
// 重試數次
Policy
.Handle<DivideByZeroException>()
.Retry(3)
// 重試數次,在每次重試時執行一指定Action,並代入例外及重試計數
Policy
.Handle<DivideByZeroException>()
.Retry(3, (exception, retryCount) =>
{
// do something
});
// 重試數次,在每次重試時執行一指定Action,並代入例外、重試計數
// 及context給Execute()
Policy
.Handle<DivideByZeroException>()
.Retry(3, (exception, retryCount, context) =>
{
// do something
});
永遠重試(直到成功)
// 永遠重試
Policy
.Handle<DivideByZeroException>()
.RetryForever()
// 永遠重試,在每次重試時執行一指定Action,並代入例外
Policy
.Handle<DivideByZeroException>()
.RetryForever(exception =>
{
// do something
});
// 永遠重試,在每次重試時執行一指定Action,並代入例外及context給Execute()
Policy
.Handle<DivideByZeroException>()
.RetryForever((exception, context) =>
{
// do something
});
等待並重試
// 重試,在每次重試前等待特定時間間隔
Policy
.Handle<DivideByZeroException>()
.WaitAndRetry(new[]
{
TimeSpan.FromSeconds(1),
TimeSpan.FromSeconds(2),
TimeSpan.FromSeconds(3)
});
// 重試,在每次重試前等待特定時間間隔,重試時並執行一指定Action,並代入例外及
// 時間間隔參數
Policy
.Handle<DivideByZeroException>()
.WaitAndRetry(new[]
{
TimeSpan.FromSeconds(1),
TimeSpan.FromSeconds(2),
TimeSpan.FromSeconds(3)
}, (exception, timeSpan) => {
// do something
});
// Retry, waiting a specified duration between each retry,
// calling an action on each retry with the current exception,
// duration and context provided to Execute()
// 重試,在每次重試前等待特定時間間隔,重試時並執行一指定Action,並代入例外、
// 時間間隔及context參數給Execute()
Policy
.Handle<DivideByZeroException>()
.WaitAndRetry(new[]
{
TimeSpan.FromSeconds(1),
TimeSpan.FromSeconds(2),
TimeSpan.FromSeconds(3)
}, (exception, timeSpan, context) => {
// do something
});
// 重試,在每次重試前等待特定時間間隔,重試時並執行一指定Action,且代入例外、
// 時間間隔、重試計數及context參數給Execute()
Policy
.Handle<DivideByZeroException>()
.WaitAndRetry(new[]
{
TimeSpan.FromSeconds(1),
TimeSpan.FromSeconds(2),
TimeSpan.FromSeconds(3)
}, (exception, timeSpan, retryCount, context) => {
// do something
});
// 重試指定次數,並使用函式依重試次數計數每次重試的間隔時間(可指數backoff)
// 此例中會等待如下時間
// 2 ^ 1 = 2 seconds then
// 2 ^ 2 = 4 seconds then
// 2 ^ 3 = 8 seconds then
// 2 ^ 4 = 16 seconds then
// 2 ^ 5 = 32 seconds
Policy
.Handle<DivideByZeroException>()
.WaitAndRetry(5, retryAttempt =>
TimeSpan.FromSeconds(Math.Pow(2, retryAttempt))
);
// 重試指定次數,並使用函式依重試次數計數每次重試的間隔時間,重試
// 時執行一指定Action,且代入例外、時間間隔及context參數給Execute()
Policy
.Handle<DivideByZeroException>()
.WaitAndRetry(
5,
retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)),
(exception, timeSpan, context) => {
// do something
}
);
// 重試指定次數,並使用函式依重試次數計數每次重試的間隔時間,重試
// 時執行一指定Action,且代入例外、時間間隔、重試計數及context參數給Execute()
Policy
.Handle<DivideByZeroException>()
.WaitAndRetry(
5,
retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)),
(exception, timeSpan, retryCount, context) => {
// do something
}
);
等待並永遠重試(直到成功)
// 等待並永遠重試
Policy
.Handle<DivideByZeroException>()
.WaitAndRetryForever(retryAttempt =>
TimeSpan.FromSeconds(Math.Pow(2, retryAttempt))
);
// 等待並永遠重試,重試時並執行一指定的Action,且代入例外及等待時間
Policy
.Handle<DivideByZeroException>()
.WaitAndRetryForever(
retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)),
(exception, timespan) =>
{
// do something
});
// 等待並永遠重試,重試時並執行一指定的Action,且代入例外、等待時間
// 及context給Execute()
Policy
.Handle<DivideByZeroException>()
.WaitAndRetryForever(
retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)),
(exception, timespan, context) =>
{
// do something
});
Circuit Breaker
// 在指定例外連續發生特定次數後斷開迴路並保持指定時間
Policy
.Handle<DivideByZeroException>()
.CircuitBreaker(2, TimeSpan.FromMinutes(1));
// 在指定例外連續發生特定次數後斷開迴路並保持指定時間
// 且在迴路狀態改變時執行一Action
Action<Exception, TimeSpan> onBreak = (exception, timespan) => { ... };
Action onReset = () => { ... };
CircuitBreakerPolicy breaker = Policy
.Handle<DivideByZeroException>()
.CircuitBreaker(2, TimeSpan.FromMinutes(1), onBreak, onReset);
// 在指定例外連續發生特定次數後斷開迴路並保持指定時間
// 且在迴路狀態改變時執行一Action,並代入一context參數給Execute()
Action<Exception, TimeSpan, Context> onBreak = (exception, timespan, context) => { ... };
Action<Context> onReset = context => { ... };
CircuitBreakerPolicy breaker = Policy
.Handle<DivideByZeroException>()
.CircuitBreaker(2, TimeSpan.FromMinutes(1), onBreak, onReset);
// Monitor the circuit state, for example for health reporting.
CircuitState state = breaker.CircuitState;
/*
CircuitState.Closed - 常態,可執行actions。
CircuitState.Open - 自動控制器已斷開電路,不允許執行actions。
CircuitState.HalfOpen - 在自動斷路時間到時,從斷開的狀態復原。可執行actions,接續的action/s或控制的完成,會讓狀態轉至Open或Closed。
CircuitState.Isolated - 在電路開路的狀態時手動hold住,不允許執行actions。
*/
// 手動打開(且保持)一個斷路器–例如手動隔離downstream的服務
breaker.Isolate();
// 重置一個斷路器回closed的狀態,可再次接受actions的執行
breaker.Reset();
Fallback
// 如果執行失敗,提供一個替代值
Policy
.Handle<Whatever>()
.Fallback<UserAvatar>(UserAvatar.Blank)
// 如果執行失敗,指定一函式以提供替代值
Policy
.Handle<Whatever>()
.Fallback<UserAvatar>(() => UserAvatar.GetRandomAvatar()) // where: public UserAvatar GetRandomAvatar() { ... }
// 如果執行失敗,指定一替代值或func,且呼叫一Action(例如logging)
Policy
.Handle<Whatever>()
.Fallback<UserAvatar>(UserAvatar.Blank, onFallback: (exception, context) =>
{
// do something
});
Step 3:執行策略
// 執行一個Action
var policy = Policy
.Handle<DivideByZeroException>()
.Retry();
policy.Execute(() => DoSomething());
// 執行一個Action且代入任意數目的context data
var policy = Policy
.Handle<DivideByZeroException>()
.Retry(3, (exception, retryCount, context) =>
{
var methodThatRaisedException = context["methodName"];
Log(exception, methodThatRaisedException);
});
policy.Execute(
() => DoSomething(),
new Dictionary<string, object>() {{ "methodName", "some method" }}
);
// 執行一個會回傳值的Action
var policy = Policy
.Handle<DivideByZeroException>()
.Retry();
var result = policy.Execute(() => DoSomething());
// 執行一個會回傳值的Action,且代入任意數目的context data
var policy = Policy
.Handle<DivideByZeroException>()
.Retry(3, (exception, retryCount, context) =>
{
object methodThatRaisedException = context["methodName"];
Log(exception, methodThatRaisedException)
});
var result = policy.Execute(
() => DoSomething(),
new Dictionary<string, object>() {{ "methodName", "some method" }}
);
// 當然你可以把它們都放在一起
Policy
.Handle<SqlException>(ex => ex.Number == 1205)
.Or<ArgumentException>(ex => ex.ParamName == "example")
.Retry()
.Execute(() => DoSomething());
以上為了簡單起見我們把策略的定義跟執行放在一起,當然我們可以分開它們,例如在程式開始時定義策略,然後透過DI在需要時注入。
應用 – 一般化的彈性策略
The general resilience policies add resilience strategies that are not explicitly centred around handling faults which delegates may throw or return.
Step 1:設定
Timeout
// 如果委託的執行超過30秒,則逾時,且回到呼叫者
// 最佳化逾時行為:委託應代入一CancellationToken
Policy
.Timeout(30)
// 用timespan當作timeout
Policy
.Timeout(TimeSpan.FromMilliseconds(2500))
// 透過一個func provider設定一逾時時間
Policy
.Timeout(() => myTimeoutProvider)) // Func<TimeSpan> myTimeoutProvider
// Timeout after 30 seconds, if the executed delegate has not completed. Enforces a timeout on delegates which have no in-built timeout and do not honour CancellationToken, at the expense (in synchronous executions) of an extra thread.
// (for more detail, see deep documentation)
Policy
.Timeout(30, TimeoutStrategy.Pessimistic)
// 逾時時呼叫一Action
Policy
.Timeout(30, onTimeout: (context, timespan, task) =>
{
// do something
});
// 例:逾時時記錄下來
Policy
.Timeout(30, onTimeout: (context, timespan, task) =>
{
logger.Warn($"{context.PolicyKey} at {context.ExecutionKey}: execution timed out after {timespan.TotalSeconds} seconds.");
});
// 例:逾時時,記錄下task中發生的任何例外
Policy
.Timeout(30, onTimeout: (context, timespan, task) =>
{
task.ContinueWith(t => {
if (t.IsFaulted) logger.Error($"{context.PolicyKey} at {context.ExecutionKey}: execution timed out after {timespan.TotalSeconds} seconds, with: {t.Exception}.");
});
});
Bulkhead
// 透過策略限制同時只能有最多12個action被執行
Policy
.Bulkhead(12)
// Restrict executions through the policy to a maximum of twelve concurrent actions,
// with up to two actions waiting for an execution slot in the bulkhead if all slots are taken.
Policy
.Bulkhead(12, 2)
// Restrict concurrent executions, calling an action if an execution is rejected
Policy
.Bulkhead(12, context =>
{
// do something
});
// Monitor the bulkhead available capacity, for example for health/load reporting.
var bulkhead = Policy.Bulkhead(12, 2);
// ...
int freeExecutionSlots = bulkhead.BulkheadAvailableCount;
int freeQueueSlots = bulkhead.QueueAvailableCount;
Cache
The Cache policy is targeting an upcoming Polly v5.x version. Check out www.thepollyproject.org for updates.
PolicyWrap
// 從現有的策略組合成一新策略
var policyWrap = Policy
.Wrap(fallback, cache, retry, breaker, timeout, bulkhead);
// (wraps the policies around any executed delegate: fallback outermost ... bulkhead innermost)
policyWrap.Execute(...)
// 定義標準 resilience 策略
PolicyWrap commonResilience = Policy.Wrap(retry, breaker, timeout);
// 然後wrap其它策略至一個call site
Avatar avatar = Policy
.Handle<Whatever>()
.Fallback<Avatar>(Avatar.Blank)
.Wrap(commonResilience)
.Execute(() => { /* get avatar */ });
// 共用commonResilience,但在其它的call site 中 wrap不同的策略
Reputation reps = Policy
.Handle<Whatever>()
.Fallback<Reputation>(Reputation.NotAvailable)
.Wrap(commonResilience)
.Execute(() => { /* get reputation */ });
NoOp
// 定義一個單純的僅需傳入一委託以執行的策略
// 在單元測試中很有用,或應用在當你的程式架構中需要一個policy,但你只想
// 傳入一可執行的委託而不想用策略
NoOpPolicy noOp = Policy.NoOp();
Step 2:執行策略
同之前所述。
Post-execution:獲取結果,或最後的例外
使用ExecuteAndCapture(...)
函式,你可以獲取策略執行的結果。
var policyResult = Policy
.Handle<DivideByZeroException>()
.Retry()
.ExecuteAndCapture(() => DoSomething());
/*
policyResult.Outcome - 執行成功或失敗
policyResult.FinalException - 獲取到的最後的例外,如果成功則為null
policyResult.ExceptionType - 例外類型,可能為原策略中定義的例外(如:DivideByZeroException)或未處理的例外(如:Exception),如果成功則為null
policyResult.Result - if executing a func, the result if the call succeeded or the type's default value
*/
Handling return values, and Policy<TResult>
// 在同一個策略中處理例外及回傳值
HttpStatusCode[] httpStatusCodesWorthRetrying = {
HttpStatusCode.RequestTimeout, // 408
HttpStatusCode.InternalServerError, // 500
HttpStatusCode.BadGateway, // 502
HttpStatusCode.ServiceUnavailable, // 503
HttpStatusCode.GatewayTimeout // 504
};
HttpResponseMessage result = Policy
.Handle<HttpResponseException>()
.OrResult<HttpResponseMessage>(r => httpStatusCodesWorthRetrying.Contains(r.StatusCode))
.Retry(...)
.Execute( /* some Func<HttpResponseMessage> */ )
Strongly-typed Policy<TResult>
用.HandleResult<TResult>(...)
或.OrResult<TResult>(...)
來設定一個策略會得到一個特定策略的強型別Policy<TResult>
,如Retry<TResult>
、AdvancedCircuitBreaker<TResult>
。
這些策略必需用來執行會回傳TResult
的委託,例如:
* Execute(Func<TResult>)
(和其相關覆載)
* ExecuteAsync(Func<CancellationToken, Task<TResult>>)
(和其相關覆載)
ExecuteAndCapture()
.ExecuteAndCapture(...)
在非泛型策略中會回傳帶屬性的一個PolicyResult
。
- policyResult.Outcome - 執行成功或失敗
- policyResult.FinalException - 獲取到的最後的例外,如果成功則為null
- policyResult.ExceptionType - 例外類型,可能為原策略中定義的例外(如:DivideByZeroException)或未處理的例外(如:Exception),如果成功則為null
- policyResult.Result - if executing a func, the result if the call succeeded or the type’s default value
.ExecuteAndCapture<TResult>(Func<TResult>)
在強型別的策略中增加了兩個屬性
* policyResult.FaultType - 策略中最後處理的錯誤是例外或是回傳值?如果委託執行成功則為null。
* policyResult.FinalHandledResult - 處理的最後結果;如果執行成功會是null,或是型別的預設值。
State-change delegates on Policy<TResult>
policies
在非泛型策略中我們僅處理例外及如onRetry
、onBreak
等需代入一個例外當作參數的狀態變更委託。
而泛型策略中處理了TResult
回傳值,而狀態委託仍是一樣的,除了它需代入一個DelegateResult<TResult>
而不是一個例外。
DelegateResult<TResult>
有兩個屬性:
* Exception // 如果策略正在處理例外,則丟出例外(否則為null)
* Result // 如果策略正在處理結果,則回傳TResult(否則為default(TResult))
BrokenCircuitException<TResult>
非泛型化的CircuitBreaker策丟在迴路斷開時丟出一個BrokenCircuitEXception
例外,此例外包含了最後一個例外(導致迴路斷開的外外)做為其InnerException
。
對BrokenCircuitException<TResult>
策略來說:
* 由於例外導致的斷路引發了一個BrokenCircuitException,其中InnerException設置為觸發斷路的例外(如前所述)。
* A circuit broken due to handling a result throws a BrokenCircuitException with the Result property set to the result which caused the circuit to break.
Policy Keys and Context data
// 使用WithPolicyKey()擴充函式可用PolicKey來識別策略
// (for example, for correlation in logs or metrics)
var policy = Policy
.Handle<DataAccessException>()
.Retry(3, onRetry: (exception, retryCount, context) =>
{
logger.Error($"Retry {retryCount} of {context.PolicyKey} at {context.ExecutionKey}, due to: {exception}.");
})
.WithPolicyKey("MyDataAccessPolicy");
// 傳入一個Context後可用ExecutiveKey來識別呼叫者
var customerDetails = policy.Execute(myDelegate, new Context("GetCustomerDetails"));
// "MyDataAccessPolicy" -> context.PolicyKey
// "GetCustomerDetails -> context.ExecutionKey
// 從call site傳外加的自訂訊息至執行的context中
var policy = Policy
.Handle<DataAccessException>()
.Retry(3, onRetry: (exception, retryCount, context) =>
{
logger.Error($"Retry {retryCount} of {context.PolicyKey} at {context.ExecutionKey}, getting {context["Type"]} of id {context["Id"]}, due to: {exception}.");
})
.WithPolicyKey("MyDataAccessPolicy");
int id = ... // customer id from somewhere
var customerDetails = policy.Execute(() => GetCustomer(id),
new Context("GetCustomerDetails", new Dictionary<string, object>() {{"Type","Customer"},{"Id",id}}
));
PolicyRegistry
// 建立一個策略註冊(例如在程式開始階段)
PolicyRegistry registry = new PolicyRegistry();
// 加入策略
registry.Add("StandardHttpResilience", myStandardHttpResiliencePolicy);
// 或用:
registry["StandardHttpResilience"] = myStandardHttpResiliencePolicy;
// DI注入
public class MyServiceGateway
{
public void MyServiceGateway(..., IPolicyRegistry<string> registry, ...)
{
...
}
}
// (Or if you prefer ambient-context pattern, use a thread-safe singleton)
// Use a policy from the registry
registry.Get<IAsyncPolicy<HttpResponseMessage>>("StandardHttpResilience")
.ExecuteAsync<HttpResponseMessage>(...)
PolicyRegistry
有類似字典的語法,例如.ContainsKy(...)
、TryGet<TPolicy>(...)
、.Count
、’Clear()
和Remove(...)
等。
Interfaces
Polly v5.2.0 增加了介面以支援PolicyRegistry
,且以Interface segregation principle的原則來群組化不同功能的策略,此介面不適用於撰寫自訂的策略實作。
Execution interfaces: ISyncPolicy
等
執行介面 ISyncPolicy
、IAsyncPolicy
、ISyncPolicy<TResult>
及IAsyncPolicy<TResult>
分別定義了對sync/async、非泛型/泛型呼叫等執行的覆載。
Policy-kind interfaces: ICircuitBreakerPolicy
等
與執行介面交集,特定策略類型的介面定義了該類型策略的常用屬性和方法。
For example, ICircuitBreakerPolicy defines
例如,ICircuitBreakerPolicy
定義了:
* CircuitState CircuitState
* Exception LastException
* void Isolate()
* void Reset()
而ICircuitBreakerPolicy<TResult> : ICircuitBreakerPolicy
增加了:
* TResult LastHandledResult
這讓我們可把類似策略當成同一個,例如這裡描述的監看所有的迴路斷路器。
執行緒安全
所有的Polly策略都是執行緒安全的,你可以安女的在不同的來源復用策略,或在不同的執要緒上併行地執行策略。
雖然策略內部的操作是執行緒安全的,但這不表示其中執行的你的委託也會是;如果它們本身不是,那最終也不會是執行緒安全的。
非同步支援
Polly使用以下函式提供完整的非同步操作:
* RetryAsync
* WaitAndRetryAsync
* CircuitBreakerAsync
* (etc)
* ExecutiveAsync
* ExecuteAndCaptureAsync
它們相對應的同步函式:
* Retry
* WaitAndRetry
* CircuitBreaker
* (etc)
* Execute
* ExecuteAndCapture
所有的策略都存在非同步的覆載及所有的Execute()
和ExecuteAndCapture()
覆載。
使用範例:
await Policy
.Handle<SqlException>(ex => ex.Number == 1205)
.Or<ArgumentException>(ex => ex.ParamName == "example")
.RetryAsync()
.ExecuteAsync(() => DoSomethingAsync());
SynchronizationContext
Async continuations and retries by default do not run on a captured synchronization context. To change this, use .ExecuteAsync(…) overloads taking a boolean continueOnCapturedContext parameter.
Cancellation的支援
非同步策略執行透過在.ExecutiveAsync(...)
代入CancellationToken
來支援Cancellation。
代入.ExecutiveAsync(...)
的CancellationToken
有三個作用:
* 它會取消如目前的重試、重試間的等待或等待bulkhead execution slot等策略動作
* 策略會把它當作CancellationToken
參數傳入任何其執行的委託,以在委 執行時仍支援取消
* In common with the Base Class Library implementation in Task.Run(…) and elsewhere, if the cancellation token is cancelled before execution begins, the user delegate is not executed at all.
// 從一個uri中重試數次,可在任意時間中斷
CancellationToken cancellationToken = // ...
var policy = Policy
.Handle<WebException>()
.Or<HttpRequestException>()
.WaitAndRetryAsync(new[] {
TimeSpan.FromSeconds(1),
TimeSpan.FromSeconds(2),
TimeSpan.FromSeconds(4)
});
var response = await policy.ExecuteAsync(ct => httpClient.GetAsync(uri, ct), cancellationToken);
題外話
Rx 中也可以實作出類似Retry的效果,在SO中的實作如下:
// 擴充方法
public static IObservable<T> RetryWithBackoffStrategy<T>(
this IObservable<T> source,
int retryCount = 3,
Func<int, TimeSpan> strategy = null,
Func<Exception, bool> retryOnError = null,
IScheduler scheduler = null)
{
strategy = strategy ?? ExpontentialBackoff;
scheduler = scheduler ?? RxApp.TaskpoolScheduler;
if (retryOnError == null)
retryOnError = e => true;
int attempt = 0;
return Observable.Defer(() =>
{
return ((++attempt == 1) ? source : source.DelaySubscription(strategy(attempt - 1), scheduler))
.Select(item => new Tuple<bool, T, Exception>(true, item, null))
.Catch<Tuple<bool, T, Exception>, Exception>(e => retryOnError(e)
? Observable.Throw<Tuple<bool, T, Exception>>(e)
: Observable.Return(new Tuple<bool, T, Exception>(false, default(T), e)));
})
.Retry(retryCount)
.SelectMany(t => t.Item1
? Observable.Return(t.Item2)
: Observable.Throw<T>(t.Item3));
}
使用方式:
Observable.Defer(() => SomApiMethod())
.RetryWithBackoffStrategy(
retryCount: 4,
retryOnError: e => e is ApiRetryWebException
)
Written with StackEdit.
沒有留言:
張貼留言