当前位置: 首页 > 知识库问答 >
问题:

如何从Akka事件流构建Akka流源?

通和裕
2023-03-14

MyActor接收到Start消息时,它将运行Akka流,并将接收到的每个项发布到Akka事件流

class MyActor (implicit system: ActorSystem, materialize: Materializer, ec: ExecutionContext) extends Actor {

  override def receive: Receive = {
    case Start =>
      someSource
        .toMat(Sink.foreach(item => system.eventStream.publish(item)))(Keep.left)
        .run()
  }
}

我怎么能那么做?

以防万一它可能会添加更多选项,请注意,另一个代码块是play framework的Websocket处理程序。

共有1个答案

茹正初
2023-03-14

这似乎是一个XY问题。如果发布服务器和订阅服务器最终解耦,如果发布服务器比订阅服务器更快地生成数据,会发生什么?

说到这里,这里有一个方法来做你要求的事情:

/** Produce a source by subscribing to the Akka actorsystem event bus for a
  * specific event type.
  * 
  * @param bufferSize max number of events to buffer up in the source
  * @param overflowStrategy what to do if the event buffer fills up
  */
def itemSource[Item : ClassTag](
  bufferSize: Int = 1000,
  overflowStrategy: OverflowStrategy = OverflowStrategy.dropNew
)(
  implicit system: ActorSystem
): Source[Item, NotUsed] = Source
  .lazily { () =>
    val (actorRef, itemSource) = Source
      .actorRef[Item](
        completionMatcher = PartialFunction.empty,
        failureMatcher = PartialFunction.empty,
        bufferSize,
        overflowStrategy
      )
      .preMaterialize()

    system.eventStream.subscribe(actorRef, classTag[Item].runtimeClass)

    itemSource
  }
  .mapMaterializedValue(_ => NotUsed)

 类似资料:
  • 我不确定这是否是正确的方法,或者即使这是一个好的方法,或者我是否应该使用一个actor与路由交互,使用ask模式,然后在actor内部流式处理所有内容。 有什么想法吗?

  • 在Akka Streams的早期版本中,返回了一个的,可以具体化为。 在Akka Streams 2.4中,我看到返回一个——我不清楚如何使用它。我需要应用于流的转换必须使整个可用,所以我不能只

  • 我需要遍历一个形状像树的API。例如,目录结构或讨论线程。它可以通过以下流程进行建模: 如何遍历这些数据?我的工作如下: 然而,由于我使用的是带有缓冲区的流,所以流永远不会完成。 上游完成且缓冲元件已排空时完成 流缓冲器 我多次阅读了图表周期、活跃度和死锁部分,但仍在努力寻找答案。 这将创建一个活动锁: 编辑:我添加了一个git repo来测试你的解决方案https://github.com/Ma

  • 学习Akka Streams。我有一个记录流,每个时间单位很多,已经按时间排序(来自Slick),我想通过检测时间步长何时变化来将它们批处理为时间组。 实例 如果传入流是 我想把它转换成 到目前为止,我只发现了按固定数量的记录进行分组,或者拆分成许多子流,但从我的角度来看,我不需要多个子流。 更新:我发现了,但它看起来更关心背压,而不仅仅是一直批处理。

  • 阅读akka-stream的留档,我不太清楚消息的顺序以及是否可以强制执行。让我用我为聊天服务器编写的一小段代码来设置我的问题的上下文。 为了让事情变得简单,我使用了这个流的形状和一个非常简单的源和汇。像这样的-- 现在,我的担忧来了。终端中打印的事件顺序根本不正常。我不知道该怎么解决。这是我得到的结果-- 输出中缺少第一条消息。消息似乎是在打印之前发送的。 我尝试通过使用(我在上面的代码中对此进