当前位置: 首页 > 知识库问答 >
问题:

EmbeddedKafka如何在单元测试中检查收到的消息

芮岳
2023-03-14

我创建了一个Spring Boot应用程序,它向Kafka主题发送消息。我正在使用springSpring-Integration-Kafka:KafkaProducerMessageHandler 订阅到一个通道(subscribableChannel),并将接收到的所有消息推送到一个主题。应用程序运行良好。我看到消息通过console consumer(本地Kafka)到达Kafka。

@ClassRule
public static KafkaEmbedded kafkaEmbedded = new KafkaEmbedded(1, true, "myTopic");
//...
@Before
public void init() throws Exception {

    mockConsumer = new MockConsumer<>( OffsetResetStrategy.EARLIEST );
    kafkaEmbedded.consumeFromAnEmbeddedTopic( mockConsumer,"sikom" );

}
//...

@Test
public void endToEnd() throws Exception {
//  ...

    ConsumerRecords<String, String> records = mockConsumer.poll( 10000 );

    StreamSupport.stream(records.spliterator(), false).forEach( record -> log.debug( "record: " + record.value() ) );


}

问题是我没有看到任何记录。我不确定我的KafkaEmbedded设置是否正确。但是消息由信道接收。

共有1个答案

白浩气
2023-03-14

这对我管用。试试看

@RunWith(SpringRunner.class)
@SpringBootTest
public class KafkaEmbeddedTest {

    private static String SENDER_TOPIC = "testTopic";

    @ClassRule
    // By default it creates two partitions.
    public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, SENDER_TOPIC); 

    @Test
    public void testSend() throws InterruptedException, ExecutionException {

        Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
        //If you wish to send it to partitions other than 0 and 1, 
        //then you need to specify number of paritions in the declaration

        KafkaProducer<Integer, String> producer = new KafkaProducer<>(senderProps);
        producer.send(new ProducerRecord<>(SENDER_TOPIC, 0, 0, "message00")).get();
        producer.send(new ProducerRecord<>(SENDER_TOPIC, 0, 1, "message01")).get();
        producer.send(new ProducerRecord<>(SENDER_TOPIC, 1, 0, "message10")).get();


        Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("sampleRawConsumer", "false", embeddedKafka);
        // Make sure you set the offset as earliest, because by the 
        // time consumer starts, producer might have sent all messages
        consumerProps.put("auto.offset.reset", "earliest");

        final List<String> receivedMessages = Lists.newArrayList();
        final CountDownLatch latch = new CountDownLatch(3);
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        executorService.execute(() -> {
            KafkaConsumer<Integer, String> kafkaConsumer = new KafkaConsumer<>(consumerProps);
            kafkaConsumer.subscribe(Collections.singletonList(SENDER_TOPIC));
            try {
                while (true) {
                    ConsumerRecords<Integer, String> records = kafkaConsumer.poll(100);
                    records.iterator().forEachRemaining(record -> {
                        receivedMessages.add(record.value());
                        latch.countDown();
                    });
                }
            } finally {
                kafkaConsumer.close();
            }
        });

    latch.await(10, TimeUnit.SECONDS);
    assertTrue(receivedMessages.containsAll(Arrays.asList("message00", "message01", "message10")));
    }
}

我使用倒计时锁存是因为producer.send(..)是一个异步操作。所以我在这里做的是,在一个无限循环中,每100毫秒轮询Kafka,如果有新记录,如果有,将它添加到未来断言的列表中,然后减少倒计时。我将总共等待10秒以确定。
您也可以使用一个简单的循环,然后在几分钟后退出。(如果您不想使用CountdownLatch和ExecutorService之类的东西)

 类似资料:
  • 我在网上读了一些讨论。他们说,我们不应该对私有方法进行单元测试或检查私有状态,因为这是实现细节,是糟糕设计的标志。但就我而言,我真的不知道如何做得更好。 下面是一个示例代码(我的实际代码是使用factory编写的,但我尝试使用纯js创建一个相同的案例,这样每个人都更容易理解,因为相同的原因是闭包): 在我真正的应用程序中,我可以注入和模拟本地存储,但这是一个问题。我的问题是如何测试方法是否设置了其

  • 问题内容: 我有以下指令来自动聚焦字段: 我将如何对此进行单元测试?我尝试了以下选择器之类的几种方法,但是它们都返回错误或false: 我的单元测试设置如下: 问题答案: 我想通了,实际上这很明显。 我的问题有两个: 我没有调用超时刷新功能,所以没有发生超时,并且 我试图查看元素的focus属性,而仅关注focus()函数的调用更像是单元测试。focus属性确实属于e2e测试领域。

  • 我是spring的新手,目前我正在尝试为我的项目执行单元测试。我已经用hibernate配置了spring,现在我想检查创建的类的方法是否工作。例如,假设我有: 如何正确测试这些方法?最好的选择是什么?我习惯在JUnit测试中使用assertTrue和assertFalse,但我担心这在我的情况下不是一个好的选择。谢谢你。

  • 问题内容: 我正在使用带有eclipse的junit编写功能测试。 当运行单个测试时,它将按照我在类中设置它们的顺序运行。 例如。 但是,当我将此测试作为套件的一部分运行时(在包中),顺序是随机的。 例如,它将执行验证,然后删除用户,然后删除joinuserToRoom然后创建用户。 我在套件中的测试并不相互依赖。但是,测试中的每个单独测试都取决于它们以正确的顺序运行。 有什么办法可以实现? 谢谢

  • 问题内容: 我有一个简单的功能要测试: 但是,如何测试该函数实际发送到标准输出的内容呢?Test :: Output在Perl中实现了我想要的功能 但这对于每个测试来说都是很多额外的工作。我希望有一种更标准的方法,或者也许是一个抽象库来处理此问题。 问题答案: 还需要记住的一件事是,没有什么可以阻止您编写避免编写样板代码的函数。 例如,我有一个使用的命令行应用程序,我编写了以下函数: 然后像这样使

  • 我需要在我的应用程序中进行单元测试JMSXGroupID。有人能告诉我是否有示例文章或参考文献来实现这一点吗?我试图在ActiveMQ门户中查看参考文献,但看起来下面的Hudson链接坏了。我正在使用Spring JMS进行编程。 http://activemq.apache.org/junit-reports.html