我需要创建一个具有以下接口的函数:
import akka.kafka.scaladsl.Consumer.Control
object ItemConversionFlow {
def build(config: StreamConfig): Flow[Item, OtherItem, Control] = {
// Implementation goes here
}
我的问题是,我不知道如何定义符合上述接口的流。
当我做这种事的时候
val flow = Flow[Item]
.map(item => doConversion(item)
.filter(_.isDefined)
.map(_.get)
结果类型为Flow[Item,OtherItem,NotUsed]。到目前为止,我还没有在Akka文档中找到任何东西。还有akka上的功能。流动scaladsl。流只提供“未使用”而不是控制。如果有人能给我指出正确的方向那就太好了。
一些背景:我需要设置几个只在转换部分区分的管道。这些管道是主流的子流,可能由于某种原因而停止(相应的消息到达某个Kafka主题)。因此我需要控制部分。其想法是创建一个图形模板,我只是在其中插入所提到的流作为参数(一个返回它的工厂)。对于特定情况,我们有一个有效的解决方案。一般来说,我需要这种流动。
实际上你有背压。然而,想想你真正需要的背压是什么。。。您没有使用异步阶段来增加吞吐量。。。例如背压可以避免快速生产商过度增长的用户https://doc.akka.io/docs/akka/2.5/stream/stream-rate.html.在您的示例中,您的流将根据doConversion完成所需的时间向发布者请求新元素。
如果您想获取流的结果,请使用toMat或viaMat。例如,如果您的流发出Item并将它们转换为其他项目:
val str = Source.fromIterator(() => List(Item(Some(1))).toIterator)
.map(item => doConversion(item))
.filter(_.isDefined)
.map(_.get)
.toMat(Sink.fold(List[OtherItem]())((a, b) => {
// Examine the result of your stream
b :: a
}))(Keep.right)
.run()
str将是未来[列表[其他项目]]。尝试将其推断到您的情况。
或者将toMat与KillSwitches一起使用,“创建[[FlowShape]]的新[[Graph]],该[[FlowShape]]具体化到外部交换机,允许外部完成该唯一具体化*。不同的具体化会导致不同的独立开关。”
def build(config: StreamConfig): Flow[Item, OtherItem, UniqueKillSwitch] = {
Flow[Item]
.map(item => doConversion(item))
.filter(_.isDefined)
.map(_.get)
.viaMat(KillSwitches.single)(Keep.right)
}
val stream =
Source.fromIterator(() => List(Item(Some(1))).toIterator)
.viaMat(build(StreamConfig(1)))(Keep.right)
.toMat(Sink.ignore)(Keep.both).run
// This stops the stream
stream._1.shutdown()
// When it finishes
stream._2 onComplete(_ => println("Done"))
out.mov的ffprompt-show_stream-show_format的输出如下: 我有一个“示例”片段,它显示了我想要的行为,带有以下流和信息: 我根本无法看出其中的差别。 输入、输出和工作模板可以在这里找到。 (点击链接时,您可能会看到的安全问题来自服务器证书是自签名的。您可以接受临时异常。顺便说一句:输出文件荒谬的文件大小将是下一个需要解决的问题。可能是与压缩有关的问题。)
走这条路对吗?如果没有,流量控制该怎么做?
因此,一般的想法是使用来爬取页面并执行HTTP请求(想法和实现与本答案中描述的非常接近。这将创建一个,然后可以使用它(解封为多部分,拆分为各个部分,...)。 我现在的问题是的消耗可能需要一段时间(如果页面很大,解组需要一些时间,可能最后会有一些数据库请求来持久化一些数据,...)。因此,如果下游速度较慢,我希望为背压。默认情况下,下一个HTTP请求将在上一个请求完成后立即启动。 所以我的问题是:
类: BrowserView 创建和控制视图 注意: BrowserView API目前为实验性质,可能会更改或删除。 进程:主进程 BrowserView是 webview的替代标签,就像是子窗口一样让某个 BrowserWindow嵌入更多的Web内容. 例子 1 // 主进程中 2 const {BrowserView, BrowserWindow} = require('electron
我们有一个Akka应用程序,它使用Kafka主题,并将收到的消息发送给Akka参与者。我不确定我编程的方式是否充分利用了Akka Streams内置背压机制的所有优点。 以下是我的配置。。。 这做了我所期望的商业案例,myActor收到命令更新(MyAvro) 我更讨厌背压的技术概念,据我所知,背压机制部分由水槽控制,但在这种水流配置中,我的水槽只是“水槽”。“忽略”。所以我的水槽可以缓解背压。
连接:Keep-Alive 主机:webhook.site 内容类型:application/x-www-form-urlencoded