当前位置: 首页 > 知识库问答 >
问题:

是否有一种方法可以定义一个动态表,其中包含最近未被事件触及的条目?

谷梁博易
2023-03-14

我是Flink的新手,我正在尝试使用它来获得我的应用程序的一堆实时视图。我想构建的动态视图中至少有一个是显示未满足SLA的条目——或基本上已过期——并且这样做的条件是一个简单的时间戳比较。因此,如果最近没有被事件触及,我基本上希望一个条目显示在我的动态表中。在开发环境中使用Flink 1.6(由于AWS Kinesis而受到限制)时,我没有看到Flink正在重新评估条件,除非事件触及该条目。

我已经将我的开发环境插入了Kinesis流,该流正在从Web服务器发送实时访问日志事件。这不是我真正的用例,但它很容易开始测试。我编写了一个简单的表查询,它拉取请求路径,它的最后一次访问时间,并计算一个布尔标志以指示它是否在最后一分钟没有被访问。我正在通过连接到PrintSinkFunction的收回流进行调试,以便将所有更新/删除打印到我的控制台。

tEnv.registerDataStream("AccessLogs", accessLogs, "username, status, request, responseSize, referrer, userAgent, requestTime, ActionTime.rowtime");

Table paths = tEnv.sqlQuery("SELECT request AS path, MAX(requestTime) as lastTime, CASE WHEN MAX(requestTime) < CURRENT_TIMESTAMP - INTERVAL '1' MINUTE THEN 1 ELSE 0 END AS expired FROM AccessLogs GROUP BY request");

DataStream<Tuple2<Boolean, Row>> retractStream = tEnv.toRetractStream(paths, Row.class);
retractStream .addSink(new PrintSinkFunction<>());

我期望当我访问一个页面时,一个添加事件被发送到这个流。然后,如果我等待1分钟(什么都不做),我的表中的CASE语句将计算为1,因此我应该看到一个删除事件,然后添加一个设置了该标志的事件。

我实际上看到的是,在我再次加载该页面之前什么都不会发生。Delete事件实际上设置了标志,而紧接其后的Add事件再次清除了它(应该如此,因为它不再“过期”)。

// add/delete, path, lastAccess, expired
(true,/mypage,2019-05-20 20:02:48.0,0) // first page load, add event
(false,/mypage,2019-05-20 20:02:48.0,1) // second load > 2 mins later, remove event for the entry with expired flag set
(true,/mypage,2019-05-20 20:05:01.0,0) // second load, add event

编辑:我在搜索中遇到的最有用的技巧是创建一个ProcessFunction。我认为这是我可以在动态表中使用的东西(在某些情况下,我最终会使用中间流来查看计算的日期),但希望不必如此。

我已经使用了ProcessFunction方法,但它需要比我最初想象的更多的修补:

  1. 我必须在我的POJO中添加一个字段,该字段在onTimer()方法中发生变化(可能是一个日期或您每次都只是碰到的版本)
  2. 我必须将此字段注册为动态表的一部分
  3. 我必须在查询中使用此字段,以便重新计算查询并更改布尔标志(即使我实际上并未使用新字段)。我刚刚将其添加为我的SELECT子句的一部分。

共有1个答案

商正浩
2023-03-14

您的方法看起来很有前途,但是Flink的表API / SQL不支持与移动的“现在”时间戳进行比较。

我会分两步解决这个问题。

    < li >在upsert模式下注册动态表,即基于版本时间戳(在您的情况下为< code>request)按键(在您的情况下为< code > request )进行up sert的表。产生的动态表将保存每个请求的最新行。 < li >使用像您这样的简单过滤谓词进行查询,该查询比较动态(更新)表中各行的版本时间戳,并过滤掉时间戳太接近现在的所有行。

不幸的是,这两种功能(upsert转换和与移动的“现在”时间戳的比较)在Flink中都不可用。不过,对于upsert表转换还有一些正在进行的工作。

 类似资料:
  • 我希望避免文件中杂乱的东西,而在我看来,放在单独的文件中会更好。 应该类似于 这有可能吗?如果不是,什么是避免杂乱的明智方法呢?

  • 我们公司的一些团队目前正在“升级”一些遗留项目以使用Flyway。一个问题是存在多个具有已可用数据库对象(基线)的安装。 我们目前有一个用例(将来肯定会有更多),我们必须创建一些新的迁移脚本,但它们只能在满足数据库当前状态的特定条件时运行。 例如:运行此脚本,但仅当数据库中的表 X 包含数据 Y 时 我已经看到了Flyway条件数据库迁移,但对我来说,这似乎不是正确的解决方案,因为条件不仅仅是一些

  • 问题内容: 是否可以在另一个CSS文件中包含一个CSS文件? 问题答案: 是: 注意: 该规则必须先于所有其他规则(除外)。 其他语句需要其他服务器请求。或者,将所有CSS连接到一个文件中,以避免多个HTTP请求。例如,复制的内容和入和仅供参考。

  • 我试图帮助我的队友使用TestNG编写更稳定、更独立的Selenium UI自动化测试。因此,我想知道是否有人有实施以下内容的经验: 从功能和/或烟雾测试包中随机选择6项测试 将新开发的测试置于套件的中间 在每次运行前更改包中的随机测试选择,在配置上运行3次

  • 问题内容: 在i386 linux上。如果可能,最好在c /(c / posix std libs)/ proc中。如果没有,那么任何程序集或第三方库都可以做到这一点? 编辑:我正在尝试开发测试内核模块是否清除缓存行或整个处理器(与wbinvd())。程序以root身份运行,但我希望尽可能保留在用户空间中。 问题答案: 高速缓存一致性系统会尽最大努力向您隐藏此类信息。我认为您将不得不通过使用性能计

  • 我意识到我需要确保在上面的示例中数组中的对象确实是一个字符串,但是考虑到JSONArrays只能处理几种类型,这应该不会有问题。