我试图使用Avro模式向我的经纪人发送消息,但“我总是收到错误:
2020-02-01 11:24:37.189[nioEventLoopGroup-4-1]错误应用程序-未经处理:POST-/api/orchestration/org。阿帕奇。Kafka。常见的错误。SerializationException:注册Avro架构时出错:io导致“字符串”。汇合的。Kafka。阴谋论。客户Rest例外。RestClientException:正在注册的架构与早期的架构不兼容;错误代码:409
这是我的docker容器:
connect:
image: confluentinc/cp-kafka-connect:5.4.0
hostname: confluentinc-connect
container_name: confluentinc-connect
depends_on:
- zookeeper
- broker
- schema-registry
ports:
- "8083:8083"
environment:
CONNECT_BOOTSTRAP_SERVERS: 'broker:29092'
CONNECT_REST_ADVERTISED_HOST_NAME: connect
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: confluentinc-connect
CONNECT_CONFIG_STORAGE_TOPIC: confluentinc-connect-configs
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
CONNECT_OFFSET_STORAGE_TOPIC: confluentinc-connect-offsets
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_TOPIC: confluentinc-connect-status
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE: "true"
CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181'
CONNECT_LOG4J_ROOT_LOGLEVEL: "INFO"
CONNECT_LOG4J_LOGGERS: "org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR"
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR
CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/extras"
我的制作人(用科林语写)
val prop: HashMap<String, Any> = HashMap()
prop[BOOTSTRAP_SERVERS_CONFIG] = bootstrapServers
prop[KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java.name
prop[VALUE_SERIALIZER_CLASS_CONFIG] = KafkaAvroSerializer::class.java.name
prop[SCHEMA_REGISTRY_URL] = schemaUrl
prop[ENABLE_IDEMPOTENCE_CONFIG] = idempotence
prop[ACKS_CONFIG] = acks.value
prop[RETRIES_CONFIG] = retries
prop[MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION] = requestPerConnection
prop[COMPRESSION_TYPE_CONFIG] = compression.value
prop[LINGER_MS_CONFIG] = linger
prop[BATCH_SIZE_CONFIG] = batchSize.value
return KafkaProducer(prop)
我的Avro模式:
{
"type": "record",
"namespace": "com.rjdesenvolvimento",
"name": "create_client_value",
"doc": "Avro Schema for Kafka Command",
"fields": [
{
"name": "id",
"type": "string",
"logicalType": "uuid",
"doc": "UUID for indentifaction command"
},
{
"name": "status",
"type": {
"name": "status",
"type": "enum",
"symbols": [
"Open",
"Closed",
"Processing"
],
"doc": "Can be only: Open, Closed or Processing"
},
"doc": "Status of the command"
},
{
"name": "message",
"type": {
"type": "record",
"name": "message",
"doc": "Avro Schema for insert new client",
"fields": [
{
"name": "id",
"type": "string",
"logicalType": "uuid",
"doc": "UUID for indentifaction client transaction"
},
{
"name": "active",
"type": "boolean",
"doc": "Soft delete for client"
},
{
"name": "name",
"type": "string",
"doc": "Name of the client"
},
{
"name": "email",
"type": "string",
"doc": "Email of the client"
},
{
"name": "document",
"type": "string",
"doc": "CPF or CPNJ of the client"
},
{
"name": "phones",
"doc": "A list of phone numbers",
"type": {
"type": "array",
"items": {
"name": "phones",
"type": "record",
"fields": [
{
"name": "id",
"type": "string",
"logicalType": "uuid",
"doc": "UUID for indentifaction of phone transaction"
},
{
"name": "active",
"type": "boolean",
"doc": "Soft delete for phone number"
},
{
"name": "number",
"type": "string",
"doc": "The phone number with this regex +xx xx xxxx xxxx"
}
]
}
}
},
{
"name": "address",
"type": "string",
"logicalType": "uuid",
"doc": "Adrres is an UUID for a other address-microservice"
}
]
}
}
]
}
还有我的帖子:
{
"id" : "9ec818da-6ee0-4634-9ed8-c085248cae12",
"status" : "Open",
"message": {
"id" : "9ec818da-6ee0-4634-9ed8-c085248cae12",
"active" : true,
"name": "name",
"email": "email@com",
"document": "document",
"phones": [
{
"id" : "9ec818da-6ee0-4634-9ed8-c085248cae12",
"active" : true,
"number": "+xx xx xxxx xxxx"
},
{
"id" : "9ec818da-6ee0-4634-9ed8-c085248cae12",
"active" : true,
"number": "+xx xx xxxx xxxx"
}
],
"address": "9ec818da-6ee0-4634-9ed8-c085248cae12"
}
}
我做错了什么?github项目:https://github.com/rodrigodevelms/kafka-registry
更新=====
简单地说:我没有使用Gradle Avro插件生成我的类。在本例中,我的帖子发送一个客户机对象。在服务中,它组装一个命令类型对象,如下所示:
id:相同的客户端id
状态: open
message:已发送的POST。
所以我把它发送给Kafka,并在connect(jdbc接收器postgres)中输入字段。只列出消息(客户端)的属性,我不获取命令id或状态。
on github the only classes that matter to understand the code are:
1 -https://github.com/rodrigodevelms/kafka-registry/blob/master/kafka/src/main/kotlin/com/rjdesenvolvimento/messagebroker/producer/Producer.kt
2 - https://github.com/rodrigodevelms/kafka-registry/blob/master/kafka/src/main/kotlin/com/rjdesenvolvimento/messagebroker/commnad/Command.kt
3 - https://github.com/rodrigodevelms/kafka-registry/blob/master/src/client/Controller.kt
4-https://github.com/rodrigodevelms/kafka-registry/blob/master/src/client/Service.kt
5-docker撰写。yml,插入客户价值。avsc,postgresql。json,
如果我将avro方案的兼容模式设置为“无”,我可以发送消息,但会显示一些未知字符,如下图所示。
在此处输入图像描述
我怀疑您正在尝试做多件事,并且在之前的尝试后没有清理状态。您不应该在新安装中遇到该错误
正在注册的架构与早期的架构不兼容
您的数据已发生更改,注册表中的模式与您发送的模式不兼容。
您可以向http://registry:8081/subjects/[name]/
要删除架构的所有版本,可以重新启动连接器
这里是我的docker容器: 我的制作人(用Kolin写成) 我的Avro架构: 状态:打开 消息:发送的邮件。 所以我把它发送给KAFKA,在connect(jdbc sink postgres)中,我只将消息(客户端)的属性作为Fields.whitelist而不获得命令id或状态。 4-https://github.com/rodrigodevelms/kafka-registry/blob
嘿,我想将ConFluent模式注册表与Avro Serializers一起使用:留档现在基本上是说:不要为多个不同的主题使用相同的模式 谁能解释一下原因吗?我重新搜索了源代码,它基本上将模式存储在Kafka主题中,如下所示(topicname,magicbytes,version- 因此,除了冗余之外,我看不到多次使用模式的问题?
使用汇流5.4.1 在将连接的流从连接的KTable转发到另一个主题时,我们碰巧在新的KTable外键连接中遇到了一个问题。 将错误中提到的模式与在模式注册表上注册的模式进行比较,结果完全相同…似乎Kafka 2.4.0中已经出现了类似的问题:https://issues.apache.org/jira/browse/Kafka-9390并且该问题在转发到另一个主题时仍然存在
我有一个docker容器运行AWS弹性容器服务(Fargate)中的confluentinc/cp模式注册表:5.5.0。只有一个容器正在运行。通过该模式注册表获取当前注册模式的API调用正在工作(例如,
我正在尝试从模式注册表中检索给定kafka主题的模式主题版本。我可以使用成功发布新版本,但我不确定如何检索版本。我在下面尝试使用curl请求,但结果立即命中-1(空)。 我如何修复这个GET请求,或者更好的是,我应该如何使用模式注册中心来检索一个模式?
我正在创建一个avro类,它包含一个字符串和一个映射作为字段。我可以通过maven生成avro类,并且能够在localhost:8081中创建注册表