我已经开始使用Akka Streams和Op Rabbit,我有点困惑。
我需要根据谓词拆分流,然后将它们组合起来,就像我在创建图形和使用分区和合并时所做的那样。
我已经能够使用GraphDSL. Builder做这样的事情,但似乎无法让它与AckedSource/Flow/Sink一起工作
图表如下所示:
| --> flow1 --> |
source--> partition --> | | --> flow3 --> sink
| --> flow2 --> |
我不确定是否拆分什么时候是我应该使用的,因为我总是需要正好2个流。
这是一个不进行分区且不使用GraphDSL. Builder的示例:
def splitExample(source: AckedSource[String, SubscriptionRef],
queueName: String)
(implicit actorSystem: ActorSystem): RunnableGraph[SubscriptionRef] = {
val toStringFlow: Flow[AckTup[Message], AckTup[String], NotUsed] = Flow[AckTup[Message]]
.map[AckTup[String]](tup => {
val (p,m) = tup
(p, new String(m.data))
})
val printFlow1: Flow[AckTup[String], AckTup[String], NotUsed] = Flow[AckTup[String]]
.map[AckTup[String]](tup => {
val (p, s) = tup
println(s"flow1 processing $s")
tup
})
val printFlow2: Flow[AckTup[String], AckTup[String], NotUsed] = Flow[AckTup[String]]
.map[AckTup[String]](tup => {
val (p, s) = tup
println(s"flow2 processing $s")
tup
})
source
.map(Message.queue(_, queueName))
.via(AckedFlow(toStringFlow))
// partition if string.length < 10
.via(AckedFlow(printFlow1))
.via(AckedFlow(printFlow2))
.to(AckedSink.ack)
}
这是我似乎无法使用的代码:
import GraphDSL.Implicits._
def buildModelAcked(source: AckedSource[String, SubscriptionRef] , queueName: String)(implicit actorSystem: ActorSystem): Graph[ClosedShape, Future[Done]] = {
import GraphDSL.Implicits._
GraphDSL.create(Sink.ignore) { implicit builder: GraphDSL.Builder[Future[Done]] => s =>
import GraphDSL.Implicits._
source.map(Message.queue(_, queueName)) ~> AckedFlow(toStringFlow) ~> AckedSink.ack
// source.map(Message.queue(_, queueName)).via(AckedFlow(toStringFlow)).to(AckedSink.ack)
ClosedShape
}}
编译器无法解析~
所以我的问题是:
>
是否有一个使用scala dsl构建Acked/Source/Flow/Sink图表的示例项目?
是否有一个分区和合并的示例项目与我在这里尝试做的类似?
根据Stefano Bonetti的出色指导,这里有一个可能的解决方案:
graph:
|--> short --|
rabbitMq --> before --| |--> after
|--> long --|
解决方案:
val before: Flow[AckTup[Message], AckTup[String], NotUsed] = Flow[AckTup[Message]].map[AckTup[String]](tup => {
val (p,m) = tup
(p, new String(m.data))
})
val short: Flow[AckTup[String], AckTup[String], NotUsed] = Flow[AckTup[String]].map[AckTup[String]](tup => {
val (p, s) = tup
println(s"short: $s")
tup
})
val long: Flow[AckTup[String], AckTup[String], NotUsed] = Flow[AckTup[String]].map[AckTup[String]](tup => {
val (p, s) = tup
println(s"long: $s")
tup
})
val after: Flow[AckTup[String], AckTup[String], NotUsed] = Flow[AckTup[String]].map[AckTup[String]](tup => {
val (p, s) = tup
println(s"all $s")
tup
})
def buildSplitGraph(source: AckedSource[String, SubscriptionRef]
, queueName: String
, splitLength: Int)(implicit actorSystem: ActorSystem): Graph[ClosedShape, Future[Done]] = {
GraphDSL.create(Sink.ignore) { implicit builder: GraphDSL.Builder[Future[Done]] => s =>
val toShort = 0
val toLong = 1
// junctions
val split = builder.add(Partition[AckTup[String]](2, (tup: AckTup[String]) => {
val (p, s) = tup
if (s.length < splitLength) toShort else toLong
}
))
val merge = builder.add(Merge[AckTup[String]](2))
//graph
val beforeSplit = source.map(Message.queue(_, queueName)).wrappedRepr ~> AckedFlow(before).wrappedRepr
beforeSplit ~> split
// must do short, then long since the split goes in that order
split ~> AckedFlow(short).wrappedRepr ~> merge
split ~> AckedFlow(long).wrappedRepr ~> merge
// after the last AckedFlow, be sure to '.acked' so that the message will be removed from the queue
merge ~> AckedFlow(after).acked ~> s
ClosedShape
}}
正如Stefano Bonetti所说,关键是使用与AckedFlow
关联的. wrappeRepr
,然后使用. ack
组合器作为最后一步。
在处理确认流时,请记住以下定义。
AckedSource[Out, Mat]
是Source[AckTup[Out], Mat]]
AckedFlow[In, Out, Mat]
是Flow[AckTup[In], AckTup[Out], Mat]
AckedSink[In, Mat]
是Sink[AckTup[In], Mat]
AckTup[T]
是(Promise[Unit], T)
AckTup
T
部分进行操作
. ack
组合器将完成AckedFlow
Promise[Unit]
GraphDSL边缘运算符(~
你有两条出路:
您可以定义自己的~
Note: Functions taking Tensor arguments can also take anything accepted by tf.convert_to_tensor. Contents Constants, Sequences, and Random Values Constant Value Tensors tf.zeros(shape, dtype=tf.float3
问题内容: 使用进行基准测试时,会看到以下结果。 根据我的理解: 是迭代次数。 是一次迭代完成所需的大概时间 但即使阅读文档,我无法找出什么和意味着什么。 我的猜测是allocs / op与垃圾回收和内存分配有关(越少越好)。 任何人都可以很好地解释这些值的含义。也很高兴知道为什么要减少这些步骤以及减少它们的主要步骤(我意识到这是针对测试的,但是在某些情况下可能会有一些通用的提示) 问题答案: 表
PREREQUISITES: Some familiarity with C++. Must have downloaded TensorFlow source, and be able to build it. If you'd like to incorporate an operation that isn't covered by the existing library, you can
If you'd like to create an op that isn't covered by the existing TensorFlow library, we recommend that you first try writing the op in Python as a composition of existing Python ops or functions. If t
预备知识: 对 C++ 有一定了解. 已经下载 TensorFlow 源代码并有能力编译它. 如果现有的库没有涵盖你想要的操作, 你可以自己定制一个. 为了使定制的 Op 能够兼容原有的库 , 你必须做以下工作: 在一个 C++ 文件中注册新 Op. Op 的注册与实现是相互独立的. 在其注册时描述了 Op 该如何执行. 例如, 注册 Op 时定义了 Op 的名字, 并指定了它的输入和输出. 使用
预备知识: 对 C++ 有一定了解. 已经下载 TensorFlow 源代码并有能力编译它. 如果现有的库没有涵盖你想要的操作, 你可以自己定制一个. 为了使定制的 Op 能够兼容原有的库 , 你必须做以下工作: 在一个 C++ 文件中注册新 Op. Op 的注册与实现是相互独立的. 在其注册时描述了 Op 该如何执行. 例如, 注册 Op 时定义了 Op 的名字, 并指定了它的输入和输出. 使用