当前位置: 首页 > 知识库问答 >
问题:

Scala Futures -内置超时?

祁烨
2023-03-14

有一个方面的未来,我不完全理解从官方教程参考。http://docs.scala-lang.org/overviews/core/futures.html

scala中的期货是否有某种内置的超时机制?假设下面的示例是一个5GB的文本文件...“Implicits.global”的隐含范围最终会导致onFailure以非阻塞方式触发还是可以定义?如果没有某种默认的超时,这是否意味着成功或失败都不会触发?

import scala.concurrent._
import ExecutionContext.Implicits.global

val firstOccurence: Future[Int] = future {
  val source = scala.io.Source.fromFile("myText.txt")
  source.toSeq.indexOfSlice("myKeyword")
}
firstOccurence onSuccess {
  case idx => println("The keyword first appears at position: " + idx)
}
firstOccurence onFailure {
  case t => println("Could not process file: " + t.getMessage)
}

共有3个答案

郑卜鹰
2023-03-14

我刚刚为一位同事创建了一个 TimeoutFuture 类:

package model

import scala.concurrent._
import scala.concurrent.duration._
import play.libs.Akka
import play.api.libs.concurrent.Execution.Implicits._

object TimeoutFuture {
  def apply[A](timeout: FiniteDuration)(block: => A): Future[A] = {

    val prom = promise[A]

    // timeout logic
    Akka.system.scheduler.scheduleOnce(timeout) {
      prom tryFailure new java.util.concurrent.TimeoutException
    }

    // business logic
    Future { 
      prom success block
    }

    prom.future
  } 
}
val future = TimeoutFuture(10 seconds) { 
  // do stuff here
}

future onComplete {
  case Success(stuff) => // use "stuff"
  case Failure(exception) => // catch exception (either TimeoutException or an exception inside the given block)
}
  • 假设 Play! 框架(但适应起来很容易)
  • 每段代码都在同一个执行文本中运行,这可能并不理想。
冯茂实
2023-03-14

所有这些答案都需要额外的相关性。我决定使用java.util编写一个版本。定时器,这是将来运行函数的有效方式,在这种情况下触发超时。

更多详情请点击此处

利用这一点和Scala的promise,我们可以做出如下带有超时的未来:

package justinhj.concurrency

import java.util.concurrent.TimeoutException
import java.util.{Timer, TimerTask}

import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.language.postfixOps

object FutureUtil {

  // All Future's that use futureWithTimeout will use the same Timer object
  // it is thread safe and scales to thousands of active timers
  // The true parameter ensures that timeout timers are daemon threads and do not stop
  // the program from shutting down

  val timer: Timer = new Timer(true)

  /**
    * Returns the result of the provided future within the given time or a timeout exception, whichever is first
    * This uses Java Timer which runs a single thread to handle all futureWithTimeouts and does not block like a
    * Thread.sleep would
    * @param future Caller passes a future to execute
    * @param timeout Time before we return a Timeout exception instead of future's outcome
    * @return Future[T]
    */
  def futureWithTimeout[T](future : Future[T], timeout : FiniteDuration)(implicit ec: ExecutionContext): Future[T] = {

    // Promise will be fulfilled with either the callers Future or the timer task if it times out
    val p = Promise[T]

    // and a Timer task to handle timing out

    val timerTask = new TimerTask() {
      def run() : Unit = {
            p.tryFailure(new TimeoutException())
        }
      }

    // Set the timeout to check in the future
    timer.schedule(timerTask, timeout.toMillis)

    future.map {
      a =>
        if(p.trySuccess(a)) {
          timerTask.cancel()
        }
    }
    .recover {
      case e: Exception =>
        if(p.tryFailure(e)) {
          timerTask.cancel()
        }
    }

    p.future
  }

}
微生运浩
2023-03-14

只有当您使用阻塞来获取Future的结果时,您才会出现超时行为。如果您想使用非阻塞回调on完成on成功onFailure,那么您必须滚动自己的超时处理。Akka为参与者之间的请求/响应()消息传递内置了超时处理,但不确定您是否要开始使用Akka。FWIW,在Akka中,为了超时处理,它们通过Future.firstCompletedOf将两个Futures组合在一起,一个表示实际的异步任务,另一个表示超时。如果超时计时器(通过HashedWheelTimer)首先弹出,则异步回调失败。

一个非常简单的滚动自己的示例可能会像这样。首先,一个用于调度超时的对象:

import org.jboss.netty.util.{HashedWheelTimer, TimerTask, Timeout}
import java.util.concurrent.TimeUnit
import scala.concurrent.duration.Duration
import scala.concurrent.Promise
import java.util.concurrent.TimeoutException

object TimeoutScheduler{
  val timer = new HashedWheelTimer(10, TimeUnit.MILLISECONDS)
  def scheduleTimeout(promise:Promise[_], after:Duration) = {
    timer.newTimeout(new TimerTask{
      def run(timeout:Timeout){              
        promise.failure(new TimeoutException("Operation timed out after " + after.toMillis + " millis"))        
      }
    }, after.toNanos, TimeUnit.NANOSECONDS)
  }
}

然后一个函数来获取Future并向其添加超时行为:

import scala.concurrent.{Future, ExecutionContext, Promise}
import scala.concurrent.duration.Duration

def withTimeout[T](fut:Future[T])(implicit ec:ExecutionContext, after:Duration) = {
  val prom = Promise[T]()
  val timeout = TimeoutScheduler.scheduleTimeout(prom, after)
  val combinedFut = Future.firstCompletedOf(List(fut, prom.future))
  fut onComplete{case result => timeout.cancel()}
  combinedFut
}

请注意,我在这里使用的HashedWheelTimer来自Netty。

 类似资料:
  • 当微服务处理业务逻辑时间过长,网关会报超时错误,默认等待时间是5秒。 可在网关指定spring.cloud.gateway.httpclient.response-timeout参数设置超时时间,单位毫秒 # 设置响应超时10秒 spring.cloud.gateway.httpclient.response-timeout=10000 更多配置参见:org.springframework.cl

  • <?php $http = HttpRequest::newSession(); $response = $http->timeout(3000, 1000) // 总时间不得超过3秒,连接时间不得超过1秒 ->get('https://www.baidu.com/'); $content = $response->body(); // 网页源码

  • 问题内容: 我在完成时遇到问题,这是我尝试过的操作: 这就是我得到的: 这解决了问题: 问题答案: 如果要设置Jedis connection ,则应使用为此专门设计的构造函数进行设置: 您正在做的是从设置Redis设置的超时。这样做,意味着redis将在几秒钟后关闭空闲的客户端连接。这就是为什么您在Jedis中遇到例外的原因。

  • 我在完成,以下是我尝试过的: 以下是我得到的: 这就解决了问题:

  • JTA事务意外回滚(可能是由于超时);嵌套的异常是 由于JPA,我得到了一个超时异常:如何增加事务所需的时间? 我应该在哪里包括参数来解决这个问题? 我用的是Tomcat7。

  • 和我的CSS: