设置Schema-Registry的配置,以支持Schema变化

洪飞白
2023-12-01

笔者用confluent的schema-registry来实现Avro格式kafka消息的发送和接收。
但是当发送的消息中的字段(导致了schema变化了)增加了的时候报错了:

Caused by: org.apache.kafka.common.errors.SerializationException: Error registering Avro schema: {"type":"record","name":"RoleOperationLog","fields":[{"name":"alliance_id","type":"long"},{"name":"operator_role_id","type":"long"},{"name":"operation_time","type":"long"},{"name":"operation_type","type":"int"},{"name":"affair_id","type":"long"},{"name":"be_operated_resource_id","type":"long"},{"name":"operation_log","type":"string"},{"name":"be_operated_resource_type","type":"int"},{"name":"operation_time_str","type":"string"},{"name":"others","type":{"type":"map","values":"string"}}]}
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema being registered is incompatible with an earlier schema; error code: 409; error code: 409
    at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:203) ~[kafka-schema-registry-client-5.0.0.jar:na]
    at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:229) ~[kafka-schema-registry-client-5.0.0.jar:na]
    at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:320) ~[kafka-schema-registry-client-5.0.0.jar:na]
    at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:312) ~[kafka-schema-registry-client-5.0.0.jar:na]
    at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:307) ~[kafka-schema-registry-client-5.0.0.jar:na]
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:114) ~[kafka-schema-registry-client-5.0.0.jar:na]
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:153) ~[kafka-schema-registry-client-5.0.0.jar:na]
    at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:79) ~[kafka-avro-serializer-5.0.0.jar:na]
    at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:53) ~[kafka-avro-serializer-5.0.0.jar:na]
    at org.apache.kafka.common.serialization.ExtendedSerializer$Wrapper.serialize(ExtendedSerializer.java:65) ~[kafka-clients-2.0.0.jar:na]
    at org.apache.kafka.common.serialization.ExtendedSerializer$Wrapper.serialize(ExtendedSerializer.java:55) ~[kafka-clients-2.0.0.jar:na]
    at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:841) ~[kafka-clients-2.0.0.jar:na]
    at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:803) ~[kafka-clients-2.0.0.jar:na]
    at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:690) ~[kafka-clients-2.0.0.jar:na]
    at cn.superid.clients.RoleOperationLogSender.send(RoleOperationLogSender.java:39) ~[cloud-common-kafka-1.2-SNAPSHOT.jar:1.2-SNAPSHOT]
    at cn.superid2.RecordServiceApplication.run(RecordServiceApplication.java:49) [classes/:na]
    at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:776) [spring-boot-1.5.3.RELEASE.jar:1.5.3.RELEASE]
    at org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:760) [spring-boot-1.5.3.RELEASE.jar:1.5.3.RELEASE]
    at org.springframework.boot.SpringApplication.afterRefresh(SpringApplication.java:747) [spring-boot-1.5.3.RELEASE.jar:1.5.3.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:315) [spring-boot-1.5.3.RELEASE.jar:1.5.3.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1162) [spring-boot-1.5.3.RELEASE.jar:1.5.3.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1151) [spring-boot-1.5.3.RELEASE.jar:1.5.3.RELEASE]
    at cn.superid2.RecordServiceApplication.main(RecordServiceApplication.java:36) [classes/:na]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_101]
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_101]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_101]
    at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_101]
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147) [idea_rt.jar:na]

上网(confluent官网https://docs.confluent.io/current/schema-registry/docs/config.html#avro-compatibility-level)查了一下:

The Avro compatibility type. Valid values are: 

none  (new schema can be any valid Avro schema), 
backward (new schema can read data produced by latest registered schema), 
backward_transitive (new schema can read data produced by all previously registered schemas), 
forward (latest registered schema can read data produced by the new schema), 
forward_transitive (all previously registered schemas can read data produced by the new schema), 
full (new schema is backward and forward compatible with latest registered schema), 
full_transitive (new schema is backward and forward compatible with all previously registered schemas)

默认的是backward模式。
查看了配置(即curl schema-registry-url/config,笔者的schema-registry-url是localhost:18081):

curl localhost:18081/config
{"compatibilityLevel":"BACKWARD"}

为了让新schema能工作,要将模式设置为NONE
在schema-registry集群中任意一个节点上执行(执行后每个节点都会同步这个配置):

curl -X PUT -H "Content-Type:application/json" http://localhost:18081/config -d '{"compatibility": "NONE"}' 

执行完毕后,再查看一下:

curl localhost:18081/config
{"compatibilityLevel":"NONE"}

这样再改变schema就不会报错了。

 类似资料: