当前位置: 首页 > 知识库问答 >
问题:

如何在PlayFramework中使用Akka Streams SourceQueue

公冶阳德
2023-03-14

我想使用SourceQueue将元素动态推送到Akka流源中。Play controller需要一个源,以便能够使用chuncked方法对结果进行流式传输。
由于Play使用自己的Akka流接收器,所以我无法使用接收器来实现源队列,因为源在chunced方法使用之前就会被消耗(除非我使用下面的hack)。

如果我使用reactive-streams发布器预物化源队列,我就可以使它正常工作,但这是一种“肮脏的攻击”:

def sourceQueueAction = Action{

    val (queue, pub) = Source.queue[String](10, OverflowStrategy.fail).toMat(Sink.asPublisher(false))(Keep.both).run()

    //stupid example to push elements dynamically
    val tick = Source.tick(0 second, 1 second, "tick")
    tick.runForeach(t => queue.offer(t))

    Ok.chunked(Source.fromPublisher(pub))
  }

有没有更简单的方法来使用Akka Streams SourceQueue和PlayFramework?

谢谢

共有1个答案

贲培
2023-03-14

解决方案是在源上使用MapMaterializedValue来获得其队列物化的未来:

def sourceQueueAction = Action {
    val (queueSource, futureQueue) = peekMatValue(Source.queue[String](10, OverflowStrategy.fail))

    futureQueue.map { queue =>
      Source.tick(0.second, 1.second, "tick")
        .runForeach (t => queue.offer(t))
    }
    Ok.chunked(queueSource)

  }

  //T is the source type, here String
  //M is the materialization type, here a SourceQueue[String]
  def peekMatValue[T, M](src: Source[T, M]): (Source[T, M], Future[M]) = {
    val p = Promise[M]
    val s = src.mapMaterializedValue { m =>
      p.trySuccess(m)
      m
    }
    (s, p.future)
  }
 类似资料:
  • 问题内容: 我有一个“问题”实体,其中有一个“答案”,其中包含“替代品”列表,如下所示: 我想制作一个表格以供用户填充问题列表。我读了太多的材料和SO问题,但是我不知道如何正确地处理表单。我知道可以使用DynamicForms以其他方式执行此操作,这不是我想要的。我的想法是可以通过以下方式完成: 但是,当我尝试使用Answer对象并将其“替代”使用时,我的脸上会大爆炸: 我想念更多有关它和其他相关

  • 问题内容: 我在PLAY FRAMEWORK中有项目,但其中包含几个子模块。 每个子模块都具有以下文件夹结构: 应用程式: 控制器 楷模 意见 conf: submodulename.routes build.sbt 我想将所有java类实体都保留在folder: models中 。 我应如何配置播放框架和/或hibernate实体管理器以扫描此文件夹中的实体。 我有示例模型类,如下所示: 但是在

  • 本文向大家介绍playframework 基本用法,包括了playframework 基本用法的使用技巧和注意事项,需要的朋友参考一下 示例 典型的单例课程: 另一类,要求访问第一个。 最后使用最后一个控制器。注意,由于我们没有将FastFoodService标记为单例,因此每次注入它都会创建一个新实例。            

  • 问题内容: 我无法正常上传多个文件。当我选择x个文件时,它成功完成,但是第一个文件被上传了x次,而其他文件则根本没有被上传。有人能指出我做错了吗? 形成: 处理文件: 问题答案: 如果有人感兴趣的话,可以像这样工作: 如果可能的话,很高兴获得带有Blob对象数组的可行解决方案,而不必要求request.args.get(“ __ UPLOADS”)。

  • 我使用的项目使用JOOQ及其生成器从Postgres数据库生成类。数据库进化由播放进化管理。我们还使用git作为版本控制系统。当从git获取新的evolutions/jooq代码时,这组工具会导致某种自引用问题: 由于缺少从这些表中生成的类而导致编译问题,因此无法应用由其他团队成员创建的新表的演变。 我在考虑以下解决方案: 应用sbt的演变-创建sbt任务-到目前为止未能实现, 堆栈: PlayF

  • 问题内容: 我对Playframwork已过时的问题有一个烦人的问题,我想将conde移入建议的方式,但是实际上我无法完成此任务,文档没有意义,而且我也不知道如何解决此问题,我花了几天时间数天试图使它没有运气! https://www.playframework.com/documentation/2.5.x/GlobalSettings 我只想运行初始数据库方法 这是java文件中的内部方法,我