我想将一个交易流聚合成相同交易量的窗口,这是区间内所有交易的交易规模之和。 我能够编写一个自定义触发器,将数据分区到Windows中。代码如下: 上面的代码可以将其划分为大致相同大小的窗口: 现在我喜欢对数据进行分区,以便卷与触发器值完全匹配。为此,我需要稍微修改一下数据,方法是将区间结束时的交易分成两部分,一部分属于正在触发的实际窗口,剩余的超过触发器值的数量必须分配给下一个窗口。 那可以用一些
的结果是一个元素流-因此,我希望从这个流中获得一个“具有最高计数的key”的更新流。 然后我通过一个常量(-因为这是一个全局操作)进行键控,并使用-这几乎可以实现:我得到一个最高计数流,但当前的最高计数是针对每个元素发出的。 我想我要找的是某种带有前一个值的过滤器,它只会在新值与前一个值不同时才会发出元素。 目前在Flink有可能吗?
例如,我有一个很大的字流,想要数每一个字。问题是这些话是歪斜的。这意味着某些词的使用频率会很高,而其他大多数词的使用频率却很低。在storm中,我们可以使用以下方法来解决这个问题。首先对流进行洗牌分组,在每个节点的一个窗口时间内计数本地字,最后更新计数到累积结果。从我的另一个问题中,我知道Flink只支持键控流上的window,否则window操作不会并行。 我的问题是在Flink中有没有一个好的
我的用例 null 问题 然而,如果我没有理解错的话,这将意味着由于滑动窗口的性质,单个事件将产生7*24*6=1008个记录。所以我的问题是,我如何才能减少纯粹的数额?
我们有一个多租户应用程序,我们在其中为每个租户维护消息队列。我们实现了一个Flink作业来处理消息队列中的流数据。基本上,每个消息队列都是Flink作业中的一个源。这是推荐的方法吗?还是可以根据租户的数量多次运行相同的作业(使用一个源)?我们预计每个租户将产生不同数量的数据。在多作业方法中会有任何可伸缩性优势吗? 谢谢你
它还提供了另一种称为边输出的方法(https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/stream/side_output.html),它允许您做同样的事情! 这两种方式有什么区别?他们使用相同的低层结构吗?它们的价格一样吗?我们应该在什么时候以及如何选择其中的一个?
水印和后期事件的处理很容易理解,但是早期事件如何呢?例如,如果原始流包含发生在3:00到4:00的事件,但如果我将发生在6:00到7:00的事件插入到流中,那么flink如何处理这些事件?它会为它们创建单独的窗口,当窗口过期时,它们也会被处理?
我们计划将Apache Flink与一个巨大的IOT设置一起使用。客户将向我们发送某种结构化的传感器数据(如sensor_id、sensor_type、sensor_value、timestamp)。我们没有控制每个客户何时发送这些数据,最有可能是实时的,但我们没有保证。我们将所有事件存储在RabbitMQ/Kafka中。更新:我们可以假设每个传感器的事件是按顺序来的。 在开始实施可能的流式管道之
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/state/state.html#state-time-to-live-ttl 我想确定TTL特性启用的位置,是在key字段上还是在value字段上。特别是,假设我有一个像这样的mapState结构: 如果我要通过StateTtlConfig在mapStat
源在内存中创建任意数量的事件,每秒吞吐量为1个事件。每个事件都有用于分区流的唯一id(使用keyBy运算符),并通过映射函数向托管状态(使用ValueState)添加约100KB。然后将事件简单地传递给不执行任何操作的接收器。 使用上面描述的设置,我们发送了1200个事件,检查点间隔和最小暂停设置为5秒。当事件以恒定的速度和相等的状态量出现时,我们期望检查点的大小或多或少是恒定的。然而,我们观察到
我们正在尝试使用RocksDB后端设置Flink有状态作业。我们使用会话窗口,有30分钟的间隔。我们使用aggregateFunction,所以不使用任何Flink状态变量。通过采样,我们的事件数不到20k次/秒,新会话数不到20-30次/秒。我们的会议基本上收集了所有的事件。会话累加器的大小会随着时间而增大。我们总共使用了10G内存和Flink1.9,128个容器。以下是设置: 从我们对给定时间
首先也是最重要的: null 现在,这项工作一直很好,直到上周,我们有一个激增(10倍以上)的流量。从那时候起,Flink变成了香蕉。检查点大小开始从500MB缓慢增长到20GB,检查点时间大约需要1分钟,并且随着时间的推移而增长。应用程序开始失败,并且永远无法完全恢复,事件迭代器的年龄增长也永远不会下降,因此没有新的事件被消耗。 因为我是一个新的闪现,我不确定我做滑动计数的方式是不是完全没有优化
任何解决这一问题的建议都将不胜感激。或者我们可以用另一种方式来计数?我想补充一些细节。滑动大小是一个事件和窗口大小超过10小时(每秒大约有300个事件),我们需要对每个事件做出反应。所以在这种情况下,我们没有使用Flink提供的窗口。我们使用来存储前面的信息。在中用于触发旧数据的清理作业。最后dinstinct键的数量非常多。
我有一个FlinkV1.2的设置,3个JobManager,2个TaskManager。我想将hdfs用于后端状态和检查点以及zookeeper storageDir 在JobManager日志中 Hadoop作为单个节点集群安装在我在Settings中设置的VM上。为什么Flink要求配置额外的参数?(顺便说一句,官方文件中没有这些内容)
我有一个FlinkV1.2的设置,3个JobManager,2个TaskManager。对于后端状态和检查点以及zookeeper storageDir,我想使用S3桶而不是hdfs 我没有安装hadoop。不确定是否需要这样做,以及是否需要这样做,应该如何/在哪里安装/配置它? 编辑:在使用以下hadoop xml(core-site.xml)配置Flink之后,我并不真正理解IAM部分,而且我