我正在使用Flink 1.11.1,并在Kubernetes上以独立模式运行它,hdfs用于存储和HA。最近,我尝试启用Flink检查点功能。但我注意到jobmanger和TaskManager都记录了太多与检查点相关的日志,这很烦人。示例如下:
工作经理
2020-10-08 19:54:23,237 [INFO] org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 1 (type=CHECKPOINT) @ 1602186863226 for job fbf26a33a6d5d235085d10e7a10c1cab.
2020-10-08 19:54:42,818 [INFO] org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 1 for job fbf26a33a6d5d235085d10e7a10c1cab (702534 bytes in 19488 ms).
2020-10-08 19:54:42,825 [INFO] org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 2 (type=CHECKPOINT) @ 1602186882820 for job fbf26a33a6d5d235085d10e7a10c1cab.
2020-10-08 19:54:43,384 [INFO] org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 2 for job fbf26a33a6d5d235085d10e7a10c1cab (729357 bytes in 494 ms).
2020-10-08 19:54:43,392 [INFO] org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 3 (type=CHECKPOINT) @ 1602186883388 for job fbf26a33a6d5d235085d10e7a10c1cab.
2020-10-08 19:54:44,295 [INFO] org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 3 for job fbf26a33a6d5d235085d10e7a10c1cab (736969 bytes in 836 ms).
2020-10-08 19:54:44,302 [INFO] org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 4 (type=CHECKPOINT) @ 1602186884298 for job fbf26a33a6d5d235085d10e7a10c1cab.
2020-10-08 19:54:44,794 [INFO] org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 4 for job fbf26a33a6d5d235085d10e7a10c1cab (748787 bytes in 431 ms).
2020-10-08 19:54:44,800 [INFO] org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 5 (type=CHECKPOINT) @ 1602186884796 for job fbf26a33a6d5d235085d10e7a10c1cab.
2020-10-08 19:54:45,198 [INFO] org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 5 for job fbf26a33a6d5d235085d10e7a10c1cab (755308 bytes in 327 ms).
2020-10-08 19:54:45,703 [INFO] org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 6 (type=CHECKPOINT) @ 1602186885698 for job fbf26a33a6d5d235085d10e7a10c1cab.
2020-10-08 19:54:45,897 [INFO] org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 6 for job fbf26a33a6d5d235085d10e7a10c1cab (757353 bytes in 163 ms).
2020-10-08 19:54:45,903 [INFO] org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 7 (type=CHECKPOINT) @ 1602186885899 for job fbf26a33a6d5d235085d10e7a10c1cab.
任务经理
2020-10-08 20:04:02,090 [INFO] org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction - FlinkKafkaProducer 1/2 - checkpoint 571 complete, committing transaction TransactionHolder{handle=KafkaTransactionState [transactionalId=null, producerId=-1, epoch=-1], transactionStartTime=1602187440992} from checkpoint 571
2020-10-08 20:04:03,086 [INFO] org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction - FlinkKafkaProducer 1/2 - checkpoint 572 complete, committing transaction TransactionHolder{handle=KafkaTransactionState [transactionalId=null, producerId=-1, epoch=-1], transactionStartTime=1602187441992} from checkpoint 572
2020-10-08 20:04:03,086 [INFO] org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction - FlinkKafkaProducer 1/2 - checkpoint 572 complete, committing transaction TransactionHolder{handle=KafkaTransactionState [transactionalId=null, producerId=-1, epoch=-1], transactionStartTime=1602187441992} from checkpoint 572
2020-10-08 20:04:04,099 [INFO] org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction - FlinkKafkaProducer 1/2 - checkpoint 573 complete, committing transaction TransactionHolder{handle=KafkaTransactionState [transactionalId=null, producerId=-1, epoch=-1], transactionStartTime=1602187442992} from checkpoint 573
2020-10-08 20:04:04,099 [INFO] org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction - FlinkKafkaProducer 1/2 - checkpoint 573 complete, committing transaction TransactionHolder{handle=KafkaTransactionState [transactionalId=null, producerId=-1, epoch=-1], transactionStartTime=1602187442992} from checkpoint 573
2020-10-08 20:04:05,130 [INFO] org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction - FlinkKafkaProducer 1/2 - checkpoint 574 complete, committing transaction TransactionHolder{handle=KafkaTransactionState [transactionalId=null, producerId=-1, epoch=-1], transactionStartTime=1602187443999} from checkpoint 574
2020-10-08 20:04:05,130 [INFO] org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction - FlinkKafkaProducer 1/2 - checkpoint 574 complete, committing transaction TransactionHolder{handle=KafkaTransactionState [transactionalId=null, producerId=-1, epoch=-1], transactionStartTime=1602187443999} from checkpoint 574
2020-10-08 20:04:06,096 [INFO] org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction - FlinkKafkaProducer 1/2 - checkpoint 575 complete, committing transaction TransactionHolder{handle=KafkaTransactionState [transactionalId=null, producerId=-1, epoch=-1], transactionStartTime=1602187444995} from checkpoint 575
2020-10-08 20:04:06,096 [INFO] org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction - FlinkKafkaProducer 1/2 - checkpoint 575 complete, committing transaction TransactionHolder{handle=KafkaTransactionState [transactionalId=null, producerId=-1, epoch=-1], transactionStartTime=1602187444992} from checkpoint 575
有什么方法可以禁用或减少检查点的日志吗?任何帮助都将不胜感激!
禁用恼人日志的最简单方法是为目标组件指定所需的日志级别。在您的情况下,如果要禁用来自组织的日志。阿帕奇。Flink。运行时。所有flink组件组织的检查点或更广泛的检查点。阿帕奇。flink,然后您可以将其日志级别提高到“警告”。为此,编辑flink/conf/log4j。属性,并添加以下内容(或取消对现有行的注释):
logger.flink.name = org.apache.flink
logger.flink.level = WARN
应用程序停止/启动后,更改将被拾取。
我想在flink中使用aws s3作为数据流的接收器。我正在使用StreamingFileSink类创建一个接收器。 我的工作不需要检查点,但是当我禁用检查点时,数据不再写入S3。 案例1:启用检查点 启用检查点后,数据将成功摄取到提到的s3路径。 案例2:检查点禁用 禁用检查点时,数据不会写入s3。 我多次尝试执行作业,但每次都得到相同的结果。我在本地机器和kubernetes集群上都面临这个问
下面是我对Flink的疑问。 我们可以将检查点和保存点存储到RockDB等外部数据结构中吗?还是只是我们可以存储在RockDB等中的状态。 状态后端是否会影响检查点?如果是,以何种方式? 什么是状态处理器API?它与我们存储的保存点和检查点直接相关吗?状态处理器API提供了普通保存点无法提供的哪些额外好处? 对于3个问题,请尽可能描述性地回答。我对学习状态处理器API很感兴趣,但我想深入了解它的应
我可以在文档中看到: Flink目前只为没有迭代的作业提供处理保证。对迭代作业启用检查点会导致异常。为了在迭代程序上强制检查点,用户需要在启用检查点时设置一个特殊的标志:env.enablecheckpointing(interval,force=true)。 如果是一个而不是一个(这意味着它也可以保存状态),会有什么变化吗?
1)以上假设是否正确。2)当发生故障时,滚动窗口有状态是否有意义,我们从最后一个kafka分区提交的偏移量开始。3)当滚动窗口有状态时,这个状态什么时候可以被flink使用。4)为什么检查点和保存点的状态大小不同。5)当发生故障时,flink总是从sorce运算符开始。对吗?
我正在检查Flink Sql Table与kafka连接器是否可以在EXACTLY_ONCE模式下执行,我的方法是创建一个表,设置合理的检查点间隔,并在event_time字段上使用简单的翻滚函数,最后重新启动我的程序。 以下是我的详细进度: 1:创建一个Kafka表 2:启动我的 Flink 作业,如下所示配置 3:执行我的sql 如我们所见,翻转窗口间隔为5分钟,检查点间隔为30秒,每个翻转窗
null 我浏览了完整的flink仪表板,但我没有得到任何线索,如何检查是增量检查点正在发生还是完全检查点正在发生。请帮助我如何设置RocksDB的日志记录来了解增量检查点是否正在发生。我在文档中看到RocksDB日志记录会在性能和存储方面造成巨大的成本,这是为了测试目的,之后我将禁用它