当前位置: 首页 > 知识库问答 >
问题:

如何测试使用Avro和Confluent Schema注册表的Spring Cloud Stream Kafka Streams应用程序?

蒙墨竹
2023-03-14
spring:
  application:
    name: shipping-service
  cloud:
    stream:
      schema-registry-client:
        endpoint: http://localhost:8081
      kafka:
        streams:
          binder:
            configuration:
              application:
                id: shipping-service
              default:
                key:
                  serde: org.apache.kafka.common.serialization.Serdes$IntegerSerde
              schema:
                registry:
                  url: ${spring.cloud.stream.schema-registry-client.endpoint}
              value:
                subject:
                  name:
                    strategy: io.confluent.kafka.serializers.subject.RecordNameStrategy
          bindings:
            input:
              consumer:
                valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
            order:
              consumer:
                valueSerde: io.confluent.kafka.streams.serdes.avro.GenericAvroSerde
            output:
              producer:
                valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
      bindings:
        input:
          destination: customer
        order:
          destination: order
        output:
          destination: order

server:
  port: 8086

logging:
  level:
    org.springframework.kafka.config: debug
  • 它正在使用本机序列化/反序列化。
  • 测试框架:Junit 5

关于Kafka代理,我想我应该使用嵌入的KafkaBroker bean,但是正如您所看到的,它还依赖于一个模式注册表,应该以某种方式进行模拟。怎么做?

共有1个答案

司寇高洁
2023-03-14

解决这个问题是一件非常痛苦的事情,但最后我设法用流利的Kafka流测试来解决这个问题:

额外依赖项:

testImplementation("org.springframework.kafka:spring-kafka-test")
testImplementation("com.bakdata.fluent-kafka-streams-tests:schema-registry-mock-junit5:2.0.0")

关键是将必要的配置设置为系统属性。为此,我创建了一个独立的测试配置类:

@Configuration
class KafkaTestConfiguration(private val embeddedKafkaBroker: EmbeddedKafkaBroker) {

    private val schemaRegistryMock = SchemaRegistryMock()

    @PostConstruct
    fun init() {
        System.setProperty("spring.kafka.bootstrap-servers", embeddedKafkaBroker.brokersAsString)
        System.setProperty("spring.cloud.stream.kafka.streams.binder.brokers", embeddedKafkaBroker.brokersAsString)
        schemaRegistryMock.start()
        System.setProperty("spring.cloud.stream.schema-registry-client.endpoint", schemaRegistryMock.url)
        System.setProperty("spring.cloud.stream.kafka.streams.binder.configuration.schema.registry.url", schemaRegistryMock.url)
    }

    @Bean
    fun schemaRegistryMock(): SchemaRegistryMock {
        return schemaRegistryMock
    }

    @PreDestroy
    fun preDestroy() {
        schemaRegistryMock.stop()
    }
}
@EmbeddedKafka
@SpringBootTest(properties = [
    "spring.profiles.active=local",
    "schema-registry.user=",
    "schema-registry.password=",
    "spring.cloud.stream.bindings.event.destination=event",
    "spring.cloud.stream.bindings.event.producer.useNativeEncoding=true",
    "spring.cloud.stream.kafka.streams.binder.configuration.application.server=localhost:8080",
    "spring.cloud.stream.kafka.streams.bindings.event.consumer.keySerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde",
    "spring.cloud.stream.kafka.streams.bindings.event.consumer.valueSerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde"])
class MyApplicationTests {

    @Autowired
    private lateinit var embeddedKafka: EmbeddedKafkaBroker

    @Autowired
    private lateinit var schemaRegistryMock: SchemaRegistryMock

    @Test
    fun `should process events`() {
        val senderProps = KafkaTestUtils.producerProps(embeddedKafka)
        senderProps[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = "io.confluent.kafka.serializers.KafkaAvroSerializer"
        senderProps[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = "io.confluent.kafka.serializers.KafkaAvroSerializer"
        senderProps["schema.registry.url"] = schemaRegistryMock.url
        val pf = DefaultKafkaProducerFactory<Int, String>(senderProps)
        try {
            val template = KafkaTemplate(pf, true)
            template.defaultTopic = "event"
            ...

    }
 类似资料:
  • 如果不是--如何修复?

  • 我有一个Kafka消费者配置了主题中的模式轮询,我想做的是在当前模式的基础上创建另一个Avro模式,并使用它水合数据,基本上我不需要50%的信息,需要编写一些逻辑来更改几个字段。这只是一个例子 从stream返回的事件相当复杂,所以我将一个较小的CustomObj建模为. avsc文件,并将其编译成java。当尝试使用CustomObj运行代码时,我想做的就是使用一个事件,然后将其反序列化为一个更

  • 我正在尝试使用Confluent schema registry,下面是我在Github中找到的一些示例(https://github.com/gAmUssA/springboot-kafka-avro). 当消费者和生产者与模型共享相同的命名空间而不是其工作时。 当使用者位于具有不同名称空间但具有相同类(名称和属性方面)的不同项目中时,它不工作。 合流Avro反序列化程序可以使用正确的值反序列化

  • 我正试图使用SpringKafka为我的生产者应用程序及其嵌入式Kafka服务器编写测试。 然而,我的应用程序也使用合流模式注册表,我想知道SpringKafka是否为模式注册表提供了一些嵌入式服务器? 或者有没有更好的方法来使用模式注册表进行Spring Kafka测试?

  • 目前,我已经使用了spring cloud stream github示例,但是我不知道如何通过提供现有json数据将手动键入的对象转换为json格式。我可以使用一些工具从json数据推断avro模式。然而,问题是,我不想使用从avro模式导入中的类推断出的POJO,而是想使用现有的json数据。我还对应用程序/json部分感到困惑,当我使用curl-X POST时,可能有办法在http请求中提供

  • 我正在处理一些注册过程的Android应用程序(原生Android),使用AWS放大和Cognito用户池从这个链接https://aws-amplify.github.io/docs/android/authentication#signup。 我已经做了,直到放大添加验证和放大推送。我的原始文件夹中有配置文件。在实现注册功能后,我在尝试从我的应用程序注册时遇到以下错误: 2019-10-23