当前位置: 首页 > 知识库问答 >
问题:

Flink Kafka:在没有收到消息的时间间隔后,优雅地关闭Flink消费来自Kafka源的消息

蒋茂材
2023-03-14

我已将flinkkafkaconsumer作为源添加到我的streamexecutionenvironment中。我想在特定时间内没有收到新消息时关闭/阻止flink使用数据(类似于kafka polltime)。目前它正在无限期运行,并阻止执行移动到下一步(验证消息)。请建议是否有任何解决方法

注意:我从反序列化中尝试了endofstream,但它无法工作,因为流实际上是不确定的。

提前谢谢。

共有1个答案

陈修诚
2023-03-14

如果这是为了测试,那么一种方法是创建自己的定制源代码,将FlinkKafkaConsumer包装起来。源代码的run()方法将从线程调用Kafka源代码的run()方法,传入一个封装真实收集器的收集器,并在收集任何内容时更新“上次收集的时间”。在源代码的run()方法中,您将对此进行轮询,并在时间过长时调用Kakfa源代码的cancel()方法,然后退出。

综上所述,通常在单元测试中,您希望使用模拟源代码来精确控制生成的内容、时间,而不是旋转Kafka系统。

 类似资料:
  • 我在mac上运行Kafka和Flink作为docker容器。 我已经实现了Flink作业,它应该消耗来自Kafka主题的消息。我运行一个向主题发送消息的python生产者。 工作开始时没有问题,但没有收到任何消息。我相信这些消息被发送到了正确的主题,因为我有一个能够使用消息的python消费者。 flink作业(java): Flink作业日志: 生产者作业(python):(在主机上运行-不是d

  • 我必须记录消费者在SpringKafka中花费的时间。由于kafkaListener方法对每条消息都执行,因此在那里放置一个记录器是行不通的。此外,有时一些信息会丢失,而不是被消费者消费掉。我应该把记录器放在哪里,以找出消费者启动后的弹性时间。使用者不会退出或关闭,其轮询将无限期进行

  • 我正在尝试把阿帕奇Storm和Kafka整合在一起。连接似乎建立良好,但没有收到任何消息。但是这些消息似乎也被发送到了Kafka服务器,而Kafka服务器中相应主题的索引文件显示存在一些数据。有没有一种方法可以在Storm End上调试这个更多的..?我正在使用Storm的客户解码器来处理信息。Storm的实现是:

  • 我有一个Kafka主题,目前有3个分区。我希望我的消费者从同一个分区读取,但每条消息都应该以循环方式发送给不同的消费者。有可能实现吗?

  • 在我的Spring启动应用程序中,我有kafka消费者类,每当主题中有可用消息时,它会频繁读取消息。我想限制消费者每隔2小时消费一次消息。就像阅读完一条消息后,消费者将暂停2小时,然后再消费另一条消息。这是我的消费者配置方法:- 然后我创建了这个容器方法,在其中我设置了kafka配置的其余部分 使用此代码分区每2小时重新平衡一次,但它根本没有读取消息。我的kafka消费者方法:-

  • Flink kafka消费者有两种类型的消费者,例如: 这两个消费者层次结构扩展了相同的类。我想知道维护编号类背后的设计决策是什么?我们什么时候应该使用其中一种? 我注意到带有数字后缀的类有更多的特性(例如ratelimiting)。 https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka