问题内容: 我为Apache Flink写了一个非常简单的Java程序,现在我对测量统计信息感兴趣,例如吞吐量(每秒处理的元组数)和等待时间(程序需要处理每个输入元组的时间)。 我知道Flink公开了一些指标: https://ci.apache.org/projects/flink/flink-docs- release-1.2/monitoring/metrics.html 但是我不确定如何使
问题内容: 我有两个原始流,我正在加入这些流,然后我要计算已加入的事件总数是多少,尚未加入的事件有多少。我通过使用如下所示的地图来做到这一点 问题1: 这是计算流中事件数量的适当方法吗? 问题2: 我注意到一种有线行为,有些人可能不相信。问题是,当我在IntelliJ IDE中运行Flink程序时,它显示了的正确值,但是当我将该程序提交为时。因此,我获得了将程序作为文件运行而不是实际计数时的初始值
问题内容: 我试图使用读取文件: 我收到以下错误: 我正在使用flink版本1.3.2,java版本“ 1.8.0_91” 问题答案: 错误“ java.lang.NoSuchMethodError”的可能原因之一是当我们使用的flink版本与系统上安装的版本不同时。对我来说,我有Flink 1.3.2,我使用的版本是1.1.1。因此,我将pom文件更新为相同的版本。
问题内容: 我试图在Flink中的KeyedStream上执行映射操作: JsonToObjectMapper运算符的输出是 MessageObject 类的POJO,它具有String字段“ keyfield ”。然后,将流键入此字段。 MessageProcessorStateful是一个RichMapFunction,如下所示: 该代码引发NullPointer异常: 尽管我已经验证了’ke
问题内容: 我正在使用Apache Flink v1.6.0,并且尝试写到托管在Elastic Cloud中的 Elasticsearch v6.4.0 。向弹性云集群进行身份验证时出现问题。 我已经能够让Flink写入本地Elasticsearch v6.4.0节点,该节点没有使用以下代码进行加密: 然而,当我尝试添加验证到代码库中,作为记录在这里的弗林克文档和这里对应Elasticsearch
问题内容: 通过阅读文档,我了解到使用Apache Flink 1.3,我应该能够使用Elasticsearch5.x。 但是,在我的pom.xml中: 我懂了 : 找不到依赖项“ org.apache.flink:flink-connector-elasticsearch5_2.10:1.3.0” 知道为什么没有这种依赖性吗? 问题答案: 这是1.3.0版本中的错误,并已在1.3.1版中修复(即
有1个高通量Kafka流定义如下 上述窗口操作符的水印正确转发。 上述窗口操作符中的需要使用一些保存在某些S3文件中的信息来丰富。S3文件很少更新。 S3文件作为流读取,然后广播以丰富中的元素。 然后连接这两个流,用类型的元素来丰富类型的所有元素。 有2个输入。其中之一是不断转发水印,但广播流没有任何时间信息或水印。这导致EnrichedAProcess的水印根本无法转发,因为它的一个输入没有传入
我有一个Flink应用程序从AWS驱动流读取数据。生产者应用程序以每秒600条记录的速率写入Kinesis流。 我想知道在实时生产环境中停止flink应用程序的最佳实践是什么,而生产商仍在向流写入数据,而不会丢失输入流数据。 我认为需要停止应用程序以进行计划维护、应用程序部署更改等。
继续:Flink:处理数据早于应用程序水印的密钥流 基于这个建议,我一直在尝试在使用Datastream API的同一个Flink应用程序中添加对批处理的支持。 逻辑是这样的: 基于公共文档,我的理解是,我只需要将源更改为有界的。然而,上述处理在窗口化步骤后的事件触发器处继续失败,但有以下例外: 输入文件包含多个键的历史事件。给定键的数据已排序,但整体数据未排序。我还在每个键的末尾添加了一个事件,
是否可以清除数据流中的当前水印? 一个月长的水印不允许延迟的示例输入: 通常,“2018年9月”的记录会因为时间太晚而被扔掉。当看到消息时,是否有方法以编程方式重置水印状态?
在广播模式的文档中,提到没有RocksDB状态后端: 如果应用程序使用rocksdb作为状态后端,这将如何影响保存点行为?这是否意味着状态在保存点期间不存储,因此不会恢复?
我在工作中使用ProcessWindowFunction并保持StateValue。我的目标是将值保持在超过1个窗口的状态,这意味着状态不会在每个窗口的末尾被清除。我有两个问题: 我怎样才能清除状态?有没有设置触发器并用它来清除状态的选项?(当在ProcessFunction中使用状态时,我能够设置触发器以执行此清除,即使没有新事件) 有没有一种方法来构建一个单元测试来检查我的ProcessWin
我试图在Flink(版本1.4.2)上使用可查询状态,但不幸的是,我一直收到以下错误: 在客户端,我使用flink-queryable-state-client-java_2_11.jar可查询客户端的相关代码部分是 最后,在Flink上运行的作业配置了一个ListState,如下所示。请注意,数据在ListState上由String键控 在我看来,这似乎是一个序列化错误,但我不知道我需要做什么来
我试图为HA设置配置Flink 1.2.0,在那里我必须设置一个名为状态后端的参数。我之前已经将此参数设置为rocksdb,但随后阅读了留档,其中说HA只有文件系统可用。这是真的吗?(HA设置只支持文件系统状态后端,没有rocksdb可用?)或者这是指一个不同的(特定于动物园管理员的)状态后端? 谢谢!
我想使用大小为2的FIFO队列来存储数据流的元素。在任何情况下,我都需要流中的前一个元素,而不是当前元素。为此,我在流代码之外创建了一个队列,并将当前元素加入队列。当我的队列有两个元素时,我将其出列并使用第一个元素。 我面临的问题是,我不能加入队列,因为它是在我的流代码之外声明的。我想这是因为流使用多个JVM,我的队列将在一个JVM中声明。 下面是一个示例代码: 在这里,没有任何东西进入队列,队列