我们正在使用Flink Kinesis消费者将Kinesis流中的数据消费到我们的Flink应用程序中。 KCL库使用DynamoDB表存储上次成功处理的Kinesis流序列号。这样,下次应用程序启动时,它将从停止的位置恢复。 但是,Flink Kinesis消费者似乎在任何持久性存储中都不维护任何这样的序列号。因此,我们需要依赖ShardIteratortype(trim\u horizen、l
问题:Flink应用程序未接收和处理Kinesis连接器在关闭时生成的事件(由于重新启动) 我们有以下Flink环境设置 动力系统有以下初始配置 有趣的是,当我更改运动配置以回复事件时,即。 Flink正在从Kinesis接收所有缓冲记录(这包括在事件Flink应用程序关闭之前、期间和之后生成的事件)并对其进行处理。因此,此行为违反了Flink应用程序的“恰好一次”属性。 有人能指出我遗漏的一些明
我是新的apache flink,并试图了解事件时间和窗口的概念是如何处理的flink。 下面是我的设想: > 我有一个程序,它以线程的形式运行,每秒创建一个包含3个字段的文件,其中第3个字段是时间戳。 虽然每隔5秒我会在创建的新文件中输入一个旧的时间戳(可以说是t-5),但还是有一些调整。 现在,我运行流处理作业,将上面的3个字段读入一个元组。 现在,我定义了以下用于水印和时间戳生成的代码: 然
我有一些能量计,将继续产生计数器值,这是一个累积指标。即不断增加,直到计数器复位。 有一个实时ETL作业,它在事件时间的两个连续值之间进行减法。 例如。 此外,有时事件可能没有按顺序接收。 如何使用Apache Flink流式API实现?最好使用Java中的示例。
这个问题涵盖了如何使用FlinkSQL对乱序流进行排序,但我更愿意使用DataStream API。一种解决方案是使用ProcessFunction来执行此操作,该ProcessFunction使用PriorityQueue来缓冲事件,直到水印指示它们不再乱序,但这在RocksDB状态后端中表现不佳(问题是每次访问PriorityQueue都需要整个PriorityQueue的ser/de)。无论
我想知道是否可以创建类似于以下内容的WindowAssigner: 但我不希望窗口在每个元素的事件时间中保持增长。我希望在接收到的第一个元素(对于该键)处定义窗口的开头,并在1秒后精确结束,无论有多少元素到达该秒。 所以它可能看起来像这样的假设: 谢谢
Flink(批处理/流式处理)中是否有方法同时计算字段的平均值和总和?使用聚合方法,我可以计算groupBy结果中字段的和,但如何同时计算平均值呢?下面的示例代码。
我认为水印可以通过两种方式进行调整: 通过数据源上的SourceContext.emitWatermark()发出它们 通过将水印策略连接到DataSourceStream 如果我在datasource操作符之后连接一个新的水印策略,第一个水印会被最新的水印策略替换吗? 本质上,我处于一种无法控制源事件/数据源的情况,但我需要稍后调整水印
我看到了一些奇怪的行为。我使用Flink 1.12编写了一些Flink处理器,并试图让它们在Amazon EMR上运行。然而,Amazon EMR目前只支持Flink 1.11.2。当我降级时,我莫名其妙地发现水印不再传播。 主题上只有一个分区,并行度设置为1。这里有我遗漏的东西吗?我觉得我有点疯了。 这是Flink 1.12的输出: 这是Flink 1.11的输出: 下面是公开它的集成测试:
我看到关于为每个密钥添加水印支持的讨论很多。但是flink支持每个分区的水印吗? 当前-然后考虑所有水印(非空闲分区)的最小值。因此,窗口中最后挂起的记录也被卡住了。(使用periodicemit增加水印时) 任何关于这方面的信息都非常感谢!
如果Flink应用程序在发生故障或更新后正在启动备份,那么不明确属于KeyedState或OperatorState的类变量是否会持久化? 例如,Flink的留档中描述的BoundedOutOfOrdernessGenerator有一个电流最大时间戳变量。如果更新了Flink应用程序,电流最大时间戳中的值是否会丢失,或者是否会写入在应用程序更新之前创建的保存点? 这样做的真正原因是我想实现一个自定
我计划将我们的一个Spark应用程序迁移到Apache Flink。我试图了解它的容错特性。 我执行了以下代码,我看不到Flink实际上尝试重试任何任务(或子任务)。这可能会导致我丢失数据。我该怎么做才能确保每一次失败都能被Flink所覆盖? 我希望在屏幕上多次看到抛出异常消息。但是,当我使用fixedDelayRestart时,它似乎只是忽略了此消息,并为其他消息继续。
我不知道当您从Apache Kafka中摄取数据时,水印应该如何工作。 我读到Flink通过从消息中获取时间戳来自动处理水印,但他们没有指定从何处开始。从消息负载、从标头还是从CreateTime 以下格式的事件: Topic
在Flink 1.11中,我正在尝试使用debezium格式,以下应该可以工作,对吗?我在试着遵循文档[1] 输出代码段/异常: 我已经验证了Debezium设置,dbserver1中有消息。库存产品主题。我可以使用其他方法阅读Flink中的Kafka主题,但如前所述,我希望使用debezium json格式。 此外,我知道Flink 1.12引入了新的Kafka Upett连接器,但我现在无法使
我是Scala和Apache Flink的初学者,但到目前为止,一切都很顺利。我正在尝试使用Flink应用程序中序列化到AVRO的Kafka事件。我阅读了文档(https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#the-反序列化模式)和google搜索了很多小时,但我仍然在同一页上。我有一