当前位置: 首页 > 知识库问答 >
问题:

带有时间戳的闪烁计数器

单于越
2023-03-14

我正在阅读Flink示例CountWithTimestamp,下面是该示例的代码片段:

  @Override
    public void processElement(Tuple2<String, String> value, Context ctx, Collector<Tuple2<String, Long>> out)
            throws Exception {

        // retrieve the current count
        CountWithTimestamp current = state.value();
        if (current == null) {
            current = new CountWithTimestamp();
            current.key = value.f0;
        }

        // update the state's count
        current.count++;

        // set the state's timestamp to the record's assigned event time timestamp
        current.lastModified = ctx.timestamp();

        // write the state back
        state.update(current);

        // schedule the next timer 60 seconds from the current event time
        ctx.timerService().registerEventTimeTimer(current.lastModified + 60000);
    }

    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<String, Long>> out)
            throws Exception {

        // get the state for the key that scheduled the timer
        CountWithTimestamp result = state.value();

        // check if this is an outdated timer or the latest timer
        if (timestamp == result.lastModified + 60000) {
            // emit the state on timeout
            out.collect(new Tuple2<String, Long>(result.key, result.count));
        }
    }
}

我的问题是,如果我删除onTimer中的if语句timestamp==result.lastmodified+60000(收集未触摸的stmt),而代之以processElement开头的另一个if语句if(ctx.timestamping ,程序的语义是否相同?在语义相同的情况下,一个版本比另一个版本有什么偏好吗?


共有1个答案

欧阳狐若
2023-03-14

您认为删除计时器的实现具有相同的语义是正确的。事实上,我最近改变了我们培训教材中使用的例子来做这件事,因为我更喜欢这种方法。我觉得它更好的原因是,所有复杂的业务逻辑都在一个地方(在processElement中),而且每当调用ontimer时,您都知道要做什么,不需要问任何问题。此外,它的性能更好,因为检查点和最终触发的计时器更少。

这个例子是在可以删除计时器之前为文档编写的,还没有更新。

通过注册页面后,您可以找到我在这些幻灯片中提到的修改过的示例--https://training.ververica.com/decks/process-function/。

FWIW,我最近还按照相同的思路重新编写了相应训练练习的参考解决方案:https://github.com/apache/flink-training/tree/master/long-ride-alerts。

 类似资料:
  • 我正在阅读《Stream Processing with Apache Flink》一书,书中说:“从版本0.10.0开始,Kafka支持消息时间戳。当从Kafka版本0.10或更高版本读取时,如果应用程序以事件时间模式运行,使用者将自动提取消息时间戳作为事件时间戳*“因此在函数中,调用将默认返回Kafka消息时间戳?请提供一个简单的示例,说明如何实现AssignerWithPeriodicalW

  • 问题内容: 中欧夏令时开始于三月的最后一个星期日。我们将时钟设置为02:00到03:00。如果我在数据库请求中进行时间戳计算会发生什么?比方说,在01:59? 结果是03:00还是02:00? 如果我们将时钟设置为03:00到02:00,那结束了呢? 时间从03:00更改为02:00之后…在02:00会发生什么?是02:59还是01:59? 应该如何处理?最佳实践以及Oracle Database

  • 我需要根据一个键连接两个事件源。事件之间的间隔最长可达1年(即具有id1的event1可能在今天到达,而来自第二个事件源的具有id1的相应event2可能在一年后到达)。假设我只想输出连接的事件输出。 我正在探索在RocksDB后端使用Flink的选项(我遇到了表API,它们似乎适合我的用例)。我找不到做这种长窗口连接的引用体系结构。我希望系统一天能处理大约2亿个事件。 关于处理这种长窗口连接的任

  • 我正在运行一个简单的示例来测试基于EventTime的Windows。我能够生成带有处理时间的输出,但当我使用EventTime时,没有输出。请帮助我明白我做错了什么。

  • 问题内容: 我在使用受信任的时间戳与Bouncy Castle创建有效的CMS签名时遇到问题。签名创建工作良好(我想将签名包括到PDF文件中),签名有效。但是,当我在签名的未签名属性表中添加可信时间戳后,签名仍然保持有效,但是Reader会报告 该签名包括嵌入式时间戳,但是无效 。这使我相信,哈希时间戳是不正确的,但是我似乎无法弄清楚问题出在哪里。 签名代码: 该代码: : 谢谢你的帮助! 示例文

  • 这是因为在开发模式下,为了通过 Webpack 实现热加载,CSS代码是打包在 JavaScript 代码中,并动态打到页面中去,从而元素重绘引起了闪烁。 不用担心,在生产模式下,CSS代码会单独打包至独立的文件并置于head标签内,不会出现页面闪烁的现象。