当前位置: 首页 > 知识库问答 >
问题:

如何将akka http与akka流绑定?

訾淇
2023-03-14
trait ImagesRoute {

  val log = LoggerFactory.getLogger(this.getClass)

  implicit def actorRefFactory: ActorRefFactory
  implicit def materializer: ActorMaterializer

  val source =
    Source
      .actorRef[Image](Int.MaxValue, OverflowStrategy.fail)
      .via(Flow[Image].mapAsync(1)(ImageRepository.add))
      .toMat(Sink.asPublisher(true))(Keep.both)

  val route = {
    pathPrefix("images") {
      pathEnd {
        post {
          entity(as[Image]) { image =>

            val (ref, publisher) = source.run()

            val addFuture = Source.fromPublisher(publisher)

            val future = addFuture.runWith(Sink.head[Option[Image]])

            ref ! image

            onComplete(future.mapTo[Option[Image]]) {
              case Success(img) =>
                complete(Created, img)

              case Failure(e) =>
                log.error("Error adding image resource", e)
                complete(InternalServerError, e.getMessage)
            }
          }
        }
      }
    }
  }
}

我不确定这是否是正确的方法,或者即使这是一个好的方法,或者我是否应该使用一个actor与路由交互,使用ask模式,然后在actor内部流式处理所有内容。

有什么想法吗?

共有1个答案

司空凌
2023-03-14

如果您只希望实体提供1个图像,那么您不需要从ActorRef创建也不需要sink.aspublisher,您可以简单地使用源.single:

def imageToComplete(img : Option[Image]) : StandardRoute = 
  img.map(i => complete(Created, i))
     .getOrElse {
       log error ("Error adding image resource", e)
       complete(InternalServerError, e.getMessage
     }

...

entity(as[Image]) { image =>

  val future : Future[StandardRoute] = 
    Source.single(image)
          .via(Flow[Image].mapAsync(1)(ImageRepository.add))
          .runWith(Sink.head[Option[Image]])
          .map(imageToComplete)

  onComplete(future)
}

进一步简化您的代码,您只处理1个图像的事实意味着流是不必要的,因为只需要1个元素就不需要背压:

val future : Future[StandardRoute] = ImageRepository.add(image)
                                                    .map(imageToComplete)

onComplete(future)

在您指出的注释中

val byteStrToImage : Flow[ByteString, Image, _] = ???

val imageToByteStr : Flow[Image, Source[ByteString], _] = ???

def imageOptToSource(img : Option[Image]) : Source[Image,_] =
  Source fromIterator img.toIterator

val route = path("images") {
  post {
    extractRequestEntity { reqEntity =>

      val stream = reqEntity.via(byteStrToImage)
                            .via(Flow[Image].mapAsync(1)(ImageRepository.add))
                            .via(Flow.flatMapConcat(imageOptToSource))
                            .via(Flow.flatMapConcat(imageToByteStr))

      complete(HttpResponse(status=Created,entity = stream))
    }
  }
}    
 类似资料:
  • 当接收到消息时,它将运行,并将接收到的每个项发布到。 我怎么能那么做? 以防万一它可能会添加更多选项,请注意,另一个代码块是的Websocket处理程序。

  • 我的流有一个CPU绑定和IO绑定阶段的均匀混合(每个IO阶段后面都有一个CPU阶段)。我想要做的是将IO操作放在一个不同于流其余部分的分派程序上。 在一个传统的基于actor的Akka应用程序中,我可以将IO actor放在一个固定的线程池调度器上,它有很多线程,而将CPU绑定的actor放在一个fork join池上,它有少量的线程(一些是核数的倍数,理想情况下是1)。这将减少CPU绑定参与者在

  • 我正试图用slf4j log4j建立一个项目,但这该死的东西就是不起作用。。。我不断得到例外: 我查了一下,班级org.apache.log4j。级别是在我的项目下"Maven依赖 我已经尝试了以下选项: slf4j api log4j slf4j api slf4j-log4j12 slf4j api log4j slf4j api slf4j-log4j12 log4j 这些选项都不起作用:(

  • 我的服务代码如下所示, 在我的AKKA HTTP路由中,我尝试从返回的未来构建,如下所示, 我不确定如何提交给响应。被传递的未来实质上是一系列预期按顺序执行的平面映射的未来。但是,我不相信这会作为分块字节流返回到客户端。 然而,我只得到最后一个未来的结果如下, 亲切地问候Meeraj

  • 我有一些用akka写的演员,我想通过ServiceMix让他们互动。很难,我对这些技术是如何交互的有点困惑。这是我目前所理解的: < li>akka让我写一些演员: < ul > < li >生产者发送消息 < li >消费者接收消息 < li >发送和接收的非类型化编辑器 每个参与者将在一个固定的endpoint上可用,在ServiceMix中定义为route 现在我的问题是: 谁自动在jett

  • 我正在尝试使用swagger记录akka-超文本传输协议API 我拥有的是: 这会生成一个json,我可以在swagger UI中查看它。但是,我不能使用生成的示例,因为缺少auth选项。 我没有找到任何使用swagger-akka-http的例子,只有一些使用 config的例子 在< code>yaml中,可能是这样的: 但是,我没有。除了通过注释之外,我也不能控制生成的。 在IIUC中,提及