问题:Flink应用程序未接收和处理Kinesis连接器在关闭时生成的事件(由于重新启动)
我们有以下Flink环境设置
env.enableCheckpointing(1000ms);
env.setStateBackend(new RocksDBStateBackend("file:///<filelocation>", true));
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(pause);
env.getCheckpointConfig().setCheckpointTimeout(timeOut);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(concurrency);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
动力系统有以下初始配置
kinesisConsumerConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION,
"LATEST");
有趣的是,当我更改运动配置以回复事件时,即。
kinesisConsumerConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION,
"TRIM_HORIZON");
Flink正在从Kinesis接收所有缓冲记录(这包括在事件Flink应用程序关闭之前、期间和之后生成的事件)并对其进行处理。因此,此行为违反了Flink应用程序的“恰好一次”属性。
有人能指出我遗漏的一些明显的东西吗?
Flink Kinesis连接器确实将碎片序列号存储在状态中,以便只进行一次处理。
根据您的描述,似乎在您的作业“重启”时,检查点状态不受尊重。
首先要消除显而易见的问题:您的工作是如何从重启后恢复的?您是从保存点恢复,还是从以前的检查点自动重新启动?
给出错误的方法如下: 如何创建一个没有泄漏的连接?为什么Hikari认为我的conn.close()方法没有关闭连接?任何想法都很欣赏。
大家听好了。首先,我的接收器不工作时,应用程序关闭在我的设备,因为自动启动管理器。我觉得自己很傻...当我试图解决它的时候,我学到了非常重要的东西。 首先,Android6.0许可请求广播接收器在Android6.0Marshmallow中不工作 更新:我还注意到,当应用程序运行和一切正常时,服务启动两次,停止两次。 服务启动时,我看到以下消息: > Toast.MakeText(上下文,“”+P
我的连接器类: 连接器。JAVA 这是我的DAO类(简化):UserDAO. java 在这里,我发现了关于Hikari的一些事实的问题: 您必须在HikariCP为您提供的连接实例上调用关闭() 可能是我的不起作用,因为它只是Hikari在方法中提供给我的连接的副本。
我正在使用debezium SQL Server跟踪生产基地上的更改。创建了主题,CDC的工作非常出色,但是当试图使用jdbcSinkConnector将数据转储到另一个Sql Server DB中时,我遇到了以下错误。 在源数据库上,sql数据类型为。Kafka事件为1549461754650000000。架构类型为Int64。架构名io.debezium.time.nanotimestamp。
我知道这个问题在网站上被问了很多,但是,我似乎找不到一个解决办法。当应用程序未运行时,不调用我的BOOT_COMPLETED接收器。 清单: 如果应用程序正在运行,并且我使用 事件被正确接收,但是,如果应用程序被关闭,事件不会被接收,也不会在启动时被接收。 我已经安装了应用程序,然后启动了它几次,以确保它已经注册。我对这件事很迷茫,所以任何建议都将非常感谢。
Flink在这里提供了一个示例:https://www.ververica.com/blog/stream-processing-introduction-event-time-apache-flink这描述了这样一个场景:有人在玩游戏,由于subway而失去连接,然后当他重新联机时,所有数据都恢复了,可以进行排序和处理。 我的理解是,如果有更多的球员,有两种选择: > 所有其他的将被延迟,等待该