当前位置: 首页 > 知识库问答 >
问题:

在Akka演员中使用future.sequence的替代方法

欧阳博文
2023-03-14
    null
case RetrieveData(userId, `type`, id, lang, paging, timeRange, platform) => {
      val sen = sender

      val result: Future[Seq[Map[String, Any]]] = if (paging.getOrElse(Paging(0, 0)) == Paging(0, 0)) Future.successful(Seq.empty)
      else {
        val start = System.currentTimeMillis()

        val profileF = profileActor ? Get(userId)

        Future.sequence(Seq(profileF, getSymbols(`type`, id), getData(paging, timeRange, platform)).map { result =>
          logger.info(s"Got ${result.size} news in ${System.currentTimeMillis() - start} ms")
          result
        }.recover { case ex: Throwable =>
          logger.error(s"Failure on getting data: ${ex.getMessage}", ex)
          Seq.empty
        }
      }

      result.pipeTo(sen)
    }

函数getAndProcessData包含并行执行3个Future.Sequence。

现在,随着我对Akka的阅读越来越多,我发现使用ask会创建另一个actor Listener。问题是:

  1. 正如我们广泛使用的ask,它是否会导致系统中使用多个线程,有时可能会导致线程饥饿?
  2. 使用future.map也往往意味着不同的线程。我读到过一个线程演员的幻觉,它可以很容易地通过混合未来打破。
  3. 此外,这是否会对性能产生不良影响?
  4. 我们是否需要将sender存储在temp变量send中,因为我们正在使用pipeto?我们能不能只做管道(发送者)。另外,几乎每个接收回调中声明sen是否浪费了很多资源?我希望它的引用将在操作完成后被删除。
  5. 有没有机会以更好的方式设计这样一个系统,meadning说我们不用map或者问那么多?我看了一些例子,当您只是将replyTo引用传递给某个actor时,使用tell而不是ask。此外,在某些情况下,发送消息给self而不是回复原始发件人可以代替使用Future.map。但是,我们想要并行执行3个异步操作并将格式化的数据返回给发送者,如何设计它呢?我们需要完成所有这3个操作才能格式化数据。

我尽量不包括很多例子,希望你能理解我们的顾虑和问题。很多问题,但我真的很想了解它是如何工作的,简单明了

提前致谢

共有1个答案

顾喜
2023-03-14

如果您想并行执行3件事情,您将需要创建3个future值,这些值可能会使用3个线程,这是无法避免的。

我不确定map的问题是什么,但这段代码中只有一个调用,这是不必要的。

以下是清理代码以避免创建不必要的future值(未经测试!)的一种方法:

case RetrieveData(userId, `type`, id, lang, paging, timeRange, platform) =>
  if (paging.forall(_ == Paging(0, 0))) {
    sender ! Seq.empty
  } else {
    val sen = sender
    val start = System.currentTimeMillis()

    val resF = Seq(
      profileActor ? Get(userId),
      getSymbols(`type`, id),
      getData(paging, timeRange, platform),
    )

    Future.sequence(resF).onComplete {
      case Success(result) =>
        val dur = System.currentTimeMillis() - start
        logger.info(s"Got ${result.size} news in $dur ms")

        sen ! result
      case Failure(ex)
        logger.error(s"Failure on getting data: ${ex.getMessage}", ex)

        sen ! Seq.empty
    }
  }
 类似资料:
  • 我正在使用Scala/Akka编写一个TCP客户端服务器程序。服务器端的一些参与者需要处理来自客户端的TCP消息。我使用了(复制)代码,基本上解析接收到的TCP消息,在接收到分隔符时,消息被发送给其他人。 由于不止一个actor使用此逻辑,所以我在baseTCP actor中对其进行了抽象,并从该actor继承了其他actor。我想在这个基本actor中添加一些常见的代码,比如处理bound/co

  • 问题内容: 我正在研究akka演员(JAVA),最近才知道有3种方法(可能更多)来了解演员的存在。 发送识别消息 : resolveOne方法 : DeatchWatch :创建另一个actor调用 getContext()。watch(actorToWatch的ActorRef); 并检查是否收到 终止 消息。这只能用于已经创建的actor。 1,2告诉演员和3个监视器的存在。我想知道这三个的用

  • 我很想知道调整大小,或者在本例中增加单个节点系统上的actor池中actor的数量是否真的会影响性能。 我有一个带超线程的四核系统。在任何给定的点上,系统可以运行8个线程。假设执行元执行的大多数操作都是CPU绑定的,那么将池中的执行元数量从20个增加到40个会有什么收获呢?

  • 在Play 2.2中,我创建了GlobalActor制作类 我想将游戏框架升级到 2.5。Play.current在play 2.5中被弃用,所以我使用注入器修改了这个类,但注入器始终为Null。我需要如何使这门课在 Play 2.5 中工作?

  • 让我们假设一个使用Akka Typed实现的应用程序有一个持久执行元。这个持久执行元作为其操作的一部分创建了瞬态(或非持久)子执行元,每个子执行元都有一个唯一的ID,这些ID是持久状态的一部分。持久执行元还需要一些与其子级通信的方式,但我们不希望持久化子级的,因为它们实际上不是状态的一部分。在恢复时,持久参与者应该基于恢复的状态重新创建它的子级。这听起来并不像是一个很不寻常的用例,我正在试图弄清楚

  • 我经常发现自己使用一个“主”角色,为子任务创建许多子角色。当子任务完成时,主角也应该停止自己。所以当时,我观察子角色并停止主角色context.children.is。 我经常使用这种模式,但因为我从未读过这方面的文章。我不确定,这是一个好主意还是失败的演员有问题。。。? 我已经读过Akka 2中的关机模式,但是这种方法在Java中似乎比我的解决方案更复杂? 以下是我针对具有两个子任务的主要参与者