我有以下政策:
var retryPolicy = Policy.Handle<Exception>(e => (e is HttpRequestException || e.InnerException is HttpRequestException)).WaitAndRetry(
retryCount: maxRetryCount,
sleepDurationProvider: attempt => TimeSpan.FromSeconds(Math.Pow(2, attempt)),
onRetry: (exception, calculatedWaitDuration, retryCount, context) =>
{
Log.Error($"Retry => Count: {retryCount}, Wait duration: {calculatedWaitDuration}, Policy Wrap: {context.PolicyWrapKey}, Policy: {context.PolicyKey}, Endpoint: {context.OperationKey}, Exception: {exception}.");
});
var circuitBreaker = Policy.Handle<Exception>(e => (e is HttpRequestException || e.InnerException is HttpRequestException)).CircuitBreaker(maxExceptionsBeforeBreaking, TimeSpan.FromSeconds(circuitBreakDurationSeconds), onBreak, onReset);
var sharedBulkhead = Policy.Bulkhead(maxParallelizations, maxQueuingActions, onBulkheadRejected);
var fallbackForCircuitBreaker = Policy<bool>
.Handle<BrokenCircuitException>()
.Fallback(
fallbackValue: false,
onFallback: (b, context) =>
{
Log.Error($"Operation attempted on broken circuit => Policy Wrap: {context.PolicyWrapKey}, Policy: {context.PolicyKey}, Endpoint: {context.OperationKey}");
}
);
var fallbackForAnyException = Policy<bool>
.Handle<Exception>()
.Fallback(
fallbackAction: (context) => { return false; },
onFallback: (e, context) =>
{
Log.Error($"An unexpected error occured => Policy Wrap: {context.PolicyWrapKey}, Policy: {context.PolicyKey}, Endpoint: {context.OperationKey}");
}
);
var resilienceStrategy = Policy.Wrap(retryPolicy, circuitBreaker, sharedBulkhead);
var policyWrap = fallbackForAnyException.Wrap(fallbackForCircuitBreaker.Wrap(resilienceStrategy));
public bool CallApi(ChangeMapModel changeMessage)
{
var httpClient = new HttpClient();
var endPoint = changeMessage.EndPoint;
var headers = endPoint.Headers;
if (headers != null)
{
foreach (var header in headers)
{
if (header.Contains(':'))
{
var splitHeader = header.Split(':');
httpClient.DefaultRequestHeaders.Add(splitHeader[0], splitHeader[1]);
}
}
}
var res = httpClient.PostAsync(endPoint.Uri, null);
var response = res.Result;
response.EnsureSuccessStatusCode();
return true;
}
我是这样执行政策的:
policyWrap.Execute((context) => CallApi(changeMessage), new Context(endPoint));
问题是,当一个动作在开路上执行时,我在断路器回调中没有得到命中。
我希望通过策略放置一个API调用,要处理的异常类型为HttpRequestException
。政策定义有问题吗?为什么不叫断路器后备?
我创建了以下最小、完整、可验证的示例来帮助探索问题:
注意:不一定是成品;只是发布代码的一些次要mods和额外的注释,以帮助探索问题。
using Polly;
using Polly.CircuitBreaker;
using System;
using System.Net.Http;
using System.Threading.Tasks;
public class Program
{
public static void Main()
{
int maxRetryCount = 6;
double circuitBreakDurationSeconds = 0.2 /* experiment with effect of shorter or longer here, eg: change to = 1, and the fallbackForCircuitBreaker is correctly invoked */ ;
int maxExceptionsBeforeBreaking = 4; /* experiment with effect of fewer here, eg change to = 1, and the fallbackForCircuitBreaker is correctly invoked */
int maxParallelizations = 2;
int maxQueuingActions = 2;
var retryPolicy = Policy.Handle<Exception>(e => (e is HttpRequestException || (/*!(e is BrokenCircuitException) &&*/ e.InnerException is HttpRequestException))) // experiment with introducing the extra (!(e is BrokenCircuitException) && ) clause here, if necessary/desired, depending on goal
.WaitAndRetry(
retryCount: maxRetryCount,
sleepDurationProvider: attempt => TimeSpan.FromMilliseconds(50 * Math.Pow(2, attempt)),
onRetry: (ex, calculatedWaitDuration, retryCount, context) =>
{
Console.WriteLine(String.Format("Retry => Count: {0}, Wait duration: {1}, Policy Wrap: {2}, Policy: {3}, Endpoint: {4}, Exception: {5}", retryCount, calculatedWaitDuration, context.PolicyWrapKey, context.PolicyKey, context.OperationKey, ex.Message));
});
var circuitBreaker = Policy.Handle<Exception>(e => (e is HttpRequestException || e.InnerException is HttpRequestException))
.CircuitBreaker(maxExceptionsBeforeBreaking,
TimeSpan.FromSeconds(circuitBreakDurationSeconds),
onBreak: (ex, breakDuration) => {
Console.WriteLine(String.Format("Circuit breaking for {0} ms due to {1}", breakDuration.TotalMilliseconds, ex.Message));
},
onReset: () => {
Console.WriteLine("Circuit closed again.");
},
onHalfOpen: () => { Console.WriteLine("Half open."); });
var sharedBulkhead = Policy.Bulkhead(maxParallelizations, maxQueuingActions);
var fallbackForCircuitBreaker = Policy<bool>
.Handle<BrokenCircuitException>()
/* .OrInner<BrokenCircuitException>() */ // Consider this if necessary.
/* .Or<Exception>(e => circuitBreaker.State != CircuitState.Closed) */ // This check will also detect the circuit in anything but healthy state, regardless of the final exception thrown.
.Fallback(
fallbackValue: false,
onFallback: (b, context) =>
{
Console.WriteLine(String.Format("Operation attempted on broken circuit => Policy Wrap: {0}, Policy: {1}, Endpoint: {2}", context.PolicyWrapKey, context.PolicyKey, context.OperationKey));
}
);
var fallbackForAnyException = Policy<bool>
.Handle<Exception>()
.Fallback<bool>(
fallbackAction: (context) => { return false; },
onFallback: (e, context) =>
{
Console.WriteLine(String.Format("An unexpected error occured => Policy Wrap: {0}, Policy: {1}, Endpoint: {2}, Exception: {3}", context.PolicyWrapKey, context.PolicyKey, context.OperationKey, e.Exception.Message));
}
);
var resilienceStrategy = Policy.Wrap(retryPolicy, circuitBreaker, sharedBulkhead);
var policyWrap = fallbackForAnyException.Wrap(fallbackForCircuitBreaker.Wrap(resilienceStrategy));
bool outcome = policyWrap.Execute((context) => CallApi("http://www.doesnotexistattimeofwriting.com/"), new Context("some endpoint info"));
}
public static bool CallApi(string uri)
{
using (var httpClient = new HttpClient() { Timeout = TimeSpan.FromSeconds(1) }) // Consider HttpClient lifetimes and disposal; this pattern is for minimum change from original posted code, not a recommendation.
{
Task<HttpResponseMessage> res = httpClient.GetAsync(uri);
var response = res.Result; // Consider async/await rather than blocking on the returned Task.
response.EnsureSuccessStatusCode();
return true;
}
}
}
不止一个因素可能会导致Fallbackforc断路器
无法调用:
电路断开持续时间秒
可以设置为短于各种尝试和重试之间的等待所花费的总时间。如果是这样,电路可能会恢复到半开路状态。在半断开状态或闭合状态下,导致电路断开的异常会按原样重新启动BrokenCircuiteException
仅在(完全)断路阻止尝试调用时才会引发。
因此,如果您的电路在重试耗尽时间时恢复到半开状态,则返回到包装回退策略的异常将是Http刚需异常
,而不是BrokenCircuitExc0019
。
CircuitBreakerException
包含导致电路中断的异常,作为其InnerException
。因此,对e.InnerException is HttpRequestException
进行过于贪婪/松散的检查也可能捕获CircuitBreakerException
具有InnerException is HttpRequestException
。根据你的目标,这可能是你想要的,也可能不是。
我相信这不会发生在最初发布的代码中,因为它是以特定的方式构造的。阻止HttpClient返回的任务
。DoSomethingAsync(…)
已导致聚合异常-
CircuitBreakerException -> AggregateException -> HttpRequestException
所以这不属于发布代码中的一个深度检查。但是,请注意,CircuitBreakerException
包含导致电路中断的异常作为其内部异常
。这可能会导致handle子句只检查e。InnerException是HttpRequestException
到
不自觉地
(如果不是您的目标,则意外地)重试
断路器异常,如果:
(a)代码更改为
async
/wait
,这将删除Aggregate Exc0019
,因此导致嵌套只有一个深度
(b) 代码更改为Polly的
。汉德莱纳
建议
/*注释掉*///并在上面的代码中附加解释
,建议如何调整发布的代码,以便Fallbackforce断路器
按预期调用。
还有两个想法:
在HttpClient上阻塞。DoSomethingAsync()
调用。结果
可能会影响性能,如果与其他异步代码混合,则可能会导致死锁,并带来整个聚合异常
——以及内部异常
痛苦。
(故意将第3点和第4点保持简短,正如其他地方广泛讨论的那样。)
问题1:如果网络中断,断路器已达到exceptionsAllowedBeforeBreaking编号,进入open状态并等待DurationOfBreaking周期,则电路将为新请求打开,但已发送的请求将抛出异常? 问题2:如果期望的行为是重试那些有异常的请求,而不是断路器抛出异常,那么除了断路器策略之外,还需要实现重试策略。我对此的理解是,将发生问题1中的行为,然后将尝试重试。 a.如果存在网络
我正面临使用Spring Cloud Resilience 4j的断路器实现的问题。 在一些教程之后,我尝试在项目中添加必要的依赖项。此外,尝试添加配置,但电路仍然没有打开,并且没有调用回退方法。 对于用例,我正在从我的服务调用外部 API,如果该外部 API 关闭,那么在几次调用后,我需要启用断路器。 请从不同的文件中找到代码片段。 我是断路器模式的新手。我们将非常感谢您的帮助。 pom.xml
我在结合设置Polly的断路器时遇到了问题。 具体来说,和用于ASP.NET Core Web API Controller,其链接如下: > 重试策略:如果出现暂时错误,则对每个请求重试3次。 cicuit断路器策略:如果所有请求中出现五个瞬态错误,则生效。 问题所在 配置Polly重试策略和断路器策略,并对自定义HttpClient, CarController 是在的Polly策略中指定的。
我在我的spring boot应用程序中使用Hystrix实现断路器,我的代码如下所示: 我看到每次失败时都会调用fallback()。但3次故障后电路不开。在3次失败之后,我原以为它会直接调用并跳过。但这并没有发生。有人能告诉我我在这里做错了什么吗? 谢谢,B Jagan 下面是实际代码。我用这个玩得更远了。当我在RegistrationHystrix.RegisterSeller()方法中直接
我使用的是Resilience4J断路器,我需要忽略一些自定义异常,所以我需要更改默认配置。我在使用微服务,所以我有一个连接到数据库的微服务,它有一些基本的请求,比如get by id,我还有一个使用这些请求的边缘服务。例如,我需要,如果id不存在,微服务会抛出一个自定义异常,在这种情况下断路器不会打开。 数据库的微服务: 获取请求 > 服务 public SalesRepDTO getSales
注:本节未经校验,如有问题欢迎提issue 为什么使用它们? 线路断路器用于提供稳定性并防止在分布式系统中的级联故障。它们应该结合在远程系统之间的接口使用明智的超时,以防止单个组件的故障拖垮所有组件。 作为一个例子,我们有一个web 应用程序与远程的第三方web服务进行交互。假如第三方已用完了他们的容量,他们的数据库也在高荷载作用下熔化。假设数据库在这种情况下失败,第三方 web 服务用了很长的时