当前位置: 首页 > 知识库问答 >
问题:

关于flink溪流水槽至hdfs

吉泰宁
2023-03-14

我正在编写一个flink代码,其中我正在从本地系统读取一个文件,并使用“writeUsingOutputFormat”将其写入数据库。

现在我的要求是写入hdfs而不是数据库。

你能帮我在Flink怎么办吗。

注意:hdfs已启动并在本地计算机上运行。

共有2个答案

范兴文
2023-03-14

在这一点上,较新的流文件接收器可能是比Bucketing接收器更好的选择。此描述来自Flink 1.6发行说明(请注意,Flink 1.7中添加了对S3的支持):

新的StreamingFileSink是一个用于写入文件系统的一次性接收器,它利用了从以前的BucketingSink中获得的知识。通过将接收器与Flink的检查点机制集成来支持精确一次。新的接收器建立在Flink自己的文件系统抽象的基础上,它支持本地文件系统和HDFS,计划在不久的将来支持S3[现在包含在Flink 1.7中]。它公开了可插拔文件滚动和屈曲策略。除了按行编码格式之外,新的StreamingFileSink还支持拼花。可以使用公开的API轻松添加ORC等其他批量编码格式。

汪翰墨
2023-03-14

Flink提供了HDFS连接器,可用于将数据写入Hadoop文件系统支持的任何文件系统。

提供的接收器是一个缓冲接收器,它将数据流分区到包含滚动文件的文件夹中。屈曲行为,以及写入,可以配置参数,如批次大小批次滚动时间间隔

Flink文件给出了以下例子-

DataStream<Tuple2<IntWritable,Text>> input = ...;

BucketingSink<String> sink = new BucketingSink<String>("/base/path");
sink.setBucketer(new DateTimeBucketer<String>("yyyy-MM-dd--HHmm", ZoneId.of("America/Los_Angeles")));
sink.setWriter(new SequenceFileWriter<IntWritable, Text>());
sink.setBatchSize(1024 * 1024 * 400); // this is 400 MB,
sink.setBatchRolloverInterval(20 * 60 * 1000); // this is 20 mins

input.addSink(sink);
 类似资料:
  • 我有一个Flink 1.11作业,它使用来自Kafka主题的消息,键入它们,过滤它们(keyBy后跟自定义ProcessFunction),并通过JDBC接收器将它们保存到db中(如下所述:https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/jdbc.html) Kafka消费者使用以下选项初始化:

  • 我正在尝试编写一个Flink应用程序,它从Kafka读取事件,从MySQL丰富这些事件并将这些数据写入HBase。我正在中进行MySQL丰富,我现在正在尝试弄清楚如何最好地写入HBase。我想批量写入HBase,所以我目前正在考虑使用,后跟标识(仅返回),然后编写,它获取记录列表并批处理放入。 这是正确的做事方式吗?仅仅为了进行基于时间的缓冲而使用所有窗口和应用窗口感觉很奇怪。

  • 接收器的方法似乎没有办法使异步io?例如返回? 例如,redis连接器使用jedis lib同步执行redis命令: https://github.com/apache/bahir-flink/blob/master/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.j

  • 我遵循火花流水槽集成的指导。但我最终无法获得任何事件。(https://spark.apache.org/docs/latest/streaming-flume-integration.html)谁能帮我分析一下?在烟雾中,我创建了“avro_flume.conf”的文件,如下所示: 在文件中,123.57.54.113是本地主机的ip。 最后,根本没有任何事件。 怎么了?谢谢!

  • 我正在尝试使用hdfs水槽运行水槽。hdfs在不同的机器上正常运行,我甚至可以与水槽机器上的hdfs交互,但是当我运行水槽并向其发送事件时,我收到以下错误: 同样,一致性不是问题,因为我可以使用hadoop命令行与hdfs交互(水槽机不是datanode)。最奇怪的是,在杀死水槽后,我可以看到tmp文件是在hdfs中创建的,但它是空的(扩展名仍然是. tmp)。 关于为什么会发生这种情况的任何想法

  • 我的要求是将数据发送到不同的ES接收器(基于数据)。例如:如果数据包含特定信息,则将其发送到sink1,否则将其发送到sink2等(基本上是根据数据动态发送到任何一个接收器)。我还想分别为ES sink1、ES sink2、ES sink3等设置并行度。 有什么简单的方法可以在flink中实现上述目标吗? 我的解决方案:(但并不满意) 我可以想出一个解决方案,但有中间Kafka主题,我写(topi