我尝试了一个带有MongoDb支持的akka-persistence的“Hello-World”示例,使用这个开源https://github.com/scullxbones/akka-persistence-mongo/tree/master/rxmongo/src。下面是我的代码。但是,当我运行应用程序时,我得到了ask Timeout:
Akka.Pattern.AskTimeoutException:[Akka://Example/User/SampleActor#1876558089]]在[2000毫秒]之后询问超时。发件人[null]发送了类型为“actors.command”的消息。
import akka.actor.{ActorSystem, Props}
import akka.pattern.ask
import akka.persistence.{PersistentActor, RecoveryCompleted}
import akka.util.Timeout
import scala.concurrent.Await
import scala.concurrent.duration._
import scala.language.postfixOps
object Main extends App {
implicit val timeout = Timeout(2 seconds)
val system = ActorSystem("example")
var actor = system.actorOf(SampleActor.props(), "sampleActor")
Await.result(actor ? Command("first"), Duration.Inf)
Await.result(actor ? Command("second"), Duration.Inf)
system.stop(actor)
system.terminate()
}
case class Command(value: String)
case class Event(value: String)
case class SampleState(counter: Int, lastValue: Option[String])
class SampleActor extends PersistentActor {
override def persistenceId = "id-1"
var state = SampleState(0, None)
def updateState(event: Event): Unit = {
state = state.copy(counter = state.counter + 1, lastValue = Some(event.value))
}
override val receiveCommand: Receive = {
case Command(value) =>
persist(Event(value))(updateState)
}
override def receiveRecover: Receive = {
case event: Event =>
updateState(event)
case RecoveryCompleted =>
println("Recovery completed")
}
}
object SampleActor {
def props(): Props = Props(new SampleActor())
}
下面是我的application.conf:
contrib {
persistence {
mongodb {
mongo {
mongouri = "mongodb://localhost:27017/akka-persistence"
driver = "akka.contrib.persistence.mongodb.RxMongoPersistenceExtension"
}
rxmongo {
failover {
initialDelay = 750ms
retries = 10
growth = con
factor = 1.5
}
}
}
}
}
如果我使用tell(!)而不是ask(?),什么都没有发生,数据库没有创建,也没有命令持久化。
谢谢!
在application.conf中,您应该分配日志和快照插件:
akka.persistence.journal.plugin = "akka-contrib-mongodb-persistence-journal"
akka.persistence.snapshot-store.plugin = "akka-contrib-mongodb-persistence-snapshot"
并且您应该回复发件人,因此receive变为:
override val receiveCommand: Receive = {
case Command(value) =>
persist(Event(value)) { persistedEvent =>
updateState(persistedEvent)
sender ! SomeResponse
}
}
A处理请求的方式是使用嵌套映射 这是可行的(也就是说,只有当actor C完成处理时才会进行最终处理),但不用说,它看起来很可怕。我试过使用平面地图,但不能使它工作。有什么好点子吗?)谢谢
我正在寻找从经典Akka持久化迁移到Akka持久化类型。在这里找到的Lagom留档:1说“注意:从Lagom持久化(经典)迁移到Akka持久化类型时的唯一限制是需要完全关闭集群。即使所有持久数据都是兼容的,Lagom持久化(经典)和Akka持久化类型也不能共存。” 有人知道这是否适用于服务器可能知道的所有持久实体吗?例如,我使用的服务有3个独立的持久实体。我需要一次迁移所有3个,还是可以一次迁移一
我熟悉此处定义的Akka测试方法: http://doc.akka.io/docs/akka/snapshot/scala/testing.html 我了解如何使用TestKit、TestActorRef、TestProbe等。我还了解我的核心域逻辑应该与Akka隔离提取和测试。 我的问题是关于在Akka Persistence参与者(即PersistentActor和PersistentView
是否有人在akka上因持续系统而面临断路器超时。我相信当akka无法重新加载保存的事件快照时,就会发生这种情况。我试图通过调整conf和添加。然而,这也不起作用。 我将与一起用作 非常感谢您的帮助! 在异常块之后,我也收到了这个警告。
清理快照存储区中的旧快照很容易:在每次成功的快照之后,参与者都会收到一个,其中包含指示其序列号的元数据,该信息可以用于构造快照,然后被馈送到。 但是,对于持久化消息,没有与等效的方法。因此,不可能知道日志中“last-ish”消息的序号是什么。可以保留持久消息计数的本地缓存并对其进行快照,以用于调用,但这太繁琐了。 附言。当然,选择用于传递到的序号比上面提到的要复杂一点:即使有一种方法可以从日记中