当前位置: 首页 > 知识库问答 >
问题:

Kafka源和接收器连接器不适用于大数据

谈桐
2023-03-14

我正在使用Kafka源和接收器连接器创建一个数据管道。源连接器从SQL数据库消费并发布到主题,而接收器连接器订阅主题并放入其他SQL数据库。表有16 GB的数据。现在的问题是,数据不能从一个数据库传输到另一个数据库。但是,如果表的大小很小,比如1000行,那么数据可以成功传输。

源连接器配置:

"config": {
       "connector.class": 
"io.confluent.connect.jdbc.JdbcSourceConnector",
       "tasks.max": "1",
       "connection.url": "",
       "mode": "incrementing",
       "incrementing.column.name": "ID",
       "topic.prefix": "migration_",
       "name": "jdbc-source",
       "validate.non.null": false,
       "batch.max.rows":5
     }

源连接器日志:

INFO WorkerSourceTask{id=cmc-migration-source-0} flushing 0 outstanding messages for offset commit 
[2019-03-08 16:48:45,402] INFO WorkerSourceTask{id=cmc-migration-source-0} Committing offsets
[2019-03-08 16:48:45,402] INFO WorkerSourceTask{id=cmc-migration-source-0} flushing 0 outstanding messages for offset commit
[2019-03-08 16:48:55,403] INFO WorkerSourceTask{id=cmc-migration-source-0} Committing offsets(org.apache.kafka.connect.runtime.WorkerSourceTask:397)

有人能指导我如何调整Kafka源连接器以传输大数据吗?

共有1个答案

公孙弘深
2023-03-14

我通过限制单个查询中返回到数据库的记录数量(例如一次5000条),成功地解决了这个问题。

解决方案将取决于数据库和SQL方言。下面的例子将为单个表正确地工作和管理偏移量。增量ID列和时间戳必须按照此处指定的说明进行设置:https://docs . confluent . io/Kafka-connect-JDBC/current/source-connector/index . html # incremental-query-modes

示例表< code>myTable包含以下各列:

    < li> id在每次添加新记录时递增 < Li > < code > lastUpdatedTimestamp -每次记录更新时更新 < li >其他一些属性

idlastUpdateTimestamp 必须唯一标识数据集中的记录。

连接器构造查询如下:

config.query选定模式的Kafka Connect WHere子句config.query.suffix

PostgreSQL / MySQL

"config": {
    ...
    "poll.interval.ms" : 10000,
    "mode":"timestamp+incrementing",
    "incrementing.column.name": "id",
    "timestamp.column.name": "lastUpdatedTimestamp",
    "table.whitelist": "myTable",
    "query.suffix": "LIMIT 5000"
    ...
    }

将导致:

SELECT *
FROM "myTable"
WHERE "myTable"."lastUpdatedTimestamp" < ?
    AND (
        ("myTable"."lastUpdatedTimestamp" = ? AND "myTable"."id" > ?)
        OR
        "myTable"."lastUpdatedTimestamp" > ?
        )
ORDER BY
    "myTable"."lastUpdatedTimestamp",
    "myTable"."id" ASC
LIMIT 5000

如果要在WHERE子句中添加附加条件,请执行以下操作。

"config": {
    ...
    "poll.interval.ms" : 10000,
    "mode":"timestamp+incrementing",
    "incrementing.column.name": "id",
    "timestamp.column.name": "lastUpdatedTimestamp",
    "query": "SELECT * FROM ( SELECT id, lastUpdatedTimestamp, name, age FROM myTable WHERE Age > 18) myQuery",
    "query.suffix": "LIMIT 5000"
    ...
    }

将导致:

SELECT *
FROM (
    SELECT id, lastUpdatedTimestamp, name, age
    FROM myTable
    WHERE Age > 18
    ) myQuery
WHERE "myTable"."lastUpdatedTimestamp" < ?
    AND (
        ("myTable"."lastUpdatedTimestamp" = ? AND "myTable"."id" > ?)
        OR
        "myTable"."lastUpdatedTimestamp" > ?
        )
ORDER BY
    "myTable"."lastUpdatedTimestamp",
    "myTable"."id" ASC
LIMIT 5000

SQL Server

json prettyprint-override">"config": {
    ...
    "poll.interval.ms" : 10000,
    "mode":"timestamp+incrementing",
    "incrementing.column.name": "id",
    "timestamp.column.name": "lastUpdatedTimestamp",
    "query": "SELECT TOP 5000 * FROM (SELECT id, lastUpdatedTimestamp, name, age FROM myTable) myQuery",
    ...
    }

将导致:

SELECT TOP 5000 *
FROM (
    SELECT id, lastUpdatedTimestamp, name, age
    FROM myTable
    WHERE Age > 18
    ) myQuery
WHERE "myTable"."lastUpdatedTimestamp" < ?
    AND (
        ("myTable"."lastUpdatedTimestamp" = ? AND "myTable"."id" > ?)
        OR
        "myTable"."lastUpdatedTimestamp" > ?
        )
ORDER BY
    "myTable"."lastUpdatedTimestamp",
    "myTable"."id" ASC

神谕

"config": {
    ...
    "poll.interval.ms" : 10000,
    "mode":"timestamp+incrementing",
    "incrementing.column.name": "id",
    "timestamp.column.name": "lastUpdatedTimestamp",
    "query": "SELECT * FROM (SELECT id, lastUpdatedTimestamp, name, age FROM myTable WHERE ROWNUM <= 5000) myQuery",
    ...
    }

将导致:

SELECT *
FROM (
    SELECT id, lastUpdatedTimestamp, name, age
    FROM myTable
    WHERE ROWNUM <= 5000
    ) myQuery
WHERE "myTable"."lastUpdatedTimestamp" < ?
    AND (
        ("myTable"."lastUpdatedTimestamp" = ? AND "myTable"."id" > ?)
        OR
        "myTable"."lastUpdatedTimestamp" > ?
        )
ORDER BY
    "myTable"."lastUpdatedTimestamp",
    "myTable"."id" ASC

此方法不适用于批量模式。它适用于时间戳递增模式,也可能适用于时间戳递增模式,具体取决于表特征。

加入许多表格-我还没有测试过的想法!

如果查询跨多个表执行连接,情况会变得更加复杂。这将需要以下条件:

  • 提供唯一标识行的id,例如连接tableA.idtableB.id
  • 提供最近更新的表记录的时间戳,以最新的为准
  • 适当订购查询记录

由于 Kafka Connect 使用的 Java long 数据类型,ID 的长度有限制,即 9,223,372,036,854,775,807

 类似资料:
  • 我正在尝试将来自主题的数据(json数据)写入MySql数据库。我想我需要一个JDBC接收器连接器。 我如何配置连接器以将主题中的json数据映射到如何将数据插入数据库。 我能找到的文件只有这个。 “接收器连接器需要了解架构,因此您应该使用合适的转换器,例如架构注册表附带的Avro转换器,或启用了架构的JSON转换器。如果存在Kafka记录键,则可以是基元类型或连接结构,记录值必须是连接结构。从连

  • 我有一个Kafka连接接收器记录从Kafka主题到S3。它在工作,但太慢了。Kafka主题每秒接收约30000条消息。连接接收器无法跟上。我已经尝试增加Kafka连接器的任务。最大值从1到3,这会创建更多任务,但这似乎无助于提高消息/秒的速度。我试着增加Kafka连接工人的CPU分配,这似乎也没有帮助。 我还能试什么?哪些指标有助于监控以进一步识别瓶颈? 更新:Kafka主题有5个分区。Kafka

  • 我已经使用Kafka的汇流本地集群为Kaffa和m安装了Aerospike所需的所有配置,并已安装https://www.confluent.io/hub/aerospike/kafka-connect-aerospike-source并已开始汇流群集,但连接器仍未启动 我还发现合流的共享文件夹中没有jar,它还在开发中吗?

  • 我有一个需求,即我们应用程序之外的源将在S3存储桶中放置一个文件,我们必须在kafka主题中加载该文件。我正在查看ConFluent的S3 Source连接器,目前正在努力定义在我们的环境中设置连接器的配置。但是有几篇文章指出,只有在您使用S3 Sink连接器将文件放在S3中时,才能使用S3 Source连接器。 以上是真的吗?在配置中,我在哪里/使用什么属性来定义输出主题?当阅读S3的文章并把它

  • 在Kafka中有没有办法使用XML源并将其转换为JSON,然后将JSON数据发送给Kafka进行接收? 我在《Kafka连接》中见过Avro和Protobuf作为转换器?他们能够将XML转换为JSON吗?或者他们会转换为AVRO、Protobuf特定格式而不是JSON?

  • 我有一个kafka主题,有200万条消息,我的刷新大小是100000,默认分区为分布式模式,有4个工作者,我可以看到数据在几秒钟内立即写入HDFS(10到15秒)。 我看到创建了一个+tmp目录和文件夹,并且每次触发一个新连接器时都会创建主题。 kafka connect的行为是每次都写得这么快,还是已经将数据存储在HDFS中,并根据连接器属性将其移动到主题目录? 我需要清楚这是怎么发生的。如果我