当前位置: 首页 > 知识库问答 >
问题:

Spring Kafka测试-未在带有嵌入式Kafka的@KafkaListener中接收数据

胡意致
2023-03-14

我们正在使用Cucumber为out应用程序做一些集成测试,我们在测试一个@KafkaListener时遇到了一些问题。我们设法使用了一个嵌入式Kafka并在其中生成数据。

但消费者从未收到任何数据,我们也不知道发生了什么。

这是我们的代码:

制作人配置

@Configuration
@Profile("test")
public class KafkaTestProducerConfig {

    private static final String SCHEMA_REGISTRY_URL = "schema.registry.url";

    @Autowired
    protected EmbeddedKafkaBroker embeddedKafka;

    @Bean
    public Map<String, Object> producerConfig() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                embeddedKafka.getBrokersAsString());
        props.put(SCHEMA_REGISTRY_URL, "URL");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
        return props;
    }

    @Bean
    public ProducerFactory<String, GenericRecord> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfig());
    }

    @Bean
    public KafkaTemplate<String, GenericRecord> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

}

消费者配置

@Configuration
@Profile("test")
@EnableKafka
public class KafkaTestConsumerConfig {

    @Autowired
    protected EmbeddedKafkaBroker embeddedKafka;

    private static final String SCHEMA_REGISTRY_URL = "schema.registry.url";

    @Bean
    public Map<String, Object> consumerProperties() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "groupId");
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());
        props.put(SCHEMA_REGISTRY_URL, "URL");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1000");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 10000);
        return props;
    }

    @Bean
    public DefaultKafkaConsumerFactory<String, Object> consumerFactory() {
        KafkaAvroDeserializer avroDeserializer = new KafkaAvroDeserializer();
        avroDeserializer.configure(consumerProperties(), false);
        return new DefaultKafkaConsumerFactory<>(consumerProperties(), new StringDeserializer(), avroDeserializer);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Object> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setBatchListener(true);
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        return factory;
    }

}

集成测试

@SpringBootTest(
        webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT,
        classes = Application.class)
@ActiveProfiles("test")
@EmbeddedKafka(topics = {"TOPIC1", "TOPIC2", "TOPIC3"})
public class CommonStepDefinitions implements En {

    protected static final Logger LOGGER = LoggerFactory.getLogger(CommonStepDefinitions.class);

    @Autowired
    protected KafkaTemplate<String, GenericRecord> kafkaTemplate;

}

步骤定义

public class KafkaStepDefinitions extends CommonStepDefinitions {

    private static final String TEMPLATE_TOPIC = "TOPIC1";

    public KafkaStepDefinitions(){
        Given("given statement", () -> {
            OperationEntity operationEntity = new OperationEntity();
            operationEntity.setFoo("foo");
            kafkaTemplate.send(TEMPLATE_TOPIC, AvroPojoTransformer.pojoToRecord(operationEntity));
        });
    }

}

消费者表示,同样的代码在生产引导服务器上运行良好,但嵌入式Kafka从未实现过

@KafkaListener(topics = "${kafka.topic1}", groupId = "groupId")
    public void consume(List<GenericRecord> records, Acknowledgment ack) throws DDCException {
        LOGGER.info("Batch of {} records received", records.size());
        //do something with the data
        ack.acknowledge();
    }

日志中的一切看起来都很好,但我们不知道遗漏了什么。

提前感谢。

共有3个答案

蒲昀
2023-03-14

我也面临同样的问题与嵌入式Kafka尝试使用KafkaContainer

@ActiveProfiles({"test"})
@RunWith(Cucumber.class)
@CucumberOptions(features= {"src/test/resources/cucumber/data.feature"},
        plugin = {"pretty", "json:target/cucumber.json"})
@SpringBootTest(classes = MyApplication.class)
public final class MyApplicationCucumberTest {
    private MyApplicationCucumberTest() {}

    @Container
    private static KafkaContainer kafkaContainer = new KafkaTestContainer();


    @BeforeClass
    public static void beforeClass() throws IOException, TTransportException {

        kafkaContainer.start();
        System.out.println("Kafka Bootstrap server : " + kafkaContainer.getBootstrapServers());
        System.setProperty("spring.kafka.bootstrap", kafkaContainer.getBootstrapServers());
        System.out.println("Kafka Bootstrap server : " + System.getProperty("spring.kafka.bootstrap"));
        try {
            // Create Topic 
            kafkaContainer.execInContainer("/bin/sh", "-c", "/usr/bin/kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic my.topic");
            
        } catch (InterruptedException e) {
            e.printStackTrace();
            System.out.println(e);
        }

    }


}

还可以添加Spring。Kafka。消费者自动偏移重置:最早的应用。yml

public class KafkaTestContainer extends KafkaContainer {
    private static final String KAFKA_DOCKER = "confluentinc/cp-kafka:5.4.3";

    public KafkaTestContainer() {
        super(DockerImageName.parse(KAFKA_DOCKER));
        

    }
}
长孙德惠
2023-03-14

你的考试在开始前就结束了;请参见包含0-0-C-1的线程名称;消费者在启动后不到一秒钟就停止了。

我只是检查了一下,没有,我的测试正在执行,因为您可以在日志的第1174行中看到ProducerConfig值的日志。这个日志就在Kafka模板之后出现。发送(主题、实体)。我不使用@Test,因为Cucumber中有step定义。你可以在我的帖子里看到代码。

好啊但是在测试中需要某种锁存,以等待使用者实际分配主题/分区并接收数据。按照您现在构建测试的方式,测试在使用者完全启动之前就已关闭。查看我对这个问题的回答,了解一种包装听众的方法,这样你就可以等到收到录音。(这使用正常的JUnit测试)。

另一种技术是以某种方式将服务注入到侦听器bean中,以倒计时闩锁。

作为快速测试,添加线程。睡眠(10_000)到你的“步骤”。

但是,大概你会想要断言消费者确实得到了数据。你需要在测试退出之前做这个断言,因为它是异步的,你需要一些机制来等待它发生(或者超时)。

夹谷衡
2023-03-14

问题是消费者没有连接到嵌入式Kafka。您可以通过使用test配置文件运行测试,并将以下内容添加到应用程序测试中来实现这一点。yml

spring:
  kafka:
    bootstrap-servers: ${spring.embedded.kafka.brokers}

然后,您也不需要定制的consumerPropertiesconsumerFactorykafkaListenerContainerFactorybean。Spring boot将为您自动连接这些。如果您确实希望使用这些bean(不知道为什么),那么应该仔细检查KafkaAutoConfiguration,以确保覆盖了正确的名称和类型。

 类似资料:
  • 我们正在用我们的Servicetest和嵌入式Kafka观察一个奇怪的行为。 该测试是一个Spock测试,我们使用JUnit规则KafkaEmbedded并传播brokersAsString如下: 现在让我们困惑的是,如果我们等待两个分区连接,等待就会超时。只有当我们等待一个分区连接时,过一段时间一切都会成功运行。 我们是否理解错了代码,在嵌入式Kafka中每个主题有两个分区?只给我们的听众分配一

  • 我有一个Spring boot应用程序,我使用spring-Cloud-stream从一个kafka主题中消费,进行一些处理并发布到另一个kafka主题。该应用程序运行良好,我已经编写了运行良好的单元测试(使用TestBinder)。 我现在正试图用嵌入式Kafka编写一个集成测试,并测试端到端的功能。我在这里跟踪了样本https://github.com/spring-cloud/spring-

  • 我编写了一个基本的Spring Boot服务,它通过rest API使用一些数据,并将其发布到rabbitmq和kafka。 为了测试处理kafka生成的服务类,我遵循以下指南:https://www.baeldung.com/spring-boot-kafka-testing 孤立地说,测试(KafkaMessagingServiceIMTest)在intellij想法和命令行上的mvn中都可以

  • 所以我用了这个嵌入Kafka的例子,还有这个 我对这个示例做了一点更改,并用一些数据库(如h2db)更新了kafka侦听器。 现在在我的单元测试中,当我想检查数据在数据库中是否可用时,我得到NULL。另外,我不确定如何手动检查数据库,因为h2是一个内存基础数据库。 这是更新的部分:在接收器类中 在单元测试中: 但 dt 始终为空。此外,我也无法检查数据库,因为它在测试停止后停止。有人知道如何使它可

  • 我正在尝试创建一个使用嵌入式H2数据库的测试。但是我必须更改spring.datasource.url,我不能使用由spring boot创建的默认的。(这是因为我要把H2数据库的模式改成MYSQL) 这是我的: 这是我的: 控制台输出: 启动嵌入式数据库:url='jdbc:h2:mem:bfad6b71-3e2d-4a47-a32d-c76988b3c5f6;db_close_delay=-1

  • 问题内容: 我想测试嵌入式设备的功能。为简化起见,我可以说这是一个人形机器人,由PC通过C / C ++ API进行远程控制。 我非常有兴趣使用它,因为它没有样板方法。但是,我的情况要复杂一些。实际测试在C#程序上运行,大约需要24小时才能完成。通过切换到Python,我可能会节省大量开发新测试的时间。但是,在这样做之前,我正在寻找一些答案。 古老的测试套件的第一个问题是,所有测试都以预定的顺序执