当前位置: 首页 > 知识库问答 >
问题:

Akka中的Actor询问超时-持久性

糜鸿风
2023-03-14

我尝试了一个带有MongoDb支持的akka-persistence的“Hello-World”示例,使用这个开源https://github.com/scullxbones/akka-persistence-mongo/tree/master/rxmongo/src。下面是我的代码。但是,当我运行应用程序时,我得到了ask Timeout:

Akka.Pattern.AskTimeoutException:[Akka://Example/User/SampleActor#1876558089]]在[2000毫秒]之后询问超时。发件人[null]发送了类型为“actors.command”的消息。

import akka.actor.{ActorSystem, Props}
import akka.pattern.ask
import akka.persistence.{PersistentActor, RecoveryCompleted}
import akka.util.Timeout

import scala.concurrent.Await
import scala.concurrent.duration._
import scala.language.postfixOps

object Main extends App {
  implicit val timeout = Timeout(2 seconds)
  val system = ActorSystem("example")

  var actor = system.actorOf(SampleActor.props(), "sampleActor")
  Await.result(actor ? Command("first"), Duration.Inf)
  Await.result(actor ? Command("second"), Duration.Inf)

  system.stop(actor)
  system.terminate()
}

case class Command(value: String)

case class Event(value: String)

case class SampleState(counter: Int, lastValue: Option[String])

class SampleActor extends PersistentActor {
  override def persistenceId = "id-1"

  var state = SampleState(0, None)

  def updateState(event: Event): Unit = {
    state = state.copy(counter = state.counter + 1, lastValue = Some(event.value))
  }

  override val receiveCommand: Receive = {
    case Command(value) =>
      persist(Event(value))(updateState)
  }

  override def receiveRecover: Receive = {
    case event: Event =>
      updateState(event)
    case RecoveryCompleted =>
      println("Recovery completed")
  }
}

object SampleActor {
  def props(): Props = Props(new SampleActor())
}

下面是我的application.conf:

contrib {
    persistence {
      mongodb {
        mongo {
          mongouri = "mongodb://localhost:27017/akka-persistence"
          driver = "akka.contrib.persistence.mongodb.RxMongoPersistenceExtension"
        }
        rxmongo {
          failover {
            initialDelay = 750ms
            retries = 10
            growth = con
            factor = 1.5
          }
        }
      }
    }

  }

如果我使用tell(!)而不是ask(?),什么都没有发生,数据库没有创建,也没有命令持久化。

谢谢!

共有1个答案

冯德佑
2023-03-14

在application.conf中,您应该分配日志和快照插件:

akka.persistence.journal.plugin = "akka-contrib-mongodb-persistence-journal"
akka.persistence.snapshot-store.plugin = "akka-contrib-mongodb-persistence-snapshot"

并且您应该回复发件人,因此receive变为:

override val receiveCommand: Receive = {
   case Command(value) =>
     persist(Event(value)) { persistedEvent =>
        updateState(persistedEvent)
        sender ! SomeResponse
     }
}
 类似资料:
  • A处理请求的方式是使用嵌套映射 这是可行的(也就是说,只有当actor C完成处理时才会进行最终处理),但不用说,它看起来很可怕。我试过使用平面地图,但不能使它工作。有什么好点子吗?)谢谢

  • 我正在寻找从经典Akka持久化迁移到Akka持久化类型。在这里找到的Lagom留档:1说“注意:从Lagom持久化(经典)迁移到Akka持久化类型时的唯一限制是需要完全关闭集群。即使所有持久数据都是兼容的,Lagom持久化(经典)和Akka持久化类型也不能共存。” 有人知道这是否适用于服务器可能知道的所有持久实体吗?例如,我使用的服务有3个独立的持久实体。我需要一次迁移所有3个,还是可以一次迁移一

  • 我熟悉此处定义的Akka测试方法: http://doc.akka.io/docs/akka/snapshot/scala/testing.html 我了解如何使用TestKit、TestActorRef、TestProbe等。我还了解我的核心域逻辑应该与Akka隔离提取和测试。 我的问题是关于在Akka Persistence参与者(即PersistentActor和PersistentView

  • 是否有人在akka上因持续系统而面临断路器超时。我相信当akka无法重新加载保存的事件快照时,就会发生这种情况。我试图通过调整conf和添加。然而,这也不起作用。 我将与一起用作 非常感谢您的帮助! 在异常块之后,我也收到了这个警告。

  • 清理快照存储区中的旧快照很容易:在每次成功的快照之后,参与者都会收到一个,其中包含指示其序列号的元数据,该信息可以用于构造快照,然后被馈送到。 但是,对于持久化消息,没有与等效的方法。因此,不可能知道日志中“last-ish”消息的序号是什么。可以保留持久消息计数的本地缓存并对其进行快照,以用于调用,但这太繁琐了。 附言。当然,选择用于传递到的序号比上面提到的要复杂一点:即使有一种方法可以从日记中