当前位置: 首页 > 知识库问答 >
问题:

Kafka连接不使用主题策略

衡玄裳
2023-03-14

上下文

我编写了几个小的Kafka Connect连接器。一个每秒只生成随机数据,另一个将其记录在控制台中。它们与模式注册表集成,因此数据使用Avro序列化。

我使用Landoop提供的fast data dev Docker映像将它们部署到本地Kafka环境中

基本设置工作,并每秒生成一条记录的消息

但是,我想更改主题名称策略。默认设置生成两个主题:

  • ${topic}-键
  • ${topic}-值

根据我的用例,我将需要生成具有不同模式的事件,这些模式将以同一主题结束。因此,我需要的对象名称是:

  • ${topic}-${keyRecordName}
  • ${topic}-${valueRecordName}

根据文件,我的需求符合TopicRecordNameStrategy

我尝试过什么

我创建了avroData对象,用于将值发送到连接:

class SampleSourceConnectorTask : SourceTask() {

    private lateinit var avroData: AvroData 

    override fun start(props: Map<String, String>) {
        [...]
        avroData = AvroData(AvroDataConfig(props))
    }

并在之后使用它来创建SourceRecord响应对象

文档指出,为了在Kafka Connect中使用模式注册表,我必须在连接器配置中设置一些属性。因此,当我创建它时,我会添加它们:

name=SampleSourceConnector
connector.class=[...]
tasks.max=1
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
key.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
value.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy

问题

连接器似乎忽略了这些属性,并继续使用旧的主题。

问题

Kafka连接应该支持不同的主题策略。我通过编写自己版本的AvroConverter来解决这个问题,并硬编码主题策略是我需要的。然而,这看起来不是一个好方法,在尝试使用接收器Kafka连接器消耗数据时也带来了问题。我复制了主题,所以有一个旧名称(${topic}-key)的版本,它可以工作

将主题策略指定给Kafka Connect的正确设置是什么?


共有1个答案

桂志新
2023-03-14
匿名用户

您缺少键。转换器和值。converter前缀,用于将配置传递给converter。因此,不是:

key.subject.name.strategy
value.subject.name.strategy

您需要:

key.converter.key.subject.name.strategy
value.converter.value.subject.name.strategy

来源https://docs.confluent.io/current/connect/managing/configuring.html

要将配置参数传递给键和值转换器,请在它们前面加上key.converter.value.converter.,就像在工作配置中定义默认转换器时一样。请注意,这些仅在key.convertervalue.converter属性中指定了相应的转换器配置时使用。

 类似资料:
  • 我想使用下面的连接器配置将多个表数据发布到同一个Kafka主题,但我看到了下面的异常 例外 原因:io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException:正在注册的架构与早期架构不兼容;错误代码:409 连接器似乎忽略了主题策略属性集,并继续使用旧的${主题}-key和${主题}-value主题。 连

  • 如何以可伸缩的方式编写连接多个Kafka主题的使用者? 我有一个主题用一个键发布事件,第二个主题用相同的键发布与第一个主题的子集相关的其他事件。我想编写一个订阅这两个主题的使用者,并为出现在这两个主题中的子集执行一些额外的操作。 理想情况下,我需要将主题绑定在一起,以便以相同的方式对它们进行分区,并同步地将分区分配给使用者。我怎么能这么做? 我知道Kafka Streams将主题连接在一起,这样键

  • 这是我的kafka连接器属性 这是我用来创建Elasticsearch水槽的POST主体 我遇到的问题是,有时这个接收器会工作并将数据发送到Elasticsearch并显示 〔2020-09-15 20:27:05904〕INFO WorkerLinkTask{id=test-distributed-connector-0}使用序列号1异步提交偏移。。。。。。。 但大多数时候,它只会卡住并重复这一

  • 我有一个服务器a,在服务器a中,我安装了kafka并启动了kafka和Zookeeper。我还创建了一个主题作为my_topic。现在我有一个应用程序B运行在服务器B中,应用程序B有一些数据,我想把这些数据推送到服务器A中的my_topic。我是否也需要在服务器B中安装kafka并在服务器B中创建一个生产者?如果是,如何将来自服务器B的消息推送到服务器A中的主题?介质是什么?

  • 我有一个批处理作业,它将一天触发一次。要求是 使用该时间点上关于Kafka主题的所有可用消息 处理消息 如果进程已成功完成,则提交偏移量。 当前,我poll()while循环中的消息,直到ConsumerRecords.isEmpty()为true。当ConsumerRecords.isEmpty()为true时,我假设Topic在该时间点的所有可用记录都已被使用。应用程序维护偏移量并关闭kafk

  • 我在CentOS7(confluent)上安装了Apache Kafka,正试图以分布式模式运行filestream Kafka connect,但收到以下错误: 现在可以通过更新workers.properties(如http://docs.confluent.io/current/connect/userguide.html#connect-userguide-distributed-conf