当前位置: 首页 > 知识库问答 >
问题:

同一数据流中的多个接收器

薛阳荣
2023-03-14

我有一条这样的小溪和两个水槽,但一次只使用一个:

Source.fromElements(1, 2, 3)
.via(flow)
.runWith(sink1)

Source.fromElements(1, 2, 3)
.via(flow)
.runWith(sink2)

我们使用哪个接收器是可配置的,但是如果我并行使用两个接收器怎么办?我该怎么做?

我想到了水槽。合并,但它也需要合并策略,我不想以任何方式合并这些汇的结果。我真的不关心它们,所以我只想通过HTTP将相同的数据发送到某个endpoint,同时将它们发送到数据库。Sink combine与broadcast非常相似,但从头开始实现广播会降低代码的可读性,现在我只有简单的源代码、流和接收器,没有低级的图形阶段。

你知道一个正确的方法吗(我只用一个水槽就有背压和其他东西)?

共有2个答案

唐彦
2023-03-14

使用最简单形式的GraphDSL进行广播不应该降低易读性-事实上,人们甚至可能会争辩说~

val graph = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
  import GraphDSL.Implicits._
  val bcast = builder.add(Broadcast[Int](2))

  Source.fromElements(1, 2, 3) ~> flow ~> bcast.in
  bcast.out(0) ~> sink1
  bcast.out(1) ~> sink2

  ClosedShape
})
graph.run()

廖琪
2023-03-14

您可以使用alsoTo(参见API文档):

Flow[Int].alsoTo(Sink.foreach(println(_))).to(Sink.ignore)
 类似资料:
  • 问题内容: 我试图在相同的数据上有两个轴。 数据是一对。情节是a ,我有两个和一个。 对于y值,所有数据均以米为单位,我希望有一个轴以米为单位显示该轴,以英尺为单位显示该数据。现在,这感觉很普通,但是我无法决定最明显的方法。一种有效的方法是复制数据并以英尺为单位设置y值,然后添加另一个并完成该操作。 但是我认为,子类化或向其注入一些功能以缩放值会更明智。还是我应该采用第一种方法? 你怎么看? 问题

  • 我试图在相同的数据上有两个轴。 数据是两个。该图是,我有两个和一个。 所有的数据是以米为y值,我想有一个轴显示它在米和一个轴显示它在英尺。现在这感觉像是一件常见的事情做,但我不能决定最明显的方式来做它。一种可行的方法是复制数据,并在脚中使用Y值,然后添加另一个并使用它。 但是我认为更明智的做法是将子类化,或者在中注入一些功能来缩放值。还是我应该采用第一种方法? 你觉得呢?

  • Flink社区! 我有一个关于在Flink中连接相同键上的多个流的问题(等连接)。我还是一个新手,正在为我的团队评估Flink,将我们的Spark批处理应用程序迁移到流处理。 注意:我看了FabianHüske的这篇关于加入处理的文章:窥视Apache Flink的引擎室。 为了简化问题,假设您有3个流,每个流都有唯一的记录,可以通过id字段进行键控。对于流中的每条记录,您将在其他流中找到相应的记

  • 我有一个Lambda函数,它将DynamoDB流消息推送到SNS。如果我把批量大小设为10,它可以合并两个不同流的记录吗? 例如:Stream1有一个由2个对象组成的数组,Stream2也有一个由2个对象组成的数组。如果我将批大小设置为10,lambda会创建2个批(每个流一个),还是将两个流的对象合并并将它们放在单个批下。假设两条流同时到达Lambda。

  • 我想了解接收器在火花流中是如何工作的。根据我的理解,将有一个接收器任务运行在执行器中,收集数据并保存为RDD的。当调用start()时,接收器开始读取。需要澄清以下内容。 null 想知道火花流和接收器的解剖。

  • 我有一个Kinesis生产者,它将单一类型的消息写入流。我想在多个完全不同的消费者应用程序中处理这个流。因此,给定主题/流的具有单个发布者的发布/订阅。我还想利用检查点来确保每个消费者处理写入流的每条消息。 最初,我对所有消费者和生产者使用相同的应用程序名称。但是,一旦我启动多个消费者,我就开始收到以下错误: 通用域名格式。amazonaws。服务。运动。模型InvalidArgumentExce