当前位置: 首页 > 知识库问答 >
问题:

Flink datastream-使用windows处理文件中的数据

江阳夏
2023-03-14

我正在学习Flink框架并从事流式作业,该作业将从csv文件中读取数据并使用DataStream API执行一些聚合。数据字段包括城市、州代码、人口值。数据将由州代码键控,并计算每个键的人口总和。输出将采用该格式:(州代码,人口总和)。

我当前的实现使用KeyedStream上的reduce函数来聚合总体。这给了我每个关键点的滚动减少。我只想打印每个键的最后一个和。我的理解是,为了在每个窗口中发出最后的结果,需要对数据进行窗口化。我不确定如何实现一个窗口赋值器来解释文件中的所有数据。任何帮助都将不胜感激。谢谢

(旁注:我在Flink文档中访问了跨事件时间处理。如果可以更新csv以添加最后一个更新文件,是否可以将其用作窗口,以便对文件中的所有数据进行会计处理)

共有1个答案

刘永望
2023-03-14

流reduce必须在处理每个传入记录后生成更新的结果,因为每个记录都可能是最后一条记录——无法知道流何时结束。

但是,如果您的输入实际上是有界的(就像从文件读取的情况一样),那么您可以在批处理执行模式下运行流作业,reduce只会产生最终结果,无需窗口化。

几个快速示例展示了如何在BATCH模式下运行:

$ bin/flink run -Dexecution.runtime-mode=BATCH examples/streaming/WordCount.jar
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);

https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/execution_mode/

 类似资料:
  • 在Windows中,如何访问批处理文件运行时传递的参数? 例如,假设我有一个名为< code>hello.bat的程序。当我在Windows命令行输入< code>hello -a时,如何让我的程序知道< code>-a是作为参数传入的?

  • 问题内容: 在Stack Overflow上使用Selenium WebDriver上传文件时,我已经看到很多问题和解决方案。但是它们都不适合以下情况。 有人给出了以下解决方案 但是我仍然找不到窗口句柄。我该如何处理? 我正在寻找上述方案的解决方案。 请在以下任何网站上进行检查。 问题答案: 如果使用Zamzar网站,它应该可以正常运行。你无需单击该元素。你只需在其中输入路径。具体来说,这绝对可以

  • 这是如何使用公共类frome的一个后续步骤。其他处理选项卡中的java文件?;使用来自的Usage类中的示例。java文件-有完整的文档吗?-处理2。x和3。x论坛,我有这个: /tmp/Sketch/Foo.java 这个例子运行得很好,但是如果我取消注释import peasy。组织 行,则编译失败: 当然,我确实在下安装了PeasyCam,如果我导入peasy.*它工作得很好 来自草图。 我

  • 本文向大家介绍使用Python的netrc文件处理,包括了使用Python的netrc文件处理的使用技巧和注意事项,需要的朋友参考一下 python中的netrc类用于从用户家庭环境的unix系统中的.netrc文件中读取数据。这些是隐藏文件,包含用户的登录凭据详细信息。这对于ftp,curl等工具成功读取.netrc文件并将其用于操作很有帮助。 以下程序显示了如何使用python的netrc模块

  • 我如何在我的java项目中包含PDE文件?有可能吗?

  • 问题内容: 我在Java应用程序中使用Hibernate访问我的数据库,它与MS- SQL和MySQL配合得很好。但是我必须以某种形式显示的某些数据必须来自文本文件,对于文本文件,我的意思是人类可读文件,它们可以是CSV,制表符分隔甚至是键,值对,每行,因为我的数据就这么简单,但是我的首选当然是XML文件。 我的问题是:我可以使用Hibernate通过HQL,Query,EntityManager