我正试图为我的Kafka消费者编写一个集成测试。我已经看完了官方的参考文件,但当我开始测试时,我只看到这个无限重复:
-2019-04-03 15:47:34.002WARN 13120 --- [ main]org.apache.kafka.clients.NetworkClient:[消费者clientId=消费者-1, group pId=my-group]无法建立到节点-1的连接。经纪人可能不可用。
我做错了什么?
我正在使用JUnit5,Spring Boot和sping-kafka
和sping-kafka-test
。
我有@EnableKafka
注释在我的@Configuration
类上。
这就是我的测试类的样子:
@ExtendWith(SpringExtension::class)
@SpringBootTest(classes = [TestKafkaConfig::class])
@DirtiesContext
@EmbeddedKafka(
partitions = 1,
topics = [KafkaIntegrationTest.MY_TOPIC])
class KafkaIntegrationTest {
@Autowired
private lateinit var embeddedKafka: EmbeddedKafkaBroker
@Test
fun test() {
val senderProps = KafkaTestUtils.senderProps(embeddedKafka.brokersAsString)
val template = KafkaTemplate(DefaultKafkaProducerFactory<Int, String>(senderProps))
template.defaultTopic = KafkaIntegrationTest.MY_TOPIC
template.sendDefault("foo")
}
}
我的application.yml
看起来像这样:
kafka:
consumer:
group-id: my-group
bootstrap-servers: ${BOOTSTRAP_SERVERS:localhost:9092}
value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
properties:
schema.registry.url: ${SCHEMA_REGISTRY_URL:http://localhost:8081}
specific.avro.reader: true
我还尝试设置了一个MockSchemaRegistryClient
,但我得到了完全相同的重复消息。(这就是我试图设置MockSchemaRegistryClient
)的方式:
@TestConfiguration
@Import(TestConfig::class)
class TestKafkaConfig {
@Autowired
private lateinit var props: KafkaProperties
@Bean
fun schemaRegistryClient() = MockSchemaRegistryClient()
@Bean
fun kafkaAvroSerializer() = KafkaAvroSerializer(schemaRegistryClient())
@Bean
fun kafkaAvroDeserializer() = KafkaAvroDeserializer(schemaRegistryClient(), props.buildConsumerProperties())
@Bean
fun producerFactory(): ProducerFactory<*, *> = DefaultKafkaProducerFactory(
props.buildProducerProperties(),
StringSerializer(),
kafkaAvroSerializer())
@Bean
fun consumerFactory(): ConsumerFactory<*, *> = DefaultKafkaConsumerFactory(
props.buildConsumerProperties(),
StringDeserializer(),
kafkaAvroDeserializer()
)
@Bean
fun kafkaListenerContainerFactory() = ConcurrentKafkaListenerContainerFactory<Any, Any>().apply {
setConsumerFactory(consumerFactory() as ConsumerFactory<in Any, in Any>?)
}
}
我做错了什么?请注意,我正在使用融合模式注册表,并尝试从Avro反序列化。
我想测试的是我的消费者是否工作,如下所示:
open class SomeConsumer(private val someUseCase) {
@KafkaListener(topics = ["\${kafka.some-topic}"])
open fun processMessage(record: ConsumerRecord<String, SomeObject>) {
someUseCase.call(record)
}
}
我相信你错过了设置经纪人网址为您的测试。
文档中有一个关于如何获取该值的注释:
当embedded Kafka和embedded Zookeeper服务器由embedded Kafka Broker启动时,会出现一个名为spring的系统属性。嵌入的Kafka。brokers设置为Kafka brokers的地址和名为spring的系统属性。嵌入的动物园管理员。connect设置为Zookeeper的地址。为该属性提供了方便的常量(EmbeddedKafkaBroker.SPRING_EMBEDDED_KAFKA_BROKERS和EmbeddedKafkaBroker.SPRING_EMBEDDED_ZOOKEEPER_CONNECT)。
(它位于junit部分的底部)
解决这个问题的一种方法是设置kafka。消费者。引导服务器
在测试中设置为该值,例如。
spring:
kafka:
consumer:
bootstrap-servers: ${spring.embedded.kafka.brokers}
我有时会在pom中看到以下声明。xml。。。 如您所见,sping-boo-starter-web被声明为tomcat-embed-jasper。 是不是sping-boo-starter-web已经有一个嵌入式tomcat了?为什么一些开发人员仍然声明tomcat-embed-jasper以及boot-starter-web?还是有什么原因?
我们正在用我们的Servicetest和嵌入式Kafka观察一个奇怪的行为。 该测试是一个Spock测试,我们使用JUnit规则KafkaEmbedded并传播brokersAsString如下: 现在让我们困惑的是,如果我们等待两个分区连接,等待就会超时。只有当我们等待一个分区连接时,过一段时间一切都会成功运行。 我们是否理解错了代码,在嵌入式Kafka中每个主题有两个分区?只给我们的听众分配一
我用的是Springboot和junit,我想用Powermock来模拟静态类,添加了Powermock后,单元测试通过IntelliJ IDEA运行得很好,但是当我在terminal下运行时,它会抛出ApplicationContextException:无法启动web服务器\n无法启动嵌入式Tomcat 我的基本测试类: 测试类: 似乎无法启动springboot嵌入的tomcat,但要使用P
我在我的SpringBoot应用程序中使用下面提到的属性,在文件中让LDAP代码在我的本地机器上运行。 我想同时拥有我的嵌入式配置
在学习教程的同时,我正在使用SPRING初始化器https://start.spring.io/使用springboot 2.0.2生成带有reactiveMongoDB的项目。 gradle文件列出:compile('org.springframework.boot:spring-boot-starter-data-mongodb-reactive') 我能够将该项目导入eclipse,主类使用
我正在使用Edgware版本中的Spring Cloud Stream binder发送Kafka消息。我也在使用Spring Sleuth和Zipkin。 Spring使用自定义类将标头嵌入到Kafka消息中。这会给一些必须处理此自定义解码的消息的非Spring消费者带来问题。 我的问题是:有没有办法为Spring配置消息头的自定义编码器/解码器(例如普通JSON)?或者可能使用Kafka标题?