有没有办法用 Flink Streaming 计算时间窗口流中唯一单词的数量?我看到这个问题,但我不知道如何实现时间窗口。
我有一个流是消费的Flink Kafka消费者将加入另一个流为定义的窗口大小,如Time.milliseconds(10000)。 如何在运行时将窗口大小更改为Time.milliseconds(20000)?
我们正在努力计算 1 分钟翻滚时间窗口内不同类型的事件的最大并发计数。 这些事件就像传感器数据,这些数据是从我们的桌面代理每分钟收集的,然而,一些代理得到了一个错误的时间戳,比如说,它甚至比现在晚了几个小时。 所以,我的问题是如何处理/删除这些事件,目前我只是应用过滤器(s = 我的第一个问题是,如果我不这样做,我怀疑这个坏的“未来”事件会触发窗口计算,即使是那些不完整的数据窗口 第二个问题是,我
我正试图编写一个小的Flink数据流,以更好地了解它的工作原理,我面临着一个奇怪的情况:每次运行它时,我都会得到不一致的输出。有时我期待的一些记录丢失了。请记住,这只是我为学习DataStream API的概念而构建的一个玩具示例。 我有一个大约7600行的CSV格式数据集,如下所示: 完整的数据集在这里:https://pastebin.com/rknnRnPc 没有特殊字符或引号,因此简单的字
我有一个数据流,其中事件的顺序很重要。时间特性设置为EventTime,因为传入记录中有时间戳。 为了保证排序,我将程序的并行度设置为1。当我的程序变得更复杂时,这会成为性能方面的问题吗? 如果我理解正确的话,我需要给我的事件分配水印,如果我想让流按时间戳排序的话。这很简单。但我读到即使这样也不能保证秩序?稍后,我想对那个流进行有状态计算。因此,为此我使用了一个FlatMap函数,它需要对流进行键
我有一个
我正在使用Flink Kafka在流上应用规则。以下是示例代码: 但问题是,当我们启动程序时,FlinkKafka只读取文件一次,我希望新规则在运行时动态添加并应用于流。 在《Flink·Kafka》中,我们有没有办法实现这一点?
这是我的Flink工作流程: 首先我遇到了 java.lang.Exception: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Can not forward element to next operator 在< code>flatMap运算符中,任务继续重新启动。我观察到许多重复的
有没有办法以可共享的格式(比如Json)指定Apache Flink CEP模式? 我的用例是:开发一个规则引擎,允许用户自定义CEP模式,而无需编写大量Java代码,并轻松地与其他人共享模式。
我正在尝试打印出一个字符串,如果Hello和世界是使用Flink CEP库找到的。我的源是 Kafka,并使用控制台生产者来输入数据。这部分正在起作用。我可以打印出我在主题中输入的内容。但是,它不会打印出我的最后一条消息“世界真好!它甚至不会打印出它进入了lambda。下面是类 任何帮助都将不胜感激。 谢谢!
主程序正在消费kafka事件,然后过滤- 但是我得到了以下例外: 以下是flink-conf.yaml中的一些配置 任何想法为什么会发生异常以及如何解决问题? 谢谢
我在Flink SQL中使用了CEP模式,它按照预期连接到Kafka broker。但是当我连接到基于集群的云kafka设置时,Flink CEP没有触发。以下是我的sql: 然后我以json格式发送消息,如 在 flink Web ui 中,水印工作精细 flink Web ui 我运行我的cep sql: 每个Kafka消息,connect_ 这是另一个仍然不起作用的cep sql。并且age
我想加入来自 kafka producer 的两个流,但该连接不起作用。我使用 AssignerWithPeriodicWatermark 来定义我的分配器,我尝试使用 3 分钟的窗口连接两个流。但我没有得到任何输出。我打印了这两个流,以确保它们的事件在时间上足够接近。
我有一个简单的测试,可以在事件发生时连接两个流: 几秒钟内就失败了: 问题出在哪里?错误消息不直观,我找不到导致问题的原因。 我看到的例外是由https://issues.apache.org/jira/browse/FLINK-18637等人报告的。但那些都是用数据流API。我把这个登记为https://issues.apache.org/jira/browse/FLINK-24926 已更新:
我正在运行一个具有多个投影的SQL,每个投影都很耗时,例如: UDF1 和 UDF2 可能是耗时的功能,但看起来 Flink SQL 按顺序运行 UDF1 和 UDF2,我的问题是 UDF1 和 UDF2 是否可以并行运行以减少延迟?