当前位置: 首页 > 知识库问答 >
问题:

如何用Spring Kafka测试Kafka Streams应用程序?

令狐经武
2023-03-14

    @Bean
    public KTable<String, Long> itemTotalKTable(StreamsBuilder streamsBuilder) {
        return streamsBuilder
                .stream(ITEM_TOPIC,
                        Consumed.with(Serdes.String(), itemAvroSerde))
                .mapValues((id, item) -> item.getNumber())
                .groupByKey()
                .aggregate(
                        () -> 0L,
                        (id, number, agg) -> agg + number,
                        Materialized.with(Serdes.String(), Serdes.Long()));
    }

共有1个答案

程凯定
2023-03-14

Spring Kafka for Kafka Streams支持没有带来任何额外的API,尤其是在流构建及其处理方面。

我们最近为自己打开了一个很好的kafka-streams-test-utils库,可以在单元测试中使用,无需任何Kafka代理启动(甚至是嵌入式的)。

在我们的几个测试中,我们有如下内容:

    KStream<String, String> stream = builder.stream(INPUT);
    stream
            .transform(() -> enricher)
            .to(OUTPUT);

    Properties config = new Properties();
    config.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
    config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
    TopologyTestDriver driver = new TopologyTestDriver(builder.build(), config);

    ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(),
            new StringSerializer());
    driver.pipeInput(recordFactory.create(INPUT, "key", "value"));
    ProducerRecord<byte[], byte[]> result = driver.readOutput(OUTPUT);
    assertThat(result.headers().lastHeader("foo")).isNotNull();
 类似资料:
  • 我需要测试旧的SpringBootJava应用程序(SpringBoot 1.2.7、Java1.8版本),我不想更改Spring版本(因为应用程序非常庞大,然后我必须对代码进行大量重构)。 我想将不同的表列类型应用于桌面应用程序的输入,并测试该应用程序是否正确确定它们。 但问题是,由于旧版本的Spring,我无法提取@ExtendWith(SpringExtension.class),@Exte

  • 问题内容: 我有一个使用React的网络应用程序,我正在尝试使用Selenium RC创建一些测试。我发现,当Selenium更改字段的值时,不会正确触发事件。我知道这是一个典型的问题,正如WebDriver常见问题所证明的那样,我已经尝试了很多不同的事情,例如使用onFocus而不是onChange并使用sendKeys()和type()确保焦点进出,以编程方式调用该事件以及我可以在网上找到的任

  • 问题内容: 我刚刚在我的express应用程序中添加了shouldjs和mocha进行测试,但是我想知道如何测试我的应用程序。我想这样做: 当然,测试套件中的最后一个测试只是告诉med,res.render函数(在show_create_user_screen中调用)是未定义的,可能是因为服务器未运行且配置尚未完成。所以我想知道其他人如何设置他们的测试? 问题答案: 好的,虽然测试路由代码是您可能

  • 我有一个spring boot应用程序,其中我的SpringBootApplication入门类看起来像一个标准类。因此,我为我的所有功能创建了许多测试,并将摘要发送给sonarqube以查看我的覆盖范围。 对于我的初级课程,Sonarqube告诉我,我只有60%的覆盖率。因此,平均覆盖率不如预期。 我的测试类只是默认类。 那么如何在应用程序的入门类中测试我的主类呢?

  • 我有一个用Spring:2.0.7的古老版本构建的老应用程序。我的任务是为这个应用程序添加新的功能,所以我也需要编写一些JUnit测试。 有没有其他方法可以在JUnit中加载ApplicationContext.XML文件,并访问该XML文件中定义的bean? 由于我已经有了mockup,而且它们不需要初始化参数,所以我可以实例化它们并将它们传递给setter,也许可以使用注释。但是如果可能的话,

  • 在新版本(1.1)中,我想用新列升级数据库。 在我的应用程序中,数据库升级是一个常见的崩溃,因为以前版本(1.0)中的用户没有数据库中的新列。 我尝试过使用Google Play Store的beta测试功能,但问题是没有一个有效的方法(或者至少我还没有找到一个)来做到以下几点: 唯一的方法(就我所关心的)是从测试列表中删除测试器(在Google Play systems中刷新可能需要几个小时),