当前位置: 首页 > 知识库问答 >
问题:

Kafka连接:主题显示的事件数是预期的3倍

祁正浩
2023-03-14

我们正在使用Kafka Connect JDBC将表同步到数据库(Debezium非常适合,但不可能)。

Sync通常运行良好,但似乎存储在主题中的事件/消息数量是预期的3倍。

这可能是什么原因?

一些附加信息

目标数据库包含确切的消息数(主题中的消息数/3)。

大多数主题分为3个分区(键通过SMT设置,使用DefaultPartitioner)。

JDBC源连接器

{
  "name": "oracle_source",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
    "connection.url": "jdbc:oracle:thin:@dbdis01.allesklar.de:1521:stg_cdb",
    "connection.user": "****",
    "connection.password": "****",
    "schema.pattern": "BBUCH",
    "topic.prefix": "oracle_",
    "table.whitelist": "cdc_companies, cdc_partners, cdc_categories, cdc_additional_details, cdc_claiming_history, cdc_company_categories, cdc_company_custom_fields, cdc_premium_custom_field_types, cdc_premium_custom_fields, cdc_premiums, cdc, cdc_premium_redirects, intermediate_oz_data, intermediate_oz_mapping",
    "table.types": "VIEW",
    "mode": "timestamp+incrementing",
    "incrementing.column.name": "id",
    "timestamp.column.name": "ts",
    "key.converter": "org.apache.kafka.connect.converters.IntegerConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "validate.non.null": false,
    "numeric.mapping": "best_fit",
    "db.timezone": "Europe/Berlin",
    "transforms":"createKey, extractId, dropTimestamp, deleteTransform",
    "transforms.createKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
    "transforms.createKey.fields": "id",
    "transforms.extractId.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
    "transforms.extractId.field": "id",
    "transforms.dropTimestamp.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
    "transforms.dropTimestamp.blacklist": "ts",
    "transforms.deleteTransform.type": "de.meinestadt.kafka.DeleteTransformation"
  }
}

JDBC接收器连接器

{
  "name": "postgres_sink",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "connection.url": "jdbc:postgresql://writer.branchenbuch.psql.integration.meinestadt.de:5432/branchenbuch",
    "connection.user": "****",
    "connection.password": "****",
    "key.converter": "org.apache.kafka.connect.converters.IntegerConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.schemas.enable": true,
    "insert.mode": "upsert",
    "pk.mode": "record_key",
    "pk.fields": "id",
    "delete.enabled": true,
    "auto.create": true,
    "auto.evolve": true,
    "topics.regex": "oracle_cdc_.*",
    "transforms": "dropPrefix",
    "transforms.dropPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.dropPrefix.regex": "oracle_cdc_(.*)",
    "transforms.dropPrefix.replacement": "$1"
  }
}

奇怪的主题计数

共有1个答案

华凡
2023-03-14

这本身不是一个答案,但在这里比在评论框中更容易格式化。

不清楚你为什么会得到重复的。一些可能性是:

  1. 您有多个连接器实例正在运行
  2. 您有正在运行的连接器实例,但之前已经运行了将相同数据加载到主题的其他实例
  3. 数据来自多个表并被合并到一个主题中(根据您的配置,这里不可能,但如果您使用单消息转换来修改目标主题名称可能是可能的)

在调查方面,我建议:

>

SELECT ROWKEY, COUNT(*) FROM source GROUP BY ROWKEY HAVING COUNT(*) > 1

我在猜测ROWKEY(Kafka消息的键)-您将知道您的数据以及哪些列应该是唯一的,并且可以用于检测重复项。

要了解更多信息,StackOverflow并不是一个合适的平台-我建议前往http://cnfl.io/slack和#连接通道。

 类似资料:
  • 我正在使用debezium SQL Server跟踪生产基地上的更改。创建了主题,CDC的工作非常出色,但是当试图使用jdbcSinkConnector将数据转储到另一个Sql Server DB中时,我遇到了以下错误。 在源数据库上,sql数据类型为。Kafka事件为1549461754650000000。架构类型为Int64。架构名io.debezium.time.nanotimestamp。

  • 问题内容: 在xcode 8 beta中获得新的警告3.此语法出了什么问题,或者xcode中有错误? SwiftyJSON.swift:772:35:期望的’,’连接多子句条件的一部分 问题答案: 似乎已包含此功能: 0099-conditionclauses.md 试试这个:

  • 如何以可伸缩的方式编写连接多个Kafka主题的使用者? 我有一个主题用一个键发布事件,第二个主题用相同的键发布与第一个主题的子集相关的其他事件。我想编写一个订阅这两个主题的使用者,并为出现在这两个主题中的子集执行一些额外的操作。 理想情况下,我需要将主题绑定在一起,以便以相同的方式对它们进行分区,并同步地将分区分配给使用者。我怎么能这么做? 我知道Kafka Streams将主题连接在一起,这样键

  • 我想使用下面的连接器配置将多个表数据发布到同一个Kafka主题,但我看到了下面的异常 例外 原因:io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException:正在注册的架构与早期架构不兼容;错误代码:409 连接器似乎忽略了主题策略属性集,并继续使用旧的${主题}-key和${主题}-value主题。 连

  • 这个包来连接Kafka,之后实现了个连接池的功能,代码如下: 开发完之后,我打算执行交叉编译,编译为Linux下可执行文件,打包脚本如下:(之前使用MQ的时候就没有问题,换成了kafka就报错): 报错如下: