当前位置: 首页 > 知识库问答 >
问题:

单个flink管道的多个elasticsearch水槽

满勇军
2023-03-14

我的要求是将数据发送到不同的ES接收器(基于数据)。例如:如果数据包含特定信息,则将其发送到sink1,否则将其发送到sink2等(基本上是根据数据动态发送到任何一个接收器)。我还想分别为ES sink1、ES sink2、ES sink3等设置并行度。

                                ->  Es sink1 (parallelism 4)
Kafka -> Map(Transformations)   ->  ES sink2 (parallelism 2)
                                ->  Es sink3 (parallelism 2)

有什么简单的方法可以在flink中实现上述目标吗?

我的解决方案:(但并不满意)

我可以想出一个解决方案,但有中间Kafka主题,我写(topic1,topic2,topic3),然后有单独的管道为Essink1,Essink2和ESsink3。我想避免写这些中间Kafka主题。

kafka -> Map(Transformations) -> Kafka topics (Insert into topic1,topic2,topic3 based on the data)

Kafka topic1 -> Essink1(parallelism 4)

Kafka topic2 -> Essink2(parallelism 2)

Kafka topic3 -> Essink3(parallelism 2)

共有1个答案

贝钧
2023-03-14

您可以使用带有侧输出[2]的ProcessFunction[1]将流分为n个方向,然后将每个侧输出流连接到相应的接收器。然后在每个接收器上调用setParallelism()[3]。

[1]https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html#the-processfunction
[2]https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/side_output.html
[3]https://ci.apache.org/projects/flink/flink-docs-stable/dev/parallel.html#operator-level

 类似资料:
  • 我试图建立flume,这样每个代理可以有多个接收器,最终有多个通道和源(现在只看多个通道)。我有一个类似这样的配置文件和一个ruby模板。我不知道如何将功能添加到模板文件中,以便每个代理可以将一个事件发送到多个通道

  • 我们有一个Kafka主题,有源源不断的数据。为了处理它,我们有一个无状态的Flink管道,它使用该主题并写入另一个主题。 我们是不是漏掉了什么?我们误会什么了吗?有没有更好的解决办法? 谢了!

  • 我们有一个Spring Boot Restful API,它需要从两个不同的Elasticsearch实例(在不同的服务器上)获取数据,一个用于“共享”数据(上面有大约5个不同的索引),一个用于“私有”数据(有大约3个不同的索引)。目前只针对“私有”数据实例运行,一切都很好。但我们现在需要获取“共享”数据。 在我们的Spring Boot应用程序中,我们启用了如下Elasticsearch存储库

  • 我正在尝试将日志从单台机器上的不同目录收集到本地文件系统文件或 HDFS。 我已经注册了 2 个来源 r1、r2。两个源都指向单通道C1。有一个接收器连接到通道。K1 请找到下面的配置文件: 但是当我使用代理 a1 启动 Flume 时,只有一个源 (r2) 正在启动。水槽代理启动日志: 谢谢

  • 我看到关于为每个密钥添加水印支持的讨论很多。但是flink支持每个分区的水印吗? 当前-然后考虑所有水印(非空闲分区)的最小值。因此,窗口中最后挂起的记录也被卡住了。(使用periodicemit增加水印时) 任何关于这方面的信息都非常感谢!

  • 问题内容: 我试图在C的shell中实现多个管道。 在执行它并输入命令(例如)之后,shell只是挂在那里,不输出任何结果。我确保关闭所有管道。但它只是挂在那里。我以为那是问题所在。我删除了,执行后没有任何结果。我做错什么了?谢谢。 添加的代码: 问题答案: 我认为这里的问题是,您的等待和结账在创建子进程的同一循环内。在第一次迭代中,子进程将执行(将破坏子程序,并用您的第一个命令将其覆盖),然后父