当前位置: 首页 > 知识库问答 >
问题:

KTable外键联接与avro架构注册表不兼容

谢善
2023-03-14

使用汇流5.4.1

在将连接的流从连接的KTable转发到另一个主题时,我们碰巧在新的KTable外键连接中遇到了一个问题。

将错误中提到的模式与在模式注册表上注册的模式进行比较,结果完全相同…似乎Kafka 2.4.0中已经出现了类似的问题:https://issues.apache.org/jira/browse/Kafka-9390并且该问题在转发到另一个主题时仍然存在

Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema being registered is incompatible with an earlier schema io.confluent.kafka.schemaregistry.rest.exceptions.RestIncompatibleAvroSchemaException: Schema being registered is incompatible with an earlier schema
io.confluent.kafka.schemaregistry.rest.exceptions.RestIncompatibleAvroSchemaException: Schema being registered is incompatible with an earlier schema
    at io.confluent.kafka.schemaregistry.rest.exceptions.Errors.incompatibleSchemaException(Errors.java:64)
    at io.confluent.kafka.schemaregistry.rest.resources.SubjectVersionsResource.register(SubjectVersionsResource.java:236)
    at sun.reflect.GeneratedMethodAccessor7.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.glassfish.jersey.server.model.internal.ResourceMethodInvocationHandlerFactory.lambda$static$0(ResourceMethodInvocationHandlerFactory.java:52)
    at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher$1.run(AbstractJavaResourceMethodDispatcher.java:124)
    at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.invoke(AbstractJavaResourceMethodDispatcher.java:167)
    at org.glassfish.jersey.server.model.internal.JavaResourceMethodDispatcherProvider$VoidOutInvoker.doDispatch(JavaResourceMethodDispatcherProvider.java:159)
    at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.dispatch(AbstractJavaResourceMethodDispatcher.java:79)
    at org.glassfish.jersey.server.model.ResourceMethodInvoker.invoke(ResourceMethodInvoker.java:469)
    at org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:391)
    at org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:80)
    at org.glassfish.jersey.server.ServerRuntime$1.run(ServerRuntime.java:253)
    at org.glassfish.jersey.internal.Errors$1.call(Errors.java:248)
    at org.glassfish.jersey.internal.Errors$1.call(Errors.java:244)
    at org.glassfish.jersey.internal.Errors.process(Errors.java:292)
    at org.glassfish.jersey.internal.Errors.process(Errors.java:274)
    at org.glassfish.jersey.internal.Errors.process(Errors.java:244)
    at org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:265)
    at org.glassfish.jersey.server.ServerRuntime.process(ServerRuntime.java:232)
    at org.glassfish.jersey.server.ApplicationHandler.handle(ApplicationHandler.java:679)
    at org.glassfish.jersey.servlet.WebComponent.serviceImpl(WebComponent.java:392)
    at org.glassfish.jersey.servlet.ServletContainer.serviceImpl(ServletContainer.java:385)
    at org.glassfish.jersey.servlet.ServletContainer.doFilter(ServletContainer.java:560)
    at org.glassfish.jersey.servlet.ServletContainer.doFilter(ServletContainer.java:501)
    at org.glassfish.jersey.servlet.ServletContainer.doFilter(ServletContainer.java:438)
    at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1591)
    at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:542)
    at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:233)
    at org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:1581)
    at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:233)
    at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1307)
    at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:188)
    at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:482)
    at org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:1549)
    at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:186)
    at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1204)
    at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
    at org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:146)
    at org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:146)
    at org.eclipse.jetty.server.handler.StatisticsHandler.handle(StatisticsHandler.java:173)
    at org.eclipse.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:221)
    at org.eclipse.jetty.server.handler.gzip.GzipHandler.handle(GzipHandler.java:772)
    at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:127)
    at org.eclipse.jetty.server.Server.handle(Server.java:494)
    at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:374)
    at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:268)
    at org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:311)
    at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:103)
    at org.eclipse.jetty.io.ChannelEndPoint$2.run(ChannelEndPoint.java:117)
    at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.runTask(EatWhatYouKill.java:336)
    at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.doProduce(EatWhatYouKill.java:313)
    at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.tryProduce(EatWhatYouKill.java:171)
    at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.run(EatWhatYouKill.java:129)
    at org.eclipse.jetty.util.thread.ReservedThreadExecutor$ReservedThread.run(ReservedThreadExecutor.java:367)
    at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:782)
    at org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:918)
    at java.lang.Thread.run(Thread.java:748)
Caused by: io.confluent.kafka.schemaregistry.exceptions.IncompatibleSchemaException: New schema is incompatible with an earlier schema.
    at io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry.register(KafkaSchemaRegistry.java:432)
    at io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry.registerOrForward(KafkaSchemaRegistry.java:481)
    at io.confluent.kafka.schemaregistry.rest.resources.SubjectVersionsResource.register(SubjectVersionsResource.java:222)
    ... 57 more
; error code: 409
    at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:272) ~[kafka-schema-registry-client-5.4.1.jar:na]
    at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:331) ~[kafka-schema-registry-client-5.4.1.jar:na]
    at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:431) ~[kafka-schema-registry-client-5.4.1.jar:na]
    at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:423) ~[kafka-schema-registry-client-5.4.1.jar:na]
    at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:409) ~[kafka-schema-registry-client-5.4.1.jar:na]
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:140) ~[kafka-schema-registry-client-5.4.1.jar:na]
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:196) ~[kafka-schema-registry-client-5.4.1.jar:na]
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:172) ~[kafka-schema-registry-client-5.4.1.jar:na]
    at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:71) ~[kafka-avro-serializer-5.4.1.jar:na]
    at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:53) ~[kafka-avro-serializer-5.4.1.jar:na]
    at io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer.serialize(SpecificAvroSerializer.java:65) ~[kafka-streams-avro-serde-5.4.1.jar:na]
    at io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer.serialize(SpecificAvroSerializer.java:38) ~[kafka-streams-avro-serde-5.4.1.jar:na]
    at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62) ~[kafka-clients-5.4.1-ccs.jar:na]
    at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:166) ~[kafka-streams-5.4.1-ccs.jar:na]
    at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:106) ~[kafka-streams-5.4.1-ccs.jar:na]
    at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:89) ~[kafka-streams-5.4.1-ccs.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201) ~[kafka-streams-5.4.1-ccs.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180) ~[kafka-streams-5.4.1-ccs.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133) ~[kafka-streams-5.4.1-ccs.jar:na]
    at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41) ~[kafka-streams-5.4.1-ccs.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:118) ~[kafka-streams-5.4.1-ccs.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201) ~[kafka-streams-5.4.1-ccs.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180) ~[kafka-streams-5.4.1-ccs.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133) ~[kafka-streams-5.4.1-ccs.jar:na]
    at org.apache.kafka.streams.kstream.internals.KTableSource$KTableSourceProcessor.process(KTableSource.java:124) ~[kafka-streams-5.4.1-ccs.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:118) ~[kafka-streams-5.4.1-ccs.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201) ~[kafka-streams-5.4.1-ccs.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180) ~[kafka-streams-5.4.1-ccs.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133) ~[kafka-streams-5.4.1-ccs.jar:na]
    at org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionResolverJoinProcessorSupplier$1.process(SubscriptionResolverJoinProcessorSupplier.java:107) ~[kafka-streams-5.4.1-ccs.jar:na]
    at org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionResolverJoinProcessorSupplier$1.process(SubscriptionResolverJoinProcessorSupplier.java:60) ~[kafka-streams-5.4.1-ccs.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:118) ~[kafka-streams-5.4.1-ccs.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201) ~[kafka-streams-5.4.1-ccs.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180) ~[kafka-streams-5.4.1-ccs.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133) ~[kafka-streams-5.4.1-ccs.jar:na]
    at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:87) ~[kafka-streams-5.4.1-ccs.jar:na]
    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:432) ~[kafka-streams-5.4.1-ccs.jar:na]
    at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:474) ~[kafka-streams-5.4.1-ccs.jar:na]
    at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:536) [kafka-streams-5.4.1-ccs.jar:na]
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:792) [kafka-streams-5.4.1-ccs.jar:na]
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:698) [kafka-streams-5.4.1-ccs.jar:na]
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671) [kafka-streams-5.4.1-ccs.jar:na]

共有1个答案

匡安宜
2023-03-14

我的错,这个问题是因为先前通过另一个编译模块为同一主题注册的模式不同...错误不够明显,没有从“to”中提到主题,而是提到了一个内部主题,这导致了混淆。这不再是一个问题。

 类似资料:
  • 我试图使用Avro模式向我的经纪人发送消息,但“我总是收到错误: 2020-02-01 11:24:37.189[nioEventLoopGroup-4-1]错误应用程序-未经处理:POST-/api/orchestration/org。阿帕奇。Kafka。常见的错误。SerializationException:注册Avro架构时出错:io导致“字符串”。汇合的。Kafka。阴谋论。客户Rest

  • 这里是我的docker容器: 我的制作人(用Kolin写成) 我的Avro架构: 状态:打开 消息:发送的邮件。 所以我把它发送给KAFKA,在connect(jdbc sink postgres)中,我只将消息(客户端)的属性作为Fields.whitelist而不获得命令id或状态。 4-https://github.com/rodrigodevelms/kafka-registry/blob

  • 嘿,我想将ConFluent模式注册表与Avro Serializers一起使用:留档现在基本上是说:不要为多个不同的主题使用相同的模式 谁能解释一下原因吗?我重新搜索了源代码,它基本上将模式存储在Kafka主题中,如下所示(topicname,magicbytes,version- 因此,除了冗余之外,我看不到多次使用模式的问题?

  • Confluent网站的留档提到以下内容: 左侧KTable可以有多条记录,这些记录映射到右侧KTable上的同一个键。如果右KTable中存在相应的键,则对单个左KTable条目的更新可能会导致单个输出事件。因此,对右KTable条目的单个更新将导致对左KTable中具有相同外键的每个记录进行更新。 查看下面的示例说明: 根据解释,如果是内部联接,则右侧的应该触发左侧的两条记录,这两条记录将被添

  • 我很困惑,如果模式注册中心不能演化模式升级/更改,那么我为什么要使用模式注册中心,或者说我为什么要使用AVRO?

  • 我们在当前的基础架构中安装了普通的apache Kafka,并开始记录一些我们想要使用Kafka Connect处理的数据。目前,我们使用Avro作为消息格式,但我们的基础架构中没有模式注册表。将来,我们计划用Confluent替换当前堆栈,并使用Schema Registry和Connect,但在一段时间内,我们只需要为此部署Connect。 是否可以以某种方式配置连接接收器,以便它们使用显式a