当前位置: 首页 > 知识库问答 >
问题:

Apache Flink不返回空闲分区的数据

蓝华皓
2023-03-14

我试图根据事件时间计算Kafka主题每分钟传入事件的速率。为此,我使用了1分钟的TumblingEventTimeWindows。下面给出了代码片段。

我观察到,如果我没有收到特定窗口的任何事件,例如从2.34到2.35,则前一个窗口2.33到2.34不会关闭。我理解在2.33到2.34的窗口中丢失数据的风险(可能是由于系统故障、Kafka滞后更大等原因),但我不能无限期地等待。等待一段时间后,我需要关闭此窗口,系统恢复后,后续窗口可以继续。我怎样才能做到这一点?

StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
    executionEnvironment.setRestartStrategy(RestartStrategies.fixedDelayRestart(
            3,
            org.apache.flink.api.common.time.Time.of(10, TimeUnit.SECONDS)
    ));
    executionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    executionEnvironment.setParallelism(1);
    Properties properties = new Properties();
    properties.setProperty("bootstrap.servers", "localhost:9092");
    properties.setProperty("group.id", "AllEventCountConsumerGroup");
    FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>("event_input_topic", new SimpleStringSchema(), properties);
    DataStreamSource<String> kafkaDataStream = environment.addSource(kafkaConsumer);
    kafkaDataStream
            .flatMap(new EventFlatter())
            .filter(Objects::nonNull)
            .assignTimestampsAndWatermarks(WatermarkStrategy
                    .<Entity>forMonotonousTimestamps()
                    .withIdleness(Duration.ofSeconds(60))
                    .withTimestampAssigner((SerializableTimestampAssigner<Entity>) (element, recordTimestamp) -> element.getTimestamp()))
            .assignTimestampsAndWatermarks(new EntityWatermarkStrategy())
            .keyBy((KeySelector<Entity, String>) Entity::getTenant)
            .window(TumblingEventTimeWindows.of(Time.minutes(1)))
            .allowedLateness(Time.seconds(10))
            .aggregate(new EventCountAggregator())
            .addSink(eventRateProducer);

private static class EntityWatermarkStrategy implements WatermarkStrategy<Entity> {
    @Override
    public WatermarkGenerator<Entity> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {

        return new EntityWatermarkGenerator();
    }

}

private static class EntityWatermarkGenerator implements WatermarkGenerator<Entity> {

    private long maxTimestamp;

    public EntityWatermarkGenerator() {
        this.maxTimestamp = Long.MIN_VALUE + 1;
    }

    @Override
    public void onEvent(Entity event, long eventTimestamp, WatermarkOutput output) {
        maxTimestamp = Math.max(maxTimestamp, eventTimestamp);
    }

    @Override
    public void onPeriodicEmit(WatermarkOutput output) {
        output.emitWatermark(new Watermark(maxTimestamp + 2));
    }
}

此外,我尝试添加一些自定义触发器,但没有帮助。我正在使用Apache Flink 1.11

有人能告诉我,我做错了什么?

当我尝试推送更多具有更新时间戳(例如t1)的主题的数据时,来自更早时间帧(t)的数据会被推送。但是对于t1数据,同样的问题发生在t。

共有3个答案

锺离声
2023-03-14

使用flink 1.11 watermarkstrategy api可以帮助您避免抽取虚假数据。您需要的是在每分钟结束时定期生成水印。这是参考:

https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_timestamps_watermarks.html

使用CustomKafkaSerializer创建flinkKafkaConsumer:

Flink Kafka消费者其他消费者=新的Flink Kafka消费者(主题,新的CustomKafkaSerializer(apacheFlink环境加载器),道具);

如何创建CustomKafkaSerializer?Ans-关于Flink反序列化的两个问题

现在为flinkKafkaConsumer使用水印策略:

FlinkKafkaConsumer<Tuple3<String,String,String>> flinkKafkaConsumer = apacheKafkaConfig.getOtherConsumer();
        flinkKafkaConsumer.assignTimestampsAndWatermarks(new ApacheFlinkWaterMarkStrategy(envConfig.getOutOfOrderDurationSeconds()).
                withIdleness(Duration.ofSeconds(envConfig.getIdlePartitionTimeout())));

这就是水印策略的样子?

Ans-

public class ApacheFlinkWaterMarkStrategy implements WatermarkStrategy<Tuple3<String, String, String>> {

    private long outOfOrderDuration;

    public ApacheFlinkWaterMarkStrategy(long outOfOrderDuration)
    {
        super();
        this.outOfOrderDuration = outOfOrderDuration;
    }

    @Override
    public TimestampAssigner<Tuple3<String, String, String>> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
        return new ApacheFlinkTimeForEvent();
    }

    @Override
    public WatermarkGenerator<Tuple3<String, String, String>> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
        return new ApacheFlinkWaterMarkGenerator(this.outOfOrderDuration);
    } }

这是我们从有效载荷中获取事件时间的方式:

Ans公司-

public class ApacheFlinkTimeForEvent implements SerializableTimestampAssigner<Tuple3<String,String,String>> {

    public static final Logger logger = LoggerFactory.getLogger(ApacheFlinkTimeForEvent.class);

    private static final FhirContext fhirContext = FhirContext.forR4();


    @Override
    public long extractTimestamp(Tuple3<String,String,String> o, long l) {
        //get timestamp from payload
    }
}

这就是我们定期生成水印的方式,这样无论数据是否到达,水印都会在每个分区中每分钟更新一次。

public class ApacheFlinkWaterMarkGenerator implements WatermarkGenerator<Tuple3<String,String,String>> {

    public static final Logger logger = LoggerFactory.getLogger(ApacheFlinkWaterMarkGenerator.class);

    private long outOfOrderGenerator;

    private long maxEventTimeStamp;

    public ApacheFlinkWaterMarkGenerator(long outOfOrderGenerator)
    {
        super();
        this.outOfOrderGenerator = outOfOrderGenerator;
    }

    @Override
    public void onEvent(Tuple3<String, String, String> stringStringStringTuple3, long l, WatermarkOutput watermarkOutput) {
        maxEventTimeStamp = Math.max(maxEventTimeStamp,l);
        Watermark eventWatermark = new Watermark(maxEventTimeStamp);
        watermarkOutput.emitWatermark(eventWatermark);
        logger.info("Current Watermark emitted from event is {}",eventWatermark.getFormattedTimestamp());
    }

    @Override
    public void onPeriodicEmit(WatermarkOutput watermarkOutput) {

        long currentUtcTime = Instant.now().toEpochMilli();
        Watermark periodicWaterMark = new Watermark(currentUtcTime-outOfOrderGenerator);
        watermarkOutput.emitWatermark(periodicWaterMark);
        logger.info("Current Watermark emitted periodically is {}",periodicWaterMark.getFormattedTimestamp());

    }
}

此外,水印的周期性发射必须在应用程序开始时设置。

streamExecutionEnvironment.getConfig().setAutoWatermarkInterval(This is in milliseconds long);

这就是我们如何向flinkKafkaConsumer添加自定义水印和时间戳的方法

flinkKafkaConsumer.assignTimestampsAndWatermarks(new ApacheFlinkWaterMarkStrategy(Out of Order seconds).
            withIdleness(IdlePartiton Seconds);
钮巴英
2023-03-14

有关解决问题的方法,请参阅空闲分区文档。

韩宏朗
2023-03-14

在您的案例中,withIdleness()没有帮助的一个原因是,在Kafka源发出数据流之后,您在数据流上调用了assignTimestampsAndWatermarks,而不是在FlinkKafkaConsumer本身上调用它。如果要执行后者,FlinkKafkaConsumer将能够在每个分区的基础上分配时间戳和水印,并将在每个kafka分区的粒度上考虑空闲。有关更多信息,请参阅水印策略和Kafka连接器。

但是,要使此工作正常,您需要使用能够创建带有时间戳的单个流记录的反序列化器,而不是SimpleStringSchema(例如KafkaDeserializationSchema)。有关如何实现KafkaDeserializationSchema的示例,请参阅https://stackoverflow.com/a/62072265/2000823。

但是,请记住,如果所有分区都处于空闲状态,with Idlness()不会推进水印。它将做的是防止空闲分区阻止水印,如果有来自其他分区的事件,水印可能会推进。

 类似资料:
  • 问题内容: 由于某些原因,我无法从以下代码中找出原因: 我得到: 我认为,什么时候可以得到: …因为我不认为我的时区距离UTC 6小时9分钟。 我已经查看了源代码,但是我承认我还不能完全弄清楚出了什么问题。 我已经将其他值传递给该函数,并且它返回的值似乎是正确的。但是由于某种原因,与我的时区有关的信息不正确。 最后,我旁边的多维数据集中的同事已确认该函数在其计算机上返回了正确的时区信息。 有谁知道

  • 我有两个模型;一个用于用户,另一个用于学习组。每个StudyGroup都有一个唯一的字段。用户模型有一个studyGroups字段,它是字符串的数组。一个用户可以加入多个学习组。 用户模型

  • 因此,我得到了一个字符串,它是我在一个类上实现的toJson方法的结果,并且在我的测试代码中确认它是我的类的正确Json表示。我的目标是使用Gson将此字符串转换为JsonObject并将其传递给构造函数。然而,我遇到了一个奇怪的问题。 这是我要调用的代码: 我以前在我的项目中的许多地方,在其他类中,都使用过完全相同的代码片段,而且效果很好。我甚至将其中一段功能代码复制到这个测试类中,并进行了尝试

  • 我正在玩新的Android设计库。折叠工具栏布局完美运行。但是,我无法将工具栏的默认状态设置为折叠。 我正在尝试实现这里和这里显示的解决方案 我在我的活动摘要中调用以下代码: 但是,params返回的行为为null。我的xml代码和这里一样,除了我没有使用drawer和CordinatorLayout是我的根布局。 编辑:我之前尝试过切换AppBarLayout。AppBarLayout的行为。S

  • 问题内容: 我有两个模型,用户模型和时间表,我想用$ lookup 和猫鼬把这两个模型结合起来。 用户(型号) 时间表(型号) 现在我的查询使用猫鼬: 用户汇总 我的查询结果是一个空的array(),如下所示: 查询结果: 我不知道为什么,但是查询结果是一个 空数组 ,我试图使用$ unwind和$ match,但也无法正常工作。 编辑: 用户集合 时间表的收集 问题答案: 猫鼬在创建时将集合名称

  • 我有一个注释定义如下: 我是这样使用它的: 现在,我有了第二个接口,它扩展了第一个: 我想获得MySecondInterface的所有注释,这意味着我也想获得超级接口上定义的注释。 我所尝试的: ######################################################################################### 结果是: 在所有情况下,