当前位置: 首页 > 知识库问答 >
问题:

Kafka Connect从同一主题导出多个事件类型

许出野
2023-03-14

我正在尝试使用一个新的特性(https://www.confluent.io/blog/put-moulation-event-types-kafka-topic/)来存储同一主题上的两种不同类型的事件。实际上,我正在使用Confluent版本4.1.0并设置下面的属性来实现这一点

properties.put(KafkaAvroSerializerConfig.VALUE_SUBJECT_NAME_STRATEGY,TopicRecordNameStrategy.class.getName());
properties.put("value.multi.type", true);

数据被写入主题而没有问题,并且可以从Kafka Streams应用程序中看到为通用的Avro记录。另外,在Kafka模式注册表中,会创建两个新条目,一个用于承载在特定主题上的每个事件。

我面临的问题是,我无法使用Kafka Connect从这个主题导出这些数据。在最简单的情况下,当我使用文件接收器连接器时,如下所示

{
  "name": "sink-connector",
  "config": {
      "topics": "source-topic",
      "connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
      "tasks.max": 1,
      "key.converter": "org.apache.kafka.connect.storage.StringConverter",
      "key.converter.schema.registry.url":"http://kafka-schema-registry:8081",
      "value.converter":"io.confluent.connect.avro.AvroConverter",
      "value.converter.schema.registry.url":"http://kafka-schema-registry:8081",
      "value.subject.name.strategy":"io.confluent.kafka.serializers.subject.TopicRecordNameStrategy",
      "file": "/tmp/sink-file.txt"
    }
}

我从连接器得到一个错误,它似乎是基于AvroConverter的某种序列化错误,如图所示

org.apache.kafka.connect.errors.DataException: source-topic
    at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:95)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:468)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:301)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:205)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:173)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema for id 2
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Subject not found.; error code: 40401
    at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:202)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:229)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.lookUpSubjectVersion(RestService.java:296)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.lookUpSubjectVersion(RestService.java:284)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getVersionFromRegistry(CachedSchemaRegistryClient.java:125)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getVersion(CachedSchemaRegistryClient.java:236)
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:152)
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaAvroDeserializer.java:194)
    at io.confluent.connect.avro.AvroConverter$Deserializer.deserialize(AvroConverter.java:120)
    at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:83)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:468)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:301)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:205)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:173)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

请注意,schema registry有一个id为2的Avro模式和另一个id为3的模式,它们描述了托管在同一主题上的两个事件。使用JDBC连接器时也会出现同样的问题。

那么我如何处理这个案例,以便将数据从我的Kafka集群导出到外部系统。是不是我的配置上遗漏了什么?有可能有一个主题与多种类型的事件,并导出他们通过Kafka连接?

共有1个答案

柴英锐
2023-03-14

找到解决办法了。我的代码是将键作为字符串传递,将值作为avro传递。读取时配置单元接收器尝试查找密钥得avro架构,但未能找到它.添加属性key.converter=org.apache.kafka.connect.storage.StringConverter key.converter.schema.registry.url=http://localhost:8081有助于解决此问题。

 类似资料:
  • 我正在使用AVRO文件来生成和使用消息,我想知道我是否可以使用相同的AVRO文件来处理多个主题,例如,在生产者中使用不同的“name”属性,在消费者中使用特定的“name”属性。 多谢.斯特凡诺

  • 在构建Kafka Streams拓扑时,可以通过两种不同的方式对多个主题的读取进行建模: 读取具有相同源节点的所有主题。 选项1相对于选项2是否有相对优势,反之亦然?所有主题都包含相同类型的数据,并具有相同的数据处理逻辑。

  • 我们有一个Kafka主题,有源源不断的数据。为了处理它,我们有一个无状态的Flink管道,它使用该主题并写入另一个主题。 我们是不是漏掉了什么?我们误会什么了吗?有没有更好的解决办法? 谢了!

  • 问题内容: 目前,我在自己的文件中有4个子类。我要求它们都在同一个文件中。我想知道是否可以在一个模块中包含所有这四个类。目前,我正在像这样导入它们 我想这样导入 甚至 我的班级定义如下 问题答案: 您可以这样导出多个类: 例如 People.js 并按照您正确提到的方式访问这些类:

  • 我有一个大型java应用程序,在不同的类中有5个主要方法。我想将此应用程序作为docker容器运行。从DockerHub OpenJDK映像中,我启动了Dockerfile,如下所示 我想添加行来运行主要方法。没有Docker,我使用下面的行运行应用程序 是否可以在一个docker容器中运行上述场景?如果可能,当Dockerfile中只能有一条和指令时,如何实现这一点?

  • 我正在使用Spring Cloud Stream Kafka Binder。我有以下Kafka活页夹函数。 在yml中,我有: 如果我想从同一个功能向两个不同的主题发送数据,我需要做什么?