当前位置: 首页 > 知识库问答 >
问题:

@kafkaListener在Unit测试用例中不消耗来自容器工厂的

顾超
2023-03-14

我编写了一个JUnit测试用例来测试Spring Kafka文档中“使用Java配置”课程中的代码。(https://docs.spring.io/spring-kafka/reference/htmlsingle/#_with_java_configuration)。一个区别是,我在类中使用的是嵌入式Kafka服务器,而不是本地主机服务器。我使用的是Spring Boot 2.0.2及其Spring Kafka依赖项。

在运行这个测试用例时,我看到消费者没有从主题中读取消息,“assertTrue”检查失败。没有其他错误。

@RunWith(SpringRunner.class)
public class SpringConfigSendReceiveMessage {

    public static final String DEMO_TOPIC =  "demo_topic";
    @Autowired
    private Listener listener;

    @Test
    public void testSimple() throws Exception {
        template.send(DEMO_TOPIC, 0, "foo");
        template.flush();
        assertTrue(this.listener.latch.await(60, TimeUnit.SECONDS));
    }

    @Autowired
    private KafkaTemplate<Integer, String> template;

    @Configuration
    @EnableKafka
    public static class Config {

        @Bean
        public KafkaEmbedded kafkaEmbedded() {
            return new KafkaEmbedded(1, true, 1, DEMO_TOPIC);
        }

        @Bean
        public ConsumerFactory<Integer, String> createConsumerFactory() {
            Map<String, Object> props = new HashMap<>();
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaEmbedded().getBrokersAsString());
            props.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
            props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            return new DefaultKafkaConsumerFactory<>(props);
        }

        @Bean
        public ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
            ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConsumerFactory(createConsumerFactory());
            return factory;
        }

        @Bean
        public Listener listener() {
            return new Listener();
        }

        @Bean
        public ProducerFactory<Integer, String> producerFactory() {
            Map<String, Object> props = new HashMap<>();
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaEmbedded().getBrokersAsString());
            props.put(ProducerConfig.RETRIES_CONFIG, 0);
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            return new DefaultKafkaProducerFactory<>(props);
        }

        @Bean
        public KafkaTemplate<Integer, String> kafkaTemplate() {
            return new KafkaTemplate<Integer, String>(producerFactory());
        }
    }       

}

class Listener {
    public final CountDownLatch latch = new CountDownLatch(1);

    @KafkaListener(id = "foo", topics = DEMO_TOPIC)
    public void listen1(String foo) {
        this.latch.countDown();
    }
}

我认为这是因为@KafkaListener在阅读主题时使用了一些错误/默认设置。我在日志中没有看到任何错误。

这个单元测试用例正确吗?如何找到为KafkaListener注释创建的对象,并查看它从哪个Kafka代理中使用?任何输入都会有帮助。谢谢

共有2个答案

邹祺
2023-03-14

@gary russell的答案是最好的解决方案。解决此问题的另一种方法是将消息发送延迟一段时间。这将使消费者做好准备。以下也是正确的解决方案。

经验教训——对于单元测试Kafka消费者,要么使用测试用例中的所有消息,要么确保消费者在生产者发送消息之前准备好。

@Test
public void testSimple() throws Exception {
    Thread.sleep(1000);
    template.send(DEMO_TOPIC, 0, "foo");
    template.flush();
    assertTrue(this.listener.latch.await(60, TimeUnit.SECONDS));
}
太叔正文
2023-03-14

消息在消费者开始之前发送。

默认情况下,新消费者在主题结束时开始消费。

添加

props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
 类似资料:
  • 单元测试在软件开发中起着重要作用。 IntelliJ支持各种单元测试框架,如JUnit,TestNG,Spock等等。 在本章中,我们将使用JUnit3。 创建单元测试 在本节中,我们将学习如何创建单元测试。 按照以下步骤创建测试 - 选择Navigate → Test选项。 将出现一个对话框,您必须选择“ Create New Test 。 按照屏幕上的说明继续 - 提供有关测试的详细信息,如测

  • 单元测试是开发大型项目的重要过程。 Unit tests有助于在开发的每个阶段自动测试应用程序的组件。 当应用程序的组件根据项目的业务规范不工作时,它会发出警报。 单元测试可以手动完成,但通常是自动化的。 PHPUnit FuelPHP框架与PHPUnit测试框架集成。 要编写FuelPHP框架的单元测试,我们需要设置PHPUnit。 如果未安装PHPUnit,则下载并安装它。 我们可以使用以下命

  • 单元测试涉及测试应用程序的每个单元。 它可以帮助开发人员在不运行整个复杂应用程序的情况下测试小功能。 名为“test”的Dart external library提供了编写和运行单元测试的标准方法。 Dart_programming单元测试涉及以下步骤 - Step 1: Installing the "test" package 要在当前项目中安装第三方软件包,您需要pubspec.yaml文件

  • 面向对象系统的基本单元是类。 因此,单元测试由一个类中的testig组成。 采用的方法是创建测试类的对象,并使用它来检查所选方法是否按预期执行。 并非每种方法都可以进行测试,因为测试每一种方法并不总是有用的。 但是应该对关键和关键方法进行单元测试。 JUnit是一个开源测试框架,是Java代码自动化单元测试的公认行业标准。 幸运的是,JUnit框架可以很容易地用于测试Groovy类。 所需的只是扩

  • 我试图遵循示例:https://blog.knoldus.com/a-quick-demo-kafka-to-flink-to-cassandra/我试图从kafka解析我的Shippingorder JSON消息并将其解析为对象。然后按一些属性对其进行分组,但在平面图步骤时出现错误。 我的sbt文件: 我的主文件。 我的订单对象 运行此作业时出错 我不知道这个错误。请解释并帮助我解决这个问题。

  • 我有两个带有选择器和方法的类(在这两个类中我都声明了webdriver)。我还使用@findby来选择定位器。 带有选择器和方法的登录页面 主页测试从两个类(登录页面和主页)调用方法--它们都声明了驱动程序。问题是,当运行测试时,打开了2个firefox实例,并且只有登录测试成功执行,其他来自主页的方法都失败了。有人能帮我明白我做错了什么吗?