我试图从动态表和基于某些字段的流中派生新表。
有没有人能为你提供最好的指导。我对flink和尝试新事物是陌生的。
//Dynamic Table
Table books = tEnv.sqlQuery("SELECT bookId, instrument, sum(tradedQuanity) as totalQuantity FROM tradeStreamTable group by bookId, instrument");
tEnv.registerTable("books", books);
书籍
============================
BookId, Instruments, Quantity
Book1, Goog,100
Book2, Vod,10
Book1, Appl,50
Book2, Goog,60
Book1, Vod,130
Book3, Appl,110
//My Stream
tEnv.registerDataStream("allInstrumentsTable", allInstruments, "timeStampMs, instrument, instrumentValue ");
allInstrumentsTable公司
====================================================================================================“时间戳、工具、工具值(价格)
流。。。。。。
=========================================
每当我在book table中得到新的更改或流中的instrumentValue得到新的instrumentValue时,我都试图派生新的表(动态)。连接仪表,仪表值*总数量。
预订-最新价格(新表格)
======================================
BookId, Instruments, Quantity, InstrentValue*总计数量
Book1, Goog,100,1203
Book1, Appl,50,...
Book1, Vod,130,...
Book2, Vod,10,...
Book2, Goog,60,...
Book3, Appl,110,...
您不能从外部“更新”表。流中的表类似于RDBMS中的物化视图。它们是从特定时刻的流状态导出的视图。
你能做的就是从这两个表中得到一个新表
SELECT instrument, instrumentValue * totalQuantity FROM allInstrumentsTable aJOIN books b ON a.instrument = b.instrument;
由于这是非窗口连接,您还应该考虑一些保留策略,以确保状态不会无限期增长。
我在flink中有两个nifi流源,我需要对这两个源执行连接。哪种方法更好?它是数据流提供的联接api还是表api(https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/streaming.html#streaming-concepts)?
在我的情况下,有可能,例如,一个新的设备被启动,因此必须处理另一个流。但是如何动态添加这个新流呢?
下面的相同代码显示了两个源函数-一个产生0-20的偶数,另一个产生1-20的奇数,连接在一起以输出所有两个流的并集并将它们打印出来。 示例代码: 输出 Q1. Flink应该将连接流中最先到达的项目发送到协处理函数。然而,我们在这里看到的是,数字“2”是以源函数的方式在数字“11”之前生成的,但数字“11”是在“2”之前发送给协处理函数的。为什么会这样? 第二季度。 连接流中无背压发生。源函数一直
我想使用Flink流媒体以低延迟处理市场数据( 我有一组计算,每个都订阅三个流:缓慢移动的参数数据、股票价格和汇率。 例如。 Params(缓慢滴答:每天一次或两次): 资源(每秒多次滴答声): fx(每秒多次滴答声): 每当任何股票、外汇汇率或参数数据发生变化时,我都想立即计算结果并将其输出为新流。这在逻辑上可以表示为连接: 例如选择价格=(params.strike-asset.spot)*f
我们计划使用Flink CEP根据一些动态模板处理大量事件。系统必须识别事件链(有时是带有条件和分组的复杂链)。模板将由用户创建。换句话说,我们必须在不接触代码的情况下创建复杂的模板。是否可以使用Apache Flink解决此问题?Filnk是否支持动态模板?
我想将一系列尝试加入到一个静态的阻止电子邮件列表中,并按IP对结果进行分组,以便稍后统计一组相关的统计数据。结果应在每10秒后以30分钟的滑动窗口交付。以下是我尝试实现这一目标的几种方法之一: 这使用下面的用户定义的表函数,该函数已在my tableEnv中注册为BlockedEmailList: 但是,它返回以下错误: 如果我按照建议执行并将创建的时间戳转换为时间戳,我会得到以下结果: 我在这里