当前位置: 首页 > 知识库问答 >
问题:

Kafka连接忽略指定的主题策略

和嘉澍
2023-03-14

我想使用下面的连接器配置将多个表数据发布到同一个Kafka主题,但我看到了下面的异常

例外

原因:io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException:正在注册的架构与早期架构不兼容;错误代码:409

连接器似乎忽略了主题策略属性集,并继续使用旧的${主题}-key和${主题}-value主题。

[2019-04-25 22:43:45,590] INFO AvroConverterConfig values: 
    schema.registry.url = [http://schema-registry:8081]
    basic.auth.user.info = [hidden]
    auto.register.schemas = true
    max.schemas.per.subject = 1000
    basic.auth.credentials.source = URL
    schema.registry.basic.auth.user.info = [hidden]
    value.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
    key.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy

连接器配置

    curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{
      "name": "two-in-one-connector",
      "config": {
        "connector.class": "io.debezium.connector.mysql.MySqlConnector",
        "tasks.max": "1",
        "database.hostname": "xxxxxxx",
        "database.port": "3306",
        "database.user": "xxxxxxx",
        "database.password": "xxxxxxxxx",
        "database.server.id": "18405457",
        "database.server.name": "xxxxxxxxxx",
        "table.whitelist": "customers,phone_book",
        "database.history.kafka.bootstrap.servers": "broker:9092",
        "database.history.kafka.topic": "dbhistory.customer",
        "transforms": "dropPrefix",
        "transforms.dropPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter",
        "transforms.dropPrefix.regex":"(.*)",
        "transforms.dropPrefix.replacement":"customer",
        "key.converter.key.subject.name.strategy": "io.confluent.kafka.serializers.subject.TopicRecordNameStrategy",
        "value.converter.value.subject.name.strategy": "io.confluent.kafka.serializers.subject.TopicRecordNameStrategy"
      }
    }'

共有1个答案

公良理
2023-03-14

尝试在连接器配置(JSON)文件中将策略类设置为低于参数,而不是“key.converter.key.subject.name.strategy”和“value.converter.value.subject.name.strategy”

"key.subject.name.strategy""value.subject.name.strategy"

 类似资料:
  • 上下文 我编写了几个小的Kafka Connect连接器。一个每秒只生成随机数据,另一个将其记录在控制台中。它们与模式注册表集成,因此数据使用Avro序列化。 我使用Landoop提供的fast data dev Docker映像将它们部署到本地Kafka环境中 基本设置工作,并每秒生成一条记录的消息 但是,我想更改主题名称策略。默认设置生成两个主题:

  • 我在 AWS S3 中备份了以下文件,这些文件由 Kafka 连接接收器连接器备份: 当使用Kafka connect S3源恢复主题时,密钥文件被忽略,我在日志中看到以下调试消息: 我的源配置如下所示: 我应该做什么改变才能让密钥和消息一起存储在Kafka中?

  • 我正在尝试设置Debezium MySQL源连接器。我的目标是为每个数据库提供一个主题,因此我正在研究利用主题的可能性,以使主题可以包含不同的消息类型并且它们的模式可以存储在ConFluent Schema注册表中。 根据这里的几个答案,我将键和值转换器主题名策略设置为io.confluent.kafka.serializers.subject.TopicRecordNameStrategy。 要

  • 我正在实验Kafka流,我有以下设置: null 有什么方法可以让我的KTable从我的主题中“继承”保留策略吗?这样当记录从主主题过期时,它们在KTable中就不再可用了? 我担心将所有记录转储到KTable中,并使StateStore无限增长。 我能想到的一个解决方案是转换成一个窗口流,其跳跃窗口等于记录的TimeToLive,但我想知道是否有更好的解决方案,以更原生的方式。 多谢了。

  • 我正在尝试使用以下命令从现有的PostgreSQL数据库生成模型: 它通过尝试连接到主机来“工作”一次,但由于无法访问端口5432而失败。 我修复了网络问题,现在端口已打开,但连接字符串中的< code>Host被忽略,我得到的错误是: 28000:无pg_hba。主机“my.public.ip.address”、用户“postgres”、数据库“my-db”的conf条目,SSL关闭 我不知道为

  • 主要内容:连接策略示例在连接策略中,为每个实体类生成一个单独的表。 每个表的属性都与主键连接。 它消除了字段字重复的可能性。 以下语法表示连接的策略: - 连接策略示例 在这个例子中,我们将员工分为活跃员工和退休员工。 因此,子类和继承父类的和字段。 现在,按照以下步骤创建JPA项目 - 第1步: 在包下创建一个根实体类并指定所有必需的属性和注释。 文件:Employee.java - 第2步: 在包下创建实体类(它是