当前位置: 首页 > 知识库问答 >
问题:

在运行时优雅地关闭Flink Kafka Comsumer

何章横
2023-03-14

我正在将FlinkKafkaConsumer010与Flink 1.2.0一起使用,我面临的问题是:如果出现某种情况,是否有办法通过编程关闭整个管道?

一种可能的解决方案是,我可以通过调用FlinkKafkaConsumer010中定义的close()方法关闭kafka消费源,然后调用带有shutdown的管道。对于这种方法,我创建了一个列表,其中包含对所有FlinkKafkaConsumer010实例的引用,这些实例是我在kafka主题管道的开头创建的。然后在管道执行期间,我有另一个线程调用列表中每个FlinkKafkaConsumer010的close()。我希望这会关闭消费者,但结果是消费者仍在运行。

有人能解释一下这一点吗?或者给我一些其他建议,告诉我如何在运行时以编程方式关闭flink管道?

共有1个答案

董飞航
2023-03-14

您尝试响应的场景是否基于输入事件?如果是这样,我建议在管道中的适当位置设置一个MapFunction,并且在满足某些条件时故意抛出异常以使作业失败。

另一种选择是查看KeyedDeserializationSchema中的isEndOfStream方法。基本上,当满足某个事件的条件时,发出流已结束的信号。

另一个需要考虑的选项是让上面提到的MapFunction改为FlatMapFunction,它向外部世界发送信号事件。Flink作业外部的单独进程侦听该事件,收到该事件后,通过Flink CLI关闭Flink作业。

 类似资料:
  • 在我的webapp中,我创建了一个使用具有固定大小线程池的的服务。我在整个应用程序生命周期中重用相同的。 All在Tomcat中运行,在关闭时出现以下错误: 我确实意识到在关闭tomcat之前需要关闭ExecutorService。Soms所以线程已经谈到了这一点,但我找不到一个干净的方法来处理这一点。 我是否应该使用,就像@tim-bender建议的那样,在优雅地关闭线程和执行器?还是应该使用C

  • 优雅关闭,包括两部分,一个是 RPC 框架作为客户端,一个是 RPC 框架作为服务端。 作为服务端 作为服务端的时候,RPC 框架在关闭时,不应该直接暴力关闭。在 RPC 框架中 com.alipay.sofa.rpc.context.RpcRuntimeContext 在静态初始化块中,添加了一个 ShutdownHook // 增加jvm关闭事件 if (RpcConf

  • Dorado的优雅关闭通过ShutDownHook方式实现,调用端和服务端通过添加hook进行资源的清理和关闭 protected synchronized void addShutDownHook() { if (hook == null) { hook = new ShutDownHook(this); Runtime.getRuntime().addS

  • Graceful shutdown When you deploy a new version of your application, you must replace the previous version. The process manager you’re using will first send a SIGTERM signal to the application to noti

  • 我有一个Tornado和装饰器,允许对处理程序进行分块流式处理。 在允许流式传输之前,我需要执行身份验证和其他一些检查。我试图在方法中实现这些,但是当我使用向处理程序传输数据时,检查失败,连接突然关闭,我看到一个异常: 我的方法如下: 如何确保请求正常关闭,以便在检查失败时向客户端返回错误消息?

  • 我有一个应用程序运行在嵌入式jetty服务器上。现在我想将服务器作为服务启动/停止。我使用一个脚本来启动服务器。