当前位置: 首页 > 知识库问答 >
问题:

Flink keyby/window操作员任务执行地点和内部

姚鹤龄
2023-03-14

我是Flink的新人。我正在编写一个简单的Flink POC程序,在这里我能够获得预期的输出。但我无法获得钥匙和车窗操作的内部信息。以下是我的代码,

environment
.addSource(consumer)
.name("MyKafkaSource")
.setParallelism(2)
.flatMap(pojoMapper)
.name("MyPojoMapper")
.setParallelism(2)
.keyBy(new MyKeyExtractor())
.timeWindow(Time.seconds(60))
.apply(new SumFunction())
.name("MySumFunction")
.setParallelism(2)
.print()
.name("S3FileSink")
.setParallelism(2)

在部署Flink作业时,我在Flink UI中看到以下图表,

任务展示台

从上图中,我完全理解了它使用了2个任务和4个插槽,每个任务有2个并行性。第一个任务有源,pojo映射器第二个任务有sum函数,sink函数。

现在的问题是,

>

  • KeyBy和Window操作在哪里?在第一个任务还是第二个任务中?为什么它在上面的图像中不可见?有什么方法可以可视化吗?

    假设对于1个窗口(60秒间隔),我收到100个不同的键,每个键在1分钟内收到5条记录,那么在1个窗口间隔内在内部创建了多少个窗口对象?我假设创建了100个窗口对象,每个窗口对象将保存5条记录。我的假设是否正确?如果没有,请解释内部发生了什么?如果可能,请共享与此相关的任何文档。

  • 共有1个答案

    逄皓轩
    2023-03-14

    因为它们是通过数据转发连接连接的,所以源和平面图运算符被链接到同一个任务中,这同样适用于窗口和接收器。但是由于平面图和窗口是由keyBy连接的,因此需要进行网络洗牌。

    因此,您的作业总共有4个任务:2个source plus flatmap实例,2个window plus sink实例。这4个任务部署到2个任务槽中,每个槽都有一个源/平面图任务和一个窗口/接收器任务。

    keyBy在图上表示哈希的位置进行了描述。keyBy不是操作符,而是描述如何连接keyBy前后的操作符。

    这两行代码

    .timeWindow(Time.seconds(60))
    .apply(new SumFunction())
    

    一起描述窗口运算符,在图中显示为mySumFunction。窗口在第二个任务中。

    假设每个不同的键都有一个窗口,这100个窗口中的每一个都包含5条记录,这是正确的。

    要了解更多关于Flink的信息,我可以推荐Apache Flink培训、Apache Flink流处理以及在YouTube上搜索Flink Forward对话。

     类似资料:
    • 我正在尝试通过NIFI从oracle数据库中提取数据。在画布中,我将文件大小为0 KB的“GenerateFlowFile”处理器安排为每5分钟运行一次。这只是为了在成功时触发“executesql”处理器。对于“executeSQL”,我将DB连接池服务设置为DBCPConnectionPool。我输入SQL查询“select*FROM sometable”。我的DBCPConnectionPo

    • 本文向大家介绍Spring Task定时任务每天零点执行一次的操作,包括了Spring Task定时任务每天零点执行一次的操作的使用技巧和注意事项,需要的朋友参考一下 最近根据项目的需求,需要限制用户每天的发送短信数量。这样以来就需要写一个定时任务,每天去置零一次所有用户的发送短信统计数量。 首先,在application.xml文件中添加 <task:annotation-driven /> 接

    • 问题内容: 我正在运行一个Electron项目,一切都很好。但是现在,当我运行package.json中的任何脚本(包括npm start)时,它只是转义了一行而没有执行任何操作。 我的package.json: 我尝试更新NPM,但没有用。当我在其他项目中尝试过时,也不起作用。 提前致谢 问题答案: npm具有配置密钥。它的期望值是布尔值,默认设置为。 可能是由于疏忽而将其设置为。 要/ 的配置

    • 我有在Gradle的执行阶段创建和执行任务的需求。 目前,在构建任务期间,我确定并创建了多个添加到taskContainer的任务,使用: 在第一次执行时,我得到以下错误: >任务工件状态缓存(.../.gradle/1.5/taskartifacts)尚未被锁定。 我做错了什么,有没有更好的方法在执行过程中动态添加任务?

    • 我正在评估我当前的气流部署从Celery executor到Kubernetes(K8s)executor的迁移,以利用Pods提供的资源动态分配和任务隔离。 我很清楚,我们可以使用本机的KubernetesPodOperator通过K8s Executor在K8s集群上运行任务。但是,我找不到关于K8s executor与其他操作符(如bash和Athena)之间兼容性的信息。 这里的问题是,是

    • 我正在Eclipse(版本:Kepler发行版)中编辑一个Java源代码。在其他编辑器上,我不使用导航文本,使用Ctrl+右移动到下一个单词,使用Ctrl+Shift+右选择下一个单词。但是在eclipse上什么都没有发生,光标停留在同一个地方。 在窗口->首选项->常规->键上,绑定似乎是正确的,它说:命令:下一个单词;绑定:Ctrl+右;当:编辑文本;类别:文本编辑。 但不管用。