我用Flink的table API创建了一个表。 当运行SQL以查看记录时,我得到: 我知道有一些坏的avro记录被推送到Kafka主题中。在JSON格式中,有一个选项可以通过设置来跳过/过滤这些记录。当从合流avro格式读取时,我们可以跳过这些记录吗? 这并不理想,但不幸的是,尽管有一个模式注册表,但我无法控制要推送到Kafka的内容。
在 Fink 源代码中,有 和 模块。为什么我们需要两个模块来进行 flink 流? https://github.com/apache/flink/tree/master/flink-streaming-java https://github.com/apache/flink/tree/master/flink-streaming-scala
我尝试根据flink doc添加一个具有事件时间属性的表源。我的代码像: 我在getDataStream方法中得到的是一个Kafka字符串源。我从每条kafka记录中提取了一个TsCol。我想使用TsCol作为事件时间。但是TsCol是字符串数据类型的10位时间戳,所以我需要将其转换为13位Long数据类型。当我尝试使用13位Long数据作为行时时,我得到了异常,说行时只能从SQL_TIMESTA
需要一些flink-sql流过程的示例。kafka源和数据库源。
我正在使用flink 1.12.0。尝试将数据流转换为表A并在tableA上运行sql查询以在如下窗口上聚合。我使用f2列作为其时间戳数据类型字段。 当我执行上述代码时,我得到 线程“main”org.apache.flink.table.api中出现异常。TableException:只能在时间属性列上定义窗口聚合,但遇到时间戳(6)。在org.apache.flink.table.planne
当前配置 应用程序在Flink 1.14.4上运行 在应用程序中,数据流(一长串运算符的结果)与Kafka源连接 每个分区的事件时间戳严格递增 Kafka上的每分区水印策略将水印设置为迄今为止看到的最大时间戳(无序度为1秒) 默认情况下,Kafka源(间隔连接的右侧)比其他数据流(Kafka)提前一分钟左右 问题 直到最近,我用新的KafkaSource类替换了不推荐使用的KafkCasumer类
在e2e FlinkSQL教程中,源表被定义为带有启用水印的时间戳列的Kafka源表 只要GROUP BY是由一个翻滚在ts上的字段生成的,这看起来很自然(因为Flink知道何时触发/弹出窗口),但在教程的中间我们看到了以下表达式 在这里,我们看到分组是在导数<code>date_str</code>字段上进行的,但是水印在这里是如何工作的呢?Flink如何决定何时“关闭”date_ str桶?由
我有下面的SQL查询,我在flink工作中使用。< code>mysql_table是使用JDBC连接器创建的,而< code>kafa_source表是从传入的kafka流创建的。 我在两者之间执行时态连接,当我在Flink的sql-client CLI中检查时,运行良好(用< code>flink-faker测试)。内部查询工作得非常好,并且正在打印结果。有人能帮助我找出这个问题吗? 编辑:我
我刚刚遇到了一个非常奇怪的问题,当使用带有时间戳和水印赋值器的EventTime时,我无法从流窗口联接中获得任何结果。 我使用Kafka作为我的数据流源,并尝试了AscendingTimestampExtractor和自定义赋值器,它们实现了Flink留档中提到的Assignerwith周期水印,正如我测试的那样,没有发出水印,也没有生成连接结果。如果我更改为使用ProcessingTime和Tu
我想使用窗口聚合,然后在中生成的结果之上运行窗口聚合。 是否可以在第一次聚合后修改属性以使其等于会话中最后观察到的事件的? 我正在尝试这样做: 关键部分是: 因此,我想将会话中最新事件的 重新分配给记录,而没有会话间隔(在本例中为 )。 这在BatchTable中运行良好,但在StreamTable中不起作用: 是的,我知道,感觉上我不想发明时间机器来改变时间顺序。但实际上有可能以某种方式实现所描
现在我们在 Flink 中拥有了带有花哨窗口的 SQL,我正在尝试从他们的 SQL 路线图/预览版 2017-03 帖子中引用衰减的移动平均线“,以”未来 Flink 版本对 Table API 和 SQL 的发布将会发生什么“: 这是我的尝试(也受到方解石腐烂的例子的启发): 时间是处理时间,通过从AppendStream表创建write_position,我们得到处理时间,如下所示: 我收到此
我正在构建一个流分析,对于单个表,它需要大约50 GB的初始状态内存。~50 GB是我将状态加载到Scala HashMap[String,String]时使用的RAM量。 对于一个随时间增长的单个表,Flink能处理大约50 GB的状态吗? 我是否能够以流式方式对此表执行查找和更新? 注意事项: 我无法将类型更改为更小的类型 该状态用作将一个字符串映射到另一个字符串的查找 该州需要三年的时间才能
我正在探索一种方法来实现这一点,就像下面的SQL一样。 是一个将聚合到
我使用在事件序列超出定义的时间窗口时丢弃它。 我正在设置水印如下 当模式超时发生时,< code>timeoutTimestamp应等于timewindow的第一个事件时间戳值 但是超时是在接收到下一个水印后触发的。是否在收到下一个水印后触发超时,或基于< code>timeWindow到期?
下面是代码片段,我在其中使用了基于翻滚事件时间的窗口 不幸的是,它似乎从未执行过reduce函数。如果使用上面的代码进行窗口处理,reduce函数可以正常工作。下面是时间戳提取器的代码。30秒水印延迟仅用作测试值,但一分钟翻转窗口为m bd.longValue(),它返回秒时间戳1498658629,因为我的窗口也是以秒为单位定义的。< br >当我使用返回分钟时间戳的bd.longValue()