我需要向一个外部api发送数据,但是这个API对每个endpoint的请求有一个限制(即:每分钟60个请求)。
数据来自 Kafka,然后每条消息都会转到 redis(因为我可以发送包含 200 个项目的请求)。因此,我使用简单的缓存来帮助我,我可以保证如果我的服务器出现故障,我不会丢失任何消息。
问题是,有些时候,Kafka开始向许多消息发送消息,然后redis开始增长(超过100万条消息发送到API),并且我们不能在消息传入时太快地发出请求。然后,我们有一个很大的延迟。
我的第一个代码很简单:执行器服务执行器 = 执行器.new固定线程池(1);
当消息很少并且延迟最小时,这非常有效。
因此,我做的第一件事是将executor更改为:ExecutorService executor=executor。新缓存线程池()
因此,我可以要求新线程,因为我需要更快地向外部api发出请求,但我遇到了每分钟请求数限制的问题。
有一些endpoint,我可以每分钟发出300个请求,其他500个,其他30个,依此类推。
我做的代码不是很好,这是为了我工作的公司,所以,我真的需要把它做得更好。
因此,每次我要请求外部api时,我都会调用makeRequest方法,这个方法是同步的,我知道我可以使用同步列表,但我认为同步方法在这种情况下更好。
// This is an inner class
private static class IntegrationType {
final Queue<Long> requests; // This queue is used to store the timestamp of the requests
final int maxRequestsPerMinute; // How many requests I can make per minute
public IntegrationType(final int maxRequestsPerMinute) {
this.maxRequestsPerMinute = maxRequestsPerMinute;
this.requests = new LinkedList<>();
}
synchronized void makeRequest() {
final long current = System.currentTimeMillis();
requests.add(current);
if (requests.size() >= maxRequestsPerMinute) {
long first = requests.poll(); // gets the first request
// The difference between the current request and the first request of the queue
final int differenceInSeconds = (int) (current - first) / 1000;
// if the difference is less than the maximum allowed
if (differenceInSeconds <= 60) {
// seconds to sleep.
final int secondsToSleep = 60 - differenceInSeconds;
sleep(secondsToSleep);
}
}
}
void sleep( int seconds){
try {
Thread.sleep(seconds * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
那么,有一个数据结构,我可以用吗?我应该考虑哪些因素?
提前致谢。
我实现了一些不同于@gthanop建议的东西。
我发现,限制可能会改变。因此,我可能需要增加或缩小阻止列表。另一个原因是,不太容易使我们当前的代码适应这种情况。另一个,我们可能会使用多个实例,所以我们需要一个分布式锁。
所以,我实现一些更容易的东西,但不如@ghtanop的答案有效。
这是我的代码(改编,因为我无法显示公司代码):
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.ScheduledExecutorService;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;
public class Teste {
private static enum ExternalApi {
A, B, C;
}
private static class RequestManager {
private long firstRequest; // First request in one minute
// how many request have we made
private int requestsCount = 0;
// A timer thread, it will execute at every minute, it will refresh the request count and the first request time
private final ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
RequestManager() {
final long initialDelay = 0L;
final long fixedRate = 60;
executor.scheduleAtFixedRate(() -> {
System.out.println("Clearing the current count!");
requestsCount = 0;
firstRequest = System.currentTimeMillis();
}, initialDelay, fixedRate, TimeUnit.SECONDS);
}
void incrementRequest() {
requestsCount++;
}
long getFirstRequest() {
return firstRequest;
}
boolean requestsExceeded(final int requestLimit) {
return requestsCount >= requestLimit;
}
}
public static class RequestHelper {
private static final byte SECONDS_IN_MINUTE = 60;
private static final short MILLISECONDS_IN_SECOND = 1000;
private static final byte ZERO_SECONDS = 0;
// Table to support the time, and count of the requests
private final Map<Integer, RequestManager> requests;
// Table that contains the limits of each type of request
private final Map<Integer, Integer> requestLimits;
/**
* We need an array of semaphores, because, we might lock the requests for ONE, but not for TWO
*/
private final Semaphore[] semaphores;
private RequestHelper(){
// one semaphore for type
semaphores = new Semaphore[ExternalApi.values().length];
requests = new ConcurrentHashMap<>();
requestLimits = new HashMap<>();
for (final ExternalApi type : ExternalApi.values()) {
// Binary semaphore, must be fair, because we are updating things.
semaphores[type.ordinal()] = new Semaphore(1, true);
}
}
/**
* When my token expire, I must update this, because the limits might change.
* @param limits the new api limits
*/
protected void updateLimits(final Map<ExternalApi, Integer> limits) {
limits.forEach((key, value) -> requestLimits.put(key.ordinal(), value));
}
/**
* Increments the counter for the type of the request,
* Using the mutual exclusion lock, we can handle and block other threads that are trying to
* do a request to the api.
* If the incoming requests are going to exceed the maximum, we will make the thread sleep for N seconds ( 60 - time since first request)
* since we are using a Binary Semaphore, it will block incoming requests until the thread that is sleeping, wakeup and release the semaphore lock.
*
* @param type of the integration, Supp, List, PET etc ...
*/
protected final void addRequest(final ExternalApi type) {
// the index of this request
final int requestIndex = type.ordinal();
// we get the permit for the semaphore of the type
final Semaphore semaphore = semaphores[requestIndex];
// Try to acquire a permit, if no permit is available, it will block until one is available.
semaphore.acquireUninterruptibly();
///gets the requestManager for the type
final RequestManager requestManager = getRequest(requestIndex);
// increments the number of requests
requestManager.incrementRequest();
if (requestManager.requestsExceeded(requestLimits.get(type.ordinal()))) {
// the difference in seconds between a minute - the time that we needed to reach the maximum of requests
final int secondsToSleep = SECONDS_IN_MINUTE - (int) (System.currentTimeMillis() - requestManager.getFirstRequest()) / MILLISECONDS_IN_SECOND;
// We reached the maximum in less than a minute
if (secondsToSleep > ZERO_SECONDS) {
System.out.printf("We reached the maximum of: %d per minute by: %s. We must wait for: %d before make a new request!\n", requestLimits.get(type.ordinal()), type.name(), secondsToSleep);
sleep(secondsToSleep * MILLISECONDS_IN_SECOND);
}
}
// releases the semaphore
semaphore.release();
}
private final void sleep(final long time) {
try {
Thread.sleep(time);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* Gets the first Request Manager, if it is the first request, it will create the
* RequestManager object
* @param index
* @return a RequestManager instance
*/
private RequestManager getRequest(final int index) {
RequestManager request = requests.get(index);
if(request == null) {
request = new RequestManager();
requests.put(index, request);
}
return request;
}
}
public static void main(String[] args) {
final RequestHelper requestHelper = new RequestHelper();
final Map<ExternalApi, Integer> apiLimits = Map.of(ExternalApi.A, 30, ExternalApi.B, 60, ExternalApi.C, 90);
// update the limits
requestHelper.updateLimits(apiLimits);
final ScheduledExecutorService executor = Executors.newScheduledThreadPool(3);
executor.scheduleWithFixedDelay(() -> {
System.out.println("A new request is going to happen");
requestHelper.addRequest(ExternalApi.A);
sleep(65);
}, 0, 100, TimeUnit.MILLISECONDS);
executor.scheduleWithFixedDelay(() -> {
System.out.println("B new request is going to happen");
requestHelper.addRequest(ExternalApi.B);
sleep(50);
}, 0, 200, TimeUnit.MILLISECONDS);
executor.scheduleWithFixedDelay(() -> {
System.out.println("C new request is going to happen");
requestHelper.addRequest(ExternalApi.C);
sleep(30);
}, 0, 300, TimeUnit.MILLISECONDS);
}
private static final void sleep(final long time) {
try {
Thread.sleep(time);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
如果正确理解了您的问题,则可以将阻止队列
与计划卓越服务结合使用,
如下所示。
BlockingQueue
s具有方法put
,如果有可用空间,该方法将只在队列中添加给定元素,否则方法调用将等待(直到有可用空间)。它们还有一个方法take
,该方法只在队列中有元素时才从队列中移除元素,否则方法调用将等待(直到至少有一个元素要获取)。
具体来说,您可以使用链接块队列
或数组阻止队列,
它们可以在任何给定时间以固定大小的元素来保存。这个固定的大小意味着你可以提交任意数量的
请求,但你只会接受
请求并每秒处理一次(例如,每分钟发出60个请求)。
要实例化具有固定大小的LinkedBlockingQueue
,只需使用相应的构造函数(它接受大小作为参数)LinkedBlockingQueue
将根据其文档以FIFO顺序获取elements。
若要实例化具有固定大小的< code>ArrayBlockingQueue,请使用接受大小以及名为< code>fair的< code>boolean标志的构造函数。如果该标志为< code >真,那么队列也将按FIFO顺序< code >取元素。
然后,您可以有一个< code > ScheduledExecutorService (而不是在循环中等待),您可以提交一个< code>Runnable,它将从队列中< code>take,与外部API进行通信,然后等待通信之间所需的延迟。
下面是上面的一个简单演示示例:
import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class Main {
public static class RequestSubmitter implements Runnable {
private final BlockingQueue<Request> q;
public RequestSubmitter(final BlockingQueue<Request> q) {
this.q = Objects.requireNonNull(q);
}
@Override
public void run() {
try {
q.put(new Request()); //Will block until available capacity.
}
catch (final InterruptedException ix) {
System.err.println("Interrupted!"); //Not expected to happen under normal use.
}
}
}
public static class Request {
public void make() {
try {
//Let's simulate the communication with the external API:
TimeUnit.MILLISECONDS.sleep((long) (Math.random() * 100));
}
catch (final InterruptedException ix) {
//Let's say here we failed to communicate with the external API...
}
}
}
public static class RequestImplementor implements Runnable {
private final BlockingQueue<Request> q;
public RequestImplementor(final BlockingQueue<Request> q) {
this.q = Objects.requireNonNull(q);
}
@Override
public void run() {
try {
q.take().make(); //Will block until there is at least one element to take.
System.out.println("Request made.");
}
catch (final InterruptedException ix) {
//Here the 'taking' from the 'q' is interrupted.
}
}
}
public static void main(final String[] args) throws InterruptedException {
/*The following initialization parameters specify that we
can communicate with the external API 60 times per 1 minute.*/
final int maxRequestsPerTime = 60;
final TimeUnit timeUnit = TimeUnit.MINUTES;
final long timeAmount = 1;
final BlockingQueue<Request> q = new ArrayBlockingQueue<>(maxRequestsPerTime, true);
//final BlockingQueue<Request> q = new LinkedBlockingQueue<>(maxRequestsPerTime);
//Submit some RequestSubmitters to the pool...
final ExecutorService pool = Executors.newFixedThreadPool(100);
for (int i = 0; i < 50_000; ++i)
pool.submit(new RequestSubmitter(q));
System.out.println("Serving...");
//Find out the period between communications with the external API:
final long delayMicroseconds = TimeUnit.MICROSECONDS.convert(timeAmount, timeUnit) / maxRequestsPerTime;
//We could do the same with NANOSECONDS for more accuracy, but that would be overkill I think.
//The most important line probably:
Executors.newSingleThreadScheduledExecutor().scheduleWithFixedDelay(new RequestImplementor(q), 0L, delayMicroseconds, TimeUnit.MICROSECONDS);
}
}
请注意,我使用了计划与固定延迟
,而不是计划AtFixedRate
。您可以在他们的文档中看到,第一个将等待提交的Runnable
调用结束之间的延迟来启动下一个,而第二个将不会等待,只需重新提交Runnable
每个时间段
单位。但是我们不知道与外部 API 通信需要多长时间,因此,如果我们将AtFixedRate安排
为每分钟一次
,但请求需要一分多钟才能完成,该怎么办?...然后,在第一个请求尚未完成时,将提交新的请求。这就是为什么我使用计划与固定的Delay
而不是计划AtFixedRate
。但还有更多:我使用了单线程调度执行器服务。这是否意味着如果第一个调用未完成,则无法启动第二个调用?...好吧,如果你看一下执行器#new的实现单线程计划演示者()
,可能会发生第二次调用,因为单线程核心池的大小,并不意味着池是固定大小的。
我使用schduleWithFixedDelay
的另一个原因是请求的下限溢位。例如,队列为空怎么办?然后调度也应该等待,不要再次提交Runnable
。
另一方面,如果我们使用 scheduleWithFixedDelay
,比如说调度之间的延迟为 1/60f
秒,并且在一分钟内提交了超过 60 个请求,那么这肯定会使我们对外部 API 的吞吐量下降,因为使用 scheduleWithFixedDelay
,我们可以保证最多向外部 API 发出 60 个请求。它可以小于此,但我们不希望它如此。我们希望每次都达到极限。如果您不关心这一点,那么您已经可以使用上述实现。
但是,假设您确实希望每次都尽可能接近限制,在这种情况下,据我所知,您可以使用自定义调度程序来实现这一点,这将是一种比第一种更简洁的解决方案,但更精确的时间。
归根结底,在上述实现中,您需要确保与外部API的通信以尽可能快地满足请求。
最后,我应该警告您,如果我建议的BlockingQueue
实现不是按FIFO顺序放入,我无法找到会发生什么。我的意思是,如果两个请求几乎同时到达,而队列已满,该怎么办?他们都会等待,但第一个到达的人会等待并首先得到
ed?我不知道。如果您不关心外部API发出的一些请求是否无序,那么请不要担心,并使用到目前为止的代码。然而,如果您真的在意,并且您能够在每个请求中输入序列号,那么您可以在put
ed,还是第二个先得到
BlockingQueue
之后使用PriorityQueue>/code>,甚至可以尝试使用
发布相关代码。至少我已经尽力了,我希望我能抛出一些好主意。我并不是说这篇帖子是你所有问题的完整解决方案,但它是一些需要考虑的因素。
我有一个Web服务器,它只支持一个非常简单的API -计算在最后一小时,每分钟和每秒收到的请求数。该服务器在世界上非常受欢迎,每秒接收数千个请求。 目标是找到如何准确地将这3个值返回到每个请求? 请求一直在到来,因此每个请求的一小时、一分钟和一秒的窗口是不同的。如何管理每个请求的不同窗口,以便每个请求的计数都是正确的?
问题内容: 如何使用aiohttp在客户端设置每秒最大请求数(限制请求数)? 问题答案: 我在这里找到了一种可能的解决方案:http : //compiletoi.net/fast-scraping-in-python-with- asyncio.html 同时执行3个请求很酷,但是同时执行5000个则不太好。如果您尝试同时执行太多请求,则连接可能会开始关闭,甚至可能被网站禁止。 为避免这种情况,
问题内容: 如何在特定时间段内调用Ajax请求?我应该使用Timer插件还是jQuery为此提供插件? 问题答案: 您可以使用内置的javascript setInterval。 或者如果您是更简洁的类型…
我正在尝试绘制每分钟调用 API 的次数。使用Coda Hale指标和石墨,我可以看到一个连续的计数(即如果API被调用5次,它将在时间上显示一条恒定的线,以获得值5),或者我可以看到每分钟的指数加权移动平均线。但我想看到的只是每分钟调用 API 的次数。我尝试过使用Coda Hale的计数器,计时器和仪表,也尝试过使用石墨渲染函数,但无法获得我想要的东西。 以前有人这样做过吗?如果这是一个基本问
我有一个Spring boot应用程序,使用测微计抛出开放的公制统计数据。 对于我的每个HTTPendpoint,我可以看到以下指标,我相信它跟踪给定endpoint的请求数: http\U server\u requests\u seconds\u count 我的问题是,如何在Grafana查询中使用它来表示每分钟调用我的endpoint的请求数? 我试过了 http\u client\u r
我的特定用例是,我想使用redis速率限制器在spring cloud gateway实现速率限制,例如,对于特定路由,每分钟4个请求。 但它并没有像预期的那样工作,一旦一个请求完成,它就会自动填充存储桶。使用了spring cloud论坛提供的相同实现,但无法限制我对特定时间窗口的请求: 通过将RequiredRate设置为所需的请求数、requestedTokens设置为以秒为单位的时间跨度、