当前位置: 首页 > 知识库问答 >
问题:

如何使用Akka持久性保存流数据

詹唯
2023-03-14

我使用StreamRefs在集群中的参与者之间建立流式连接。目前,在writing节点中,我手动将传入消息保存到日志文件中,但我想知道是否可以将其替换为persistentSink,用于写入,以及persistentSource,用于从Akka Persistence journal启动actor时读取。我一直在考虑用Persistent actor的Persistent{evt替换日志文件接收器=

当前执行情况:

object Writer {
  case class WriteSinkRequest(userId: String) 
  case class WriteSinkReady(userId: String, sinkRef: SinkRef[ByteString])
  case class ReadSourceRequest(userId: String)
  case class ReadSourceReady(userId: String, sourceRef: SourceRef[ByteString])
}

class Writer extends Actor {

    // code omitted

    val logsDir = "logs"

    val path = Files.createDirectories(FileSystems.getDefault.getPath(logsDir))

    def logFile(id: String) = {
        path.resolve(id)
    }

    def logFileSink(logId: String): Sink[ByteString, Future[IOResult]] = FileIO.toPath(logFile(logId), Set(CREATE, WRITE, APPEND))
    def logFileSource(logId: String): Source[ByteString, Future[IOResult]] = FileIO.fromPath(logFile(logId))

    override def receive: Receive = {
        case WriteSinkRequest(userId) => 
            // obtain the source you want to offer:
            val sink = logFileSink(userId)
            // materialize the SinkRef (the remote is like a source of data for us):
            val ref: Future[SinkRef[ByteString]] = StreamRefs.sinkRef[ByteString]().to(sink).run()
            // wrap the SinkRef in some domain message, such that the sender knows what source it is
            val reply: Future[WriteSinkReady] = ref.map(WriteSinkReady(userId, _))
            // reply to sender
            reply.pipeTo(sender())

        case ReadSourceRequest(userId) =>
            val source = logFileSource(userId)
            val ref: Future[SourceRef[ByteString]] = source.runWith(StreamRefs.sourceRef())
            val reply: Future[ReadSourceReady] = ref.map(ReadSourceReady(userId, _))
            reply pipeTo sender()

    }
}

另外,是否可以不创建“保存到日志”接收器,而是创建流:要写入的传入数据~


共有1个答案

沃阳飙
2023-03-14

以背压方式将数据流传输给持久参与者的一个想法是使用Sink。actorRefWithAck:让参与者在保留消息时发送确认消息。这将类似于以下内容:

// ...
case class WriteSinkReady(userId: String, sinkRef: SinkRef[MyMsg])    
// ...

def receive = {
  case WriteSinkRequest(userId) =>
    val persistentActor: ActorRef = ??? // a persistent actor that handles MyMsg messages
                                        // as well as the messages used in persistentSink

    val persistentSink: Sink[MyMsg, NotUsed] = Sink.actorRefWithAck[MyMsg](
      persistentActor,
      /* additional parameters: see the docs */
    )

    val ref: Future[SinkRef[MyMsg]] = StreamRefs.sinkRef[MyMsg]().to(persistentSink).run()
    val reply: Future[WriteSinkReady] = ref.map(WriteSinkReady(userId, _))
    reply.pipeTo(sender())

  case ReadSourceRequest(userId) =>
    // ...
}

上面的示例使用名为MyMsg的自定义case类,而不是ByteString

在发送者中,假设是演员:

def receive = {
  case WriteSinkReady(userId, sinkRef) =>
    source.runWith(sinkRef) // source is a Source[MyMsg, _]

  // ...
}

发送方中的物化流将向持久参与者发送消息。

 类似资料:
  • 问题内容: 我创建了一个这样的对象: 我想保存该对象。我怎样才能做到这一点? 问题答案: 你可以使用标准库中的模块。这是你的示例的基本应用: 你还可以定义自己的简单实用程序,如下所示,该实用程序打开文件并向其中写入单个对象: 更新资料 由于这是一个非常受欢迎的答案,因此,我想谈谈一些高级用法主题。 实际使用该cPickle模块几乎总是可取的,而不是因为该模块是用C编写的并且速度更快。它们之间有一些

  • 我正在寻找从经典Akka持久化迁移到Akka持久化类型。在这里找到的Lagom留档:1说“注意:从Lagom持久化(经典)迁移到Akka持久化类型时的唯一限制是需要完全关闭集群。即使所有持久数据都是兼容的,Lagom持久化(经典)和Akka持久化类型也不能共存。” 有人知道这是否适用于服务器可能知道的所有持久实体吗?例如,我使用的服务有3个独立的持久实体。我需要一次迁移所有3个,还是可以一次迁移一

  • 我已经开始使用推荐的,并不再使用。其中一件我无法粘合的事情是使用持久的cookie存储。我只想在我的连接上附加一个定制的cookie处理程序/管理器来存储cookie。Android文档并没有太大帮助,因为它将有关cookie的主题概括为两行。 我之前一直在使用LoopJ的PersistentCookieStore,效果很好。 关于如何在Android中设置一个持久的cookie存储,我可以附加到

  • 问题内容: 我最近开始玩Play!Java框架1.2.3版(最新)。在测试框架时,尝试在名为的Hibernate实体中保留对象时遇到了一个奇怪的问题。地图对象映射长到Hibernate的实体,我呼吁,随着申报 我的问题如下:按照我的注释创建正确的表。但是,当对象持久化时,其中的数据就不会! 这是我用于实体的代码。首先是: 这里是: 这是我用来测试设置的课程: 表演!框架会自动为HTTP请求创建一个

  • 问题内容: 我最近开始玩Play!Java框架1.2.3版(最新)。在测试框架时,尝试在名为的Hibernate实体中保留对象时遇到了一个奇怪的问题。地图对象映射长到Hibernate的实体,我呼吁,随着申报 我的问题如下:按照我的注释创建正确的表。但是,当对象持久化时,其中的数据就不会! 这是我用于实体的代码。首先是: 这里是: 这是我用来测试设置的课程: 表演!框架会自动为HTTP请求创建一个

  • 我熟悉此处定义的Akka测试方法: http://doc.akka.io/docs/akka/snapshot/scala/testing.html 我了解如何使用TestKit、TestActorRef、TestProbe等。我还了解我的核心域逻辑应该与Akka隔离提取和测试。 我的问题是关于在Akka Persistence参与者(即PersistentActor和PersistentView