我正在阅读Flink示例CountWithTimestamp,下面是该示例的代码片段:
@Override
public void processElement(Tuple2<String, String> value, Context ctx, Collector<Tuple2<String, Long>> out)
throws Exception {
// retrieve the current count
CountWithTimestamp current = state.value();
if (current == null) {
current = new CountWithTimestamp();
current.key = value.f0;
}
// update the state's count
current.count++;
// set the state's timestamp to the record's assigned event time timestamp
current.lastModified = ctx.timestamp();
// write the state back
state.update(current);
// schedule the next timer 60 seconds from the current event time
ctx.timerService().registerEventTimeTimer(current.lastModified + 60000);
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<String, Long>> out)
throws Exception {
// get the state for the key that scheduled the timer
CountWithTimestamp result = state.value();
// check if this is an outdated timer or the latest timer
if (timestamp == result.lastModified + 60000) {
// emit the state on timeout
out.collect(new Tuple2<String, Long>(result.key, result.count));
}
}
}
我的问题是,如果我删除onTimer中的if语句timestamp==result.lastmodified+60000
(收集未触摸的stmt),而代之以processElement开头的另一个if语句if(ctx.timestamping
您认为删除计时器的实现具有相同的语义是正确的。事实上,我最近改变了我们培训教材中使用的例子来做这件事,因为我更喜欢这种方法。我觉得它更好的原因是,所有复杂的业务逻辑都在一个地方(在processElement
中),而且每当调用ontimer
时,您都知道要做什么,不需要问任何问题。此外,它的性能更好,因为检查点和最终触发的计时器更少。
这个例子是在可以删除计时器之前为文档编写的,还没有更新。
通过注册页面后,您可以找到我在这些幻灯片中提到的修改过的示例--https://training.ververica.com/decks/process-function/。
FWIW,我最近还按照相同的思路重新编写了相应训练练习的参考解决方案:https://github.com/apache/flink-training/tree/master/long-ride-alerts。
我正在阅读《Stream Processing with Apache Flink》一书,书中说:“从版本0.10.0开始,Kafka支持消息时间戳。当从Kafka版本0.10或更高版本读取时,如果应用程序以事件时间模式运行,使用者将自动提取消息时间戳作为事件时间戳*“因此在函数中,调用将默认返回Kafka消息时间戳?请提供一个简单的示例,说明如何实现AssignerWithPeriodicalW
问题内容: 中欧夏令时开始于三月的最后一个星期日。我们将时钟设置为02:00到03:00。如果我在数据库请求中进行时间戳计算会发生什么?比方说,在01:59? 结果是03:00还是02:00? 如果我们将时钟设置为03:00到02:00,那结束了呢? 时间从03:00更改为02:00之后…在02:00会发生什么?是02:59还是01:59? 应该如何处理?最佳实践以及Oracle Database
我需要根据一个键连接两个事件源。事件之间的间隔最长可达1年(即具有id1的event1可能在今天到达,而来自第二个事件源的具有id1的相应event2可能在一年后到达)。假设我只想输出连接的事件输出。 我正在探索在RocksDB后端使用Flink的选项(我遇到了表API,它们似乎适合我的用例)。我找不到做这种长窗口连接的引用体系结构。我希望系统一天能处理大约2亿个事件。 关于处理这种长窗口连接的任
我正在运行一个简单的示例来测试基于EventTime的Windows。我能够生成带有处理时间的输出,但当我使用EventTime时,没有输出。请帮助我明白我做错了什么。
问题内容: 我在使用受信任的时间戳与Bouncy Castle创建有效的CMS签名时遇到问题。签名创建工作良好(我想将签名包括到PDF文件中),签名有效。但是,当我在签名的未签名属性表中添加可信时间戳后,签名仍然保持有效,但是Reader会报告 该签名包括嵌入式时间戳,但是无效 。这使我相信,哈希时间戳是不正确的,但是我似乎无法弄清楚问题出在哪里。 签名代码: 该代码: : 谢谢你的帮助! 示例文
这是因为在开发模式下,为了通过 Webpack 实现热加载,CSS代码是打包在 JavaScript 代码中,并动态打到页面中去,从而元素重绘引起了闪烁。 不用担心,在生产模式下,CSS代码会单独打包至独立的文件并置于head标签内,不会出现页面闪烁的现象。