当前位置: 首页 > 知识库问答 >
问题:

用于管理api每分钟最大请求数的数据结构

吴伟志
2023-03-14

我需要向一个外部api发送数据,但是这个API对每个endpoint的请求有一个限制(即:每分钟60个请求)。

数据来自 Kafka,然后每条消息都会转到 redis(因为我可以发送包含 200 个项目的请求)。因此,我使用简单的缓存来帮助我,我可以保证如果我的服务器出现故障,我不会丢失任何消息。

问题是,有些时候,Kafka开始向许多消息发送消息,然后redis开始增长(超过100万条消息发送到API),并且我们不能在消息传入时太快地发出请求。然后,我们有一个很大的延迟。

我的第一个代码很简单:执行器服务执行器 = 执行器.new固定线程池(1);
当消息很少并且延迟最小时,这非常有效。

因此,我做的第一件事是将executor更改为:ExecutorService executor=executor。新缓存线程池()
因此,我可以要求新线程,因为我需要更快地向外部api发出请求,但我遇到了每分钟请求数限制的问题。

有一些endpoint,我可以每分钟发出300个请求,其他500个,其他30个,依此类推。

我做的代码不是很好,这是为了我工作的公司,所以,我真的需要把它做得更好。

因此,每次我要请求外部api时,我都会调用makeRequest方法,这个方法是同步的,我知道我可以使用同步列表,但我认为同步方法在这种情况下更好。

// This is an inner class
private static class IntegrationType {

    final Queue<Long> requests; // This queue is used to store the timestamp of the requests
    final int maxRequestsPerMinute; // How many requests I can make per minute

    public IntegrationType(final int maxRequestsPerMinute) {
        this.maxRequestsPerMinute = maxRequestsPerMinute;
        this.requests = new LinkedList<>();
    }

    synchronized void makeRequest() {
        final long current = System.currentTimeMillis();
        requests.add(current);
        if (requests.size() >= maxRequestsPerMinute) {
            long first = requests.poll(); // gets the first request

            // The difference between the current request and the first request of the queue
            final int differenceInSeconds = (int) (current - first) / 1000;
           
            // if the difference is less than the maximum allowed
            if (differenceInSeconds <= 60) {
                // seconds to sleep.
                final int secondsToSleep = 60 - differenceInSeconds;
                sleep(secondsToSleep);
            }
        }
    }

     void sleep( int seconds){
        try {
            Thread.sleep(seconds * 1000);
         } catch (InterruptedException e) {
            e.printStackTrace();
         }
     }
}

那么,有一个数据结构,我可以用吗?我应该考虑哪些因素?

提前致谢。

共有2个答案

满博
2023-03-14

我实现了一些不同于@gthanop建议的东西。

我发现,限制可能会改变。因此,我可能需要增加或缩小阻止列表。另一个原因是,不太容易使我们当前的代码适应这种情况。另一个,我们可能会使用多个实例,所以我们需要一个分布式锁。

所以,我实现一些更容易的东西,但不如@ghtanop的答案有效。

这是我的代码(改编,因为我无法显示公司代码):

import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.ScheduledExecutorService;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;

public class Teste {
    
    private static enum ExternalApi {    
        A, B, C;
    }

    private static class RequestManager {

        private long firstRequest; // First request in one minute
    
        // how many request have we made
        private int requestsCount = 0;
    
        // A timer thread, it will execute at every minute, it will refresh the request count and the first request time
        private final ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
    
        RequestManager() {
            final long initialDelay = 0L;
            final long fixedRate = 60;
    
            executor.scheduleAtFixedRate(() -> {
                System.out.println("Clearing the current count!");
                requestsCount = 0;
                firstRequest = System.currentTimeMillis();
            }, initialDelay, fixedRate, TimeUnit.SECONDS);
        }
    
        void incrementRequest() {
            requestsCount++;
        }
    
        long getFirstRequest() {
            return firstRequest;
        }
    
    
        boolean requestsExceeded(final int requestLimit) {
            return requestsCount >= requestLimit;
        }
    
    }

    public static class RequestHelper {

        private static final byte SECONDS_IN_MINUTE = 60;
        private static final short MILLISECONDS_IN_SECOND = 1000;
        private static final byte ZERO_SECONDS = 0;
    
        // Table to support the time, and count of the requests
        private final Map<Integer, RequestManager> requests;
    
        // Table that contains the limits of each type of request
        private final Map<Integer, Integer> requestLimits;
    
        /**
         * We need an array of semaphores, because, we might lock the requests for ONE, but not for TWO
         */
        private final Semaphore[] semaphores;
    
        private RequestHelper(){
    
            // one semaphore for type
            semaphores = new Semaphore[ExternalApi.values().length];
            requests = new ConcurrentHashMap<>();
            requestLimits = new HashMap<>();
    
            for (final ExternalApi type : ExternalApi.values()) {

                // Binary semaphore, must be fair, because we are updating things.
                semaphores[type.ordinal()] = new Semaphore(1, true);
            }
        }
    
        /**
         * When my token expire, I must update this, because the limits might change.
         * @param limits the new api limits
         */
        protected void updateLimits(final Map<ExternalApi, Integer> limits) {
            limits.forEach((key, value) -> requestLimits.put(key.ordinal(), value));
        }
    
    
        /**
         * Increments the counter for the type of the request,
         * Using the mutual exclusion lock, we can handle and block other threads that are trying to
         * do a request to the api.
         * If the incoming requests are going to exceed the maximum, we will make the thread sleep for N seconds ( 60 - time since first request)
         * since we are using a Binary Semaphore, it will block incoming requests until the thread that is sleeping, wakeup and release the semaphore lock.
         *
         * @param type of the integration, Supp, List, PET etc ...
         */
        protected final void addRequest(final ExternalApi type) {
    
            // the index of this request
            final int requestIndex = type.ordinal();
    
            // we get the permit for the semaphore of the type
            final Semaphore semaphore = semaphores[requestIndex];
    
            // Try to acquire a permit, if no permit is available, it will block until one is available.
            semaphore.acquireUninterruptibly();
    
            ///gets the requestManager for the type
            final RequestManager requestManager = getRequest(requestIndex);
    
            // increments the number of requests
            requestManager.incrementRequest();
    
            if (requestManager.requestsExceeded(requestLimits.get(type.ordinal()))) {
    
                // the difference in seconds between a minute - the time that we needed to reach the maximum of requests
                final int secondsToSleep = SECONDS_IN_MINUTE - (int) (System.currentTimeMillis() - requestManager.getFirstRequest()) / MILLISECONDS_IN_SECOND;
    
                // We reached the maximum in less than a minute
                if (secondsToSleep > ZERO_SECONDS) {
                    System.out.printf("We reached the maximum of: %d per minute by: %s. We must wait for: %d before make a new request!\n", requestLimits.get(type.ordinal()), type.name(), secondsToSleep);
                    sleep(secondsToSleep * MILLISECONDS_IN_SECOND);
                }
            }
            // releases the semaphore
            semaphore.release();
        }
    
    
        private final void sleep(final long time) {
            try {
                Thread.sleep(time);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    
        /**
         * Gets the first Request Manager, if it is the first request, it will create the
         * RequestManager object
         * @param index
         * @return a RequestManager instance
         */
        private RequestManager getRequest(final int index) {
            RequestManager request = requests.get(index);
            if(request == null) {
                request = new RequestManager();
                requests.put(index, request);
            }
            return request;
        }
    }

    public static void main(String[] args) {
        
        final RequestHelper requestHelper = new RequestHelper();
        
        final Map<ExternalApi, Integer> apiLimits = Map.of(ExternalApi.A, 30, ExternalApi.B, 60, ExternalApi.C, 90);
        
        // update the limits
        requestHelper.updateLimits(apiLimits);

        final ScheduledExecutorService executor = Executors.newScheduledThreadPool(3);
        executor.scheduleWithFixedDelay(() -> {
            System.out.println("A new request is going to happen");
            requestHelper.addRequest(ExternalApi.A);
            sleep(65);
        }, 0, 100, TimeUnit.MILLISECONDS);

        executor.scheduleWithFixedDelay(() -> {
            System.out.println("B new request is going to happen");
            requestHelper.addRequest(ExternalApi.B);
            sleep(50);
        }, 0, 200, TimeUnit.MILLISECONDS);

        executor.scheduleWithFixedDelay(() -> {
            System.out.println("C new request is going to happen");
            requestHelper.addRequest(ExternalApi.C);
            sleep(30);
        }, 0, 300, TimeUnit.MILLISECONDS);

    }
    
    
    private static final void sleep(final long time) {
        try {
            Thread.sleep(time);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    } 
}
孔鸿哲
2023-03-14

如果正确理解了您的问题,则可以将阻止队列计划卓越服务结合使用,如下所示。

BlockingQueues具有方法put,如果有可用空间,该方法将只在队列中添加给定元素,否则方法调用将等待(直到有可用空间)。它们还有一个方法take,该方法只在队列中有元素时才从队列中移除元素,否则方法调用将等待(直到至少有一个元素要获取)。

具体来说,您可以使用链接块队列数组阻止队列,它们可以在任何给定时间以固定大小的元素来保存。这个固定的大小意味着你可以提交任意数量的请求,但你只会接受请求并每秒处理一次(例如,每分钟发出60个请求)。

实例化具有固定大小的LinkedBlockingQueue,只需使用相应的构造函数(它接受大小作为参数LinkedBlockingQueue将根据其文档以FIFO顺序获取elements。

若要实例化具有固定大小的< code>ArrayBlockingQueue,请使用接受大小以及名为< code>fair的< code>boolean标志的构造函数。如果该标志为< code >真,那么队列也将按FIFO顺序< code >取元素。

然后,您可以有一个< code > ScheduledExecutorService (而不是在循环中等待),您可以提交一个< code>Runnable,它将从队列中< code>take,与外部API进行通信,然后等待通信之间所需的延迟。

下面是上面的一个简单演示示例:

import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class Main {
    
    public static class RequestSubmitter implements Runnable {
        private final BlockingQueue<Request> q;
        
        public RequestSubmitter(final BlockingQueue<Request> q) {
            this.q = Objects.requireNonNull(q);
        }
        
        @Override
        public void run() {
            try {
                q.put(new Request()); //Will block until available capacity.
            }
            catch (final InterruptedException ix) {
                System.err.println("Interrupted!"); //Not expected to happen under normal use.
            }
        }
    }
    
    public static class Request {
        public void make() {
            try {
                //Let's simulate the communication with the external API:
                TimeUnit.MILLISECONDS.sleep((long) (Math.random() * 100));
            }
            catch (final InterruptedException ix) {
                //Let's say here we failed to communicate with the external API...
            }
        }
    }
    
    public static class RequestImplementor implements Runnable {
        private final BlockingQueue<Request> q;
        
        public RequestImplementor(final BlockingQueue<Request> q) {
            this.q = Objects.requireNonNull(q);
        }
        
        @Override
        public void run() {
            try {
                q.take().make(); //Will block until there is at least one element to take.
                System.out.println("Request made.");
            }
            catch (final InterruptedException ix) {
                //Here the 'taking' from the 'q' is interrupted.
            }
        }
    }
    
    public static void main(final String[] args) throws InterruptedException {
        
        /*The following initialization parameters specify that we
        can communicate with the external API 60 times per 1 minute.*/
        final int maxRequestsPerTime = 60;
        final TimeUnit timeUnit = TimeUnit.MINUTES;
        final long timeAmount = 1;
        
        final BlockingQueue<Request> q = new ArrayBlockingQueue<>(maxRequestsPerTime, true);
        //final BlockingQueue<Request> q = new LinkedBlockingQueue<>(maxRequestsPerTime);
        
        //Submit some RequestSubmitters to the pool...
        final ExecutorService pool = Executors.newFixedThreadPool(100);
        for (int i = 0; i < 50_000; ++i)
            pool.submit(new RequestSubmitter(q));
        
        System.out.println("Serving...");
        
        //Find out the period between communications with the external API:
        final long delayMicroseconds = TimeUnit.MICROSECONDS.convert(timeAmount, timeUnit) / maxRequestsPerTime;
        //We could do the same with NANOSECONDS for more accuracy, but that would be overkill I think.
        
        //The most important line probably:
        Executors.newSingleThreadScheduledExecutor().scheduleWithFixedDelay(new RequestImplementor(q), 0L, delayMicroseconds, TimeUnit.MICROSECONDS);
    }
}

请注意,我使用了计划与固定延迟,而不是计划AtFixedRate。您可以在他们的文档中看到,第一个将等待提交的Runnable调用结束之间的延迟来启动下一个,而第二个将不会等待,只需重新提交Runnable每个时间段单位。但是我们不知道与外部 API 通信需要多长时间,因此,如果我们将AtFixedRate安排为每分钟一次,但请求需要一分多钟才能完成,该怎么办?...然后,在第一个请求尚未完成时,将提交新的请求。这就是为什么我使用计划与固定的Delay而不是计划AtFixedRate。但还有更多:我使用了单线程调度执行器服务。这是否意味着如果第一个调用未完成,则无法启动第二个调用?...好吧,如果你看一下执行器#new的实现单线程计划演示者(),可能会发生第二次调用,因为单线程核心池的大小,并不意味着池是固定大小的。

我使用schduleWithFixedDelay的另一个原因是请求的下限溢位。例如,队列为空怎么办?然后调度也应该等待,不要再次提交Runnable

另一方面,如果我们使用 scheduleWithFixedDelay,比如说调度之间的延迟为 1/60f 秒,并且在一分钟内提交了超过 60 个请求,那么这肯定会使我们对外部 API 的吞吐量下降,因为使用 scheduleWithFixedDelay,我们可以保证最多向外部 API 发出 60 个请求。它可以小于此,但我们不希望它如此。我们希望每次都达到极限。如果您不关心这一点,那么您已经可以使用上述实现。

但是,假设您确实希望每次都尽可能接近限制,在这种情况下,据我所知,您可以使用自定义调度程序来实现这一点,这将是一种比第一种更简洁的解决方案,但更精确的时间。

归根结底,在上述实现中,您需要确保与外部API的通信以尽可能快地满足请求。

最后,我应该警告您,如果我建议的BlockingQueue实现不是按FIFO顺序放入,我无法找到会发生什么。我的意思是,如果两个请求几乎同时到达,而队列已满,该怎么办?他们都会等待,但第一个到达的人会等待并首先得到puted,还是第二个先得到 put ed?我不知道。如果您不关心外部API发出的一些请求是否无序,那么请不要担心,并使用到目前为止的代码。然而,如果您真的在意,并且您能够在每个请求中输入序列号,那么您可以在BlockingQueue之后使用PriorityQueue>/code>,甚至可以尝试使用 PriorityBlocking队列PriorityQueue 发布相关代码。至少我已经尽力了,我希望我能抛出一些好主意。我并不是说这篇帖子是你所有问题的完整解决方案,但它是一些需要考虑的因素。

 类似资料:
  • 问题内容: 如何使用aiohttp在客户端设置每秒最大请求数(限制请求数)? 问题答案: 我在这里找到了一种可能的解决方案:http : //compiletoi.net/fast-scraping-in-python-with- asyncio.html 同时执行3个请求很酷,但是同时执行5000个则不太好。如果您尝试同时执行太多请求,则连接可能会开始关闭,甚至可能被网站禁止。 为避免这种情况,

  • 我有一个Web服务器,它只支持一个非常简单的API -计算在最后一小时,每分钟和每秒收到的请求数。该服务器在世界上非常受欢迎,每秒接收数千个请求。 目标是找到如何准确地将这3个值返回到每个请求? 请求一直在到来,因此每个请求的一小时、一分钟和一秒的窗口是不同的。如何管理每个请求的不同窗口,以便每个请求的计数都是正确的?

  • 问题内容: 如何在特定时间段内调用Ajax请求?我应该使用Timer插件还是jQuery为此提供插件? 问题答案: 您可以使用内置的javascript setInterval。 或者如果您是更简洁的类型…

  • 我正在尝试绘制每分钟调用 API 的次数。使用Coda Hale指标和石墨,我可以看到一个连续的计数(即如果API被调用5次,它将在时间上显示一条恒定的线,以获得值5),或者我可以看到每分钟的指数加权移动平均线。但我想看到的只是每分钟调用 API 的次数。我尝试过使用Coda Hale的计数器,计时器和仪表,也尝试过使用石墨渲染函数,但无法获得我想要的东西。 以前有人这样做过吗?如果这是一个基本问

  • 我有一个Spring boot应用程序,使用测微计抛出开放的公制统计数据。 对于我的每个HTTPendpoint,我可以看到以下指标,我相信它跟踪给定endpoint的请求数: http\U server\u requests\u seconds\u count 我的问题是,如何在Grafana查询中使用它来表示每分钟调用我的endpoint的请求数? 我试过了 http\u client\u r

  • 我的特定用例是,我想使用redis速率限制器在spring cloud gateway实现速率限制,例如,对于特定路由,每分钟4个请求。 但它并没有像预期的那样工作,一旦一个请求完成,它就会自动填充存储桶。使用了spring cloud论坛提供的相同实现,但无法限制我对特定时间窗口的请求: 通过将RequiredRate设置为所需的请求数、requestedTokens设置为以秒为单位的时间跨度、