1.14 限制器
限制器
Hprose 3.0 提供了两种限制器插件。一个是并发限制器(ConcurrentLimiter
),一个是速率限制器(RateLimiter
)。
这两个看似相似,实际上它们的功能是完全不同的。
并发限制器(ConcurrentLimiter
)是用来控制并发执行的请求个数的。
速率限制器(RateLimiter
)是用来限制每秒钟提交的请求数或限制每秒钟提交请求的字节数的。
这两个插件既可以用于客户端,也可以用于服务端。
并发限制器
TypeScript 版本的算法不涉及到多线程访问共享变量的问题,所以实现起来比较简单,下面是该插件实现的代码:
export class ConcurrentLimiter {
private counter: number = 0;
private tasks: Deferred<void>[] = [];
constructor(public readonly maxConcurrentRequests: number, public readonly timeout: number = 0) { }
public async acquire(): Promise<void> {
if (++this.counter <= this.maxConcurrentRequests) return;
const task = defer<void>();
this.tasks.push(task);
if (this.timeout > 0) {
const timeoutId = setTimeout(() => {
const index = this.tasks.indexOf(task);
if (index > -1) {
--this.counter;
this.tasks.splice(index, 1);
}
task.reject(new TimeoutError());
}, this.timeout);
task.promise.then(() => {
clearTimeout(timeoutId);
}, () => {
clearTimeout(timeoutId);
});
}
return task.promise;
}
public release(): void {
--this.counter;
const task = this.tasks.shift();
if (task) task.resolve();
}
public handler = async (name: string, args: any[], context: Context, next: NextInvokeHandler): Promise<any> => {
await this.acquire();
try {
return await next(name, args, context);
}
finally {
this.release();
}
}
}
counter
是一个全局的请求计数器。当请求到来时,该计数器加一,如果其计数值小于等于最大并发请求数(maxConcurrentRequests
),就会对请求放行。当该计数器超过最大并发请求数(maxConcurrentRequests
)时,则将请求挂起放入队列。每个已放行的请求,在执行完之后,计数器会减一,并从队列中取出一个挂起的请求放行,直到队列为空。
如果对挂起的请求设置了超时时间,那么当挂起的请求超过了超时时间,那么该请求会被从挂起请求队列中移除,并且抛出超时异常。
而 C# 版本相比 TypeScript 版本,则多了多线程并发的考虑,下面是 C# 版本的代码:
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
namespace Hprose.RPC.Plugins.Limiter {
public class ConcurrentLimiter {
private volatile int counter = 0;
private readonly ConcurrentQueue<TaskCompletionSource<bool>> tasks = new ConcurrentQueue<TaskCompletionSource<bool>>();
public int MaxConcurrentRequests { get; private set; }
public TimeSpan Timeout { get; private set; }
public ConcurrentLimiter(int maxConcurrentRequests, TimeSpan timeout = default) {
MaxConcurrentRequests = maxConcurrentRequests;
Timeout = timeout;
}
public async Task Acquire() {
if (Interlocked.Increment(ref counter) <= MaxConcurrentRequests) return;
var deferred = new TaskCompletionSource<bool>();
tasks.Enqueue(deferred);
if (Timeout > TimeSpan.Zero) {
using (CancellationTokenSource source = new CancellationTokenSource()) {
#if NET40
var timer = TaskEx.Delay(Timeout, source.Token);
var task = await TaskEx.WhenAny(timer, deferred.Task).ConfigureAwait(false);
#else
var timer = Task.Delay(Timeout, source.Token);
var task = await Task.WhenAny(timer, deferred.Task).ConfigureAwait(false);
#endif
source.Cancel();
if (task == timer) {
deferred.TrySetException(new TimeoutException());
}
}
}
await deferred.Task.ConfigureAwait(false);
}
public void Release() {
while (true) {
Interlocked.Decrement(ref counter);
if (tasks.TryDequeue(out var task)) {
if (task.TrySetResult(true)) return;
} else {
return;
}
}
}
public async Task<object> Handler(string name, object[] args, Context context, NextInvokeHandler next) {
await Acquire().ConfigureAwait(false);
try {
return await next(name, args, context).ConfigureAwait(false);
}
finally {
Release();
}
}
}
}
C# 版本的从算法上来说,跟 TypeScript 版本是一样的,只不过 counter
计数器在加一和减一时,使用了原子操作来保证计数的准确性。另外,因为要考虑多线程访问共享队列的问题,这里使用了 ConcurrentQueue
来保证安全性。最后一点不同就是对于超时的任务,在 TypeScript 版本中是在超时后直接从队列中移除的,但是在 C# 版本中,ConcurrentQueue
只能以先进先出的顺序进行访问,因此,C# 版本是在 Release
时才移除超时任务的。
开发者在实现其它语言版本的并发限制器时,也应该根据具体语言的特点来选择最合适的实现方案。
并发限制器在使用时应该注意,其设置的最大并发请求数(maxConcurrentRequests
)不应过小,尤其是当使用到推送、反向调用等插件时,因为这些插件会跟服务之间保持一个长时间不会返回结果的调用。另外,最大并发请求数设置过小,也会降低客户端并发请求的能力。
但是如果设置过大,或者没有设置的情况下(相当于无限大),并发调用的性能不一定会更高,反而可能会降低。在本人单机测试的情况下,客户端设置最大并发请求数(maxConcurrentRequests
)为 64 时,性能最优。
速率限制器
速率限制器的算法比并发限制器要复杂一些。但是代码并不多,下面是 TypeScript 版本的代码:
export class RateLimiter {
private readonly interval: number;
private next: number = Date.now();
constructor(public readonly permitsPerSecond: number, public readonly maxPermits: number = Infinity, public readonly timeout: number = 0) {
this.interval = 1000 / permitsPerSecond;
}
public async acquire(tokens: number = 1): Promise<number> {
const now = Date.now();
const last = this.next;
let permits = (now - last) / this.interval - tokens;
if (permits > this.maxPermits) {
permits = this.maxPermits;
}
this.next = now - permits * this.interval;
const delay = last - now;
if (delay <= 0) return last;
if (this.timeout > 0 && delay > this.timeout) {
throw new TimeoutError();
}
return new Promise<number>(function (resolve) {
setTimeout(resolve, delay, last);
});
}
public ioHandler = async (request: Uint8Array, context: Context, next: NextIOHandler): Promise<Uint8Array> => {
await this.acquire(request.length);
return next(request, context);
}
public invokeHandler = async (name: string, args: any[], context: Context, next: NextInvokeHandler): Promise<any> => {
await this.acquire();
return next(name, args, context);
}
}
ioHandler
是用来限制每秒钟请求的字节数的。而 inovkeHandler
则是用来限制每秒钟请求个数的。在使用时,应该为它们创建各自独立的 RateLimiter
对象。因为对于 ioHandler
来说,ReteLimiter
构造函数中的 permitsPerSecond
和 maxPermits
表示的是请求的字节数,而对于 inovkeHandler
来说,它们表示的是请求的个数。
该插件的核心算法在 acquire
方法中。该算法根据上次请求时计算出的当前请求的预期时间跟当前请求的实际时间的时间差来决定是立即执行,还是延时执行。并且根据该时间差和所需的令牌数来计算出剩余的许可数,如果剩余许可数超过所设置的最大许可数,则将剩余许可数设为最大许可数,最后根据当前时间和剩余许可数,计算并保存下次请求的预期时间。如果设置了请求超时时间,那么当计算出的延时执行时间超过请求超时时间时,直接抛出超时异常。
该插件的 C# 版本在算法上跟 TypeScript 完全一致,只是多了对多线程并发的考虑。下面是 C# 版本的代码:
using System;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
namespace Hprose.RPC.Plugins.Limiter {
public class RateLimiter {
private long next = DateTime.Now.Ticks;
private readonly double interval;
public long PermitsPerSecond { get; private set; }
public double MaxPermits { get; private set; }
public TimeSpan Timeout { get; private set; }
public RateLimiter(long permitsPerSecond, double maxPermits = double.PositiveInfinity, TimeSpan timeout = default) {
PermitsPerSecond = permitsPerSecond;
MaxPermits = maxPermits;
Timeout = timeout;
interval = (double)(new TimeSpan(0, 0, 1).Ticks) / permitsPerSecond;
}
public async Task<long> Acquire(long tokens = 1) {
var now = DateTime.Now.Ticks;
long last = Interlocked.Read(ref next);
double permits = (now - last) / interval - tokens;
if (permits > MaxPermits) {
permits = MaxPermits;
}
Interlocked.Exchange(ref next, now - (long)(permits * interval));
var delay = new TimeSpan(last - now);
if (delay <= TimeSpan.Zero) return last;
if (Timeout > TimeSpan.Zero && delay > Timeout) {
throw new TimeoutException();
}
#if NET40
await TaskEx.Delay(delay).ConfigureAwait(false);
#else
await Task.Delay(delay).ConfigureAwait(false);
#endif
return last;
}
public async Task<Stream> IOHandler(Stream request, Context context, NextIOHandler next) {
if (!request.CanSeek) {
request = await request.ToMemoryStream().ConfigureAwait(false);
}
await Acquire(request.Length).ConfigureAwait(false);
return await next(request, context).ConfigureAwait(false);
}
public async Task<object> InvokeHandler(string name, object[] args, Context context, NextInvokeHandler next) {
await Acquire().ConfigureAwait(false);
return await next(name, args, context).ConfigureAwait(false);
}
}
}
C# 版本对于下次请求预期时间(next
)的操作,需要使用原子操作来读取和保存。而 .NET Compact Framework 不支持对 long 类型的数据进行原子操作,因此该插件不支持 .NET Compact Framework 3.5。
原子操作指令速度非常快,所以该插件性能是很高的。
使用实例
下面我们以 C# 版本为例,来说明一下如何使用:
using Hprose.RPC;
using Hprose.RPC.Plugins.Limiter;
using System;
using System.Net;
using System.Threading.Tasks;
class MyService {
public int Sum(int x, int y) {
return x + y;
}
}
public interface IMyService {
Task<int> Sum(int x, int y);
}
class Program {
static async Task RunClient() {
var client = new Client("http://127.0.0.1:8080/");
client.Use(new ConcurrentLimiter(64).Handler).Use(new RateLimiter(2000).InvokeHandler);
var begin = DateTime.Now;
var proxy = client.UseService<IMyService>();
var n = 5000;
var tasks = new Task<int>[n];
for (int i = 0; i < n; ++i) {
tasks[i] = proxy.Sum(i, i);
}
await Task.WhenAll(tasks);
var end = DateTime.Now;
Console.WriteLine(end - begin);
}
static void Main(string[] args) {
HttpListener server = new HttpListener();
server.Prefixes.Add("http://127.0.0.1:8080/");
server.Start();
var service = new Service();
service.AddInstanceMethods(new MyService()).Bind(server);
RunClient().Wait();
server.Stop();
}
}
该程序执行时间大约是 2.5 秒钟,因为 RateLimiter
限制了请求速率为每秒钟 2000 个请求,一共有 5000 个请求,这与计算得出的结果是相符的,如果我们把该例子中的 new RateLimiter(2000)
换成 new RateLimiter(5000)
,执行时间差不多就会变为 1 秒钟左右。说明请求的发送速度确实被 RateLimiter
限制了。
但是请求的速率不仅仅是由 RateLimiter
来决定的,比如我们如果把 new RateLimiter(2000)
换成 new RateLimiter(20000)
,我们会发现执行时间并不会缩短到 0.25 秒左右。因为客户端请求的速率本身也会受制于带宽、传输协议等各种因素的制约,如果设置的速率已经超过客户端本身所能达到的速率的最大值的话,那么 RateLimiter
的限制就无效了。
对于不同的传输协议,客户端请求的速率差别也很大。如果把上面例子中的 HTTP 服务器和客户端换成 TCP 服务器和客户端,那么客户端请求速率差不多会有将近 10 倍的提升。
但是在这个例子中,之所以没有使用 TCP 传输,而使用 HTTP 传输,是为了验证 ConcurrentLimiter
插件是否正常工作。
比如把上面例子中的 .Use(new ConcurrentLimiter(64).Handler)
这部分语句拿掉的话,再执行该程序,我们会发现程序会报告下面的错误:
Unhandled Exception: System.AggregateException: One or more errors occurred. (An error occurred while sending the request.) ---> System.Net.Http.HttpRequestException: An error occurred while sending the request. ---> System.IO.IOException: Unable to read data from the transport connection: 远程主机强迫关闭了一个现有的连接。. ---> System.Net.Sockets.SocketException: 远程主机强迫关闭了一个现有的连接。
--- End of inner exception stack trace ---
at System.Net.Sockets.Socket.AwaitableSocketAsyncEventArgs.ThrowException(SocketError error)
at System.Net.Sockets.Socket.AwaitableSocketAsyncEventArgs.GetResult(Int16 token)
at System.Net.Http.HttpConnection.FillAsync()
at System.Net.Http.HttpConnection.ReadNextResponseHeaderLineAsync(Boolean foldedHeadersAllowed)
at System.Net.Http.HttpConnection.SendAsyncCore(HttpRequestMessage request, CancellationToken cancellationToken)
--- End of inner exception stack trace ---
at System.Net.Http.HttpConnection.SendAsyncCore(HttpRequestMessage request, CancellationToken cancellationToken)
at System.Net.Http.HttpConnectionPool.SendWithNtConnectionAuthAsync(HttpConnection connection, HttpRequestMessage request, Boolean doRequestAuth, CancellationToken cancellationToken)
at System.Net.Http.HttpConnectionPool.SendWithRetryAsync(HttpRequestMessage request, Boolean doRequestAuth, CancellationToken cancellationToken)
at System.Net.Http.RedirectHandler.SendAsync(HttpRequestMessage request, CancellationToken cancellationToken)
at System.Net.Http.HttpClient.FinishSendAsyncBuffered(Task`1 sendTask, HttpRequestMessage request, CancellationTokenSource cts, Boolean disposeCts)
at Hprose.RPC.HttpTransport.Transport(Stream request, Context context)
at Hprose.RPC.Client.Transport(Stream request, Context context)
at Hprose.RPC.Client.Call(String fullname, Object[] args, Context context)
at Hprose.RPC.Client.InvokeAsync[T](String fullname, Object[] args, ClientContext context)
at Program.RunClient() in D:\Git\hprose\hprose-dotnet\examples\Limiter\Program.cs:line 29
--- End of inner exception stack trace ---
at System.Threading.Tasks.Task.Wait(Int32 millisecondsTimeout, CancellationToken cancellationToken)
at Program.Main(String[] args) in D:\Git\hprose\hprose-dotnet\examples\Limiter\Program.cs:line 40
原因很容易理解,那就是在不限制并发请求数的情况下,已发送的请求如果尚未返回响应,那么这些请求就会一直占用着连接。新的请求如果要发送,就需要开启新的连接,但是 HttpClient
内置的连接池是有上限的,一旦开启的连接数超过上限,连接就会被断开,也就出现了上面的错误。而使用了 ConcurrentLimiter
的情况下,最大连接数就被限制在 ConcurrentLimiter
所设置的参数范围内,当请求数超过最大连接数后,后面的请求会在前面的请求返回响应之后,再复用这些已经开启的连接,而不会开启新的连接,这样就不会出现上面的错误了,而且在一定程度上还会提高并发请求的性能,因为使用已有的连接比新建连接更加高效。
该实例如果换成 TCP 传输的话,即使不设置 ConcurrentLimiter
也不会出现类似于上面的错误。因为 hprose 的 TCP 客户端跟服务器通讯时,只需要建立一个连接,所有的请求和响应都是通过这一个连接进行发送和接收的。Hprose 的 TCP 连接是全双工通讯的,因此,即使只有一个连接,也可以实现多路复用的并发请求调用。但就算使用 TCP 传输,如果设置了 ConcurrentLimiter
插件,并将并发请求数设置为一个合理数值的情况下,客户端的性能仍然比不设置的情况下有微弱的提升。
对于其它语言的来说,该插件的使用方式是类似的,这里就不在单独举例说明了。