当前位置: 首页 > 知识库问答 >
问题:

Kafkalistener SpringBoot故障保护

叶经略
2023-03-14

我正在运行spring boot,KafkaListener是我的客户。问题是我们如何从失败的kafka配置中恢复,并避免应用程序在退出代码为0的过程结束时停止。例如,不正确的配置可能是不正确的endpointurl。如果无法访问Kafka服务器,也会出现同样的情况。因此,在任何情况下,KafkaListner进程都不应该杀死服务器。

 @Bean
open fun consumerFactory(): ConsumerFactory<String, String> {
    val deserializer = JsonDeserializer<Thing>()
    deserializer.addTrustedPackages("de.data.Thing")

    val props: MutableMap<String, Any> = HashMap()
    val serverUrl = server.substringBefore(":")
    props[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = server
    props[ConsumerConfig.GROUP_ID_CONFIG] = "group"
    props[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
    props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
    props[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = "SASL_SSL"
    props[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "earliest"
    props[ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG] = false
    props[SaslConfigs.SASL_MECHANISM] = "PLAIN"
    props[SaslConfigs.SASL_JAAS_CONFIG] = "org.apache.kafka.common.security.plain.PlainLoginModule required " +
            "username=\"\$ConnectionString\" " +
            "password=\"Endpoint=sb://$serverUrl/;" +
            "SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=$sharedSecret\";"
    return DefaultKafkaConsumerFactory(props,
            StringDeserializer(), StringDeserializer())

}


@Bean
open fun kafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<String, String>? {
    val factory: ConcurrentKafkaListenerContainerFactory<String, String> = ConcurrentKafkaListenerContainerFactory<String, String>()
    factory.consumerFactory = consumerFactory()
    factory.setMessageConverter(BytesJsonMessageConverter())
    return factory
}

 @KafkaListener(topics = ["topic"],
        groupId = "group",
        containerFactory = "kafkaListenerContainerFactory",
)
fun listenThingsChanged(@Payload thing: Thing,
                        record: ConsumerRecord<String, String>,
                        @Headers headers: MessageHeaders) {

    ....
}

 

ontext.java:895应用程序上下文异常:未能启动bean'org.springframework.context.support.KafkaListenerEndpoint注册表';嵌套的异常是ontext.refreshKafkaException:无法构造kafka消费者在ontext.java:554DefaultLificyclePorg.springframework.boot.web.servlet.context.开始(DefaultLificyclePontext.refresh)在org.springframework.context.support.DefaultLificyclePorg.springframework.boot.200美元(DefaultLificyclePpplication.refresh)在pplication.java:758DefaultLifeycleProcter$LifeycleGorg.springframework.boot.(DefaultLifeycleProcessor.java:360)在org.springframework.context.support.DefaultLifeycleProcessor.startBeans(DefaultLifeycleProcessor.java:158)在org.springframework.context.support.DefaultLifeycleProcessor.on刷新(DefaultLifeycleProcessor.java:122)在org.springframework.context.support.AbstractApplication ationContext.finish刷新(AbstractApplication ationCorg.springframework.context.)在org.springframework.kafka.config.internalAbstractApplication ationCorg.apache.kafka.common.(AbstractApplication ationCorg.springframework.context.support.)在rocessor.doServletWebServerApplication ationCrocessor.java:185(ServletWebServerApplication ationC)在rocessor.accessSpringArocessor.java:53(SpringAorg.springframework.context.support.)在roup.start方法。调用(方法。java:566)在org. springframe. boot. devtools. restart。RestartLauncher. run(RestartLauncher. java:49)由:org. apache. kafka. Common引起。KafkaException:未能在org. apache. kafka. clients.消费者处构造kafka消费者。Kafka消费者。(Kafka消费者。java:825)在org. apache. kafka. clients。消费者。(Kafka消费者。java:631)在org. springframe. kafka. core。DefaultKafka消费者工厂. createRaw消费者(DefaultKafka消费者工厂. java:340)在org. springframe. kafka. core。DefaultKafka消费者工厂. createKafka消费者(DefaultKafka消费者ClientUtils. parseAndValidateAddress(ClientUtils. java: 89)在org. apache. kafka. clients. ClientUtils. parseAndValidateAddress(ClientUtils. java: 48)在org. apache. kafka. clients.消费者。(Kafka消费者。java: 737)...33个常见框架被省略

共有1个答案

唐裕
2023-03-14

如果代理刚刚关闭,应用程序将正常启动(对于2.3.4之前的版本,您必须在容器属性上将missingtopicsfail设置为false(此后默认为false)。

中没有提供可解析的引导URL。。。

这是致命的-无法恢复。

但是,您可以在@KafkaListener或容器工厂上设置autoStartup=false-。

这将阻止Spring在应用程序初始化期间尝试启动容器。

然后,您可以自己在try/catch块中启动容器。

 类似资料:
  • 我在FUSE ESB中公开了一个CXF webservice。在出现异常的情况下,应用程序将抛出SOAP错误,响应在SOAP UI中如下所示。如何避免/删除响应中的classContext标记。

  • 在我的项目中,我创建了以下目录结构 在我的pom.xml我做了以下记录 如果我复制src中的资源- 如何使联调也将资源中的文件复制到目标中?

  • Webpack 的配置比较复杂,很容出现错误,下面是一些通常的故障处理手段。 一般情况下,webpack 如果出问题,会打印一些简单的错误信息,比如模块没有找到。我们还可以通过参数 --display-error-details 来打印错误详情。 $ webpack --display-error-details Hash: a40fbc6d852c51fceadb Version: webpa

  • 初始设置 在创建并启动新的 Serene 应用程序之后,不能显示登录页,而当你打开浏览器控制台,却得到一条错误消息:找不到 Template.LoginPanel: 你可能使用了无效的解决方案名称,如 MyProject.Something (包含点’.’)。 当项目以这种方式命名时,模板系统将不能定位模板。 请不要在解决方案名称中使用点符号(’.’),如果必须使用点符号,可在创建解决方案之后再重

  • 有人能帮我吗?我尝试使用Firepath进行正确的Xpath,但是它给我的代码在我看来是不正确的。示例中的第一行是提供的。 在这里我想检查之间的文本是否大于或等于1 另一个是: 在这里,我想检查div类公共结果是否已生成,1项等于1个公共结果

  • 当使用 Gradle 时, 你肯定会碰到许多问题. 解决遇到的问题 如果你碰到了问题, 首先要确定你使用的是最新版本的 Gradle. 我们会经常发布新版本, 解决一些 bug 并加入新的功能. 所以你遇到的问题可能就在新版本里解决了. 如果你正在使用 Gradle Daemon, 先暂时关闭 daemon (你可以使用 switch —no-daemon 命令). 在第19章我们可以了解到更多关