我目前正在使用Flink 1.0编写一个聚合用例,作为该用例的一部分,我需要获得过去10分钟内登录的api数量。
这我可以很容易地使用keyBy("api"),然后应用10分钟的窗口和doe sum(count)操作。
但问题是我的数据可能会出现混乱,所以我需要一些方法来获取10分钟窗口内的api计数。。
例如:如果相同的api日志出现在两个不同的窗口中,我应该得到一个全局计数,即2,而不是两个单独的记录,每个窗口的显示计数为1。
我也不想要增量计数,即每个具有相同键的记录都显示多次,计数等于增量值。
我希望记录以全局计数显示一次,类似于Spark中的updateStateByKey()。
我们能做到吗?
您应该看看Flink的事件时间特性,它为无序流生成一致的结果。事件时间意味着Flink将根据作为事件一部分的时间戳处理数据,而不取决于机器的挂钟时间。
如果您选择事件时间(使用适当的水印)。Flink将使用自动处理无序到达的事件。
我一直在关注Flink 1.14中针对有界数据的不同全局数据排序选项。我发现Stackoverflow和其他网站上关于这个的很多问题都是好几年前的问题了,关于不推荐使用的API或者没有完全回答这个问题。由于Flink正在快速发展,我想问一下最新稳定的Flink (1.14)中的可用选项。 以下是我如何理解当前的情况(这可能是错误的)。我的问题也附上。Flink 有两个 API —— 和 , 它们可
我有一个表示为的自定义状态计算,当我的看到来自Kafka的新事件时,它将不断更新。现在,每次更新状态时,我都希望将更新后的状态打印到stdout。想知道怎么在Flink中做到这一点吗?与所有的窗口和触发器操作很少混淆,我一直得到以下错误。 我只想知道如何将我的聚合流打印到stdout或写回另一个kafka主题? 下面是引发错误的代码片段。
我使用Apache Flink来预测来自Twitter的流。 代码是在Scala中实现的 我的问题是,我从DataSet API训练的SVM模型需要一个DataSet作为predict()方法的输入。 我在这里已经看到一个问题,其中一个用户说,您需要编写一个自己的MapFunction,在作业开始时读取模型(参考:Flink中使用scala的实时流预测) 但是我不能写/理解这段代码。 即使我在St
我想使用大小为2的FIFO队列来存储数据流的元素。在任何情况下,我都需要流中的前一个元素,而不是当前元素。为此,我在流代码之外创建了一个队列,并将当前元素加入队列。当我的队列有两个元素时,我将其出列并使用第一个元素。 我面临的问题是,我不能加入队列,因为它是在我的流代码之外声明的。我想这是因为流使用多个JVM,我的队列将在一个JVM中声明。 下面是一个示例代码: 在这里,没有任何东西进入队列,队列
我有一个窗口化的每小时聚合的数据流。 Datastreamds=.....
不幸的是,Kafka Flink连接器只支持-csv、json和avro格式。因此,我不得不使用较低级别的API(数据流)。 问题:如果我可以从datastream对象中创建一个表,那么我就可以接受在该表上运行的查询。它将使转换部分无缝和通用。是否可以在数据流对象上运行SQL查询?