当前位置: 首页 > 知识库问答 >
问题:

Akka StreamsscalaDSL和Op-Rabbit

乐正玺
2023-03-14

我已经开始使用Akka Streams和Op Rabbit,我有点困惑。

我需要根据谓词拆分流,然后将它们组合起来,就像我在创建图形和使用分区和合并时所做的那样。

我已经能够使用GraphDSL. Builder做这样的事情,但似乎无法让它与AckedSource/Flow/Sink一起工作

图表如下所示:

                        | --> flow1 --> |
source--> partition --> |               | --> flow3 --> sink
                        | --> flow2 --> |

我不确定是否拆分什么时候是我应该使用的,因为我总是需要正好2个流。

这是一个不进行分区且不使用GraphDSL. Builder的示例:

def splitExample(source: AckedSource[String, SubscriptionRef],
                 queueName: String)
                (implicit actorSystem: ActorSystem): RunnableGraph[SubscriptionRef] = {
  val toStringFlow: Flow[AckTup[Message], AckTup[String], NotUsed] = Flow[AckTup[Message]]
    .map[AckTup[String]](tup => {
      val (p,m) = tup
      (p, new String(m.data))
    })

  val printFlow1: Flow[AckTup[String], AckTup[String], NotUsed] = Flow[AckTup[String]]
    .map[AckTup[String]](tup => {
      val (p, s) = tup
      println(s"flow1 processing $s")
      tup
     })

  val printFlow2: Flow[AckTup[String], AckTup[String], NotUsed] = Flow[AckTup[String]]
    .map[AckTup[String]](tup => {
      val (p, s) = tup
      println(s"flow2 processing $s")
      tup
    })

  source
    .map(Message.queue(_, queueName))
    .via(AckedFlow(toStringFlow))
    // partition if string.length < 10
    .via(AckedFlow(printFlow1))
    .via(AckedFlow(printFlow2))
    .to(AckedSink.ack)
}

这是我似乎无法使用的代码:

import GraphDSL.Implicits._
def buildModelAcked(source: AckedSource[String, SubscriptionRef] , queueName: String)(implicit actorSystem: ActorSystem):  Graph[ClosedShape, Future[Done]] = {
    import GraphDSL.Implicits._
    GraphDSL.create(Sink.ignore) { implicit builder: GraphDSL.Builder[Future[Done]] => s =>
    import GraphDSL.Implicits._
    source.map(Message.queue(_, queueName)) ~> AckedFlow(toStringFlow) ~> AckedSink.ack
//      source.map(Message.queue(_, queueName)).via(AckedFlow(toStringFlow)).to(AckedSink.ack)
    ClosedShape

}}

编译器无法解析~

所以我的问题是:

>

  • 是否有一个使用scala dsl构建Acked/Source/Flow/Sink图表的示例项目?

    是否有一个分区和合并的示例项目与我在这里尝试做的类似?


  • 共有2个答案

    陶飞鸿
    2023-03-14

    根据Stefano Bonetti的出色指导,这里有一个可能的解决方案:

    graph:    
                            |--> short --|
      rabbitMq --> before --|            |--> after
                            |--> long  --|
    

    解决方案

    val before: Flow[AckTup[Message], AckTup[String], NotUsed] = Flow[AckTup[Message]].map[AckTup[String]](tup => {
      val (p,m) = tup
      (p, new String(m.data))
    })
    
    val short: Flow[AckTup[String], AckTup[String], NotUsed] = Flow[AckTup[String]].map[AckTup[String]](tup => {
      val (p, s) = tup
      println(s"short: $s")
      tup
    })
    val long: Flow[AckTup[String], AckTup[String], NotUsed] = Flow[AckTup[String]].map[AckTup[String]](tup => {
      val (p, s) = tup
      println(s"long: $s")
      tup
    })
    val after: Flow[AckTup[String], AckTup[String], NotUsed] = Flow[AckTup[String]].map[AckTup[String]](tup => {
      val (p, s) = tup
      println(s"all $s")
      tup
    })
    
    def buildSplitGraph(source: AckedSource[String, SubscriptionRef]
                        , queueName: String
                        , splitLength: Int)(implicit actorSystem: ActorSystem):  Graph[ClosedShape, Future[Done]] = {
     GraphDSL.create(Sink.ignore) { implicit builder: GraphDSL.Builder[Future[Done]] => s =>
       val toShort = 0
       val toLong = 1
    
       // junctions
       val split = builder.add(Partition[AckTup[String]](2, (tup: AckTup[String]) => {
                                                               val (p, s) = tup
                                                               if (s.length < splitLength) toShort else toLong
                                                             }
       ))
       val merge = builder.add(Merge[AckTup[String]](2))
    
       //graph
       val beforeSplit = source.map(Message.queue(_, queueName)).wrappedRepr ~> AckedFlow(before).wrappedRepr
       beforeSplit ~> split
       // must do short, then long since the split goes in that order
       split ~> AckedFlow(short).wrappedRepr ~> merge
       split ~> AckedFlow(long).wrappedRepr ~> merge
       // after the last AckedFlow, be sure to '.acked' so that the message will be removed from the queue
       merge ~> AckedFlow(after).acked ~> s
    
      ClosedShape
    }}
    

    正如Stefano Bonetti所说,关键是使用与AckedFlow关联的. wrappeRepr,然后使用. ack组合器作为最后一步。

    杜君浩
    2023-03-14

    在处理确认流时,请记住以下定义。

    1. AckedSource[Out, Mat]Source[AckTup[Out], Mat]]
    2. 的包装器
    3. AckedFlow[In, Out, Mat]Flow[AckTup[In], AckTup[Out], Mat]
    4. 的包装器
    5. AckedSink[In, Mat]Sink[AckTup[In], Mat]
    6. 的包装器
    7. AckTup[T](Promise[Unit], T)
    8. 的别名
    9. 经典流组合器将对AckTup
    10. T部分进行操作
    11. . ack组合器将完成AckedFlow
    12. Promise[Unit]

    GraphDSL边缘运算符(~

    你有两条出路:

    1. 您可以定义自己的~

     类似资料:
    • Note: Functions taking Tensor arguments can also take anything accepted by tf.convert_to_tensor. Contents Constants, Sequences, and Random Values Constant Value Tensors tf.zeros(shape, dtype=tf.float3

    • 问题内容: 使用进行基准测试时,会看到以下结果。 根据我的理解: 是迭代次数。 是一次迭代完成所需的大概时间 但即使阅读文档,我无法找出什么和意味着什么。 我的猜测是allocs / op与垃圾回收和内存分配有关(越少越好)。 任何人都可以很好地解释这些值的含义。也很高兴知道为什么要减少这些步骤以及减少它们的主要步骤(我意识到这是针对测试的,但是在某些情况下可能会有一些通用的提示) 问题答案: 表

    • PREREQUISITES: Some familiarity with C++. Must have downloaded TensorFlow source, and be able to build it. If you'd like to incorporate an operation that isn't covered by the existing library, you can

    • If you'd like to create an op that isn't covered by the existing TensorFlow library, we recommend that you first try writing the op in Python as a composition of existing Python ops or functions. If t

    • 预备知识: 对 C++ 有一定了解. 已经下载 TensorFlow 源代码并有能力编译它. 如果现有的库没有涵盖你想要的操作, 你可以自己定制一个. 为了使定制的 Op 能够兼容原有的库 , 你必须做以下工作: 在一个 C++ 文件中注册新 Op. Op 的注册与实现是相互独立的. 在其注册时描述了 Op 该如何执行. 例如, 注册 Op 时定义了 Op 的名字, 并指定了它的输入和输出. 使用

    • 预备知识: 对 C++ 有一定了解. 已经下载 TensorFlow 源代码并有能力编译它. 如果现有的库没有涵盖你想要的操作, 你可以自己定制一个. 为了使定制的 Op 能够兼容原有的库 , 你必须做以下工作: 在一个 C++ 文件中注册新 Op. Op 的注册与实现是相互独立的. 在其注册时描述了 Op 该如何执行. 例如, 注册 Op 时定义了 Op 的名字, 并指定了它的输入和输出. 使用