当前位置: 首页 > 知识库问答 >
问题:

如何在flink的readFile方法中提到FileInputFormat?

井斌斌
2023-03-14

我正在使用flink从定期附加数据的文件中连续读取数据。我尝试在flink中使用readFile方法。但对如何在该方法的参数中提及FileInputFormat感到困惑。我的文件格式是json。有人能帮我吗?。谢谢

共有1个答案

齐乐
2023-03-14

Flink的InputFormat不适合从并发写入的文件中读取。

考虑到这个需求,我假设您正在寻找一种将文件作为流使用并使用Flink的DataStream API处理它的方法。在这种情况下,您需要实现一个SourceFunction来跟踪文件的大小和进度,并持续读取文件。

但是,我不推荐这种设计。我宁愿定期启动一个新文件,并在它完成并启动下一个文件后将其移动到专用文件夹以供使用。

 类似资料:
  • 问题内容: 我正在尝试从以base64编码的客户端读取图像。如何使用nodejs进行阅读? 我的代码: 但是,我得到了这个错误: 问题答案: 最新和最好的方法: 或使用新的promises API :

  • 问题内容: 我将使用ReadFile: 我该如何解决? 问题答案: 使用包名称限定它:

  • 我有一个xslt 2.0文件,用于将csv文件转换为xml文件。xsl是从这里获取的:http://P2P . wrox . com/XSLT/40898-transform-CSV-file-XML . html # post 164344 现在我正在尝试通过Java变压器(使用Saxon9 xsl变压器工厂)执行此操作。由于csv文件作为参数传递到xsl中,因此我不需要在转换方法中的Sourc

  • 我试图在Flink的数据流上应用每窗口功能。以下是我的代码 下面是我的实现MyProcessWindow函数 然而,当我试图通过maven编译上述代码时,我得到了以下错误 知道我做错了什么吗?仅供参考,我正在使用ApacheFlink 1.5.1版,并在Mac上使用maven3编译Java代码。

  • 我是刚接触flink的,我正在尝试编写junit测试用例来测试KeyedBroadcastProcessFunction。下面是我的代码,我当前正在调用TestUtils类中的getDataStreamOutput方法,并在输入数据根据模式规则列表求值后将inputdata和patternrules传递给方法,如果输入数据满足条件,我将获得信号并调用sink函数,并在getDataStreamOu

  • 我正在探索一种方法来实现这一点,就像下面的SQL一样。 是一个将聚合到