当前位置: 首页 > 知识库问答 >
问题:

使用Spring云流kafka流创建测试的问题使用嵌入的Kafka和Mock起初注册客户端

毛勇
2023-03-14

我正在尝试弄清楚如何测试我的Spring Cloud Streams Kafka-Streams应用程序

应用程序如下所示:

流1:主题1

我尝试了不同的方法,例如TestChannelBinder,但这种方法仅适用于简单函数,而不适用于Streams和Avro。

我决定将EmbeddedKafka与MockSchemaRegistryClient一起使用。我可以生成一个主题,也可以再次使用同一主题(topic1),但无法使用(topic2)。

在我的测试application.yaml中,我放置了以下配置(我现在只测试第一个流,我想在它工作后扩展它):

spring.application.name: processingapp
spring.cloud:
  function.definition: stream1 # not now ;stream2;stream3
    stream:
      bindings:
        stream1-in-0:
          destination: topic1
        stream1-out-0:
          destination: topic2
      kafka:
        binder:
          min-partition-count: 1
          replication-factor: 1
          auto-create-topics: true
          auto-add-partitions: true
        bindings:
          default:
            consumer:
              autoRebalanceEnabled: true
              resetOffsets: true
              startOffset: earliest
        stream1-in-0:
          consumer:
            keySerde: io.confluent.kafka.streams.serdes.avro.PrimitiveAvroSerde
            valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
          stream1-out-0:
            producer:
              keySerde: io.confluent.kafka.streams.serdes.avro.PrimitiveAvroSerde
              valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
        streams:
          binder:
            configuration:
              schema.registry.url: mock://localtest
              specivic.avro.reader: true

我的测试如下所示:

@RunWith(SpringRunner.class)
@SpringBootTest
public class Test {

    private static final String INPUT_TOPIC = "topic1";

    private static final String OUTPUT_TOPIC = "topic2";

    @ClassRule
    public static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, true, 1, INPUT_TOPIC, OUTPUT_TOPIC);

    @BeforeClass
    public static void setup() {
        System.setProperty("spring.cloud.stream.kafka.binder.brokers", embeddedKafka.getEmbeddedKafka().getBrokersAsString());
    }

    @org.junit.Test
    public void testSendReceive() throws IOException {
        Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka.getEmbeddedKafka());
        senderProps.put("key.serializer", LongSerializer.class);
        senderProps.put("value.serializer", SpecificAvroSerializer.class);
        senderProps.put("schema.registry.url", "mock://localtest");
        AvroFileParser fileParser = new AvroFileParser();
        DefaultKafkaProducerFactory<Long, Test1> pf = new DefaultKafkaProducerFactory<>(senderProps);
        KafkaTemplate<Long, Test1> template = new KafkaTemplate<>(pf, true);
        Test1 test1 = fileParser.parseTest1("src/test/resources/mocks/test1.json");

        template.send(INPUT_TOPIC, 123456L, test1);
        System.out.println("produced");
        
        Map<String, Object> consumer1Props = KafkaTestUtils.consumerProps("testConsumer1", "false", embeddedKafka.getEmbeddedKafka());
        consumer1Props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        consumer1Props.put("key.deserializer", LongDeserializer.class);
        consumer1Props.put("value.deserializer", SpecificAvroDeserializer.class);
        consumer1Props.put("schema.registry.url", "mock://localtest");
        DefaultKafkaConsumerFactory<Long, Test1> cf = new DefaultKafkaConsumerFactory<>(consumer1Props);

        Consumer<Long, Test1> consumer1 = cf.createConsumer();
        consumer1.subscribe(Collections.singleton(INPUT_TOPIC));
        ConsumerRecords<Long, Test1> records = consumer1.poll(Duration.ofSeconds(10));
        consumer1.commitSync();

        System.out.println("records count?");
        System.out.println("" + records.count());

        Test1 fetchedTest1;
        fetchedTest1 = records.iterator().next().value();
        assertThat(records.count()).isEqualTo(1);
        System.out.println("found record");
        System.out.println(fetchedTest1.toString());

        Map<String, Object> consumer2Props = KafkaTestUtils.consumerProps("testConsumer2", "false", embeddedKafka.getEmbeddedKafka());
        consumer2Props.put("key.deserializer", StringDeserializer.class);
        consumer2Props.put("value.deserializer", TestAvroDeserializer.class);
        consumer2Props.put("schema.registry.url", "mock://localtest");

        DefaultKafkaConsumerFactory<String, Test2> consumer2Factory = new DefaultKafkaConsumerFactory<>(consumer2Props);
        Consumer<String, Test2> consumer2 = consumer2Factory.createConsumer();
        consumer2.subscribe(Collections.singleton(OUTPUT_TOPIC));
        ConsumerRecords<String, Test2> records2 = consumer2.poll(Duration.ofSeconds(30));
        consumer2.commitSync();
        

        if (records2.iterator().hasNext()) {
            System.out.println("has next");
        } else {
            System.out.println("has no next");
        }
    }
}

当尝试从主题2消费和反序列化时,我收到以下异常:

Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro unknown schema for id 0
Caused by: java.io.IOException: Cannot get schema from schema registry!
    at io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient.getSchemaBySubjectAndIdFromRegistry(MockSchemaRegistryClient.java:193) ~[kafka-schema-registry-client-6.2.0.jar:na]
    at io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient.getSchemaBySubjectAndId(MockSchemaRegistryClient.java:249) ~[kafka-schema-registry-client-6.2.0.jar:na]
    at io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient.getSchemaById(MockSchemaRegistryClient.java:232) ~[kafka-schema-registry-client-6.2.0.jar:na]
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer$DeserializationContext.schemaFromRegistry(AbstractKafkaAvroDeserializer.java:307) ~[kafka-avro-serializer-6.2.0.jar:na]
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:107) ~[kafka-avro-serializer-6.2.0.jar:na]
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:86) ~[kafka-avro-serializer-6.2.0.jar:na]
    at io.confluent.kafka.serializers.KafkaAvroDeserializer.deserialize(KafkaAvroDeserializer.java:55) ~[kafka-avro-serializer-6.2.0.jar:na]
    at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60) ~[kafka-clients-2.7.1.jar:na]
    at org.apache.kafka.streams.processor.internals.SourceNode.deserializeKey(SourceNode.java:54) ~[kafka-streams-2.7.1.jar:na]
    at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:65) ~[kafka-streams-2.7.1.jar:na]
    at org.apache.kafka.streams.processor.internals.RecordQueue.updateHead(RecordQueue.java:176) ~[kafka-streams-2.7.1.jar:na]
    at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:112) ~[kafka-streams-2.7.1.jar:na]
    at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:185) ~[kafka-streams-2.7.1.jar:na]
    at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:895) ~[kafka-streams-2.7.1.jar:na]
    at org.apache.kafka.streams.processor.internals.TaskManager.addRecordsToTasks(TaskManager.java:1008) ~[kafka-streams-2.7.1.jar:na]
    at org.apache.kafka.streams.processor.internals.StreamThread.pollPhase(StreamThread.java:812) ~[kafka-streams-2.7.1.jar:na]
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:625) ~[kafka-streams-2.7.1.jar:na]
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:564) ~[kafka-streams-2.7.1.jar:na]
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:523) ~[kafka-streams-2.7.1.jar:na]

不会有消息被消耗。

因此,我尝试覆盖专业AvroSerde并直接注册模式并使用此反序列化器。

public class TestAvroDeserializer<T extends org.apache.avro.specific.SpecificRecord>
        extends SpecificAvroDeserializer<T> implements Deserializer<T> {

    private final KafkaAvroDeserializer inner;

    public TestAvroDeserializer() throws IOException, RestClientException {
        MockSchemaRegistryClient mockedClient = new MockSchemaRegistryClient();

        Schema.Parser parser = new Schema.Parser();
        Schema test2Schema = parser.parse(new File("./src/main/resources/avro/test2.avsc"));
        mockedClient.register("test2-value", test2Schema , 1, 0);
        inner = new KafkaAvroDeserializer(mockedClient);
    }

    /**
     * For testing purposes only.
     */
    TestAvroDeserializer(final SchemaRegistryClient client) throws IOException, RestClientException {
        MockSchemaRegistryClient mockedClient = new MockSchemaRegistryClient();

        Schema.Parser parser = new Schema.Parser();
        Schema test2Schema = parser.parse(new File("./src/main/resources/avro/test2.avsc"));
        mockedClient.register("test2-value", test2Schema , 1, 0);

        inner = new KafkaAvroDeserializer(mockedClient);
    }
}

有了这个反序列化器,它也不起作用。有人有经验如何使用Em申明Kafka和Mock起初注册做这个测试吗?或者我应该使用另一种方法吗?

如果有人能帮忙,我很高兴。先谢谢你。

共有1个答案

唐茂实
2023-03-14

我找到了一种适当的方法来集成测试我的拓扑。

我使用kafka-streams-test-utils包中的TopologyTestDriver。

将此依赖项包含到Maven:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams-test-utils</artifactId>
    <scope>test</scope>
</dependency>

对于问题中描述的应用程序,设置拓扑测试驱动程序如下所示。此代码只是顺序显示它是如何工作的。

    @Test
    void test() {
        keySerde.configure(Map.of(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "mock://schemas"), true);
        valueSerdeTopic1.configure(Map.of(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "mock://schemas"), false);
        valueSerdeTopic2.configure(Map.of(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "mock://schemas"), false);

        final StreamsBuilder builder = new StreamsBuilder();

        Configuration config = new Configuration(); // class where you declare your spring cloud stream functions
        KStream<String, Topic1> input = builder.stream("topic1", Consumed.with(keySerde, valueSerdeTopic1));

        KStream<String, Topic2> output = config.stream1().apply(input);
        output.to("topic2");

        Topology topology = builder.build();
        Properties streamsConfig = new Properties();

        streamsConfig.putAll(Map.of(
                org.apache.kafka.streams.StreamsConfig.APPLICATION_ID_CONFIG, "toplogy-test-driver",
                org.apache.kafka.streams.StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "ignored",
                KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "mock://schemas",
                org.apache.kafka.streams.StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, PrimitiveAvroSerde.class.getName(),
                org.apache.kafka.streams.StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class.getName()
        ));

        TopologyTestDriver testDriver = new TopologyTestDriver(topology, streamsConfig);
        
        TestInputTopic<String, Topic1> inputTopic = testDriver.createInputTopic("topic1", keySerde.serializer(), valueSerdeTopic1.serializer());
        TestOutputTopic<String, Topic2> outputTopic = testDriver.createOutputTopic("topic2", keySerde.deserializer(), valueSerdeTopic2.deserializer());
        inputTopic.pipeInput("key", topic1AvroModel); // Write to the input topic which applies the topology processor of your spring-cloud-stream app
        KeyValue<String, Topic2> outputRecord = outputTopic.readKeyValue(); // Read from the output topic
    }

如果你写了更多的测试,我建议你抽象设置代码,不要在每次测试中重复。我强烈建议使用Spring-Cloud-Stream样本库中的这个示例,它引导我使用TopologyTestDriver的解决方案。

 类似资料:
  • 我需要在Spring云流kafka活页夹错误处理场景的帮助。我的应用程序有一个java 8消费者,其绑定在application.yaml中指定。 application.yaml: 现在,我正在处理错误,有两个问题: > 我正在尝试手动包装消息的消费,而不是使用自动提交偏移设置为真。因此,当我将自动提交偏移设置设为假并测试错误场景时,会面临奇怪的行为,即每当抛出异常时,消息都会重试n次,即使在服

  • 一段时间以来,我一直试图让Spring Cloud Stream与Kafka Streams一起使用,我的项目使用嵌入式kafka进行Kafka DSL测试,我使用这个存储库作为我的测试实现的基础(它本身就是这个问题的测试用例)。 我在这里制作了一个存储库来演示这一点。 基本上,当使用“Processor.class”的“DemoApplicationTest.ExampleAppWorking.

  • 我正在尝试用《Spring的云流》和《Kafka》。下面是示例代码。但它似乎没有任何作用。它总是创建一个名为“输出”的主题。但这些价值观尚未公布。 应用亚马尔 我的目标就是创造价值。 依赖性-2.2.6。释放

  • 我正试图使用SpringKafka为我的生产者应用程序及其嵌入式Kafka服务器编写测试。 然而,我的应用程序也使用合流模式注册表,我想知道SpringKafka是否为模式注册表提供了一些嵌入式服务器? 或者有没有更好的方法来使用模式注册表进行Spring Kafka测试?

  • 我已经使用Spring云流启动了一个小型微服务。 我只有两个流绑定,如下所示: 我用Serenity开发了组件测试,我将通道注入到我想要发送测试消息的地方: 哪里: 只是定义为字符串常量: 组件测试模块导入依赖项: 我发送的信息如下: 快乐流工作正常。但是,我想在侦听器无法处理消息时测试错误流。 这是一个监听器的例子: 从try/catch引发异常时,错误由服务激活器处理: 在没有Spring-C

  • 我们正在用我们的Servicetest和嵌入式Kafka观察一个奇怪的行为。 该测试是一个Spock测试,我们使用JUnit规则KafkaEmbedded并传播brokersAsString如下: 现在让我们困惑的是,如果我们等待两个分区连接,等待就会超时。只有当我们等待一个分区连接时,过一段时间一切都会成功运行。 我们是否理解错了代码,在嵌入式Kafka中每个主题有两个分区?只给我们的听众分配一