当前位置: 首页 > 知识库问答 >
问题:

禁用检查点时Flink StreamingFileSink不会摄取到S3

司空瑾瑜
2023-03-14

我想在flink中使用aws s3作为数据流的接收器。我正在使用StreamingFileSink类创建一个接收器。

我的工作不需要检查点,但是当我禁用检查点时,数据不再写入S3。

案例1:启用检查点
启用检查点后,数据将成功摄取到提到的s3路径。

案例2:检查点禁用
禁用检查点时,数据不会写入s3。
我多次尝试执行作业,但每次都得到相同的结果。我在本地机器和kubernetes集群上都面临这个问题。

object FlinkTestJob {

  def main(args: Array[String]): Unit = {

    val env = StreamExecutionEnvironment.getExecutionEnvironment

    // with checkpointing enabled
    env.enableCheckpointing(100)

    // Sinks
    val streamStrings: Seq[String] =
      Seq("test1", "test2", "test3", "test4", "test5", "test6", "test7", "test8", "test9", "test10")

    val testStream = env.fromCollection(streamStrings)

    val rollingPolicy = new RollingPolicy[String, String] {

      override def shouldRollOnCheckpoint(partFileState: PartFileInfo[String]): Boolean =
        partFileState.getSize > 1

      override def shouldRollOnEvent(
          partFileState: PartFileInfo[String],
          element: String): Boolean = true

      override def shouldRollOnProcessingTime(
          partFileState: PartFileInfo[String],
          currentTime: Long): Boolean = true
    }

    val sink: StreamingFileSink[String] = StreamingFileSink
      .forRowFormat(new Path("s3a://testbucket/sink"), new SimpleStringEncoder[String]("UTF-8"))
      .withRollingPolicy(rollingPolicy)
      .build()

    testStream.addSink(sink)
    env.execute("test-job")
  }
}

当我使用“WriteEasText(“s3a://testbucket/sink”)而不是StreamingFileSink”写入s3时,无论是否启用检查点,它都可以正常工作。

Flink版本:1.8.0我想了解检查点和StreamingFileSink之间的关系
谢谢

共有1个答案

邵畅
2023-03-14

要使StreamingFileSink正常工作,需要启用检查点。

 类似资料:
  • 我已经设置了一个本地的Kafka0.10+Flink1.4环境。 我使用下面的代码来消费来自Kafka主题的数据: 在我执行这段代码后,总是会发现警告消息: 自动提交组TaxidataGroup的偏移量{taxidata-0=offsetandMetadata{offset=728461,Metadata=“}}失败:无法完成提交,因为该组已重新平衡并将分区分配给其他成员。这意味着对poll()的

  • 使用这个脚本检查是否有禁用函数。命令行运行curl -Ss http://www.workerman.net/check.php | php 如果有提示Function stream_socket_server may be disabled. Please check disable_functions in php.ini说明workerman依赖的函数被禁用,需要在php.ini中解除禁用才

  • 我正在使用至少一次检查点模式,这应该是异步化进程。有人能建议吗?我的检查点设置 我的工作有128个容器。 我想用一个30分钟的检查站看看

  • 当前,当点击第一阶段时,声明性管道将自动执行 我只想在子目录中签出scm。(因此它不会在根目录中签出相同的内容) 因为它需要jenkinsfile它已经结账一次。加目录

  • 本文向大家介绍sitecore 访问项目时禁用权限检查,包括了sitecore 访问项目时禁用权限检查的使用技巧和注意事项,需要的朋友参考一下 示例            

  • 我正在尝试一个android应用程序,其中我的服务器的链接被传递到来玩。一开始我一直在 “原因:javax.net.ssl.sslHandShakeException:java.security.cert.certPathValidatorException:找不到证书路径的信任锚点。” 错误之后,在做研究时,我找到了两种可能的解决方法。 第一种方法是按照https://knowledge.dig