当前位置: 首页 > 知识库问答 >
问题:

Apache Flink流事件延迟

郤浩慨
2023-03-14

我试图开发以下代码,但它不起作用。我想使用apache Flink来延迟时间(在时间戳字段中指定的)与当前日期不同的事件。

样品:

>

  • 当前日期:2022-05-06 10:30

    事件1[{“user1”:“1”,“user2”:“2”,“timestamp”:“2022-05-06 10:30”}--

    事件2[{“user1”:“1”,“user2”:“2”,“timestamp”:“2022-05-06 13:30”}--

    FLinkLikePipeline

    public class FlinkLikePipeline {
    
    public static void createBackup() throws Exception {
        String inputTopic = "flink_input";
        String outputTopic = "flink_output";
        String consumerGroup = "baeldung";
        String kafkaAddress = "localhost:9092";
    
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        environment.getConfig().setAutoWatermarkInterval(Duration.ofMillis(1).toMillis());
    
        FlinkKafkaConsumer011<Like> consumer = Consumers.createMsgLike(inputTopic, kafkaAddress, consumerGroup);
        consumer.assignTimestampsAndWatermarks(new InputMessageTimestampAssigner());
    
        FlinkKafkaProducer011<Matches> producer = new FlinkKafkaProducer011<Matches>(kafkaAddress, outputTopic,
                new BackupSerializationSchema());
    
        DataStream<Like> likes = environment.addSource(consumer);
    
        DataStream<Matches> matches = likes.keyBy(like -> like.getId()).process(new MatchFunction());
        matches.addSink(producer);
    
        environment.execute("Keyed Process Function Example");
    
    }
    
    public static void main(String[] args) throws Exception {
        createBackup();
    }
    

    }

    InputMessageTimestampAssigner

    public class InputMessageTimestampAssigner implements 
    AssignerWithPunctuatedWatermarks<Like> {
    
    @Override
    public long extractTimestamp(Like element, long previousElementTimestamp) {
        Timestamp now = new Timestamp(System.currentTimeMillis());
        return now.getTime();
    
    }
    
    @Override
    public Watermark checkAndGetNextWatermark(Like lastElement, long extractedTimestamp) {
        return new Watermark(extractedTimestamp);
    }
    

    }

    匹配函数

    public class MatchFunction extends KeyedProcessFunction<String, Like, Matches> {
    
    ValueState<Like> liked;
    
    @Override
    public void open(Configuration parameters) throws Exception {
        liked = getRuntimeContext().getState(new ValueStateDescriptor<>("like", Like.class));
    }
    
    @Override
    public void processElement(Like newLike, Context ctx, Collector<Matches> out) throws Exception {
    
        Timestamp now = new Timestamp(System.currentTimeMillis());
        Timestamp sendAt = newLike.getTimestamp();
    
        if (sendAt.before(now)) {
            liked.clear();
            out.collect(new Matches(newLike.getUser1(), newLike.getUser2(), newLike.getTimestamp()));
        } else {
            // schedule the next timer by sendAt
            liked.update(newLike);
            ctx.timerService().registerEventTimeTimer(sendAt.getTime());
        }
    }
    
    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<Matches> out) throws Exception {
        Like like = liked.value();
        Timestamp now = new Timestamp(System.currentTimeMillis());
        Timestamp sendAt = like.getTimestamp();
        if (now == sendAt) {
            liked.clear();
            out.collect(new Matches(like.getUser1(), like.getUser2(), like.getTimestamp()));
        }
    
    }
    

    }

    制作人Topic Kafka(Json Sample):{"user1":"1","user2":"2","时间戳":"2022-05-06T11:32:00.000Z"}

    谢谢你的支持,朱塞佩。

  • 共有1个答案

    太叔昊穹
    2023-03-14

    尝试以下操作:

    public class MatchFunction extends KeyedProcessFunction<String, Like, Matches> {
    
    ValueState<Like> liked;
    
    @Override
    public void open(Configuration parameters) throws Exception {
        liked = getRuntimeContext().getState(new ValueStateDescriptor<>("like", Like.class));
    }
    
    @Override
    public void processElement(Like newLike, Context ctx, Collector<Matches> out) throws Exception {
    
        Timestamp now = new Timestamp(ctx.timestamp());  // CHANGE: Use context timestamp
        Timestamp sendAt = newLike.getTimestamp();
    
        if (sendAt.before(now)) {
            liked.clear();
            out.collect(new Matches(newLike.getUser1(), newLike.getUser2(), newLike.getTimestamp()));
        } else {
            // schedule the next timer by sendAt
            liked.update(newLike);
            ctx.timerService().registerProcessingTimeTimer(sendAt.getTime()); // CHANGED: change to processing time
        }
    }
    
    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<Matches> out) throws Exception {
        Like like = liked.value();
        Timestamp now = new Timestamp(timestamp);  // CHANGE: Use current timestamp
        Timestamp sendAt = like.getTimestamp();
        if (now >= sendAt) { // CHANGE: Should try equals or great
            liked.clear();
            out.collect(new Matches(like.getUser1(), like.getUser2(), like.getTimestamp()));
        }
    
    }
    

    public static void createBackup() throws Exception {
        String inputTopic = "flink_input";
        String outputTopic = "flink_output";
        String consumerGroup = "baeldung";
        String kafkaAddress = "localhost:9092";
    
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        environment.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);  // CHANGED: to processingtime
        environment.getConfig().setAutoWatermarkInterval(Duration.ofMillis(1).toMillis());
    
        FlinkKafkaConsumer011<Like> consumer = Consumers.createMsgLike(inputTopic, kafkaAddress, consumerGroup);
        consumer.assignTimestampsAndWatermarks(new InputMessageTimestampAssigner());
    
        FlinkKafkaProducer011<Matches> producer = new FlinkKafkaProducer011<Matches>(kafkaAddress, outputTopic,
                new BackupSerializationSchema());
    
        DataStream<Like> likes = environment.addSource(consumer);
    
        DataStream<Matches> matches = likes.keyBy(like -> like.getId()).process(new MatchFunction());
        matches.addSink(producer);
    
        environment.execute("Keyed Process Function Example");
    
    }
    
     类似资料:
    • 我读过几篇关于Flink的文章,在读Flink的博客时,我遇到了这样一句话:“最多延迟60秒(事件最多延迟1分钟)” 是否在Flink中定义乱序事件持续时间用于技术“水印”,如果不是,那么内部目的是什么?

    • 我正在尝试为ApacheFlink导入ScalaAPI流扩展,如中所述https://ci.apache.org/projects/flink/flink-docs-master/apis/scala_api_extensions.html 但是,我的ScalaIDE抱怨以下消息:对象扩展不是包的成员org.apache.flink.streaming.api.scala 我使用的是scala 2

    • 我有以下代码来计算socketTextStream中的单词。累积字数和时间窗字数都是必需的。该程序存在累积计数始终与窗口计数相同的问题。为什么会出现这个问题?根据加窗计数计算累积计数的正确方法是什么?

    • 事件流 三个过程: 从window对象向下到触发元素的父级元素是捕获过程; 然后触发相应事件 从当前触发事件的节点的父节点开始向上冒泡,冒泡到顶层的window对象 capture phase 捕获过程 target phase 触发过程 bubble phase 冒泡过程 冒泡与捕获 <!DOCTYPE html> <html> <head> <meta charset="utf-8

    • DOM 事件流描述了 DOM 时间响应的阶段、路径。 DOM 事件流也会被称为 DOM 事件模型。 1. 事件流阶段 事件流有三个阶段: 捕获阶段 从window开始,寻找触发事件最深层的节点,过程中如果有节点绑定了对应事件,则触发事件 目标阶段 找到事件触及的最深节点 冒泡阶段 从最深节点按照捕获的路径进行返回,过程中如果有节点绑定了对应事件,则触发事件 现代浏览器默认都会在冒泡阶段触发事件。

    • 当浏览器发展到第四代时(IE4 及Netscape Communicator 4),浏览器开发团队遇到了一个很有意思的问题:页面的哪一部分会拥有某个特定的事件?要明白这个问题问的是什么,可以想象画在一张纸上的一组同心圆。如果你把手指放在圆心上,那么你的手指指向的不是一个圆,而是纸上的所有圆。两家公司的浏览器开发团队在看待浏览器事件方面还是一致的。如果你单击了某个按钮,他们都认为单击事件不仅仅发生在