我正在使用Kafka源和接收器连接器创建一个数据管道。源连接器从SQL数据库消费并发布到主题,而接收器连接器订阅主题并放入其他SQL数据库。表有16 GB的数据。现在的问题是,数据不能从一个数据库传输到另一个数据库。但是,如果表的大小很小,比如1000行,那么数据可以成功传输。
源连接器配置:
"config": {
"connector.class":
"io.confluent.connect.jdbc.JdbcSourceConnector",
"tasks.max": "1",
"connection.url": "",
"mode": "incrementing",
"incrementing.column.name": "ID",
"topic.prefix": "migration_",
"name": "jdbc-source",
"validate.non.null": false,
"batch.max.rows":5
}
源连接器日志:
INFO WorkerSourceTask{id=cmc-migration-source-0} flushing 0 outstanding messages for offset commit
[2019-03-08 16:48:45,402] INFO WorkerSourceTask{id=cmc-migration-source-0} Committing offsets
[2019-03-08 16:48:45,402] INFO WorkerSourceTask{id=cmc-migration-source-0} flushing 0 outstanding messages for offset commit
[2019-03-08 16:48:55,403] INFO WorkerSourceTask{id=cmc-migration-source-0} Committing offsets(org.apache.kafka.connect.runtime.WorkerSourceTask:397)
有人能指导我如何调整Kafka源连接器以传输大数据吗?
我通过限制单个查询中返回到数据库的记录数量(例如一次5000条),成功地解决了这个问题。
解决方案将取决于数据库和SQL方言。下面的例子将为单个表正确地工作和管理偏移量。增量ID列和时间戳必须按照此处指定的说明进行设置:https://docs . confluent . io/Kafka-connect-JDBC/current/source-connector/index . html # incremental-query-modes
示例表< code>myTable包含以下各列:
id
在每次添加新记录时递增 < Li > < code > lastUpdatedTimestamp -每次记录更新时更新 < li >其他一些属性
id
和 lastUpdateTimestamp
必须唯一标识数据集中的记录。
连接器构造查询如下:
config.query
选定模式的Kafka Connect WHere子句
config.query.suffix
PostgreSQL / MySQL
"config": {
...
"poll.interval.ms" : 10000,
"mode":"timestamp+incrementing",
"incrementing.column.name": "id",
"timestamp.column.name": "lastUpdatedTimestamp",
"table.whitelist": "myTable",
"query.suffix": "LIMIT 5000"
...
}
将导致:
SELECT *
FROM "myTable"
WHERE "myTable"."lastUpdatedTimestamp" < ?
AND (
("myTable"."lastUpdatedTimestamp" = ? AND "myTable"."id" > ?)
OR
"myTable"."lastUpdatedTimestamp" > ?
)
ORDER BY
"myTable"."lastUpdatedTimestamp",
"myTable"."id" ASC
LIMIT 5000
如果要在WHERE子句中添加附加条件,请执行以下操作。
"config": {
...
"poll.interval.ms" : 10000,
"mode":"timestamp+incrementing",
"incrementing.column.name": "id",
"timestamp.column.name": "lastUpdatedTimestamp",
"query": "SELECT * FROM ( SELECT id, lastUpdatedTimestamp, name, age FROM myTable WHERE Age > 18) myQuery",
"query.suffix": "LIMIT 5000"
...
}
将导致:
SELECT *
FROM (
SELECT id, lastUpdatedTimestamp, name, age
FROM myTable
WHERE Age > 18
) myQuery
WHERE "myTable"."lastUpdatedTimestamp" < ?
AND (
("myTable"."lastUpdatedTimestamp" = ? AND "myTable"."id" > ?)
OR
"myTable"."lastUpdatedTimestamp" > ?
)
ORDER BY
"myTable"."lastUpdatedTimestamp",
"myTable"."id" ASC
LIMIT 5000
SQL Server
json prettyprint-override">"config": {
...
"poll.interval.ms" : 10000,
"mode":"timestamp+incrementing",
"incrementing.column.name": "id",
"timestamp.column.name": "lastUpdatedTimestamp",
"query": "SELECT TOP 5000 * FROM (SELECT id, lastUpdatedTimestamp, name, age FROM myTable) myQuery",
...
}
将导致:
SELECT TOP 5000 *
FROM (
SELECT id, lastUpdatedTimestamp, name, age
FROM myTable
WHERE Age > 18
) myQuery
WHERE "myTable"."lastUpdatedTimestamp" < ?
AND (
("myTable"."lastUpdatedTimestamp" = ? AND "myTable"."id" > ?)
OR
"myTable"."lastUpdatedTimestamp" > ?
)
ORDER BY
"myTable"."lastUpdatedTimestamp",
"myTable"."id" ASC
神谕
"config": {
...
"poll.interval.ms" : 10000,
"mode":"timestamp+incrementing",
"incrementing.column.name": "id",
"timestamp.column.name": "lastUpdatedTimestamp",
"query": "SELECT * FROM (SELECT id, lastUpdatedTimestamp, name, age FROM myTable WHERE ROWNUM <= 5000) myQuery",
...
}
将导致:
SELECT *
FROM (
SELECT id, lastUpdatedTimestamp, name, age
FROM myTable
WHERE ROWNUM <= 5000
) myQuery
WHERE "myTable"."lastUpdatedTimestamp" < ?
AND (
("myTable"."lastUpdatedTimestamp" = ? AND "myTable"."id" > ?)
OR
"myTable"."lastUpdatedTimestamp" > ?
)
ORDER BY
"myTable"."lastUpdatedTimestamp",
"myTable"."id" ASC
此方法不适用于批量
模式。它适用于时间戳递增
模式,也可能适用于时间戳
或递增
模式,具体取决于表特征。
加入许多表格-我还没有测试过的想法!
如果查询跨多个表执行连接,情况会变得更加复杂。这将需要以下条件:
由于 Kafka Connect 使用的 Java long 数据类型,ID 的长度有限制,即 9,223,372,036,854,775,807
我正在尝试将来自主题的数据(json数据)写入MySql数据库。我想我需要一个JDBC接收器连接器。 我如何配置连接器以将主题中的json数据映射到如何将数据插入数据库。 我能找到的文件只有这个。 “接收器连接器需要了解架构,因此您应该使用合适的转换器,例如架构注册表附带的Avro转换器,或启用了架构的JSON转换器。如果存在Kafka记录键,则可以是基元类型或连接结构,记录值必须是连接结构。从连
我有一个Kafka连接接收器记录从Kafka主题到S3。它在工作,但太慢了。Kafka主题每秒接收约30000条消息。连接接收器无法跟上。我已经尝试增加Kafka连接器的任务。最大值从1到3,这会创建更多任务,但这似乎无助于提高消息/秒的速度。我试着增加Kafka连接工人的CPU分配,这似乎也没有帮助。 我还能试什么?哪些指标有助于监控以进一步识别瓶颈? 更新:Kafka主题有5个分区。Kafka
我已经使用Kafka的汇流本地集群为Kaffa和m安装了Aerospike所需的所有配置,并已安装https://www.confluent.io/hub/aerospike/kafka-connect-aerospike-source并已开始汇流群集,但连接器仍未启动 我还发现合流的共享文件夹中没有jar,它还在开发中吗?
我有一个需求,即我们应用程序之外的源将在S3存储桶中放置一个文件,我们必须在kafka主题中加载该文件。我正在查看ConFluent的S3 Source连接器,目前正在努力定义在我们的环境中设置连接器的配置。但是有几篇文章指出,只有在您使用S3 Sink连接器将文件放在S3中时,才能使用S3 Source连接器。 以上是真的吗?在配置中,我在哪里/使用什么属性来定义输出主题?当阅读S3的文章并把它
在Kafka中有没有办法使用XML源并将其转换为JSON,然后将JSON数据发送给Kafka进行接收? 我在《Kafka连接》中见过Avro和Protobuf作为转换器?他们能够将XML转换为JSON吗?或者他们会转换为AVRO、Protobuf特定格式而不是JSON?
我有一个kafka主题,有200万条消息,我的刷新大小是100000,默认分区为分布式模式,有4个工作者,我可以看到数据在几秒钟内立即写入HDFS(10到15秒)。 我看到创建了一个+tmp目录和文件夹,并且每次触发一个新连接器时都会创建主题。 kafka connect的行为是每次都写得这么快,还是已经将数据存储在HDFS中,并根据连接器属性将其移动到主题目录? 我需要清楚这是怎么发生的。如果我