当前位置: 首页 > 知识库问答 >
问题:

如何取消(或者只是中断)一个scala.concurrent.Future?

李新霁
2023-03-14

我有一个Java库,可以执行长时间运行的阻塞操作。该库旨在响应用户取消请求。库集成点是可调用接口。

我需要将此库从 Actor 中集成到我的应用程序中。我最初的想法是做这样的事情:

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
val callable: java.util.concurrent.Callable[Void] = ???
val fut = Future {
  callable.call()
}
fut.onSuccess {
  case _ => // continue on success path
}
fut.onFailure {
  case throwable => // handle exceptions
}

我认为这段代码可以正常工作,因为它不会阻止参与者。但是我不知道如何提供取消操作的方法。假设当可调用对象正在处理时,参与者收到一条消息,指示它应该取消可调用对象中正在处理的操作,并且库通过中断处理线程来响应取消请求。

从参与者内部提交一个可调用函数,然后在稍后取消该操作的最佳实践是什么?

更新

需要明确的是,该库公开了java.util.concurrent.Callable接口的一个实例。Callable本身不提供取消方法。但是可调用对象的实现方式是,由于中断线程,它会对取消做出响应。在java中,这将通过将可调用对象提交给执行器来完成。这将返回一个java.util.concurrent.Future.正是这个Future对象提供了取消方法。

Java我会做以下事情:

ExecutorService executor = ...
Callable c = ...
Future f = executor.submit(c);
...
// do more stuff
...
// If I need to cancel the callable task I just do this:
f.cancel(true);

java.util.concurrent.Future和scala.concurrent.Future之间似乎存在脱节,java版本提供了cancel方法,而scala版本没有。

在Scala中,我会这样做:

// When the Akka Actor receives a message to process a 
// long running/blocking task I would schedule it to run
// on a different thread like this:

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
val callable: java.util.concurrent.Callable[Void] = ???
val fut = Future {
  callable.call()
}
fut.onSuccess {
  case _ => // continue on success path
}
fut.onFailure {
  case throwable => // handle exceptions
}

// But now, if/when the actor receives a message to cancel
// the task because it is taking too long to finish (even
// though it is running in the background) there is no way
// that I know of to cancel or interrupt the 
// scala.concurrent.Future.

有没有一种惯用的scala方法可以取消scala.concurrent.Future?

共有1个答案

晏永康
2023-03-14

据我所知,你的库公开了一个具有< code>call和一些< code>cancel方法的接口,对吗?我想你可以随时打电话取消。下面的例子应该可以让你开始。

class InterruptableThingy extends Actor {
  implicit val ctx = context.system.dispatchers.lookup("dedicated-dispatcher")      

  var counter = 0
  var tasks = Map.empty[Int, HasCancelMethod]

  def receive = {
    case "doThing" =>
      counter += 1
      val id = counter

      val thing = ???
      Future { thing.call() } onSuccess {} // ...

      tasks(id) = thing
      sender() ! id

    case Interrupt(id) => 
      tasks(id).cancel()
      tasks -= id
  }
}
case class Interrupt(taskId: Int)

请注意,我们正在为阻塞期货使用专用调度程序。这是一个非常好的模式,因为您可以将专用调度程序配置为适合您的阻塞工作负载(并且不会消耗默认调度程序中的资源)。调度器在这里的文档中有更详细的解释:http://doc.akka.io/docs/akka/2.3.3/scala/dispatchers.html

 类似资料:
  • 那么我如何确保我的队列中只有一个消费者呢?

  • 我有两个组id相同的消费者服务器订阅了相同的主题。kafka服务器仅使用一个分区运行。据我所知,消息应该在这两个消费者服务器中随机使用。但现在似乎总是同一个消费者服务器A消费消息,另一个不消费消息。如果我停止消费者服务器A,另一个将正常工作。我所期望的是,他们可以随机消费信息。

  • 当一个组中只有一个消费者,并且认为消费者无法在session.time.out内进行轮询时,将触发重新平衡,但是在这种情况下,组中只有一个消费者,现在假设session.time.out是30秒和消费者民意调查后50秒组协调员将识别消费者后50秒,并允许它提交偏移或协调员将断开消费者和没有偏移得到提交,并将重新平衡消费者与新的消费者标识?如果上次提交的偏移量是345678,在下一次轮询中,它处理了

  • 我想知道一个使用者如何从多个分区使用消息,具体来说,从不同的分区读取消息的顺序是什么? 我看了一眼源代码(Consumer,Fetcher),但我不能完全理解。 这是我以为会发生的: 分区是顺序读取的。也就是说:在继续下一个分区之前,一个分区中的所有消息都将被读取。如果我们达到< code>max.poll.records而没有消耗整个分区,则下一次读取将继续读取当前分区,直到耗尽为止,然后继续下

  • 问题内容: 我有一个JMS客户端,它正在生成消息并通过JMS队列发送到其唯一的使用者。 我想要的是不止一个消费者收到这些消息。我想到的第一件事是将队列转换为主题,以便现有用户和新用户都可以订阅并将相同的消息传递给他们。 显然,这将涉及在生产者和消费者方面修改当前的客户代码。 我还要查看其他选项,例如创建第二个队列,这样就不必修改现有的使用者。我相信这种方法有很多优点,例如(如果我错了,请纠正我)在

  • 无论是ES6promise还是蓝鸟promise、Qpromise等。 如何测试给定对象是否为Promise?