前言
Microsoft.AspNetCore.ConcurrencyLimiter AspNetCore3.0后增加的,用于传入的请求进行排队处理,避免线程池的不足.
我们日常开发中可能常做的给某web服务器配置连接数以及,请求队列大小,那么今天我们看看如何在通过中间件形式实现一个并发量以及队列长度限制.
Queue策略
添加Nuget
Install-Package Microsoft.AspNetCore.ConcurrencyLimiter
public void ConfigureServices(IServiceCollection services) { services.AddQueuePolicy(options => { //最大并发请求数 options.MaxConcurrentRequests = 2; //请求队列长度限制 options.RequestQueueLimit = 1; }); services.AddControllers(); } public void Configure(IApplicationBuilder app, IWebHostEnvironment env) { //添加并发限制中间件 app.UseConcurrencyLimiter(); app.Run(async context => { Task.Delay(100).Wait(); // 100ms sync-over-async await context.Response.WriteAsync("Hello World!"); }); if (env.IsDevelopment()) { app.UseDeveloperExceptionPage(); } app.UseHttpsRedirection(); app.UseRouting(); app.UseAuthorization(); app.UseEndpoints(endpoints => { endpoints.MapControllers(); }); }
通过上面简单的配置,我们就可以将他引入到我们的代码中,从而做并发量限制,以及队列的长度;那么问题来了,他是怎么实现的呢?
public static IServiceCollection AddQueuePolicy(this IServiceCollection services, Action<QueuePolicyOptions> configure) { services.Configure(configure); services.AddSingleton<IQueuePolicy, QueuePolicy>(); return services; }
QueuePolicy采用的是SemaphoreSlim信号量设计,SemaphoreSlim、Semaphore(信号量)支持并发多线程进入被保护代码,对象在初始化时会指定 最大任务数量,当线程请求访问资源,信号量递减,而当他们释放时,信号量计数又递增。
/// <summary> /// 构造方法(初始化Queue策略) /// </summary> /// <param name="options"></param> public QueuePolicy(IOptions<QueuePolicyOptions> options) { _maxConcurrentRequests = options.Value.MaxConcurrentRequests; if (_maxConcurrentRequests <= 0) { throw new ArgumentException(nameof(_maxConcurrentRequests), "MaxConcurrentRequests must be a positive integer."); } _requestQueueLimit = options.Value.RequestQueueLimit; if (_requestQueueLimit < 0) { throw new ArgumentException(nameof(_requestQueueLimit), "The RequestQueueLimit cannot be a negative number."); } //使用SemaphoreSlim来限制任务最大个数 _serverSemaphore = new SemaphoreSlim(_maxConcurrentRequests); }
ConcurrencyLimiterMiddleware中间件
/// <summary> /// Invokes the logic of the middleware. /// </summary> /// <param name="context">The <see cref="HttpContext"/>.</param> /// <returns>A <see cref="Task"/> that completes when the request leaves.</returns> public async Task Invoke(HttpContext context) { var waitInQueueTask = _queuePolicy.TryEnterAsync(); // Make sure we only ever call GetResult once on the TryEnterAsync ValueTask b/c it resets. bool result; if (waitInQueueTask.IsCompleted) { ConcurrencyLimiterEventSource.Log.QueueSkipped(); result = waitInQueueTask.Result; } else { using (ConcurrencyLimiterEventSource.Log.QueueTimer()) { result = await waitInQueueTask; } } if (result) { try { await _next(context); } finally { _queuePolicy.OnExit(); } } else { ConcurrencyLimiterEventSource.Log.RequestRejected(); ConcurrencyLimiterLog.RequestRejectedQueueFull(_logger); context.Response.StatusCode = StatusCodes.Status503ServiceUnavailable; await _onRejected(context); } }
每次当我们请求的时候首先会调用_queuePolicy.TryEnterAsync(),进入该方法后先开启一个私有lock锁,再接着判断总请求量是否≥(请求队列限制的大小+最大并发请求数),如果当前数量超出了,那么我直接抛出,送你个503状态;
if (result) { try { await _next(context); } finally { _queuePolicy.OnExit(); } } else { ConcurrencyLimiterEventSource.Log.RequestRejected(); ConcurrencyLimiterLog.RequestRejectedQueueFull(_logger); context.Response.StatusCode = StatusCodes.Status503ServiceUnavailable; await _onRejected(context); }
问题来了,我这边如果说还没到你设置的大小呢,我这个请求没有给你服务器造不成压力,那么你给我处理一下吧.
await _serverSemaphore.WaitAsync();异步等待进入信号量,如果没有线程被授予对信号量的访问权限,则进入执行保护代码;否则此线程将在此处等待,直到信号量被释放为止
lock (_totalRequestsLock) { if (TotalRequests >= _requestQueueLimit + _maxConcurrentRequests) { return false; } TotalRequests++; } //html" target="_blank">异步等待进入信号量,如果没有线程被授予对信号量的访问权限,则进入执行保护代码;否则此线程将在此处等待,直到信号量被释放为止 await _serverSemaphore.WaitAsync(); return true; }
返回成功后那么中间件这边再进行处理,_queuePolicy.OnExit();通过该调用进行调用_serverSemaphore.Release();释放信号灯,再对总请求数递减
Stack策略
再来看看另一种方法,栈策略,他是怎么做的呢?一起来看看.再附加上如何使用的代码.
public void ConfigureServices(IServiceCollection services) { services.AddStackPolicy(options => { //最大并发请求数 options.MaxConcurrentRequests = 2; //请求队列长度限制 options.RequestQueueLimit = 1; }); services.AddControllers(); }
通过上面的配置,我们便可以对我们的应用程序执行出相应的策略.下面再来看看他是怎么实现的呢
public static IServiceCollection AddStackPolicy(this IServiceCollection services, Action<QueuePolicyOptions> configure) { services.Configure(configure); services.AddSingleton<IQueuePolicy, StackPolicy>(); return services; }
可以看到这次是通过StackPolicy类做的策略.来一起来看看主要的方法
/// <summary> /// 构造方法(初始化参数) /// </summary> /// <param name="options"></param> public StackPolicy(IOptions<QueuePolicyOptions> options) { //栈分配 _buffer = new List<ResettableBooleanCompletionSource>(); //队列大小 _maxQueueCapacity = options.Value.RequestQueueLimit; //最大并发请求数 _maxConcurrentRequests = options.Value.MaxConcurrentRequests; //剩余可用空间 _freeServerSpots = options.Value.MaxConcurrentRequests; }
当我们通过中间件请求调用,_queuePolicy.TryEnterAsync()时,首先会判断我们是否还有访问请求次数,如果_freeServerSpots>0,那么则直接给我们返回true,让中间件直接去执行下一步,如果当前队列=我们设置的队列大小的话,那我们需要取消先前请求;每次取消都是先取消之前的保留后面的请求;
public ValueTask<bool> TryEnterAsync() { lock (_bufferLock) { if (_freeServerSpots > 0) { _freeServerSpots--; return _trueTask; } // 如果队列满了,取消先前的请求 if (_queueLength == _maxQueueCapacity) { _hasReachedCapacity = true; _buffer[_head].Complete(false); _queueLength--; } var tcs = _cachedResettableTCS ??= new ResettableBooleanCompletionSource(this); _cachedResettableTCS = null; if (_hasReachedCapacity || _queueLength < _buffer.Count) { _buffer[_head] = tcs; } else { _buffer.Add(tcs); } _queueLength++; // increment _head for next time _head++; if (_head == _maxQueueCapacity) { _head = 0; } return tcs.GetValueTask(); } }
当我们请求后调用_queuePolicy.OnExit();出栈,再将请求长度递减
public void OnExit() { lock (_bufferLock) { if (_queueLength == 0) { _freeServerSpots++; if (_freeServerSpots > _maxConcurrentRequests) { _freeServerSpots--; throw new InvalidOperationException("OnExit must only be called once per successful call to TryEnterAsync"); } return; } // step backwards and launch a new task if (_head == 0) { _head = _maxQueueCapacity - 1; } else { _head--; } //退出,出栈 _buffer[_head].Complete(true); _queueLength--; } }
总结
基于栈结构的特点,在实际应用中,通常只会对栈执行以下两种操作:
队列存储结构的实现有以下两种方式:
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持小牛知识库。
实施限制 授权,实质上是关于限制。 基于某些检查,用户可能受到限制。 限制可以在以下两个地方之一中应用: 在RADIUS服务器上 在NAS 当访问请求数据包发送到RADIUS服务器时,在身份验证过程中确定限制。 Accounting-Request数据包不会也不能确定限制。 当在RADIUS服务器上应用限制时,服务器返回一个Access-Reject数据包,该数据包应该包含一个Reply-Mess
问题内容: 最近,一位通讯员提到了Python 2.6中的新增功能,指出典型的浮点实现本质上是实数的有理近似。出于好奇,我不得不尝试π: 由于Arima,我没有看到更准确的结果使我感到有些惊讶: 例如,此代码: 产生以下输出: 当然,考虑到64位浮点数提供的精度,结果是正确的,但是这使我提出了一个问题:我如何才能找到有关的实现局限性的更多信息?感谢您的指导。 附加链接:Stern-Brocot树和
本文向大家介绍Django权限机制实现代码详解,包括了Django权限机制实现代码详解的使用技巧和注意事项,需要的朋友参考一下 本文研究的主要是Django权限机制的相关内容,具体如下。 1. Django权限机制概述 权限机制能够约束用户行为,控制页面的显示内容,也能使API更加安全和灵活;用好权限机制,能让系统更加强大和健壮。因此,基于Django的开发,理清Django权限机制是非常必要的。
有没有使用Qpromise库限制promise并发的方法? 这个问题有点与如何限制Qpromise并发有关? 但问题是我正试图这样做: 真正的用例是: 从DB获取帖子 循环DB中的每个帖子,如 对于每个帖子做task1,task2,task3(检索社交计数器,检索评论计数等) 在DB中保存新的文章数据。 但问题是node同时执行所有帖子的所有任务,比如同时向facebook询问500篇帖子的“喜欢
本文向大家介绍docker CPU限制的实现,包括了docker CPU限制的实现的使用技巧和注意事项,需要的朋友参考一下 1、--cpu=<value> 1)指定一个容器可以使用多少可用的CPU资源,但无法让容器始终在一个或某几个CPU上运行 2)例如:如果主机有2个CPU,设置--cpus="1.5",则可以报称容器醉倒 容纳一半的CPU,相当于设置--cpu-
本文向大家介绍golang实现并发数控制的方法,包括了golang实现并发数控制的方法的使用技巧和注意事项,需要的朋友参考一下 golang并发 谈到golang这门语言,很自然的想起了他的的并发goroutine。这也是这门语言引以为豪的功能点。并发处理,在某种程度上,可以提高我们对机器的使用率,提升系统业务处理能力。但是并不是并发量越大越好,太大了,硬件环境就会吃不消,反而会影响到系统整体性能