当前位置: 首页 > 知识库问答 >
问题:

如何在Akka的当前演员调度程序上运行futures

慕永年
2023-03-14

Akka的文件警告:

在actor内部使用未来的回调时,如onComplete、onSuccess和onFailure,您需要小心避免关闭包含actor的引用,即不要从回调中调用方法或访问封闭actor上的可变状态

在我看来,如果我能让想要访问可变状态的未来在同一调度程序上运行,该调度程序安排了处理actor消息的线程的相互排除,那么这个问题就可以避免。这可能吗?(为什么不呢?

context.dispatcher提供的ExecutionContext没有绑定到参与者消息调度程序,但是如果绑定了呢?

class MyActorWithSafeFutures {
  implicit def safeDispatcher = context.dispatcherOnMessageThread

  var successCount = 0
  var failureCount = 0

  override def receive: Receive = {
    case MakeExternalRequest(req) =>
      val response: Future[Response] = someClient.makeRequest(req)
      response.onComplete {
        case Success(_) => successCount += 1 
        case Failure(_) => failureCount += 1 
      }
      response pipeTo sender()
    }
  }
}

在阿卡有没有办法做到这一点?

(我知道我可以将上面的示例转换为类似于<code>self!IncrementSuccess</code>的操作,但这个问题是关于从未来改变参与者状态,而不是通过消息。)

看起来我可以自己实现,使用如下代码:

class MyActorWithSafeFutures {
  implicit val executionContext: ExecutionContextExecutor = new ExecutionContextExecutor {
    override def execute(runnable: Runnable): Unit = {
      self ! runnable
    }

    override def reportFailure(cause: Throwable): Unit = {
      throw new Error("Unhandled throwable", cause)
    }
  }

  override def receive: Receive = {
    case runnable: Runnable => runnable.run()
    ... other cases here
  }
}

这有用吗?为什么阿卡不提供这个——有什么我没有看到的巨大缺点吗?

(请参阅https://github.com/jducoeur/Requester以有限的方式执行此操作的库-仅用于Asks,而不是所有Future回调。)

共有3个答案

舒浩邈
2023-03-14

这并没有直接回答您的问题,而是提供了一个使用Akka代理的替代解决方案:

  class MyActorWithSafeFutures extends Actor {

    var successCount = Agent(0)
    var failureCount = Agent(0)

    def doSomethingWithPossiblyStaleCounts() = {
      val (s, f) = (successCount.get(), failureCount.get())
      statisticsCollector ! Ratio(f/s+f)
    }

    def doSomethingWithCurrentCounts() = {
      val (successF, failureF) = (successCount.future(), failureCount.future())
      val ratio : Future[Ratio] = for {
        s <- successF
        f <- failureF
      } yield Ratio(f/s+f)
      ratio pipeTo statisticsCollector
    }

    override def receive: Receive = {
      case MakeExternalRequest(req) =>

        val response: Future[Response] = someClient.makeRequest(req)
        response.onComplete {
          case Success(_) => successCount.send(_ + 1)
          case Failure(_) => failureCount.send(_ + 1)
        }
        response pipeTo sender()
    }
  }

需要注意的是,如果要对使用@volatile时会产生的计数进行操作,则需要在未来进行操作,请参见doSomethingWithCurrentCounts()

如果您可以使用最终一致的值(可能会为代理计划待处理的更新),那么像doSometinghwithPossiblyStaleCounts()这样的内容就可以了。

冯野
2023-03-14

您可以将PinnedDispatcher用于< code > myactorwithsafeffutures actor类,该类将为给定类的每个实例创建一个线程池,并使用< code>context.dispatcher作为< code>Future的执行上下文。

要做到这一点,您必须在<code>application.conf</code>中放入类似的内容:

akka {
  ...
}
my-pinned-dispatcher {
  executor = "thread-pool-executor"
  type = PinnedDispatcher
}

并创建您的演员:

actorSystem.actorOf(
  Props(
    classOf[MyActorWithSafeFutures]
  ).withDispatcher("my-pinned-dispatcher"),       
  "myActorWithSafeFutures"
)

尽管您试图实现的目标完全违背了演员模型的目的。应该封装参与者状态,参与者状态的更改应该由传入消息驱动。

公孙盛
2023-03-14

您的actor正在dispatcher的一个线程下执行它的< code>receive,您想要剥离一个与这个特定线程紧密相关的< code>Future?在这种情况下,系统不能重用这个线程来运行不同的actor,因为这意味着当您想要执行< code>Future时,该线程不可用。如果碰巧使用相同的线程来执行< code>someClient,您可能会与自己死锁。所以这个线程不能再自由地用来运行其他Actor了——它必须属于< code > mysafeector 。

并且不允许其他线程自由运行MySafeActor-如果是,两个不同的线程可能会同时尝试更新successCount并且会丢失数据(例如,如果值为0,而两个线程都尝试执行successCount=1,则该值可能最终为1,而不是2)。因此,为了安全地执行此操作,MySafeActor必须有一个用于自身及其未来的线程。因此,您最终得到了<code>MySafeActor</code>和<code>未来</code>紧密但无形的耦合。这两者不能同时运行,可能会互相死锁。(一个写得不好的参与者仍然有可能对自己造成死锁,但使用该参与者的“虚拟互斥”的所有代码都在一个地方,这一事实更容易看到潜在的问题)。

您可以使用传统的多线程技术——互斥或类似技术——来允许< code>Future和< code>MySafeActor并发运行。但是您真正想要的是将< code>successCount封装在一个可以同时安全使用的东西中——某种...演员?

TL;DR:< code > Future 和< code>Actor: 1)可能不会并发运行,在这种情况下,您可能会死锁2)可能会并发运行,在这种情况下,您将破坏数据3)以并发安全的方式访问状态,在这种情况下,您将重新实现Actor。

 类似资料:
  • 我想有可能让演员睡一会儿。演员们应该自己决定他们要睡多久。由于thread.sleep()不是一种推荐的方法,所以我想到在Akka中使用调度程序。因此,我定义一个actor是指另一个actor可以注册被唤醒。 但发送方从未接收到振铃消息。所以我的问题是 是否建议在执行元内部使用计划程序进行计划? 发送方为什么从未收到振铃消息? 如果不可能这样做,建议采用什么方法解决问题?

  • 我正在将现有应用程序从Akka Classic移植到Akka Typed。最初,您可以使用上下文获取对参与者的引用。actorSelection()。resolveOne() 我知道在Akka Type中不再支持这一点,我们应该使用来注册演员以供发现。 但是,我只想将消息发送到本地参与者,即存在于集群中每个节点上的本地单例。我有它的本地路径,但没有对它的直接引用。这是因为它是由Akka管理系统创建

  • 我正在运行Akka2.0.2微内核,希望为不受信任的远程参与者实现一个身份验证方案。 首先想到的是设置一个身份验证执行元,该执行元在身份验证成功时返回对工作执行元的引用。 也就是说,我要阻止远程参与者在没有身份验证的情况下访问我的微内核参与者系统中的参与者。 在actorOf()中不给工作执行元一个名字是不够的,因为它会得到一个容易猜测的自动生成的名字。有没有一种方法可以禁用Actor的远程查找,

  • 问题内容: 我有一个不是actor的java对象,它使用actorSelection(Path)从一个actor系统中选择actor。系统中可能不存在所选参与者。 在Java Api中,ActorSelection不存在ask(),因此我无法向actor选择发送和标识消息并使用响应的发送者。 我试图通过演员选择将消息发送给演员,然后对死信做出反应来解决该问题。但是我没有任何死信。 如何通过Acto

  • 我很想知道调整大小,或者在本例中增加单个节点系统上的actor池中actor的数量是否真的会影响性能。 我有一个带超线程的四核系统。在任何给定的点上,系统可以运行8个线程。假设执行元执行的大多数操作都是CPU绑定的,那么将池中的执行元数量从20个增加到40个会有什么收获呢?