当前位置: 首页 > 知识库问答 >
问题:

NPE使用嵌入式Kafka测试Kafka生产者

笪昌翰
2023-03-14

我编写了一个基本的Spring Boot服务,它通过rest API使用一些数据,并将其发布到rabbitmq和kafka。

为了测试处理kafka生成的服务类,我遵循以下指南:https://www.baeldung.com/spring-boot-kafka-testing

孤立地说,测试(KafkaMessagingServiceIMTest)在intellij想法和命令行上的mvn中都可以完美运行。在想法中运行所有项目测试都可以正常工作。但是,当我在命令行上通过maven运行所有项目测试时,当尝试对有效负载字符串进行断言时,此测试在NPE中失败。

我已经将根本问题的位置缩小到了另一个测试类(AppPropertiesTest),它只测试我的AppProperties组件(这是一个我用来以整洁的方式从application.properties中提取配置的组件)。当且仅当该测试类中的测试在项目根目录中使用“mvn clean install”与失败的测试一起运行时,才会显示NPE。注释掉此类中的测试或用@DirtiesContext对其进行注释可以解决问题。显然,这个测试类加载到spring上下文中的内容会导致另一个测试中事件/CountdownClast的时间/顺序出现问题。当然,我不想使用@DirtiesContext,因为随着项目复杂性的增加,它可能会导致更慢的构建。这也不能解释问题。。我无法处理:)

AppProperty tiesTest使用构造函数注入来注入AppProperties组件。它还扩展了一个抽象类“GenericServiceTest”,其注释如下:

@SpringBootTest
@TestConstructor(autowireMode = TestConstructor.AutowireMode.ALL) 

不包含任何其他内容。您可能知道,SpringBootTest注释构建了一个测试spring上下文,并将其连接到样板中,以允许对spring应用程序的依赖项注入等进行有效测试。TestConstructor注释允许在我的一些测试中进行构造函数注入。FWIW,我已经尝试删除TestConstructor注释,并在AppProperties类中使用普通的旧自动连接,以查看它是否有影响,但没有影响。

失败的测试类还扩展了GenericServiceTest,因为它需要Spring上下文注入一些依赖项,例如正在测试的消费者和消息传递服务以及其中的AppProperties实例等。

所以我知道问题出在哪里,但我不知道问题出在哪里。根据Baeldung指南,即使NPE的测试失败,我也可以在日志中看到消费者在失败前成功消费了消息:

TestKafkaConsumer  : received payload='ConsumerRecord(topic = test-kafka-topic, partition = 0, leaderEpoch = 0, offset = 0, CreateTime = 1618997289238, serialized key size = -1, serialized value size = 43, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = This is a test message to be sent to Kafka.)'

然而,当我们回到断言时,payLoad为空。我在失败的测试中尝试了各种方法,如Thread.sleep(),以给它更多的时间,我增加了wait()超时,但没有乐趣。

我觉得奇怪的是,这些测试在构思和孤立方面都很好。现在它开始让我有点疯狂,我无法调试它,因为这个问题在我的IDE中没有发生。

如果有人有任何想法,我们将不胜感激!

谢谢。

编辑:有人非常合理地建议我添加一些代码,如下所示:)

失败的测试(在assertTrue(payload.contains(testMessage))失败,因为有效负载为null)。autowired kafkaMessagingService只需要注入AppProperties和KakfaTemplate的依赖项,并调用kafkaTemplate。发送():


@EmbeddedKafka(partitions = 1, brokerProperties = { "listeners=PLAINTEXT://localhost:9092", "port=9092" })
class KafkaMessagingServiceImplTest extends GenericServiceTest {

    @Autowired
    @Qualifier("kafkaMessagingServiceImpl")
    private IMessagingService messagingService;
    @Autowired
    private TestKafkaConsumer kafkaConsumer;
    @Value("${app.topicName}")
    private String testTopic;

    @Test
    public void testSendAndConsumeKafkaMessage() throws InterruptedException {
        String testMessage = "This is a test message to be sent to Kafka.";
        messagingService.sendMessage(testMessage);
        kafkaConsumer.getLatch().await(2000, TimeUnit.MILLISECONDS);
        String payload = kafkaConsumer.getPayload();
        assertTrue(payload.contains(testMessage));
    }

TestConsumer(用于在上述测试中使用)

@Component
public class TestKafkaConsumer {
    private static final Logger LOGGER = LoggerFactory.getLogger(TestKafkaConsumer.class);

    private CountDownLatch latch = new CountDownLatch(1);
    private String payload = null;

    @KafkaListener(topics = "${app.topicName}")
    public void receive(ConsumerRecord<?, ?> consumerRecord) {
        LOGGER.info("received payload='{}'", consumerRecord.toString());
        setPayload(consumerRecord.toString());

        latch.countDown();
    }

    public CountDownLatch getLatch() {
        return latch;
    }

    public String getPayload() {
        return payload;
    }

    public void setPayload(String payload) {
        this.payload = payload;
    }

项目依赖关系:

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
            <version>2.2.2.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>2.5.8.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-configuration-processor</artifactId>
            <optional>true</optional>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.mockito/mockito-all -->
        <dependency>
            <groupId>org.mockito</groupId>
            <artifactId>mockito-all</artifactId>
            <version>1.10.19</version>
            <scope>test</scope>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka-test -->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
            <version>2.5.6.RELEASE</version>
            <scope>test</scope>
        </dependency>

    </dependencies>
    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring-cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

AppPropertiesTest类(其上下文似乎导致了问题)

class AppPropertiesTest extends GenericServiceTest {

    private final AppProperties appProperties;

    public AppPropertiesTest(AppProperties appProperties) {
        this.appProperties = appProperties;
    }

    @Test
    public void testAppPropertiesGetQueueName() {
        String expected = "test-queue";
        String result = appProperties.getRabbitMQQueueName();
        assertEquals(expected, result);
    }

    @Test
    public void testAppPropertiesGetDurableQueue() {
        boolean isDurableQueue = appProperties.isDurableQueue();
        assertTrue(isDurableQueue);
    }
}

AppPropertiesTest类正在测试的AppProperties类:

@Component
@ConfigurationProperties("app")
public class AppProperties {

    // a whole bunch of properties by name that are prefixed by app. in the application.properties file. Nothing else
}

两个测试都扩展的通用服务测试类。

@SpringBootTest
@TestConstructor(autowireMode = TestConstructor.AutowireMode.ALL)
public abstract class GenericServiceTest {

}

故障(您可以在有效负载上方的行上看到,有效负载已被接收并打印出来)。

2021-04-21 14:15:07.113  INFO 493384 --- [ntainer#0-0-C-1] service.TestKafkaConsumer  : received payload='ConsumerRecord(topic = test-kafka-topic, partition = 0, leaderEpoch = 0, offset = 0, CreateTime = 1619010907076, serialized key size = -1, serialized value size = 43, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = This is a test message to be sent to Kafka.)'
[ERROR] Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 3.791 s <<< FAILURE! - in 
service.KafkaMessagingServiceImplTest
[ERROR] testSendAndConsumeKafkaMessage  Time elapsed: 2.044 s  <<< ERROR!
java.lang.NullPointerException
    at service.KafkaMessagingServiceImplTest.testSendAndConsumeKafkaMessage(KafkaMessagingServiceImplTest.java:42)

共有1个答案

宋凌龙
2023-03-14

问题是TestListener是一个组件,所以它被添加了两次-记录将转到另一个实例。

我添加了更多的调试来验证getter是否在其他实例上调用。

@Component
public class TestKafkaConsumer {

    private static final Logger LOGGER = LoggerFactory.getLogger(TestKafkaConsumer.class);

    private final CountDownLatch latch = new CountDownLatch(1);
    private String payload = null;


    @KafkaListener(id = "myListener", topics = "${app.kafkaTopicName}")
    public void receive(ConsumerRecord<?, ?> consumerRecord) {
        LOGGER.info("received payload='{}'", consumerRecord.toString());
        setPayload(consumerRecord.toString());

        if (payload != null) {
            LOGGER.info(this + ": payload is not null still");
        }

        latch.countDown();

        if (payload != null) {
            LOGGER.info(this + ": payload is not null after latch countdown");
        }
    }

    public CountDownLatch getLatch() {
        return latch;
    }

    public String getPayload() {
        LOGGER.info(this + ": getting Payload");
        return payload;
    }

    public void setPayload(String payload) {
        this.payload = payload;
    }
}

如果不想使用DirtiesContext,至少可以在测试完成后停止侦听器容器:

@SpringBootTest
@TestConstructor(autowireMode = TestConstructor.AutowireMode.ALL)
public abstract class GenericDataServiceTest {

    @AfterAll
    static void stopContainers(@Autowired KafkaListenerEndpointRegistry registry) {
        registry.stop();
    }

}
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
 类似资料:
  • 我们正在用我们的Servicetest和嵌入式Kafka观察一个奇怪的行为。 该测试是一个Spock测试,我们使用JUnit规则KafkaEmbedded并传播brokersAsString如下: 现在让我们困惑的是,如果我们等待两个分区连接,等待就会超时。只有当我们等待一个分区连接时,过一段时间一切都会成功运行。 我们是否理解错了代码,在嵌入式Kafka中每个主题有两个分区?只给我们的听众分配一

  • 我们有一个Kafka集群,由3个节点组成,每个节点有32GB内存和6个内核2.5 CPU。 我们写了一个 kafka 制作人,它接收来自 Twitter 的推文,然后分批发送给 Kafka,每批 5000 条推文。 在生产者中,我们使用

  • 我目前正在开发Kafka模块,我正在使用Kafka通信的抽象。我能够集成生产者 Spring Boot测试类 监听器类 我的问题是:在测试类中,我断言分区、有效负载等是从BlockingQueue轮询的,然而,我的问题是如何验证用KafkaListener注释的类中的业务逻辑是否得到正确执行,并根据错误处理和其他业务场景将消息路由到不同的主题。在一些示例中,我看到了CountDownLatch的断

  • 我们正在使用Cucumber为out应用程序做一些集成测试,我们在测试一个时遇到了一些问题。我们设法使用了一个嵌入式Kafka并在其中生成数据。 但消费者从未收到任何数据,我们也不知道发生了什么。 这是我们的代码: 制作人配置 消费者配置 集成测试 步骤定义 消费者表示,同样的代码在生产引导服务器上运行良好,但嵌入式Kafka从未实现过 日志中的一切看起来都很好,但我们不知道遗漏了什么。 提前感谢

  • 我正在使用kafka在spring boot中开发一个异步邮件服务器。 我已经用嵌入式Kafka编写了测试,它在一个随机端口中启动自己的Kafka主题,并使用它进行测试。 当我开始这个应用程序上下文正在加载,它期望在我的本地kafka集群。我需要停止应用程序上下文的加载。我从https://github.com/code-not-found/spring-kafka/blob/master/spr

  • 所以我用了这个嵌入Kafka的例子,还有这个 我对这个示例做了一点更改,并用一些数据库(如h2db)更新了kafka侦听器。 现在在我的单元测试中,当我想检查数据在数据库中是否可用时,我得到NULL。另外,我不确定如何手动检查数据库,因为h2是一个内存基础数据库。 这是更新的部分:在接收器类中 在单元测试中: 但 dt 始终为空。此外,我也无法检查数据库,因为它在测试停止后停止。有人知道如何使它可