当前位置: 首页 > 知识库问答 >
问题:

在flink中的链式操作符内分配负载

荀学文
2023-03-14

我有一个带有单个碎片的输入动觉流。我创建了一个简单的应用程序,它具有映射函数,但环境级并行度为8。使用线程档案器,当我检查创建的线程时,只有1个线程对map函数和源代码处于活动状态(如文档中所述,查询)。我意识到这是因为我正在环境级别设置并行性。在使用map函数后,是否可以在为其创建的所有并行实例之间分配负载?

共有1个答案

呼延宪
2023-03-14

一个简单的解决方法是在动觉消费者之后放置一个重新平衡()<代码>重新平衡()执行往返重新分区。这将在紧随其后的8个地图实例中重新分配从一个碎片读取的事件。

文档。

重新平衡将导致ser/de和网络洗牌。您必须权衡这一点与拥有8个活动管道而不是1个管道的好处。

考虑到你的工作图是这样的,

source -> map -> async -> filter -> keyBy + process function -> sink

稍后在管道中的keyBy将导致另一次网络混乱。如果您非常关心性能,那么可以在源代码之后立即执行keyBy(而不是在那里使用重新平衡),然后在异步操作符之后使用reinterpretAsKeyedStream来避免第二次网络混乱。这变得有点复杂的原因是async操作符对键控流一无所知——只有在async函数前后使用相同的键选择器时,这才有效。

 类似资料:
  • 我正在一个由15台机器组成的裸机集群上制作Flink流媒体应用程序的原型。我使用的是90个任务槽(15x6)的纱线模式。 该应用程序从单个Kafka主题读取数据。Kafka主题有15个分区,所以我也将源操作符的并行性设置为15。然而,我发现Flink在某些情况下会将2-4个消费者任务实例分配给同一个taskmanager。这会导致某些节点受到网络限制(Kafka主题是提供大量数据,而机器只有1G

  • 数据库提供的链式操作方法,可以有效的提高数据存取的代码清晰度和开发效率,并且支持所有的CURD操作(原生查询不支持链式操作)。 使用也比较简单,假如我们现在要查询一个User表的满足状态为1的前10条记录,并希望按照用户的创建时间排序 ,代码如下: Db::table('think_user') ->where('status',1) ->order('create_time')

  • 我正在阅读Apache Flink的文档:https://ci.apache.org/projects/flink/flink-docs-stable/concepts/runtime.html. 正如医生提到的, 对于分布式执行,Flink将运算符子任务链接到任务中。每个任务都由一个线程执行。将运算符链接到任务中是一种有用的优化:它减少了线程到线程切换和缓冲的开销,并在减少延迟的同时提高了整体吞

  • 本文向大家介绍php链式操作的实现方式分析,包括了php链式操作的实现方式分析的使用技巧和注意事项,需要的朋友参考一下 本文实例讲述了php链式操作的实现方式。分享给大家供大家参考,具体如下: 类似$db->where("id=1")->limit("5")->order("id desc"),链式操作的实现方式 先讲下方法的常规调用; 调用 缺点:实现多个方法需要多行调用; 链式操作,在方法返回

  • 本文向大家介绍jQuery链式操作实例分析,包括了jQuery链式操作实例分析的使用技巧和注意事项,需要的朋友参考一下 本文实例讲述了jQuery链式操作。分享给大家供大家参考,具体如下: 从过去的实例中,我们知道jQuery语句可以链接在一起,这不仅可以缩短代码长度,而且很多时候可以实现特殊的效果。 以上代码为整个<div>列表增加样式css1,然后再进行筛选,再为筛选的元素单独增加css2样式

  • buffer buffer() 操作符的函数签名: buffer([breakObservable]) buffer 本身意味着我们在等待而不会发出任何值,直到 breakObservable 发生。示例如下: let breakWhen$ = Rx.Observable.timer(1000); let stream$ = Rx.Observable.interval(200) .buffer(