当前位置: 首页 > 知识库问答 >
问题:

Kafka Connect:没有为连接器创建任务

慕容耘豪
2023-03-14

我们使用Debezium(MongoDB)和Confluent S3连接器以分布式模式运行Kafka Connect(Confluent Platform 5.4,即Kafka 2.4)。通过REST API添加新连接器时,连接器将在RUNNING状态下创建,但不会为连接器创建任何任务。

暂停和恢复连接器没有帮助。当我们停止所有工作人员,然后再次启动他们时,任务就会创建,一切都会按应有的方式运行。

该问题不是由连接器插件引起的,因为我们在 Debezium 和 S3 连接器上看到了相同的行为。同样在调试日志中,我可以看到Debezium正确地从Connector.taskConfigs()方法返回任务配置。

有人能告诉我该怎么做吗?我们可以在不重启工人的情况下添加连接器?谢谢。

配置详细信息

群集有 3 个节点,具有以下连接分布式属性:

bootstrap.servers=kafka-broker-001:9092,kafka-broker-002:9092,kafka-broker-003:9092,kafka-broker-004:9092
group.id=tdp-QA-connect-cluster

key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false

internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

offset.storage.topic=connect-offsets-qa
offset.storage.replication.factor=3
offset.storage.partitions=5

config.storage.topic=connect-configs-qa
config.storage.replication.factor=3

status.storage.topic=connect-status-qa
status.storage.replication.factor=3
status.storage.partitions=3

offset.flush.interval.ms=10000

rest.host.name=tdp-QA-kafka-connect-001
rest.port=10083
rest.advertised.host.name=tdp-QA-kafka-connect-001
rest.advertised.port=10083

plugin.path=/opt/kafka-connect/plugins,/usr/share/java/

security.protocol=SSL
ssl.truststore.location=/etc/kafka/ssl/kafka-connect.truststore.jks
ssl.truststore.password=<secret>
ssl.endpoint.identification.algorithm=
producer.security.protocol=SSL
producer.ssl.truststore.location=/etc/kafka/ssl/kafka-connect.truststore.jks
producer.ssl.truststore.password=<secret>
consumer.security.protocol=SSL
consumer.ssl.truststore.location=/etc/kafka/ssl/kafka-connect.truststore.jks
consumer.ssl.truststore.password=<secret>

max.request.size=20000000
max.partition.fetch.bytes=20000000

连接器配置

脱贝齐姆示例:

{
  "name": "qa-mongodb-comp-converter-task|1",
  "config": {
    "connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
    "mongodb.hosts": "mongodb-qa-001:27017,mongodb-qa-002:27017,mongodb-qa-003:27017",
    "mongodb.name": "qa-debezium-comp",
    "mongodb.ssl.enabled": true,
    "collection.whitelist": "converter[.]task",
    "tombstones.on.delete": true
  }
}

S3的例子:

{
  "name": "qa-s3-sink-task|1",
  "config": {
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "topics": "qa-debezium-comp.converter.task",
    "topics.dir": "data/env/qa",
    "s3.region": "eu-west-1",
    "s3.bucket.name": "<bucket-name>",
    "flush.size": "15000",
    "rotate.interval.ms": "3600000",
    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
    "format.class": "custom.kafka.connect.s3.format.plaintext.PlaintextFormat",
    "schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
    "partitioner.class": "io.confluent.connect.storage.partitioner.DefaultPartitioner",
    "schema.compatibility": "NONE",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": false,
    "value.converter.schemas.enable": false,
    "transforms": "ExtractDocument",
    "transforms.ExtractDocument.type":"custom.kafka.connect.transforms.ExtractDocument$Value"
  }
}

连接器是使用curl创建的:< code > curl-X POST-H " Content-Type:application/JSON "-data @

共有3个答案

公西财
2023-03-14

我得到空任务时,部署不同的连接器在任务是空后部署Elasticsearch chSinkConnector

在部署连接器时将这两个元素添加到< code>config将有助于定位任务失败的原因。

        "errors.log.include.messages": "true",
        "errors.log.enable": "true"

在我的例子中,它将显示失败的原因,而不是空的任务

GET /connectors/elasticsearch-sink/status

{
    "name": "elasticsearch-sink",
    "connector": {
        "state": "RUNNING",
        "worker_id": "10.xxx.xxx.xxx:8083"
    },
    "tasks": [
        {
            "id": 0,
            "state": "FAILED",
            "worker_id": "10.xxx.xxx.xxx:8083",
            "trace": "org.apache.kafka.common.errors.GroupAuthorizationException: Not authorized to access group: connect-elasticsearch-sink\n"
        }
    ],
    "type": "sink"
}
胡承悦
2023-03-14

删除连接器,然后使用其他 database.server.id 重新创建它。重复此过程,直到出现任务。

经过6-7次试验后,我觉得效果不错,但不知道为什么。暂停并继续,重新启动连接器/任务对我没有帮助。

魏勇军
2023-03-14

我也有同样的问题,所以我更改了连接器的名称并创建了一个新的,它工作了,但我不知道这个问题的来源,因为我们在kafka-connect日志中没有任何信息。

 类似资料:
  • 我参考了以下链接来了解Kafka的HDFS连接https://docs.confluent.io/2.0.0/Connect/connect-hdfs/docs/index.html 我能够通过配置单元集成将数据从Kafka导出到HDFS。 现在我正尝试在Java程序的帮助下将avro记录写入Kafka 当我把Avro记录写到Kafka主题时,我在Connect中出现以下错误

  • 我似乎经常根据一个查询从JdbcConnectionSource创建一个Kafka Connect连接器,连接器创建成功,状态为“RUNNING ”,但是没有创建任何任务。在我的容器的控制台日志中,我看不到任何迹象表明有任何问题:没有错误,没有警告,没有对任务失败原因的解释。我可以让其他连接器工作,但有时一个不工作。 当连接器无法创建运行任务时,如何获取更多信息进行故障排除? 我将在下面发布我的连

  • 我使用的debezium-connector:https://repo1.maven.org/maven2/io/debezium/debezium-connector-oracle/1.4.0.final/debezium-connector-oracle-1.4.0.final-plugin.tar.gz 我遵循了docker-compose的以下说明:https://github.com/c

  • 操作系统:Ubuntu 17.10 Python:2.7 Sublime文本3: 我正在尝试导入mysql.connector, 没有名为连接器的模块 不过,当我尝试导入mysql时。在pythonshell中,它可以工作。 早些时候它工作得很好,我刚刚升级了Ubuntu,不知何故mysql连接器不工作。 我尝试重新安装mysql连接器使用pip和git两者。 还是不走运。 请帮忙!

  • 我尝试为我的数据创建带有“转换”的插件到kafka-connect,并将其与不同的接收器连接器一起使用。当我安装插件时,kafka-connect看不到我的类。 我使用kafka connect maven插件创建了我的捆绑包zip。使用confluent hub(来自本地文件)的安装已成功。 所有文件都已解压,我的工作者属性已更新插件。路径。我在分布式模式下运行connect,并尝试从包中创建带

  • 我创建了一个ssl mongodb连接,如下所示 但在那之后,其他外部rest调用(用于翻译api,google的示例获取语言)失败,并给出以下错误。 由:sun.security.validator.validatoreXception:PKIX路径生成失败:sun.security.provider.certPath.SunCertPathBuilderException:找不到请求目标的有效