6.29.2017

暫態故障處理策略 - 使用Polly

相對於PLC或SoftPLC開發,故障處理或重試等都是在同一個context中;在Windows事件驅動環境下就要多花一些手腳去實作類似功能,因此也發現了這一套函式庫,以下介紹大多來自其Github說明,可以把它當作是簡略的翻譯。

Polly 是 .Net 下的一套暫態故障處理及恢復的函式庫,可讓開發者以fluent及執行緒安全的方式來應用諸如RetryCircuit BreakerTimeoutBulkhead IsolationFallback等策略。

基本用法

// Execute an action
var policy = Policy
              .Handle<DivideByZeroException>()
              .Retry();

policy.Execute(() => DoSomething());

應用 – 錯誤(故障)處理策略

此策略處理執行的委託中所丟出的特定例外或是回傳值。

Step1:指定此策略想要處理的例外/故障

適用在:Retry、CircuitBreaker和Fallback

// 處理指定例外
Policy
  .Handle<DivideByZeroException>()

// 處理有條件的指定例外
Policy
  .Handle<SqlException>(ex => ex.Number == 1205)

// 處理多種例外
Policy
  .Handle<DivideByZeroException>()
  .Or<ArgumentException>()

// 處理多種有條件的例外
Policy
  .Handle<SqlException>(ex => ex.Number == 1205)
  .Or<ArgumentException>(ex => ex.ParamName == "example")

Step2:指定策略如何處理故障

重試

// 重試一次
Policy
  .Handle<DivideByZeroException>()
  .Retry()

// 重試數次
Policy
  .Handle<DivideByZeroException>()
  .Retry(3)

// 重試數次,在每次重試時執行一指定Action,並代入例外及重試計數
Policy
    .Handle<DivideByZeroException>()
    .Retry(3, (exception, retryCount) =>
    {
        // do something 
    });

// 重試數次,在每次重試時執行一指定Action,並代入例外、重試計數
// 及contextExecute()
Policy
    .Handle<DivideByZeroException>()
    .Retry(3, (exception, retryCount, context) =>
    {
        // do something 
    });

永遠重試(直到成功)

// 永遠重試
Policy
  .Handle<DivideByZeroException>()
  .RetryForever()

// 永遠重試,在每次重試時執行一指定Action,並代入例外
Policy
  .Handle<DivideByZeroException>()
  .RetryForever(exception =>
  {
        // do something       
  });

// 永遠重試,在每次重試時執行一指定Action,並代入例外及context給Execute()
Policy
  .Handle<DivideByZeroException>()
  .RetryForever((exception, context) =>
  {
        // do something       
  });

等待並重試

// 重試,在每次重試前等待特定時間間隔
Policy
  .Handle<DivideByZeroException>()
  .WaitAndRetry(new[]
  {
    TimeSpan.FromSeconds(1),
    TimeSpan.FromSeconds(2),
    TimeSpan.FromSeconds(3)
  });

// 重試,在每次重試前等待特定時間間隔,重試時並執行一指定Action,並代入例外及
// 時間間隔參數
Policy
  .Handle<DivideByZeroException>()
  .WaitAndRetry(new[]
  {
    TimeSpan.FromSeconds(1),
    TimeSpan.FromSeconds(2),
    TimeSpan.FromSeconds(3)
  }, (exception, timeSpan) => {
    // do something    
  }); 

// Retry, waiting a specified duration between each retry, 
// calling an action on each retry with the current exception, 
// duration and context provided to Execute()
// 重試,在每次重試前等待特定時間間隔,重試時並執行一指定Action,並代入例外、
// 時間間隔及context參數給Execute()
Policy
  .Handle<DivideByZeroException>()
  .WaitAndRetry(new[]
  {
    TimeSpan.FromSeconds(1),
    TimeSpan.FromSeconds(2),
    TimeSpan.FromSeconds(3)
  }, (exception, timeSpan, context) => {
    // do something    
  });

// 重試,在每次重試前等待特定時間間隔,重試時並執行一指定Action,且代入例外、
// 時間間隔、重試計數及context參數給Execute()
Policy
  .Handle<DivideByZeroException>()
  .WaitAndRetry(new[]
  {
    TimeSpan.FromSeconds(1),
    TimeSpan.FromSeconds(2),
    TimeSpan.FromSeconds(3)
  }, (exception, timeSpan, retryCount, context) => {
    // do something    
  });

// 重試指定次數,並使用函式依重試次數計數每次重試的間隔時間(可指數backoff)
// 此例中會等待如下時間
//  2 ^ 1 = 2 seconds then
//  2 ^ 2 = 4 seconds then
//  2 ^ 3 = 8 seconds then
//  2 ^ 4 = 16 seconds then
//  2 ^ 5 = 32 seconds
Policy
  .Handle<DivideByZeroException>()
  .WaitAndRetry(5, retryAttempt => 
    TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)) 
  );

// 重試指定次數,並使用函式依重試次數計數每次重試的間隔時間,重試
// 時執行一指定Action,且代入例外、時間間隔及context參數給Execute()
Policy
  .Handle<DivideByZeroException>()
  .WaitAndRetry(
    5, 
    retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), 
    (exception, timeSpan, context) => {
      // do something
    }
  );

// 重試指定次數,並使用函式依重試次數計數每次重試的間隔時間,重試
// 時執行一指定Action,且代入例外、時間間隔、重試計數及context參數給Execute()
Policy
  .Handle<DivideByZeroException>()
  .WaitAndRetry(
    5, 
    retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), 
    (exception, timeSpan, retryCount, context) => {
      // do something
    }
  );

等待並永遠重試(直到成功)

// 等待並永遠重試
Policy
  .Handle<DivideByZeroException>()
  .WaitAndRetryForever(retryAttempt => 
    TimeSpan.FromSeconds(Math.Pow(2, retryAttempt))
    );

// 等待並永遠重試,重試時並執行一指定的Action,且代入例外及等待時間
Policy
  .Handle<DivideByZeroException>()
  .WaitAndRetryForever(
    retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)),    
    (exception, timespan) =>
    {
        // do something       
    });

// 等待並永遠重試,重試時並執行一指定的Action,且代入例外、等待時間
// 及context給Execute()
Policy
  .Handle<DivideByZeroException>()
  .WaitAndRetryForever(
    retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)),    
    (exception, timespan, context) =>
    {
        // do something       
    });

Circuit Breaker

// 在指定例外連續發生特定次數後斷開迴路並保持指定時間
Policy
    .Handle<DivideByZeroException>()
    .CircuitBreaker(2, TimeSpan.FromMinutes(1));

// 在指定例外連續發生特定次數後斷開迴路並保持指定時間
// 且在迴路狀態改變時執行一Action
Action<Exception, TimeSpan> onBreak = (exception, timespan) => { ... };
Action onReset = () => { ... };
CircuitBreakerPolicy breaker = Policy
    .Handle<DivideByZeroException>()
    .CircuitBreaker(2, TimeSpan.FromMinutes(1), onBreak, onReset);

// 在指定例外連續發生特定次數後斷開迴路並保持指定時間
// 且在迴路狀態改變時執行一Action,並代入一context參數給Execute()
Action<Exception, TimeSpan, Context> onBreak = (exception, timespan, context) => { ... };
Action<Context> onReset = context => { ... };
CircuitBreakerPolicy breaker = Policy
    .Handle<DivideByZeroException>()
    .CircuitBreaker(2, TimeSpan.FromMinutes(1), onBreak, onReset);

// Monitor the circuit state, for example for health reporting.
CircuitState state = breaker.CircuitState;

/*
CircuitState.Closed - 常態,可執行actions。
CircuitState.Open - 自動控制器已斷開電路,不允許執行actions。
CircuitState.HalfOpen - 在自動斷路時間到時,從斷開的狀態復原。可執行actions,接續的action/s或控制的完成,會讓狀態轉至Open或Closed。
CircuitState.Isolated - 在電路開路的狀態時手動hold住,不允許執行actions。
*/

// 手動打開(且保持)一個斷路器–例如手動隔離downstream的服務
breaker.Isolate(); 

// 重置一個斷路器回closed的狀態,可再次接受actions的執行
breaker.Reset(); 

Fallback

// 如果執行失敗,提供一個替代值
Policy
   .Handle<Whatever>()
   .Fallback<UserAvatar>(UserAvatar.Blank)

// 如果執行失敗,指定一函式以提供替代值
Policy
   .Handle<Whatever>()
   .Fallback<UserAvatar>(() => UserAvatar.GetRandomAvatar()) // where: public UserAvatar GetRandomAvatar() { ... }

// 如果執行失敗,指定一替代值或func,且呼叫一Action(例如logging)
Policy
   .Handle<Whatever>()
   .Fallback<UserAvatar>(UserAvatar.Blank, onFallback: (exception, context) => 
    {
        // do something
    });

Step 3:執行策略

// 執行一個Action
var policy = Policy
              .Handle<DivideByZeroException>()
              .Retry();

policy.Execute(() => DoSomething());

// 執行一個Action且代入任意數目的context data
var policy = Policy
    .Handle<DivideByZeroException>()
    .Retry(3, (exception, retryCount, context) =>
    {
        var methodThatRaisedException = context["methodName"];
        Log(exception, methodThatRaisedException);
    });

policy.Execute(
    () => DoSomething(),
    new Dictionary<string, object>() {{ "methodName", "some method" }}
);

// 執行一個會回傳值的Action
var policy = Policy
              .Handle<DivideByZeroException>()
              .Retry();

var result = policy.Execute(() => DoSomething());

// 執行一個會回傳值的Action,且代入任意數目的context data
var policy = Policy
    .Handle<DivideByZeroException>()
    .Retry(3, (exception, retryCount, context) =>
    {
        object methodThatRaisedException = context["methodName"];
        Log(exception, methodThatRaisedException)
    });

var result = policy.Execute(
    () => DoSomething(),
    new Dictionary<string, object>() {{ "methodName", "some method" }}
);

// 當然你可以把它們都放在一起
Policy
  .Handle<SqlException>(ex => ex.Number == 1205)
  .Or<ArgumentException>(ex => ex.ParamName == "example")
  .Retry()
  .Execute(() => DoSomething());

以上為了簡單起見我們把策略的定義跟執行放在一起,當然我們可以分開它們,例如在程式開始時定義策略,然後透過DI在需要時注入。

應用 – 一般化的彈性策略

The general resilience policies add resilience strategies that are not explicitly centred around handling faults which delegates may throw or return.

Step 1:設定

Timeout

// 如果委託的執行超過30秒,則逾時,且回到呼叫者
// 最佳化逾時行為:委託應代入一CancellationToken
Policy
  .Timeout(30)

// 用timespan當作timeout
Policy
  .Timeout(TimeSpan.FromMilliseconds(2500))

// 透過一個func provider設定一逾時時間
Policy
  .Timeout(() => myTimeoutProvider)) // Func<TimeSpan> myTimeoutProvider

// Timeout after 30 seconds, if the executed delegate has not completed.  Enforces a timeout on delegates which have no in-built timeout and do not honour CancellationToken, at the expense (in synchronous executions) of an extra thread.
// (for more detail, see deep documentation)
Policy
  .Timeout(30, TimeoutStrategy.Pessimistic)

// 逾時時呼叫一Action
Policy
  .Timeout(30, onTimeout: (context, timespan, task) => 
    {
        // do something 
    });

// 例:逾時時記錄下來
Policy
  .Timeout(30, onTimeout: (context, timespan, task) => 
    {
        logger.Warn($"{context.PolicyKey} at {context.ExecutionKey}: execution timed out after {timespan.TotalSeconds} seconds.");
    });

// 例:逾時時,記錄下task中發生的任何例外
Policy
  .Timeout(30, onTimeout: (context, timespan, task) => 
    {
        task.ContinueWith(t => {
            if (t.IsFaulted) logger.Error($"{context.PolicyKey} at {context.ExecutionKey}: execution timed out after {timespan.TotalSeconds} seconds, with: {t.Exception}.");
        });
    });

Bulkhead

// 透過策略限制同時只能有最多12個action被執行
Policy
  .Bulkhead(12)

// Restrict executions through the policy to a maximum of twelve concurrent actions, 
// with up to two actions waiting for an execution slot in the bulkhead if all slots are taken.

Policy
  .Bulkhead(12, 2)

// Restrict concurrent executions, calling an action if an execution is rejected
Policy
  .Bulkhead(12, context => 
    {
        // do something 
    });

// Monitor the bulkhead available capacity, for example for health/load reporting.
var bulkhead = Policy.Bulkhead(12, 2);
// ...
int freeExecutionSlots = bulkhead.BulkheadAvailableCount;
int freeQueueSlots     = bulkhead.QueueAvailableCount;

Cache

The Cache policy is targeting an upcoming Polly v5.x version. Check out www.thepollyproject.org for updates.

PolicyWrap

// 從現有的策略組合成一新策略
var policyWrap = Policy
  .Wrap(fallback, cache, retry, breaker, timeout, bulkhead);
// (wraps the policies around any executed delegate: fallback outermost ... bulkhead innermost)
policyWrap.Execute(...)

// 定義標準 resilience 策略
PolicyWrap commonResilience = Policy.Wrap(retry, breaker, timeout);

// 然後wrap其它策略至一個call site
Avatar avatar = Policy
   .Handle<Whatever>()
   .Fallback<Avatar>(Avatar.Blank)
   .Wrap(commonResilience)
   .Execute(() => { /* get avatar */ });

// 共用commonResilience,但在其它的call site 中 wrap不同的策略
Reputation reps = Policy
   .Handle<Whatever>()
   .Fallback<Reputation>(Reputation.NotAvailable)
   .Wrap(commonResilience)
   .Execute(() => { /* get reputation */ });  

NoOp

// 定義一個單純的僅需傳入一委託以執行的策略
// 在單元測試中很有用,或應用在當你的程式架構中需要一個policy,但你只想
// 傳入一可執行的委託而不想用策略
NoOpPolicy noOp = Policy.NoOp();

Step 2:執行策略

之前所述。

Post-execution:獲取結果,或最後的例外

使用ExecuteAndCapture(...)函式,你可以獲取策略執行的結果。

var policyResult = Policy
              .Handle<DivideByZeroException>()
              .Retry()
              .ExecuteAndCapture(() => DoSomething());
/*              
policyResult.Outcome - 執行成功或失敗
policyResult.FinalException - 獲取到的最後的例外,如果成功則為null
policyResult.ExceptionType - 例外類型,可能為原策略中定義的例外(如:DivideByZeroException)或未處理的例外(如:Exception),如果成功則為null
policyResult.Result - if executing a func, the result if the call succeeded or the type's default value
*/              

Handling return values, and Policy<TResult>

// 在同一個策略中處理例外及回傳值
HttpStatusCode[] httpStatusCodesWorthRetrying = {
   HttpStatusCode.RequestTimeout, // 408
   HttpStatusCode.InternalServerError, // 500
   HttpStatusCode.BadGateway, // 502
   HttpStatusCode.ServiceUnavailable, // 503
   HttpStatusCode.GatewayTimeout // 504
}; 
HttpResponseMessage result = Policy
  .Handle<HttpResponseException>()
  .OrResult<HttpResponseMessage>(r => httpStatusCodesWorthRetrying.Contains(r.StatusCode))
  .Retry(...)
  .Execute( /* some Func<HttpResponseMessage> */ )

Strongly-typed Policy<TResult>

.HandleResult<TResult>(...).OrResult<TResult>(...)來設定一個策略會得到一個特定策略的強型別Policy<TResult>,如Retry<TResult>AdvancedCircuitBreaker<TResult>

這些策略必需用來執行會回傳TResult的委託,例如:
* Execute(Func<TResult>)(和其相關覆載)
* ExecuteAsync(Func<CancellationToken, Task<TResult>>)(和其相關覆載)

ExecuteAndCapture()

.ExecuteAndCapture(...)在非泛型策略中會回傳帶屬性的一個PolicyResult

  • policyResult.Outcome - 執行成功或失敗
  • policyResult.FinalException - 獲取到的最後的例外,如果成功則為null
  • policyResult.ExceptionType - 例外類型,可能為原策略中定義的例外(如:DivideByZeroException)或未處理的例外(如:Exception),如果成功則為null
  • policyResult.Result - if executing a func, the result if the call succeeded or the type’s default value

.ExecuteAndCapture<TResult>(Func<TResult>)在強型別的策略中增加了兩個屬性
* policyResult.FaultType - 策略中最後處理的錯誤是例外或是回傳值?如果委託執行成功則為null。
* policyResult.FinalHandledResult - 處理的最後結果;如果執行成功會是null,或是型別的預設值。

State-change delegates on Policy<TResult> policies

在非泛型策略中我們僅處理例外及如onRetryonBreak等需代入一個例外當作參數的狀態變更委託。

而泛型策略中處理了TResult回傳值,而狀態委託仍是一樣的,除了它需代入一個DelegateResult<TResult>而不是一個例外。
DelegateResult<TResult>有兩個屬性:
* Exception // 如果策略正在處理例外,則丟出例外(否則為null)
* Result // 如果策略正在處理結果,則回傳TResult(否則為default(TResult))

BrokenCircuitException<TResult>

非泛型化的CircuitBreaker策丟在迴路斷開時丟出一個BrokenCircuitEXception例外,此例外包含了最後一個例外(導致迴路斷開的外外)做為其InnerException
BrokenCircuitException<TResult>策略來說:
* 由於例外導致的斷路引發了一個BrokenCircuitException,其中InnerException設置為觸發斷路的例外(如前所述)。
* A circuit broken due to handling a result throws a BrokenCircuitException with the Result property set to the result which caused the circuit to break.

Policy Keys and Context data

// 使用WithPolicyKey()擴充函式可用PolicKey來識別策略
// (for example, for correlation in logs or metrics)
var policy = Policy
    .Handle<DataAccessException>()
    .Retry(3, onRetry: (exception, retryCount, context) =>
       {
           logger.Error($"Retry {retryCount} of {context.PolicyKey} at {context.ExecutionKey}, due to: {exception}.");
       })
    .WithPolicyKey("MyDataAccessPolicy");

// 傳入一個Context後可用ExecutiveKey來識別呼叫者
var customerDetails = policy.Execute(myDelegate, new Context("GetCustomerDetails"));

// "MyDataAccessPolicy" -> context.PolicyKey 
// "GetCustomerDetails  -> context.ExecutionKey

// 從call site傳外加的自訂訊息至執行的context中
var policy = Policy
    .Handle<DataAccessException>()
    .Retry(3, onRetry: (exception, retryCount, context) =>
       {
           logger.Error($"Retry {retryCount} of {context.PolicyKey} at {context.ExecutionKey}, getting {context["Type"]} of id {context["Id"]}, due to: {exception}.");
       })
    .WithPolicyKey("MyDataAccessPolicy");

int id = ... // customer id from somewhere
var customerDetails = policy.Execute(() => GetCustomer(id), 
    new Context("GetCustomerDetails", new Dictionary<string, object>() {{"Type","Customer"},{"Id",id}}
    ));

PolicyRegistry

// 建立一個策略註冊(例如在程式開始階段)
PolicyRegistry registry = new PolicyRegistry();

// 加入策略
registry.Add("StandardHttpResilience", myStandardHttpResiliencePolicy);
// 或用:
registry["StandardHttpResilience"] = myStandardHttpResiliencePolicy;

// DI注入
public class MyServiceGateway 
{
    public void MyServiceGateway(..., IPolicyRegistry<string> registry, ...)
    {
       ...
    } 
}
// (Or if you prefer ambient-context pattern, use a thread-safe singleton)

// Use a policy from the registry
registry.Get<IAsyncPolicy<HttpResponseMessage>>("StandardHttpResilience")
    .ExecuteAsync<HttpResponseMessage>(...)

PolicyRegistry有類似字典的語法,例如.ContainsKy(...)TryGet<TPolicy>(...).Count、’Clear()Remove(...)等。

Interfaces

Polly v5.2.0 增加了介面以支援PolicyRegistry,且以Interface segregation principle的原則來群組化不同功能的策略,此介面不適用於撰寫自訂的策略實作。

Execution interfaces: ISyncPolicy

執行介面 ISyncPolicyIAsyncPolicyISyncPolicy<TResult>IAsyncPolicy<TResult>分別定義了對sync/async、非泛型/泛型呼叫等執行的覆載。

Policy-kind interfaces: ICircuitBreakerPolicy

與執行介面交集,特定策略類型的介面定義了該類型策略的常用屬性和方法。

For example, ICircuitBreakerPolicy defines
例如,ICircuitBreakerPolicy定義了:
* CircuitState CircuitState
* Exception LastException
* void Isolate()
* void Reset()
ICircuitBreakerPolicy<TResult> : ICircuitBreakerPolicy 增加了:
* TResult LastHandledResult
這讓我們可把類似策略當成同一個,例如這裡描述的監看所有的迴路斷路器。

執行緒安全

所有的Polly策略都是執行緒安全的,你可以安女的在不同的來源復用策略,或在不同的執要緒上併行地執行策略。

雖然策略內部的操作是執行緒安全的,但這不表示其中執行的你的委託也會是;如果它們本身不是,那最終也不會是執行緒安全的。

非同步支援

Polly使用以下函式提供完整的非同步操作:
* RetryAsync
* WaitAndRetryAsync
* CircuitBreakerAsync
* (etc)
* ExecutiveAsync
* ExecuteAndCaptureAsync

它們相對應的同步函式:
* Retry
* WaitAndRetry
* CircuitBreaker
* (etc)
* Execute
* ExecuteAndCapture
所有的策略都存在非同步的覆載及所有的Execute()ExecuteAndCapture()覆載。

使用範例:

await Policy
  .Handle<SqlException>(ex => ex.Number == 1205)
  .Or<ArgumentException>(ex => ex.ParamName == "example")
  .RetryAsync()
  .ExecuteAsync(() => DoSomethingAsync());

SynchronizationContext

Async continuations and retries by default do not run on a captured synchronization context. To change this, use .ExecuteAsync(…) overloads taking a boolean continueOnCapturedContext parameter.

Cancellation的支援

非同步策略執行透過在.ExecutiveAsync(...)代入CancellationToken來支援Cancellation。

代入.ExecutiveAsync(...)CancellationToken有三個作用:
* 它會取消如目前的重試、重試間的等待或等待bulkhead execution slot等策略動作
* 策略會把它當作CancellationToken參數傳入任何其執行的委託,以在委 執行時仍支援取消
* In common with the Base Class Library implementation in Task.Run(…) and elsewhere, if the cancellation token is cancelled before execution begins, the user delegate is not executed at all.

// 從一個uri中重試數次,可在任意時間中斷
CancellationToken cancellationToken = // ...
var policy = Policy
    .Handle<WebException>()
    .Or<HttpRequestException>()
    .WaitAndRetryAsync(new[] { 
        TimeSpan.FromSeconds(1), 
        TimeSpan.FromSeconds(2), 
        TimeSpan.FromSeconds(4) 
    });
var response = await policy.ExecuteAsync(ct => httpClient.GetAsync(uri, ct), cancellationToken);

題外話

Rx 中也可以實作出類似Retry的效果,在SO中的實作如下:

// 擴充方法
public static IObservable<T> RetryWithBackoffStrategy<T>(
    this IObservable<T> source, 
    int retryCount = 3,
    Func<int, TimeSpan> strategy = null,
    Func<Exception, bool> retryOnError = null,
    IScheduler scheduler = null)
{
    strategy = strategy ?? ExpontentialBackoff;
    scheduler = scheduler ?? RxApp.TaskpoolScheduler;

    if (retryOnError == null)
        retryOnError = e => true;

    int attempt = 0;

    return Observable.Defer(() =>
    {
        return ((++attempt == 1) ? source : source.DelaySubscription(strategy(attempt - 1), scheduler))
            .Select(item => new Tuple<bool, T, Exception>(true, item, null))
            .Catch<Tuple<bool, T, Exception>, Exception>(e => retryOnError(e)
                ? Observable.Throw<Tuple<bool, T, Exception>>(e)
                : Observable.Return(new Tuple<bool, T, Exception>(false, default(T), e)));
    })
    .Retry(retryCount)
    .SelectMany(t => t.Item1
        ? Observable.Return(t.Item2)
        : Observable.Throw<T>(t.Item3));
}

使用方式:

Observable.Defer(() => SomApiMethod())
  .RetryWithBackoffStrategy(
     retryCount: 4, 
     retryOnError: e => e is ApiRetryWebException
  )

Written with StackEdit.

沒有留言:

張貼留言