我正在检查Flink Sql Table与kafka连接器是否可以在EXACTLY_ONCE模式下执行,我的方法是创建一个表,设置合理的检查点间隔,并在event_time字段上使用简单的翻滚函数,最后重新启动我的程序。
以下是我的详细进度:
1:创建一个Kafka表
CREATE TABLE IF NOT EXISTS LOG_TABLE(
id String,
...
...
event_timestamp timestamp(3), watermark for event_timestamp as ....
)
2:启动我的 Flink 作业,如下所示配置
StreamExecutionEnvironment environment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
environment.getCheckpointConfig().setCheckpointInterval(30000L);
environment.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
environment.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
environment.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage("file:///tmp/checkpoint/"));
environment.setStateBackend(new HashMapStateBackend());
environment.setParallelism(1);
EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
TableEnvironment tableEnvironment = StreamTableEnvironment.create(environment, settings);
tableEnvironment.getConfig().getConfiguration().setBoolean("table.exec.emit.early-fire.enabled", true);
tableEnvironment.getConfig().getConfiguration().setString("table.exec.emit.early-fire.delay", "1s");
3:执行我的sql
select tumble_end(event_timestamp, interval '5' minute),
count(1)
from LOG_TABLE
group by tumble(event_timestamp, interval '5' minute)
如我们所见,翻转窗口间隔为5分钟,检查点间隔为30秒,每个翻转窗口触发6个检查点。
在这种情况下,窗口状态丢失:
所有消息都在2分钟内发送,因此从检查点重新启动后,作业输出应为200,但结果为100,作业失去了第一个作业的状态。我的进度有什么错误吗?请帮忙查一下,谢谢。
在保留精确一次保证的同时重新启动Flink作业需要以特殊方式启动后续作业,以便新作业从恢复上一个作业的状态开始。(修改检查点存储目录,如您在步骤2中所做的那样,没有帮助。)
如果使用 SQL 客户端启动作业,请参阅从保存点启动 SQL 作业,这涉及执行类似如下操作
SET 'execution.savepoint.path' = '/tmp/flink-savepoints/...';
在启动需要恢复状态的查询之前。
如果您使用的是表API,那么详细信息取决于您如何启动作业,但是您可以使用命令行执行以下操作
$ ./bin/flink run \
--detached \
--fromSavepoint /tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0dab \
./examples/streaming/StateMachineExample.jar
或者您可能正在使用REST API,在这种情况下,您将向< code>/jars/:jarid/run发送配置了< code > save path 的内容。
请注意,您可以使用保留的检查点而不是保存点来重新启动或重新缩放作业。另请注意,如果更改查询的方式使旧状态与新查询不兼容,则这些都不起作用。有关该主题的更多信息,请参阅 FLIP-190。
Flink Operations Playground 是一个教程,更详细地介绍了此主题和相关主题。
我有一份flink的工作,它使用Kafka的数据,制作一些无状态平面图,并向Kafka生成数据,这是一份工作量非常小的工作。 例如,在作业需要从检查点还原之前,它通常会无问题地获取检查点,而它只是无法使用下面的堆栈跟踪还原状态。 状态非常小,我相信它只是Kafka偏移量,它至少运行了一次语义。 所有操作员都有。uid()集,我完全没有主意了。 这是尝试从检查点重新启动时的错误: 任务管理器在正常操
1)以上假设是否正确。2)当发生故障时,滚动窗口有状态是否有意义,我们从最后一个kafka分区提交的偏移量开始。3)当滚动窗口有状态时,这个状态什么时候可以被flink使用。4)为什么检查点和保存点的状态大小不同。5)当发生故障时,flink总是从sorce运算符开始。对吗?
我正在使用KCL(v2)将Kafka消费者转换为AWS动觉消费者。在Kafka中,偏移量用于帮助消费者跟踪其最近使用的消息。如果我的Kafka应用程序死机,它将使用重新启动时停止的偏移量。 然而,这在Kinesis中是不一样的。我可以设置,但唯一的参数是、或。如果我的Kinesis应用程序死机,它将不知道重新启动时从哪里恢复消费。 我的KCL消费者非常简单。方法如下所示: 而RecordProce
目标:从Kinesis读取数据,并通过火花流将数据以拼花格式存储到S3 情况:应用程序最初运行良好,批量运行1小时,平均处理时间不到30分钟。出于某种原因,假设应用程序崩溃,我们尝试从检查点重新启动。现在,处理过程需要很长时间,不会向前推进。我们尝试以1分钟的分批间隔测试相同的东西,处理运行良好,分批完成需要1.2分钟。当我们从检查点恢复时,每批大约需要15分钟 注意:我们使用s3作为检查点,使用
我们目前正在kubernetes上运行flink,作为使用这个helm模板的作业集群:https://github.com/docker-flink/examples/tree/master/helm/flink(带有一些添加的配置)。 如果我想关闭集群,重新部署新映像(由于应用程序代码更新)并重新启动,我将如何从保存点进行恢复? jobManager命令严格设置在standalone-job.s
所以我有一个我似乎无法解决的问题。我有一个ViewPager在我的一个活动中,比如MainActivity。我正在实现所有必要的方法,以保存和检索实例状态时,活动被取消在后台。但是当活动试图恢复它的状态时,片断会被恢复,但它们并没有附加到viewpager,所以我得到的只是一个白色屏幕。 下面是相关代码: mainactivity.java ViewPagerAdapter.java 因此,如果我