我们有一个Akka应用程序,它使用Kafka主题,并将收到的消息发送给Akka参与者。我不确定我编程的方式是否充分利用了Akka Streams内置背压机制的所有优点。
以下是我的配置。。。
val control : Consumer.DrainingControl[Done]
Consumer
.sourceWitOffsetContext(consumerSettings, Subscriptions.topics("myTopic"))
.map(consumerRecord =>
val myAvro = consumerRecord.value().asInstanceOf[MyAvro]
val myActor = AkkaSystem.sharding.getMyActor(myAvro.getId)
myActor ! Update(myAvro)
)
.via(Commiter.flowWithOffsetContext(CommitterSettings(AkkaSystem.system.toClassic)))
.toMat(Sink.ignore)(Consumer.DrainControl.apply)
.run()
这做了我所期望的商业案例,myActor收到命令更新(MyAvro)
我更讨厌背压的技术概念,据我所知,背压机制部分由水槽控制,但在这种水流配置中,我的水槽只是“水槽”。“忽略”。所以我的水槽可以缓解背压。
当《阿卡Kafka流》提交Kafka主题偏移时,我还好奇什么?命令发送到MyActor邮箱的那一刻?如果是这样的话,那么我如何处理像ask模式这样的场景,在ask模式完成之前,Kafka偏移量不应该提交。
我看到了一些处理手动偏移控制的工厂方法,例如,在我的业务逻辑中,可以手动提交偏移量吗?
作为替代配置,我可以使用这样的东西。
val myActor: ActorRef[MyActor.Command] = AkkaSystem.sharding.getMyActor
val (control, result) =
Consumer
.plainSource(consumerSettings, Subscriptions.topics("myTopic"))
.toMat(Sink.actorRef(AkkaSystem.sharding.getMyActor(myAvro.getId), null))(Keep.both)
.run()
现在我可以进入水槽了。actorRef,我认为背压机制有机会控制背压,当然这个代码不会工作,因为我不知道如何在这个星座下访问“myAvro”。
谢谢你的回答...
这种说法不正确:
... 据我所知,背压机制部分由水槽控制,但在这种水流配置中,我的水槽只是“水槽”。“忽略”。所以我的水槽可以缓解背压。
对于背压,Sink
s没有什么特别之处。作为流量控制机制的背压将自动用于流中存在异步边界的任何地方。这可能在接收器中,但也可能在流中的任何其他地方。
在你的情况下,你正在连接你的流与演员交谈。这是你的异步边界,但你的方式是使用
映射
,在映射内你使用
与演员交谈。所以没有背压,因为:
map
不是一个异步操作符,它内部调用的任何东西都不能参与背压机制。因此,从Akka流的角度来看,没有引入异步边界
是fire and forget,对于演员在执行任何背压时有多忙,没有提供反馈
就像Levi提到的,您可以做的是从
tell
更改为ask
交互,并使接收参与者在其工作完成时做出响应。然后您可以像Levi描述的那样使用mapAsync
。map
和mapAsync
之间的区别是mapAsync
的语义学是这样的,只有当返回Future
完成时,它才会向下游发出。即使并行度
为1,背压仍然有效。如果您的Kafka记录的速度比您的参与者可以处理的要快得多,mapAsync
在等待Future
完成时会逆向上行。
在这种情况下,我认为增加
。根据您的参与者处理消息的方式,增加并行性
没有意义,因为所有这些消息都将添加到参与者的收件箱中,因此您不会通过这样做真正加快任何速度。如果交互是REST调用,那么它可以提高整体吞吐量
mapAsync
的并行度
可能会导致吞吐量增加。并行度
值有效地限制了在反压生效之前允许的未完成Future
s的最大数量。
在第一个流中,基本上没有反压。偏移提交将在消息发送到myActor
后很快发生。
对于反压,您需要等待目标参与者的响应,正如您所说,询问模式是实现这一目标的典型方式。由于来自参与者外部的参与者的询问(出于所有意图和目的,流都在参与者之外:阶段由参与者执行是一个实现细节)会导致Future
,这表明需要mapAsync
。
def askUpdate(m: MyAvro): Future[Response] = ??? // get actorref from cluster sharding, send ask, etc.
然后将原始流中的映射替换为
.mapAsync(parallelism) { consumerRecord =>
askUpdate(consumeRecord.value().asInstanceOf[MyAvro])
}
mapAsync
将“正在运行”的未来限制为parallelism
。如果存在并行性
未来(当然是由它产生的),它将产生反压力。如果衍生的未来以失败告终(对于ask本身来说,这通常是一个超时),它将失败;成功未来的结果(关于传入订单)将被传递(通常是akka.Done
,尤其是当流中只剩下offset commit和Sink.ignore
)时)。
类型:雷达图 1.值显示在 name 下面 2.支持值颜色样式的配置 找了一圈文档了 只能实现下图效果,还是找不到 值样式 配置 一定要自己亲自实现过的!!! (问了十个同事,基本都说用 formatter 然后就实现不了....)
我对编程很陌生,我已经自学了将近一个月了,有谁能给我解释一下我代码中错误的原因吗?在“Total(moneyConv(moneySum*moneyRate))”中出错。行,表示实际和形式的论点在长度上是不同的。我检查了我所有的参数,我觉得很好。多谢!
问题内容: 在回答这个问题时,我和其他一些人实际上认为是错误的,因为认为以下方法可行: 说一个有 背后的原因是什么 而有一个 要么 要么 是造成尺寸 退化的 原因吗? 问题答案: 是对象dtype的2D数组,每行的第一个元素是一个列表。 是对象dtype的一维数组,其每个元素都是列表。 当您这样做时,NumPy不会对list的每个元素进行元素比较。它从中创建尽可能高维的数组,生成1D数组,然后广播
例如下面的代码。它有一个随机类。然而,它总是在任何地方产生相同的输出。在这种情况下,哪一项是种子? 来源:link
这个问题在我的项目中经常出现。作为一个例子,假设我有两个接口,一个从API检索信息,另一个解析这些信息。 现在,我可能需要有不同的API,因此我将有许多的实现,但每个实现都需要自己的。 这看起来与Bridge设计模式所建议的非常相似,但是该模式允许任何APIClient使用任何APIParser(我说的对吗?) 那么,有没有更好的解决方案呢?或者也许这很好,不需要重构它。 另外,也许parse不是