当前位置: 首页 > 知识库问答 >
问题:

如何处理失败的期货

蓬森
2023-03-14

我正在与我无法控制的遗留库集成。

定义了以下接口:

interface Factory {
    Future<Void> subscribe(Context c);
}

这个“subscribe”方法被不同的线程频繁调用。我关心“Future.get()”的结果的唯一情况是当它失败时,所以我可以获取并处理异常。这不一定发生在调用线程中。另外,在“Future.get()”上阻塞调用线程对我来说是非常昂贵的,因为即使成功也可能需要几秒钟才能完成。

所以我的任务是以某种方式“后处理”所有这些期货,过滤失败的期货。基本上,我可以看到两种可能的方法

方法#1:

在获得Future的实例时,向外部执行程序提交一个单独的Runnable,它将执行所需的操作:

    executor.submit(
        new Runnable(){
            @Override
            public void run() {
                try {
                    future.get();
                } catch(Exception e){
                    // process the exception
                }
            }
        }
    );

这种方法的缺点是我们仍然会阻塞一个线程很长一段时间。正如我所说,这个代码片段会被相当频繁地执行。

方法#2:

在获取 Future 的实例时,将其放置到某个集合中,并专用于一个单独的单个线程,该线程将定期运行此集合的元素进行处理:

    while(true){
        Iterator<Future<Void>> iterator = collection.iterator();
        while(iterator.hasNext()){
            Future<Void> future = iterator.next();
            if(future.isDone()){
                try {
                    future.get();
                } catch(Exception e){
                    // process the exception
                } finally {
                    iterator.remove();
                }
            }
        }

        TimeUnit.MILLISECONDS.sleep(1000); // sleep
    }

你怎么认为?有没有更好的办法解决这个问题?

共有1个答案

饶德元
2023-03-14

由于您没有参与创造未来,因此最佳选择不可用,请使用自定义的未来从未来本身触发处理。

因此,在您的情况下,我建议使用一种可能看起来像是两个选项混合的模式。将 Futures 添加到(线程安全)队列,并将 Runnables 提交给处理循环中所有项目的执行器。因此,您可以通过配置执行器来限制线程的数量,即没有 Future那样多的线程,但仍然可以有多个线程,而不必始终保持这些后处理线程处于活动状态。

为了避免在重新排队未完成的项目时出现无限循环,请使用本地集合将挂起的项目与重新排队的项目分开:

static BlockingQueue<Future<?>> PENDING = …;
static int MAX_ITEMS_PER_JOB = …;
…
/*scheduling code …*/new Runnable() {
  public void run()
  {
    ArrayList<Future<?>> myLocalItems=new ArrayList<>();
    PENDING.drainTo(myLocalItems, MAX_ITEMS_PER_JOB);
    for(Future<?> f:myLocalItems) {
      if(!f.isDone()) PENDING.offer(f); // re-queue
      try {
        f.get();
      } catch(ExecutionException ex) {
        handleException(ex.getCause());
      }
    }
  }
};

因此,这个< code>Runnable将检查和处理有限数量的< code>Future并返回,因此,如果有大量项目处于待定状态,则适合多次提交以进行并行处理,但如果有较少数量的项目处于待定状态,则不会造成损害,因为如果无事可做,作业将不会停留。它们甚至适用于使用< code > ScheduledExecutorService 的< code > scheduleWithFixedDelay 。

 类似资料:
  • 我想处理不同的失败案例(返回为尝试)。 示例代码 如果是或只需打印消息,对于所有其他异常,打印堆栈跟踪。 然而,ex永远只是一个可丢弃的,因此是一个没有结果的类型测试(根据IntelliJ,类型Throwable的值不能也是StatsException.type) 更糟糕的是,我会遇到编译错误: 以惯用的方式处理不同的失败案例的最佳方式是什么?

  • 我在spark streaming应用程序中看到一些失败的批处理,原因是与内存相关的问题,如 无法计算拆分,找不到块输入-0-1464774108087

  • 类项目: hbm文件: 方法如下:

  • 我在服务器1、服务器2和服务器3上运行zookeeper设置,同样kafka也在服务器1、服务器2和服务器3上运行。 安装程序正在kubernetes中运行。 问题陈述: > 如果一个动物园管理员设置关闭,整个设置都会关闭,因为kafka依赖于动物园管理员。我说得对吗? 如果Q1正确-是否有任何方法来进行设置,例如如果一个动物园管理员服务器将停机,那么kafka应该按原样运行? 如何在kubern

  • 我们正在Netty之上实现SSL。但是当前的设计有一个缺陷。如果失败,客户端将重试连接到服务器。这是网络或服务器负载过重问题所需要的。但是错误的客户端凭据会导致持续的失败。 有一些解决办法: 客户端-服务器连接可以故障转移到未加密模式(从管道中删除SslHandler)。 客户端可以死亡并在知道它是SSL异常时抛出异常。 不幸的是,我不知道如何使用Netty实现这一点。几个问题: < li >如何

  • 方法一:利用BIOS Boot Block引导块 现在用Award BIOS的主板都有一个BIOS引导块,当你升级BIOS时,这一小部分引导块可以不被覆盖(BootBlock Write 跳线设置为“Disable”,并且在运行Flash程序时,不选择“UpdateBIOS Including Boot Block”方式)。这个BIOS引导块只支持软驱和ISA显示卡,所以很多人在升级BIOS失败后