当前位置: 首页 > 知识库问答 >
问题:

SpringKafka1.2.2优雅关机

施琦
2023-03-14

我用的是spring kafka 1.2.2。释放目前,我已经为没有BackOffPolicy和AlwaysRetryPolicy的容器配置了重试模板。确认模式为手动或立即。

当一个SIGTERM出现时,我会让当前消息被处理,当@KafkaListener再次被调用时,我会在容器上抛出RuntimeException,该容器会无限期重试并持续抛出异常。一段时间后,SIGKILL被发出,容器被停止(我认为有更好的方法)。但在这个过程中,消费者会在重启后检索重试的消息,但不会调用KafkaListener进行提交。请参见堆栈跟踪下方的偏移=13

堆栈跟踪:

[20 May 2018 22:37:20] [ INFO] [] [ConsumerCoordinator  onJoinComplete]:[262 ] - Setting newly assigned partitions [messages-0] for group listener
[20 May 2018 22:37:20] [DEBUG] [] [KafkaMessageListenerContainer$ListenerConsumer$2 onPartitionsAssigned]:[513 ] - Committing on assignment: {messages-0=OffsetAndMetadata{offset=13, metadata=''}}
[20 May 2018 22:37:20] [ INFO] [] [AbstractMessageListenerContainer$2 onPartitionsAssigned]:[278 ] - partitions assigned:[messages-0]
[20 May 2018 22:37:20] [DEBUG] [] [KafkaMessageListenerContainer$ListenerConsumer        run]:[632 ] - Received: 0 records
[20 May 2018 22:37:20] [DEBUG] [] [KafkaMessageListenerContainer$ListenerConsumer        run]:[632 ] - Received: 1 records
[20 May 2018 22:37:20] [TRACE] [] [KafkaMessageListenerContainer$ListenerConsumer doInvokeWithRecords]:[931 ] - Processing ConsumerRecord(topic = messages, partition = 0, offset = 13, CreateTime = 1526855737241, serialized key size = 31, serialized value size = 2032, headers = RecordHeaders(headers = [], isReadOnly = false), key = "some key", value = "some random data")
[20 May 2018 22:37:20] [DEBUG] [] [KafkaMessageListenerContainer$ListenerConsumer ackImmediate]:[749 ] - Committing: {messages-0=OffsetAndMetadata{offset=14, metadata=''}

当我看到SIGTERM被发出时,有没有更好的方法来阻止容器,这样@KafkaListener就不会被消息调用了。我知道后来的版本(

任何帮助都非常感谢。

共有1个答案

潘英豪
2023-03-14

endpoint注册表从1.0开始就存在了。建议1. x用户升级到最新的1.3. x;请参阅此处。

当从侦听器停止容器时,最好在新线程上停止,否则停止将被延迟。

参见2.1。x Containers ToppingErrorHandler了解如何做到这一点。但是,当然,停止后不需要抛出异常

但是有1个。x、 您将需要丢弃已获取的任何后续消息。

 类似资料:
  • 优雅关闭,包括两部分,一个是 RPC 框架作为客户端,一个是 RPC 框架作为服务端。 作为服务端 作为服务端的时候,RPC 框架在关闭时,不应该直接暴力关闭。在 RPC 框架中 com.alipay.sofa.rpc.context.RpcRuntimeContext 在静态初始化块中,添加了一个 ShutdownHook // 增加jvm关闭事件 if (RpcConf

  • Dorado的优雅关闭通过ShutDownHook方式实现,调用端和服务端通过添加hook进行资源的清理和关闭 protected synchronized void addShutDownHook() { if (hook == null) { hook = new ShutDownHook(this); Runtime.getRuntime().addS

  • 作为自动缩放presto集群工作的一部分,我们喜欢在EC2终止presto worker之前优雅地关闭它。以下命令后 curl-v-xput--data'“shutting_down”'-h“content-type:application/json”http://250.0.46.167:8081/v1/info/state worker日志立即指示“com.facebook.presto.se

  • 我正在开发一个由嵌入式Tomcat支持的Spring Boot应用程序,我需要通过以下步骤开发一个优雅的关机: 停止处理新的HTTP请求(停止web容器) 处理所有已接受的请求 关闭Spring ApplicationContext 附注。Spring Boot 1.5.20版本,Java 8

  • 在spring boot中有没有一种方法可以控制App的优雅关机。 我知道在bean中可以使用@predestroy方法,但如何控制调用这些@predestroy方法的顺序。 您可以有多个相互依赖的bean,上下文的关闭是否已经寻找这种依赖关系并以正确的顺序调用@predestroy方法? 2.)阻止rabbit消息侦听器接受新消息 3.)等待关机前已经开始但尚未完成的所有处理。

  • 如果我们在后台启动KafkaStream应用程序(比如Linux),有没有一种方法可以从外部向应用程序发出信号,从而启动优雅的关机?