笔者用confluent的schema-registry来实现Avro格式kafka消息的发送和接收。
但是当发送的消息中的字段(导致了schema变化了)增加了的时候报错了:
Caused by: org.apache.kafka.common.errors.SerializationException: Error registering Avro schema: {"type":"record","name":"RoleOperationLog","fields":[{"name":"alliance_id","type":"long"},{"name":"operator_role_id","type":"long"},{"name":"operation_time","type":"long"},{"name":"operation_type","type":"int"},{"name":"affair_id","type":"long"},{"name":"be_operated_resource_id","type":"long"},{"name":"operation_log","type":"string"},{"name":"be_operated_resource_type","type":"int"},{"name":"operation_time_str","type":"string"},{"name":"others","type":{"type":"map","values":"string"}}]}
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema being registered is incompatible with an earlier schema; error code: 409; error code: 409
at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:203) ~[kafka-schema-registry-client-5.0.0.jar:na]
at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:229) ~[kafka-schema-registry-client-5.0.0.jar:na]
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:320) ~[kafka-schema-registry-client-5.0.0.jar:na]
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:312) ~[kafka-schema-registry-client-5.0.0.jar:na]
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:307) ~[kafka-schema-registry-client-5.0.0.jar:na]
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:114) ~[kafka-schema-registry-client-5.0.0.jar:na]
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:153) ~[kafka-schema-registry-client-5.0.0.jar:na]
at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:79) ~[kafka-avro-serializer-5.0.0.jar:na]
at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:53) ~[kafka-avro-serializer-5.0.0.jar:na]
at org.apache.kafka.common.serialization.ExtendedSerializer$Wrapper.serialize(ExtendedSerializer.java:65) ~[kafka-clients-2.0.0.jar:na]
at org.apache.kafka.common.serialization.ExtendedSerializer$Wrapper.serialize(ExtendedSerializer.java:55) ~[kafka-clients-2.0.0.jar:na]
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:841) ~[kafka-clients-2.0.0.jar:na]
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:803) ~[kafka-clients-2.0.0.jar:na]
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:690) ~[kafka-clients-2.0.0.jar:na]
at cn.superid.clients.RoleOperationLogSender.send(RoleOperationLogSender.java:39) ~[cloud-common-kafka-1.2-SNAPSHOT.jar:1.2-SNAPSHOT]
at cn.superid2.RecordServiceApplication.run(RecordServiceApplication.java:49) [classes/:na]
at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:776) [spring-boot-1.5.3.RELEASE.jar:1.5.3.RELEASE]
at org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:760) [spring-boot-1.5.3.RELEASE.jar:1.5.3.RELEASE]
at org.springframework.boot.SpringApplication.afterRefresh(SpringApplication.java:747) [spring-boot-1.5.3.RELEASE.jar:1.5.3.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:315) [spring-boot-1.5.3.RELEASE.jar:1.5.3.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1162) [spring-boot-1.5.3.RELEASE.jar:1.5.3.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1151) [spring-boot-1.5.3.RELEASE.jar:1.5.3.RELEASE]
at cn.superid2.RecordServiceApplication.main(RecordServiceApplication.java:36) [classes/:na]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_101]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_101]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_101]
at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_101]
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147) [idea_rt.jar:na]
上网(confluent官网https://docs.confluent.io/current/schema-registry/docs/config.html#avro-compatibility-level)查了一下:
The Avro compatibility type. Valid values are:
none (new schema can be any valid Avro schema),
backward (new schema can read data produced by latest registered schema),
backward_transitive (new schema can read data produced by all previously registered schemas),
forward (latest registered schema can read data produced by the new schema),
forward_transitive (all previously registered schemas can read data produced by the new schema),
full (new schema is backward and forward compatible with latest registered schema),
full_transitive (new schema is backward and forward compatible with all previously registered schemas)
默认的是backward
模式。
查看了配置(即curl schema-registry-url/config,笔者的schema-registry-url是localhost:18081):
curl localhost:18081/config
{"compatibilityLevel":"BACKWARD"}
为了让新schema能工作,要将模式设置为NONE
在schema-registry集群中任意一个节点上执行(执行后每个节点都会同步这个配置):
curl -X PUT -H "Content-Type:application/json" http://localhost:18081/config -d '{"compatibility": "NONE"}'
执行完毕后,再查看一下:
curl localhost:18081/config
{"compatibilityLevel":"NONE"}
这样再改变schema就不会报错了。