当前位置: 首页 > 知识库问答 >
问题:

Flink流文件接收器无法从失败中恢复

韩季
2023-03-14

我们有一个flink流媒体作业,它从kafka读取数据,并将其放入S3。我们使用flink的内部流文件接收器API来实现这一点。然而,几天后,作业失败,无法从失败中恢复。消息称它无法从s3中找到tmp文件。我们想知道可能的根本原因是什么,因为我们真的不想丢失任何数据。

谢谢。

整个输出如下所示

java.io.FileNotFoundException: No such file or directory: s3://bucket_name/_part-0-282_tmp_b9777494-d73b-4141-a4cf-b8912019160e
    at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2255)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:2149)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:2088)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.open(S3AFileSystem.java:699)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FileSystem.open(FileSystem.java:950)
    at org.apache.flink.fs.s3hadoop.HadoopS3AccessHelper.getObject(HadoopS3AccessHelper.java:99)
    at org.apache.flink.fs.s3.common.writer.S3RecoverableMultipartUploadFactory.recoverInProgressPart(S3RecoverableMultipartUploadFactory.java:97)
    at org.apache.flink.fs.s3.common.writer.S3RecoverableMultipartUploadFactory.recoverRecoverableUpload(S3RecoverableMultipartUploadFactory.java:75)
    at org.apache.flink.fs.s3.common.writer.S3RecoverableWriter.recover(S3RecoverableWriter.java:95)
    at org.apache.flink.fs.s3.common.writer.S3RecoverableWriter.recover(S3RecoverableWriter.java:50)
    at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restoreInProgressFile(Bucket.java:140)
    at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.<init>(Bucket.java:127)
    at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restore(Bucket.java:396)
    at org.apache.flink.streaming.api.functions.sink.filesystem.DefaultBucketFactoryImpl.restoreBucket(DefaultBucketFactoryImpl.java:64)
    at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.handleRestoredBucketState(Buckets.java:177)
    at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeActiveBuckets(Buckets.java:165)
    at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeState(Buckets.java:149)
    at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:334)
    at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
    at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:278)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
    at java.lang.Thread.run(Thread.java:748)

共有1个答案

呼延运恒
2023-03-14

谢谢你的报道!

你能具体说明你用的是哪个Flink版本吗?我问的原因是因为你的问题可能与此有关https://issues.apache.org/jira/browse/FLINK-13940票

此外,StreamingFileSink使用了S3的多部分上传功能。这意味着文件将以小部分的形式逐渐上传到S3,当需要“提交”它们时,所有部分在概念上都连接到一个对象中。S3允许您为bucker的挂起(即未提交)多部分上传(MPU)指定超时,当超时时,挂起的MPU被中止,数据被删除。因此,如果您积极地设置了这个参数,那么您可能会遇到这个问题。

最后,从你之前的帖子中,我猜你是在尝试从失败中重新启动,而不是从保存点重新启动。对吗?如果您试图从旧的存储点重新启动,那么您可能会遇到这样的问题:接收器已经提交了该MPU,而现在接收器找不到它。

我希望这有帮助。

 类似资料:
  • 我是flink的新手,我正在实现一个模式识别模块(不使用CEP实现模式匹配),该模块将从EventHub主题读取json流,并在模式匹配的情况下推送到另一个EventHub主题。我的模块功能如下 > 从Eventhub主题接收JSON有效负载 我正在使用RichSourceFunction,它将从API读取模式并发送到广播流 我正在使用Flink BroadcastProcessFunction根

  • 我正在尝试使用Kafka连接接收器将文件从Kafka写入HDFS。 我的属性看起来像: 有什么建议吗?

  • 版本flink 1.7 我正在尝试从保存点(或检查点)还原flink作业,该作业所做的是读取kafka的内容- 我使用rocksdb和启用的检查点。 现在我尝试手动触发一个保存点。每个聚合的预期值为30(1个数据/每分钟)。但是当我从保存点(flink run-d-s{url})恢复时,聚合值不是30(小于30,取决于我取消flink作业和恢复的时间)。当作业正常运行时,它得到30。 我不知道为什

  • 主程序正在消费kafka事件,然后过滤- 但是我得到了以下例外: 以下是flink-conf.yaml中的一些配置 任何想法为什么会发生异常以及如何解决问题? 谢谢

  • 我在生成签名的Apk时得到了这个异常,我对密钥存储文件没有做任何操作,密码也是正确的。如果我创建了一个新的密钥存储文件。我想将无法在相同的应用程序包上传它到play store上。我不明白我现在要做什么? 错误:任务“:app:PackageFreeRelease”执行失败。从存储区读取密钥失败无法恢复密钥

  • 我有一个我真的无法解决的问题。所以我有一个kafka流,其中包含一些这样的数据: 我想用另一个值“bookingId”替换“adId”。此值位于csv文件中,但我无法真正弄清楚如何使其工作。 这是我的映射csv文件: 所以我的输出最好是这样的 该文件可以每小时至少刷新一次,因此它应该会接收对它的更改。 我目前有一个不适合我的代码: 代码只运行一次,然后停止,因此它不会使用csv文件转换kafka中