当前位置: 首页 > 知识库问答 >
问题:

FLINK:是否可以在同一个 flink 作业中从 kafka 主题(文件名)读取数据,然后从 amazon s3 读取文件内容?

萧焱
2023-03-14

我有一个用例,我需要处理存储在s3中的文件中的数据,并将处理后的数据写入本地文件。s3 文件会不断添加到存储桶中。

每当一个文件被添加到bucket中时,完整的路径被发布到一个kafka主题。

我想在一份工作中实现以下目标:

  1. 从kafka(无界流)读取文件名
  2. 一个计算器,它接收文件名,从s3(第二个源)读取内容并创建数据流
  3. 处理数据流(为每行添加一些逻辑)
  4. 接收到文件

我设法完成了设计的第一、第三和第四部分。

有没有办法做到这一点?提前致谢。

共有2个答案

闾丘淇
2023-03-14

这在一般情况下是可以实现的,但正如David Anderson所建议的,目前还没有使用vanilla Flink连接器实现这一点的直接方法。

另一种方法是用Apache Beam编写管道,它已经支持这一点,并可以使用Flink作为运行时(这证明了这可以用现有的原语实现)。

我认为这是一个合法的用例,Flink最终应该支持开箱即用。

姜晨
2023-03-14

我不相信有任何直接的方法可以做到这一点。

要在一个作业中完成所有工作,您可以说服文件源使用自定义FileEnumerator,从Kafka获取路径。

一个更简单的替代方案是为要摄取的每个文件启动一个新的(有界的)作业。要读取的文件可以作为参数传入。

 类似资料:
  • 我是Flink大学的一年级新生,我想知道如何从hdfs读取数据。有谁能给我一些建议或简单的例子吗?谢谢大家。

  • 我一直在努力阅读java项目中的文本文件,我一整天都在寻找解决方案,我尝试了很多方法,但没有一个有效。其中一些:(另外,我必须使用文件和扫描程序类) 异常线程"main"java.nio.file.NoSuchFileExc0019: test\fileTest.txt 异常线程"main"java.lang.NullPointerExc0019 线程“main”java中出现异常。木卫一。Fil

  • 我一直在试图找到一个连接器,将数据从Redis读取到Flink。Flink的文档中包含了要写入Redis的连接器的描述。在我的Flink工作中,我需要从Redis读取数据。在使用ApacheFlink进行数据流传输时,Fabian提到可以从Redis读取数据。可用于此目的的接头是什么?

  • 美好的一天 我正在通过flink/kafka接收数据(流数据)。我连接的端口与我需要写回消息的端口相同 TCP/IP- 连接到 URL 和端口工作正常。我接收并处理写入主题的数据 现在我还需要写回我连接到的同一URL和端口。{由于 Url 和端口可以同时发送和接收数据} 我把它写到另一个端口 这个管用...问题是试图写入同一个端口。当我使用我从中读取的同一个端口时...flink作业失败 有什么想

  • 我必须使用Flink作为流引擎处理来自Kafka的数据流。为了对数据进行分析,我需要查询Cassandra中的一些表。做这件事最好的方法是什么?我一直在Scala中寻找这样的例子。但是我找不到任何数据。如何使用Scala作为编程语言在Flink中读取来自Cassandra的数据呢?使用apache flink Java API将数据读写到cassandra中也有同样的问题。答案中提到它有多种方法。

  • 我有一个非常简单的问题:使用Python从txt文件中读取不同条目的最有效方法是什么? 假设我有一个文本文件,如下所示: 在C中,我会这样做: 用Python做这样的事情最好的方法是什么?以便将每个值存储到不同的变量中(因为我必须在整个代码中使用这些变量)。 提前感谢!