我需要延迟处理一些事件。 我有三件事(发表在Kafka上): A(id: 1, retry At: now) B(id: 2, retry At: 10分钟后) C(id: 3, retry At: now) 我需要立即处理记录A和C,而记录B需要在十分钟后处理。这在Apache Flink中实现可行吗? 到目前为止,无论我研究了什么,“触发器”似乎都有助于在Flink中实现它,但还没有能够正确实
我是flink的新手,我部署了我的flink应用程序,它基本上执行简单的模式匹配。它部署在库伯内特斯集群中,具有1个JM和6个TM。我每10分钟向eventhub主题发送大小4.4k和200k消息并执行负载测试。我添加了重启策略和检查点,如下所示,我没有在代码中显式使用任何状态,因为没有要求 最初,我遇到了网络缓冲区的Netty服务器问题,我遵循了以下链接https://ci.apache.org
我有一个Flink v1.2设置,在它自己的VM中有1个JobManager、2个TaskManager。我将状态后端配置为文件系统,并在上述每个主机的情况下将其指向本地位置(state.backend.fs.checkpointdir:file:///home/ubuntu/Prototype/flink/flink-checkpoints)。我已将parallelism设置为1,每个TaskM
我想使用Flink读取输入文件,进行聚合,然后将结果写入输出文件。作业处于批处理模式。请参见字数。py如下: 在运行python wordcount之前。py,我运行echo-e“flink\npyflink\nflink” 虽然我希望有一个包含内容的单个文件/tmp/输出: 实际上,我通过调整下面生成单个文件/tmp/输出的参数,得到了上面的python程序。 运行此版本将生成一个 /tmp/o
我在Flink中构建了一个工作流,它由一个自定义源、一系列地图/平面地图和一个接收器组成。 我的自定义源的run()方法遍历存储在文件夹中的文件,并通过上下文的collect()方法收集每个文件的名称和内容(我有一个自定义对象,它将此信息存储在两个字段中)。 然后,我有一系列地图/平面图来转换这些对象,然后使用自定义接收器将其打印到文件中。在Flink的Web UI中生成的执行图如下所示: 我有一
我正在使用Flink来处理我的流数据。 流媒体来自其他一些中间件,如Kafka、Pravega等。 说Pravega正在发送一些文字流,<代码>你好,世界,我的名字是 。 我需要的是三个过程步骤: 将每个单词映射到我的自定义类对象。 将对象映射到String。 将字符串写入文件:一个字符串写入一个文件。 例如,对于流,我应该得到五个文件。 这是我的代码: 此代码将所有结果输出到Flink日志文件。
我正在使用Flink处理来自某些数据源(如Kafka、Pravega等)的数据。 在我的例子中,数据源是Pravega,它为我提供了一个flink连接器。 我的数据源正在向我发送一些JSON数据,如下所示: 以下是我的代码: 如您所见,我使用FlinkPravegaReader和适当的反序列化程序来获取来自Pravega的JSON流。 然后我尝试将JSON数据转换为String,它们并对它们进行计
我将Rocksdb设置为状态后端的位置存在太多空chk-*文件 我正在使用FlinkKafkaConsumer从Kafka主题获取数据。我使用RocksDb作为状态后端。我正在打印从Kafka那里收到的信息。以下是我必须设置状态后端的属性: 在flink-conf.yaml我还设置了这个属性: 我使用的是简单的单节点flink集群(使用./start cluster.sh)。我启动了该作业并使其运
处理事件时,如果jar应用程序向任务管理器抛出异常,会发生什么情况? a) Flink作业管理器将杀死现有的任务管理器并创建新的任务管理器? b) 任务管理器本身使用RocksDB中保存的本地状态从失败的执行和重启过程中恢复? 我有一个疑问,如果相同类型的错误事件被每个可用的任务管理器处理,因此它们都被杀死,整个flink工作被关闭。 我注意到,如果出现一些应用程序错误,那么最终整个工作都会失败。
我的Flink作业从kafka主题读取并将数据存储在RocksDB状态后端,以利用可查询状态。我能够在本地机器中运行作业并查询状态。但是在集群上部署时,我收到以下错误: 我已经尝试在集群级别和作业级别设置rocksDB状态后端。当它设置为作业级别时,我已将其作为阴影依赖项提供。我也尝试在主机集群机器上编译代码。我在所有情况下都会得到相同的错误。 如何解决此错误?
我们使用flink从一些物联网传感器生成事件。每个传感器都可用于生成不同类型的事件(如温度、湿度等)。一比多比率(传感器- 为了丰富传感器数据,我们将连接传感器数据流和表API。只需添加带有已启用事件列表的元数据。 那么,如果某些特定的仅启用了和事件,如何将传感器数据仅发送到这两个定义的过程函数? 我想到了如下情况: > 在我的案例中,执行数据丰富过程的最佳方式是什么?将传感器数据流与表流连接(通
假设我有两种不同类型的数据流,一种提供天气数据,另一种提供车辆数据,我想使用Flink对数据进行复杂的事件处理。 Flink 1.3中的哪种方法。x是正确的使用方法吗?我看到了不同的方法,如联合、连接、窗口连接。基本上,我只想尝试这样一个简单的CEP: 谢谢
我有一个管道,我在其中对事件流应用转换规则(从广播状态);当我运行广播时 我已附上两种情况的快照: 顶部行显示来自Kafka的流消耗事件,底部行显示消耗的规则
我知道Apache Flink中有三种状态后端:MemoryStateBend、FSStateBend和RockSDBStateBend。 MemoryStateBindend将检查点存储到本地RAM中,FSStateBindend将检查点存储到本地文件系统中,RockSDBStateBindend将检查点存储到RocksDB中。我对RocksDBStateBend有一些问题。 据我所知,Rock
我们试图构建一个用例,其中来自流的数据通过计算公式运行,但公式本身也应该(很少)是可更新的。从阅读文档来看,在我看来,Flink broadcast state自然适合这种情况。 作为一个实验,我构建了一个简化的版本:假设我有一个整数流,第二个流包含这些整数的乘法因子(我可以随意发送值)。第二个流的频率很低,很容易在事件之间的几天或几周内出现。目前,这两个都实现为简单的套接字服务器,最终产品将使用