当前位置: 首页 > 知识库问答 >
问题:

Vertx延迟批处理

段干德泽
2023-03-14

如何在Vertx中处理延迟作业列表(实际上是数百个HTTP GET请求,到禁止快速请求主机的有限API)?现在,我正在使用此代码,它被阻止,因为Vertx一次启动所有请求。希望在每个请求之间有5秒的延迟来处理每个请求。

public void getInstrumnetDailyInfo(Instrument instrument,
                                   Handler<AsyncResult<OptionInstrument>> handler) {
  webClient
    .get("/Loader")
    .addQueryParam("i", instrument.getId())
    .timeout(30000)
    .send(
      ar -> {
        if (ar.succeeded()) {
          String html = ar.result().bodyAsString();
          Integer thatData = processHTML(html);
            instrument.setThatData(thatData);
            handler.handle(Future.succeededFuture(instrument));
        } else {
          // error
          handler.handle(Future.failedFuture("error " +ar.cause()));
        }
      });
}

public void start(){
  List<Instrument> instruments = loadInstrumentsList();
  instruments.forEach(
    instrument -> {
      webClient.getInstrumnetDailyInfo(instrument,
            async -> {
              if(async.succeeded()){
                instrumentMap.put(instrument.getId(), instrument);
              }else {
                log.warn("getInstrumnetDailyInfo: ", async.cause());
              }
            });
        });
}

共有2个答案

钱志强
2023-03-14

您可以使用任何现成的速率限制器功能,并将其调整为异步使用。

Guava的RateLimitor示例:

    // Make permits available at a rate of one every 5 seconds
    private RateLimiter limiter = RateLimiter.create(1 / 5.0);

    // A vert.x future that completes when it obtains a throttle permit
    public Future<Double> throttle() {
        return vertx.executeBlocking(p -> p.complete(limiter.acquire()), true);
    }

然后

   throttle()
       .compose(d -> {
           System.out.printf("Waited %.2f before running job\n", d);
           return runJob(); // runJob returns a Future result
       });
戚英逸
2023-03-14

您可以考虑使用计时器来触发事件(而不是在启动时触发所有事件)。

Vertx有两种变体,

>

  • . setTimer()在延迟后触发特定事件

    <代码>顶点。setTimer(间隔,新处理程序


    2<代码>。setPeriodic(),每次经过指定的时间段后都会激发。

    vertx.setPeriodic(interval, new Handler<Long>() {});
    

    您正在寻找的似乎是设置周期。

    您可以从文档中获得更多信息

    对于更复杂的Vertx调度用例,您可以查看Chime或其他调度程序或此模块

  •  类似资料:
    • 我有一个Spring批处理工作,分两步。第一个下载文件,第二个处理文件。问题是,在第一步运行之前,第二步不知道文件的名称。 作业已自动实例化这些步骤,以便在需要时运行。我想不出任何方法来确保第一步运行后该步骤会初始化。 以下是代码: 你可以看到获取我在第一步的Tasklet中设置的局部变量。

    • 我们在RHEL 7.0 VM上部署了一个Java/spring/Tomcat应用程序,它使用AlejandRorivera/Embedded-RabbitMQ,一旦部署了war,它就启动Rabbitmq服务器,并连接到它。我们有多个队列用来处理和过滤事件。 流程如下所示: 我们接收到的事件->发布事件队列->侦听器类筛选事件->发布到另一个队列进行处理->我们发布到另一个队列进行日志记录。 问题是

    • 我有一系列应用程序使用来自SQS队列的消息。如果由于某种原因,这些消费者中的一个出现故障,并且停止使用消息,我希望得到通知。做这件事最好的方法是什么? 请注意,其中一些队列每2-3天只能将一条消息放入队列,因此等待队列中的消息数触发通知对我来说不是一个好的选择。 我正在寻找的是可以监视一个SQS队列并说“这个消息已经在这里一个小时了,什么都没有处理它……让某人知道。”

    • top看到单个CPU 100%时,就是垂直扩展的时候了。如果需要让CPU使用率最大化,可以配置Redis实例数对应CPU数, Redis实例数对应端口数(8核Cpu, 8个实例, 8个端口), 以提高并发。单机测试时, 单条数据在200字节, 测试的结果为8~9万tps。(未实测)。 另外,对于命令的复杂度一定要关注。

    • 问题内容: 这是我的情况: 我有一个包含用户列表的页面。我通过Web界面创建一个新用户,并将其保存到服务器。服务器在elasticsearch中为文档建立索引并成功返回。然后,我被重定向到不包含新用户的列表页面,因为它可能需要1秒钟的时间才能使文档在Elasticsearch中可供搜索 elasticsearch中的近实时搜索。 elasticsearch指南说您可以手动刷新索引,但说在生产中不要

    • 问题内容: 我正在尝试使用新的React Lazy和Suspense创建后备加载组件。这很好用,但后备时间仅显示几毫秒。有没有办法增加额外的延迟或最短时间,因此我可以在渲染下一个组件之前显示该组件的动画? 现在懒导入 等待组件: 我可以做这样的事情吗? 问题答案: 函数应该返回对象的承诺,该对象由具有默认导出功能的模块返回。不会返回承诺,也不能那样使用。尽管任意承诺可以: 如果目标是提供 最小的