当前位置: 首页 > 知识库问答 >
问题:

这种Akka Kafka Stream配置是否受益于Akka Streams的背压机制?

岳正浩
2023-03-14

我们有一个Akka应用程序,它使用Kafka主题,并将收到的消息发送给Akka参与者。我不确定我编程的方式是否充分利用了Akka Streams内置背压机制的所有优点。

以下是我的配置。。。

val control : Consumer.DrainingControl[Done]
Consumer
 .sourceWitOffsetContext(consumerSettings, Subscriptions.topics("myTopic"))
 .map(consumerRecord =>
     val myAvro = consumerRecord.value().asInstanceOf[MyAvro]
     
     val myActor = AkkaSystem.sharding.getMyActor(myAvro.getId)
     
     myActor ! Update(myAvro)          
 )
 .via(Commiter.flowWithOffsetContext(CommitterSettings(AkkaSystem.system.toClassic)))
 .toMat(Sink.ignore)(Consumer.DrainControl.apply)
 .run()

这做了我所期望的商业案例,myActor收到命令更新(MyAvro)

我更讨厌背压的技术概念,据我所知,背压机制部分由水槽控制,但在这种水流配置中,我的水槽只是“水槽”。“忽略”。所以我的水槽可以缓解背压。

当《阿卡Kafka流》提交Kafka主题偏移时,我还好奇什么?命令发送到MyActor邮箱的那一刻?如果是这样的话,那么我如何处理像ask模式这样的场景,在ask模式完成之前,Kafka偏移量不应该提交。

我看到了一些处理手动偏移控制的工厂方法,例如,在我的业务逻辑中,可以手动提交偏移量吗?

作为替代配置,我可以使用这样的东西。

val myActor: ActorRef[MyActor.Command] = AkkaSystem.sharding.getMyActor
val (control, result) =
  Consumer
    .plainSource(consumerSettings, Subscriptions.topics("myTopic"))
    .toMat(Sink.actorRef(AkkaSystem.sharding.getMyActor(myAvro.getId), null))(Keep.both)
    .run()

现在我可以进入水槽了。actorRef,我认为背压机制有机会控制背压,当然这个代码不会工作,因为我不知道如何在这个星座下访问“myAvro”。

谢谢你的回答...

共有2个答案

易元青
2023-03-14

这种说法不正确:

... 据我所知,背压机制部分由水槽控制,但在这种水流配置中,我的水槽只是“水槽”。“忽略”。所以我的水槽可以缓解背压。

对于背压,Sinks没有什么特别之处。作为流量控制机制的背压将自动用于流中存在异步边界的任何地方。这可能在接收器中,但也可能在流中的任何其他地方。

在你的情况下,你正在连接你的流与演员交谈。这是你的异步边界,但你的方式是使用映射,在映射内你使用 与演员交谈。所以没有背压,因为:

  1. map不是一个异步操作符,它内部调用的任何东西都不能参与背压机制。因此,从Akka流的角度来看,没有引入异步边界
  2. 是fire and forget,对于演员在执行任何背压时有多忙,没有提供反馈

就像Levi提到的,您可以做的是从tell更改为ask交互,并使接收参与者在其工作完成时做出响应。然后您可以像Levi描述的那样使用mapAsyncmapmapAsync之间的区别是mapAsync的语义学是这样的,只有当返回Future完成时,它才会向下游发出。即使并行度为1,背压仍然有效。如果您的Kafka记录的速度比您的参与者可以处理的要快得多,mapAsync在等待Future完成时会逆向上行。 在这种情况下,我认为增加 并行性没有意义,因为所有这些消息都将添加到参与者的收件箱中,因此您不会通过这样做真正加快任何速度。如果交互是REST调用,那么它可以提高整体吞吐量 。根据您的参与者处理消息的方式,增加mapAsync并行度可能会导致吞吐量增加。并行度值有效地限制了在反压生效之前允许的未完成Futures的最大数量。

贝礼骞
2023-03-14

在第一个流中,基本上没有反压。偏移提交将在消息发送到myActor后很快发生。

对于反压,您需要等待目标参与者的响应,正如您所说,询问模式是实现这一目标的典型方式。由于来自参与者外部的参与者的询问(出于所有意图和目的,流都在参与者之外:阶段由参与者执行是一个实现细节)会导致Future,这表明需要mapAsync

def askUpdate(m: MyAvro): Future[Response] = ???  // get actorref from cluster sharding, send ask, etc.

然后将原始流中的映射替换为

.mapAsync(parallelism) { consumerRecord =>
  askUpdate(consumeRecord.value().asInstanceOf[MyAvro])
}

mapAsync将“正在运行”的未来限制为parallelism。如果存在并行性未来(当然是由它产生的),它将产生反压力。如果衍生的未来以失败告终(对于ask本身来说,这通常是一个超时),它将失败;成功未来的结果(关于传入订单)将被传递(通常是akka.Done,尤其是当流中只剩下offset commit和Sink.ignore)时)。

 类似资料:
  • 类型:雷达图 1.值显示在 name 下面 2.支持值颜色样式的配置 找了一圈文档了 只能实现下图效果,还是找不到 值样式 配置 一定要自己亲自实现过的!!! (问了十个同事,基本都说用 formatter 然后就实现不了....)

  • 我对编程很陌生,我已经自学了将近一个月了,有谁能给我解释一下我代码中错误的原因吗?在“Total(moneyConv(moneySum*moneyRate))”中出错。行,表示实际和形式的论点在长度上是不同的。我检查了我所有的参数,我觉得很好。多谢!

  • 问题内容: 在回答这个问题时,我和其他一些人实际上认为是错误的,因为认为以下方法可行: 说一个有 背后的原因是什么 而有一个 要么 要么 是造成尺寸 退化的 原因吗? 问题答案: 是对象dtype的2D数组,每行的第一个元素是一个列表。 是对象dtype的一维数组,其每个元素都是列表。 当您这样做时,NumPy不会对list的每个元素进行元素比较。它从中创建尽可能高维的数组,生成1D数组,然后广播

  • 例如下面的代码。它有一个随机类。然而,它总是在任何地方产生相同的输出。在这种情况下,哪一项是种子? 来源:link

  • 这个问题在我的项目中经常出现。作为一个例子,假设我有两个接口,一个从API检索信息,另一个解析这些信息。 现在,我可能需要有不同的API,因此我将有许多的实现,但每个实现都需要自己的。 这看起来与Bridge设计模式所建议的非常相似,但是该模式允许任何APIClient使用任何APIParser(我说的对吗?) 那么,有没有更好的解决方案呢?或者也许这很好,不需要重构它。 另外,也许parse不是