当前位置: 首页 > 知识库问答 >
问题:

Spring Boot嵌入式Kafka无法连接

郎鸿雪
2023-03-14

我正试图为我的Kafka消费者编写一个集成测试。我已经看完了官方的参考文件,但当我开始测试时,我只看到这个无限重复:

-2019-04-03 15:47:34.002WARN 13120 --- [ main]org.apache.kafka.clients.NetworkClient:[消费者clientId=消费者-1, group pId=my-group]无法建立到节点-1的连接。经纪人可能不可用。

我做错了什么?

我正在使用JUnit5,Spring Boot和sping-kafkasping-kafka-test

我有@EnableKafka注释在我的@Configuration类上。

这就是我的测试类的样子:

@ExtendWith(SpringExtension::class)
@SpringBootTest(classes = [TestKafkaConfig::class])
@DirtiesContext
@EmbeddedKafka(
        partitions = 1,
        topics = [KafkaIntegrationTest.MY_TOPIC])
class KafkaIntegrationTest {

    @Autowired
    private lateinit var embeddedKafka: EmbeddedKafkaBroker

    @Test
    fun test() {
        val senderProps = KafkaTestUtils.senderProps(embeddedKafka.brokersAsString)
        val template = KafkaTemplate(DefaultKafkaProducerFactory<Int, String>(senderProps))
        template.defaultTopic = KafkaIntegrationTest.MY_TOPIC
        template.sendDefault("foo")
    }
}

我的application.yml看起来像这样:

kafka:
  consumer:
    group-id: my-group
    bootstrap-servers: ${BOOTSTRAP_SERVERS:localhost:9092}
    value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
    key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    properties:
      schema.registry.url: ${SCHEMA_REGISTRY_URL:http://localhost:8081}
      specific.avro.reader: true

我还尝试设置了一个MockSchemaRegistryClient,但我得到了完全相同的重复消息。(这就是我试图设置MockSchemaRegistryClient)的方式:

@TestConfiguration
@Import(TestConfig::class)
class TestKafkaConfig {

    @Autowired
    private lateinit var props: KafkaProperties

    @Bean
    fun schemaRegistryClient() = MockSchemaRegistryClient()

    @Bean
    fun kafkaAvroSerializer() = KafkaAvroSerializer(schemaRegistryClient())

    @Bean
    fun kafkaAvroDeserializer() = KafkaAvroDeserializer(schemaRegistryClient(), props.buildConsumerProperties())

    @Bean
    fun producerFactory(): ProducerFactory<*, *> = DefaultKafkaProducerFactory(
            props.buildProducerProperties(),
            StringSerializer(),
            kafkaAvroSerializer())

    @Bean
    fun consumerFactory(): ConsumerFactory<*, *> = DefaultKafkaConsumerFactory(
            props.buildConsumerProperties(),
            StringDeserializer(),
            kafkaAvroDeserializer()
    )

    @Bean
    fun kafkaListenerContainerFactory() = ConcurrentKafkaListenerContainerFactory<Any, Any>().apply {
        setConsumerFactory(consumerFactory() as ConsumerFactory<in Any, in Any>?)
    }

}

我做错了什么?请注意,我正在使用融合模式注册表,并尝试从Avro反序列化。

我想测试的是我的消费者是否工作,如下所示:

open class SomeConsumer(private val someUseCase) {

    @KafkaListener(topics = ["\${kafka.some-topic}"])
    open fun processMessage(record: ConsumerRecord<String, SomeObject>) {
        someUseCase.call(record)
    }
}

共有1个答案

苏俊友
2023-03-14

我相信你错过了设置经纪人网址为您的测试。

文档中有一个关于如何获取该值的注释:

当embedded Kafka和embedded Zookeeper服务器由embedded Kafka Broker启动时,会出现一个名为spring的系统属性。嵌入的Kafka。brokers设置为Kafka brokers的地址和名为spring的系统属性。嵌入的动物园管理员。connect设置为Zookeeper的地址。为该属性提供了方便的常量(EmbeddedKafkaBroker.SPRING_EMBEDDED_KAFKA_BROKERS和EmbeddedKafkaBroker.SPRING_EMBEDDED_ZOOKEEPER_CONNECT)。

(它位于junit部分的底部)

解决这个问题的一种方法是设置kafka。消费者。引导服务器在测试中设置为该值,例如。

spring:
    kafka:
        consumer:
            bootstrap-servers: ${spring.embedded.kafka.brokers}
 类似资料:
  • 我有时会在pom中看到以下声明。xml。。。 如您所见,sping-boo-starter-web被声明为tomcat-embed-jasper。 是不是sping-boo-starter-web已经有一个嵌入式tomcat了?为什么一些开发人员仍然声明tomcat-embed-jasper以及boot-starter-web?还是有什么原因?

  • 我们正在用我们的Servicetest和嵌入式Kafka观察一个奇怪的行为。 该测试是一个Spock测试,我们使用JUnit规则KafkaEmbedded并传播brokersAsString如下: 现在让我们困惑的是,如果我们等待两个分区连接,等待就会超时。只有当我们等待一个分区连接时,过一段时间一切都会成功运行。 我们是否理解错了代码,在嵌入式Kafka中每个主题有两个分区?只给我们的听众分配一

  • 我用的是Springboot和junit,我想用Powermock来模拟静态类,添加了Powermock后,单元测试通过IntelliJ IDEA运行得很好,但是当我在terminal下运行时,它会抛出ApplicationContextException:无法启动web服务器\n无法启动嵌入式Tomcat 我的基本测试类: 测试类: 似乎无法启动springboot嵌入的tomcat,但要使用P

  • 我在我的SpringBoot应用程序中使用下面提到的属性,在文件中让LDAP代码在我的本地机器上运行。 我想同时拥有我的嵌入式配置

  • 在学习教程的同时,我正在使用SPRING初始化器https://start.spring.io/使用springboot 2.0.2生成带有reactiveMongoDB的项目。 gradle文件列出:compile('org.springframework.boot:spring-boot-starter-data-mongodb-reactive') 我能够将该项目导入eclipse,主类使用

  • 我正在使用Edgware版本中的Spring Cloud Stream binder发送Kafka消息。我也在使用Spring Sleuth和Zipkin。 Spring使用自定义类将标头嵌入到Kafka消息中。这会给一些必须处理此自定义解码的消息的非Spring消费者带来问题。 我的问题是:有没有办法为Spring配置消息头的自定义编码器/解码器(例如普通JSON)?或者可能使用Kafka标题?