当前位置: 首页 > 知识库问答 >
问题:

Flink工作流并行与自定义源

杨晓博
2023-03-14

我在Flink中构建了一个工作流,它由一个自定义源、一系列地图/平面地图和一个接收器组成。

我的自定义源的run()方法遍历存储在文件夹中的文件,并通过上下文的collect()方法收集每个文件的名称和内容(我有一个自定义对象,它将此信息存储在两个字段中)。

然后,我有一系列地图/平面图来转换这些对象,然后使用自定义接收器将其打印到文件中。在Flink的Web UI中生成的执行图如下所示:

我有一个集群或2个worker设置,每个都有6个插槽(它们都有6个内核)。我将平行度设置为12。从执行图中,我看到源代码的并行度为1,而工作流的其余部分的并行度为12。

当我运行工作流时(我在专用文件夹中有大约15K个文件),我会使用htop监控我的工作人员的资源。大多数时候,所有核心都达到了100%的利用率,但大约每30分钟左右,就会有8-10个核心闲置2-3分钟左右。

我的问题如下:

>

我的工作流的其余部分使用parallelism 12执行,这看起来是正确的,因为通过检查任务管理器的日志,我可以从所有插槽中获得打印结果(例如,<代码>…[平面图-

  • 插槽1读取文件并将其转发到可用插槽(2到12)
  • 插槽1将一个文件转发给自身并停止读取,直到完成其工作
  • 完成后,插槽1读取更多文件并将其转发到可用的插槽

我相信我上面描述的是错误的,但我举个例子来更好地解释我的问题

共有2个答案

章禄
2023-03-14
  1. 单消费者设置将管道的总吞吐量限制在唯一消费者的性能范围内。此外,它还对所有插槽引入了重洗牌——在这种情况下,消费者读取的所有数据也会在这个消费者插槽上序列化,这是额外的CPU负载。相比之下,让消费者并行性等于map/平面映射并行性将允许链接源——
简培
2023-03-14

为了回答关于并行化阅读的具体问题,我将执行以下操作。。。

  1. 通过扩展RichSourceFunction来实现自定义源代码

通过这种方式,您可以将工作分散到12个子任务上,而无需子任务尝试处理同一个文件

 类似资料:
  • 我计划将 Cadence 或临时工作流用于架构,但我们计划在决定工作流时为用户提供很大的权力。在他们的用例中,节奏和时间都提到他们的SDK支持自定义DSL,但我看不到该功能。你能帮帮我吗?

  • 是否可以为Java8并行流指定自定义线程池?我到处都找不到它。 如果我不能为不同的模块使用不同的线程池,这就意味着我不能在大多数真实世界的情况下安全地使用并行流。 请尝试以下示例。有些CPU密集型任务在单独的线程中执行。这些任务利用并行流。第一个任务被打破,因此每一步需要1秒(通过线程Hibernate模拟)。问题是其他线程会被卡住,等待中断的任务完成。这是一个虚构的示例,但假设一个servlet

  • 问题内容: 是否可以为Java 8 并行流指定自定义线程池?我在任何地方都找不到。 假设我有一个服务器应用程序,并且想使用并行流。但是应用程序很大并且是多线程的,所以我想将其划分。我不希望一个模块中的一个模块中的某个模块运行缓慢,而另一个模块中的任务却运行缓慢。 如果不能为不同的模块使用不同的线程池,则意味着在大多数实际情况下,我不能安全地使用并行流。 请尝试以下示例。在单独的线程中执行一些CPU

  • 使用像Activiti这样的现代工作流引擎的主要驱动因素之一是它对云和多租户的支持。我们目前的内部工作流引擎缺乏这些功能。所以,我们正计划用Activiti取代它。 当前的想法是,我们将Activiti作为一个独立的应用程序运行。我们的应用程序(多个实例)将使用REST API与Activiti应用程序交互。

  • 问题内容: 请注意: 这个问题是基于旧的,现在称为“脚本化”管道格式的。使用“声明性管道”时,可以将并行块嵌套在阶段块内部(请参阅带有声明性管道1.2的并行阶段)。 我想知道Jenkins工作流/管道插件,特别是并行步骤应该如何工作。如何将它们与构建阶段混合使用。我了解一般模式: 但是,我想并行运行几个阶段(在具有多个执行程序的同一节点上),因此我尝试添加以下阶段: 这不能按预期方式工作。“执行任

  • 在Flink-Job中,我目前有两个流,一个是每分钟从Kafka主题更新的主数据流,另一个流(广播流)用于KeyedBroadcastProcessFunction的process元素函数中,用于对主流数据进行一些计算。 2)主数据可以有两个广播流吗? 3)由于流数据是完全不同的数据,广播,第三个数据流不经常变化,所以连接是不起作用的。它就像一个主数据,在计算中和主数据流一起使用,找不到任何解决方