当前位置: 首页 > 知识库问答 >
问题:

Flink动态表和流连接

周和歌
2023-03-14

我试图从动态表和基于某些字段的流中派生新表。

有没有人能为你提供最好的指导。我对flink和尝试新事物是陌生的。

//Dynamic Table

Table books = tEnv.sqlQuery("SELECT bookId, instrument, sum(tradedQuanity) as totalQuantity FROM tradeStreamTable group by bookId, instrument");

tEnv.registerTable("books", books);

书籍

============================
BookId, Instruments, Quantity
Book1, Goog,100
Book2, Vod,10
Book1, Appl,50
Book2, Goog,60
Book1, Vod,130
Book3, Appl,110

//My Stream

tEnv.registerDataStream("allInstrumentsTable", allInstruments, "timeStampMs, instrument, instrumentValue ");

allInstrumentsTable公司

====================================================================================================“时间戳、工具、工具值(价格)
流。。。。。。

=========================================

每当我在book table中得到新的更改或流中的instrumentValue得到新的instrumentValue时,我都试图派生新的表(动态)。连接仪表,仪表值*总数量。

预订-最新价格(新表格)

======================================
BookId, Instruments, Quantity, InstrentValue*总计数量
Book1, Goog,100,1203
Book1, Appl,50,...
Book1, Vod,130,...
Book2, Vod,10,...
Book2, Goog,60,...
Book3, Appl,110,...

共有1个答案

勾长卿
2023-03-14

您不能从外部“更新”表。流中的表类似于RDBMS中的物化视图。它们是从特定时刻的流状态导出的视图。

你能做的就是从这两个表中得到一个新表

SELECT instrument, instrumentValue * totalQuantity FROM allInstrumentsTable aJOIN books b ON a.instrument = b.instrument;

由于这是非窗口连接,您还应该考虑一些保留策略,以确保状态不会无限期增长。

 类似资料:
  • 我在flink中有两个nifi流源,我需要对这两个源执行连接。哪种方法更好?它是数据流提供的联接api还是表api(https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/streaming.html#streaming-concepts)?

  • 在我的情况下,有可能,例如,一个新的设备被启动,因此必须处理另一个流。但是如何动态添加这个新流呢?

  • 下面的相同代码显示了两个源函数-一个产生0-20的偶数,另一个产生1-20的奇数,连接在一起以输出所有两个流的并集并将它们打印出来。 示例代码: 输出 Q1. Flink应该将连接流中最先到达的项目发送到协处理函数。然而,我们在这里看到的是,数字“2”是以源函数的方式在数字“11”之前生成的,但数字“11”是在“2”之前发送给协处理函数的。为什么会这样? 第二季度。 连接流中无背压发生。源函数一直

  • 我想使用Flink流媒体以低延迟处理市场数据( 我有一组计算,每个都订阅三个流:缓慢移动的参数数据、股票价格和汇率。 例如。 Params(缓慢滴答:每天一次或两次): 资源(每秒多次滴答声): fx(每秒多次滴答声): 每当任何股票、外汇汇率或参数数据发生变化时,我都想立即计算结果并将其输出为新流。这在逻辑上可以表示为连接: 例如选择价格=(params.strike-asset.spot)*f

  • 我们计划使用Flink CEP根据一些动态模板处理大量事件。系统必须识别事件链(有时是带有条件和分组的复杂链)。模板将由用户创建。换句话说,我们必须在不接触代码的情况下创建复杂的模板。是否可以使用Apache Flink解决此问题?Filnk是否支持动态模板?

  • 我想将一系列尝试加入到一个静态的阻止电子邮件列表中,并按IP对结果进行分组,以便稍后统计一组相关的统计数据。结果应在每10秒后以30分钟的滑动窗口交付。以下是我尝试实现这一目标的几种方法之一: 这使用下面的用户定义的表函数,该函数已在my tableEnv中注册为BlockedEmailList: 但是,它返回以下错误: 如果我按照建议执行并将创建的时间戳转换为时间戳,我会得到以下结果: 我在这里