当前位置: 首页 > 知识库问答 >
问题:

如何创建具有背压和控制的Akka流

闻人冷勋
2023-03-14

我需要创建一个具有以下接口的函数:

import akka.kafka.scaladsl.Consumer.Control

object ItemConversionFlow {

def build(config: StreamConfig): Flow[Item, OtherItem, Control] = {
    // Implementation goes here
}

我的问题是,我不知道如何定义符合上述接口的流。

当我做这种事的时候

val flow = Flow[Item]
    .map(item => doConversion(item)
    .filter(_.isDefined)
    .map(_.get)

结果类型为Flow[Item,OtherItem,NotUsed]。到目前为止,我还没有在Akka文档中找到任何东西。还有akka上的功能。流动scaladsl。流只提供“未使用”而不是控制。如果有人能给我指出正确的方向那就太好了。

一些背景:我需要设置几个只在转换部分区分的管道。这些管道是主流的子流,可能由于某种原因而停止(相应的消息到达某个Kafka主题)。因此我需要控制部分。其想法是创建一个图形模板,我只是在其中插入所提到的流作为参数(一个返回它的工厂)。对于特定情况,我们有一个有效的解决方案。一般来说,我需要这种流动。

共有1个答案

姬和豫
2023-03-14

实际上你有背压。然而,想想你真正需要的背压是什么。。。您没有使用异步阶段来增加吞吐量。。。例如背压可以避免快速生产商过度增长的用户https://doc.akka.io/docs/akka/2.5/stream/stream-rate.html.在您的示例中,您的流将根据doConversion完成所需的时间向发布者请求新元素。

如果您想获取流的结果,请使用toMat或viaMat。例如,如果您的流发出Item并将它们转换为其他项目:

val str = Source.fromIterator(() => List(Item(Some(1))).toIterator)
  .map(item => doConversion(item))
  .filter(_.isDefined)
  .map(_.get)
  .toMat(Sink.fold(List[OtherItem]())((a, b) => {
      // Examine the result of your stream
      b :: a
    }))(Keep.right)
  .run()

str将是未来[列表[其他项目]]。尝试将其推断到您的情况。

或者将toMat与KillSwitches一起使用,“创建[[FlowShape]]的新[[Graph]],该[[FlowShape]]具体化到外部交换机,允许外部完成该唯一具体化*。不同的具体化会导致不同的独立开关。”

  def build(config: StreamConfig): Flow[Item, OtherItem, UniqueKillSwitch] = {
    Flow[Item]
      .map(item => doConversion(item))
      .filter(_.isDefined)
      .map(_.get)
      .viaMat(KillSwitches.single)(Keep.right)
  }


  val stream = 
    Source.fromIterator(() => List(Item(Some(1))).toIterator)
    .viaMat(build(StreamConfig(1)))(Keep.right)
    .toMat(Sink.ignore)(Keep.both).run

  // This stops the stream
  stream._1.shutdown()

  // When it finishes
  stream._2 onComplete(_ => println("Done"))
 类似资料:
  • out.mov的ffprompt-show_stream-show_format的输出如下: 我有一个“示例”片段,它显示了我想要的行为,带有以下流和信息: 我根本无法看出其中的差别。 输入、输出和工作模板可以在这里找到。 (点击链接时,您可能会看到的安全问题来自服务器证书是自签名的。您可以接受临时异常。顺便说一句:输出文件荒谬的文件大小将是下一个需要解决的问题。可能是与压缩有关的问题。)

  • 走这条路对吗?如果没有,流量控制该怎么做?

  • 因此,一般的想法是使用来爬取页面并执行HTTP请求(想法和实现与本答案中描述的非常接近。这将创建一个,然后可以使用它(解封为多部分,拆分为各个部分,...)。 我现在的问题是的消耗可能需要一段时间(如果页面很大,解组需要一些时间,可能最后会有一些数据库请求来持久化一些数据,...)。因此,如果下游速度较慢,我希望为背压。默认情况下,下一个HTTP请求将在上一个请求完成后立即启动。 所以我的问题是:

  • 类: BrowserView 创建和控制视图 注意: BrowserView API目前为实验性质,可能会更改或删除。 进程:主进程​ BrowserView是 webview的替代标签,就像是子窗口一样让某个 BrowserWindow嵌入更多的Web内容. 例子 1 // 主进程中 2 const {BrowserView, BrowserWindow} = require('electron

  • 我们有一个Akka应用程序,它使用Kafka主题,并将收到的消息发送给Akka参与者。我不确定我编程的方式是否充分利用了Akka Streams内置背压机制的所有优点。 以下是我的配置。。。 这做了我所期望的商业案例,myActor收到命令更新(MyAvro) 我更讨厌背压的技术概念,据我所知,背压机制部分由水槽控制,但在这种水流配置中,我的水槽只是“水槽”。“忽略”。所以我的水槽可以缓解背压。

  • 连接:Keep-Alive 主机:webhook.site 内容类型:application/x-www-form-urlencoded