我想在flink中使用aws s3作为数据流的接收器。我正在使用StreamingFileSink类创建一个接收器。
我的工作不需要检查点,但是当我禁用检查点时,数据不再写入S3。
案例1:启用检查点
启用检查点后,数据将成功摄取到提到的s3路径。
案例2:检查点禁用
禁用检查点时,数据不会写入s3。
我多次尝试执行作业,但每次都得到相同的结果。我在本地机器和kubernetes集群上都面临这个问题。
object FlinkTestJob {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
// with checkpointing enabled
env.enableCheckpointing(100)
// Sinks
val streamStrings: Seq[String] =
Seq("test1", "test2", "test3", "test4", "test5", "test6", "test7", "test8", "test9", "test10")
val testStream = env.fromCollection(streamStrings)
val rollingPolicy = new RollingPolicy[String, String] {
override def shouldRollOnCheckpoint(partFileState: PartFileInfo[String]): Boolean =
partFileState.getSize > 1
override def shouldRollOnEvent(
partFileState: PartFileInfo[String],
element: String): Boolean = true
override def shouldRollOnProcessingTime(
partFileState: PartFileInfo[String],
currentTime: Long): Boolean = true
}
val sink: StreamingFileSink[String] = StreamingFileSink
.forRowFormat(new Path("s3a://testbucket/sink"), new SimpleStringEncoder[String]("UTF-8"))
.withRollingPolicy(rollingPolicy)
.build()
testStream.addSink(sink)
env.execute("test-job")
}
}
当我使用“WriteEasText(“s3a://testbucket/sink”)而不是StreamingFileSink”写入s3时,无论是否启用检查点,它都可以正常工作。
Flink版本:1.8.0我想了解检查点和StreamingFileSink之间的关系
谢谢
要使StreamingFileSink正常工作,需要启用检查点。
我已经设置了一个本地的Kafka0.10+Flink1.4环境。 我使用下面的代码来消费来自Kafka主题的数据: 在我执行这段代码后,总是会发现警告消息: 自动提交组TaxidataGroup的偏移量{taxidata-0=offsetandMetadata{offset=728461,Metadata=“}}失败:无法完成提交,因为该组已重新平衡并将分区分配给其他成员。这意味着对poll()的
使用这个脚本检查是否有禁用函数。命令行运行curl -Ss http://www.workerman.net/check.php | php 如果有提示Function stream_socket_server may be disabled. Please check disable_functions in php.ini说明workerman依赖的函数被禁用,需要在php.ini中解除禁用才
我正在使用至少一次检查点模式,这应该是异步化进程。有人能建议吗?我的检查点设置 我的工作有128个容器。 我想用一个30分钟的检查站看看
当前,当点击第一阶段时,声明性管道将自动执行 我只想在子目录中签出scm。(因此它不会在根目录中签出相同的内容) 因为它需要jenkinsfile它已经结账一次。加目录
本文向大家介绍sitecore 访问项目时禁用权限检查,包括了sitecore 访问项目时禁用权限检查的使用技巧和注意事项,需要的朋友参考一下 示例
我正在尝试一个android应用程序,其中我的服务器的链接被传递到来玩。一开始我一直在 “原因:javax.net.ssl.sslHandShakeException:java.security.cert.certPathValidatorException:找不到证书路径的信任锚点。” 错误之后,在做研究时,我找到了两种可能的解决方法。 第一种方法是按照https://knowledge.dig