当前位置: 首页 > 知识库问答 >
问题:

为什么flink无法从保存点恢复

黄丰
2023-03-14

版本flink 1.7

我正在尝试从保存点(或检查点)还原flink作业,该作业所做的是读取kafka的内容-

我使用rocksdb和启用的检查点。

现在我尝试手动触发一个保存点。每个聚合的预期值为30(1个数据/每分钟)。但是当我从保存点(flink run-d-s{url})恢复时,聚合值不是30(小于30,取决于我取消flink作业和恢复的时间)。当作业正常运行时,它得到30。

我不知道为什么有些数据会丢失?

日志显示“FlinkKafkaConsumer没有恢复状态”

主代码:

        source.flatMap(new FlatMapFunction<String, Model>() {
        private static final long serialVersionUID = 5814342517597371470L;

        @Override
        public void flatMap(String value, Collector<Model> out) throws Exception {
            LOGGER.info("----> catch value: " + value);
            Model model =  JSONObject.parseObject(value, Model.class);
            out.collect(model);
        }
    }).uid("flatmap-1").name("flatmap-1").assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<Model>() {

        private static final long serialVersionUID = -1742071229344039681L;

        @Override
        public long extractTimestamp(Model element, long previousElementTimestamp) {
            return element.getTime();
        }

        @Nullable
        @Override
        public Watermark checkAndGetNextWatermark(Model lastElement, long extractedTimestamp) {
            return new Watermark(extractedTimestamp);
        }
    }).setParallelism(1).keyBy(Model::getDim).window(new DynamicWindowAssigner()).aggregate(new AggregateFunction<Model, Model, Model>() {
        @Override
        public Model createAccumulator() {
            return new Model();
        }

        @Override
        public Model add(Model value, Model accumulator) {
            init(value, accumulator);
            accumulator.setValue(accumulator.getValue() + 1);
            return accumulator;
        }

        @Override
        public Model getResult(Model accumulator) {
            return accumulator;
        }

        @Override
        public Model merge(Model a, Model b) {
            return null;
        }

        private void init(Model value, Model accumulator){
            if(accumulator.getTime() == 0L){
                accumulator.setValue(0);
                accumulator.setDim(value.getDim());
                accumulator.setTime(value.getTime());
            }
        }
    }).uid("agg-1").name("agg-1").map(new MapFunction<Model, String>() {
        private static final long serialVersionUID = -1742071229344039681L;

        @Override
        public String map(Model value) throws Exception {
            value.setTime(TimeWindow.getWindowStartWithOffset(value.getTime(), 0, TimeUnit.MINUTES.toMillis(30)));
            return JSONObject.toJSONString(value);
        }
    }).uid("flatmap-2").name("flatmap-2").setParallelism(4).addSink(metricProducer).uid("sink").name("sink").setParallelism(2);

检查点设置:

    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    env.enableCheckpointing(60000);
    env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
    env.getCheckpointConfig().setCheckpointTimeout(120000);
    env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
    env.getCheckpointConfig().setMinPauseBetweenCheckpoints(50000);
    StateBackend stateBackend = new RocksDBStateBackend(${path}, true);
    env.setStateBackend(stateBackend);
    env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
    env.getConfig().disableSysoutLogging();

共有1个答案

司寇善
2023-03-14

最后我发现我应该使用flink run-s{savepoint}-dxxx.jarinstesad的flink run-dxxx.jar-s{savepoint},如果-d标志在-s标志前面,那么flink忽略-s

 类似资料:
  • 我们目前正在kubernetes上运行flink,作为使用这个helm模板的作业集群:https://github.com/docker-flink/examples/tree/master/helm/flink(带有一些添加的配置)。 如果我想关闭集群,重新部署新映像(由于应用程序代码更新)并重新启动,我将如何从保存点进行恢复? jobManager命令严格设置在standalone-job.s

  • 考虑使用以下管道的Apache Flink流媒体应用程序: 其中每个函数都是非状态运算符(例如

  • 我试图检查/保存我在EMR上运行的flink状态到AWS上的s3存储桶。请注意: 实例(主节点和核心节点)正确设置了IAM角色,以访问s3 bucket及其内部的所有目录/文件(AmazonS3FullAccess策略附加到该角色,没有任何内容覆盖它) jobmanager日志:

  • 下面是我对Flink的疑问。 我们可以将检查点和保存点存储到RockDB等外部数据结构中吗?还是只是我们可以存储在RockDB等中的状态。 状态后端是否会影响检查点?如果是,以何种方式? 什么是状态处理器API?它与我们存储的保存点和检查点直接相关吗?状态处理器API提供了普通保存点无法提供的哪些额外好处? 对于3个问题,请尽可能描述性地回答。我对学习状态处理器API很感兴趣,但我想深入了解它的应

  • 我正在检查Flink Sql Table与kafka连接器是否可以在EXACTLY_ONCE模式下执行,我的方法是创建一个表,设置合理的检查点间隔,并在event_time字段上使用简单的翻滚函数,最后重新启动我的程序。 以下是我的详细进度: 1:创建一个Kafka表 2:启动我的 Flink 作业,如下所示配置 3:执行我的sql 如我们所见,翻转窗口间隔为5分钟,检查点间隔为30秒,每个翻转窗

  • 问题内容: 我很惊讶即使在Java中发生故障后仍可以继续执行。 我知道那是Error类的一个子集。错误类Error被贬义为“ Throwable的子类,它指示合理的应用程序不应尝试捕获的严重问题”。 这听起来像是一条建议,而不是一条规则,这表明实际上允许捕获像StackOverflowError这样的错误,这取决于程序员的合理性。看到了,我测试了这段代码,它正常终止。 怎么会这样?我认为,当引发S