当前位置: 首页 > 知识库问答 >
问题:

如何在没有连接器的情况下创建合适的Kafka连接插件?

索瀚海
2023-03-14

我尝试为我的数据创建带有“转换”的插件到kafka-connect,并将其与不同的接收器连接器一起使用。当我安装插件时,kafka-connect看不到我的类。

我使用kafka connect maven插件创建了我的捆绑包zip。使用confluent hub(来自本地文件)的安装已成功。

所有文件都已解压,我的工作者属性已更新插件。路径。我在分布式模式下运行connect,并尝试从包中创建带有transformer的新连接器。

我的插件结构如下所示:

- mwojdowski-my-connect-plugin-0.0.1-SNAPSHOT
|- manifest.json
|- lib
||- my-connect-plugin-0.0.1-SNAPSHOT.jar

还有我的舱单。json文件:

{
  "name" : "my-connect-plugin",
  "version" : "0.0.1-SNAPSHOT",
  "title" : "my-connect-plugin",
  "description" : "A set of transformations for Kafka Connect",
  "owner" : {
    "username" : "mwojdowski",
    "name" : "Marcin Wojdowski<mwojdowski@gmail.com>"
  },
  "tags" : [ "transform", "field", "topic" ],
  "features" : {
    "supported_encodings" : [ "any" ],
    "single_message_transforms" : true,
    "confluent_control_center_integration" : true,
    "kafka_connect_api" : true
  },
  "documentation_url" : "",
  "docker_image" : { },
  "license" : [ {
    "name" : "Confluent Software License",
    "url" : "https://www.confluent.io/software-evaluation-license"
  } ],
  "component_types" : [ "transform" ],
  "release_date" : "2019-08-29"
}

接下来,我尝试创建新的连接器:

curl -XPOST -H 'Content-type:application/json' 'localhost:8083/connectors' -d '{
    "name" : "custom-file-sink-with-validation",
    "config" : {
    "connector.class" : "FileStreamSink",
        "tasks.max" : "1",
        "topics" : "test_topic",
        "file" : "/tmp/my-plugin-test.txt",
        "key.ignore" : "true",
        "schema.ignore" : "true",
        "drop.invalid.message": "false",
        "behavior.on.malformed.documents": "warn",
        "key.converter":"org.apache.kafka.connect.storage.StringConverter",
        "value.converter":"org.apache.kafka.connect.storage.StringConverter",
        "transforms" : "Validation",
        "transforms.Validation.type" : "org.kafka.connect.my.connector.ValidateId"
    }
}'

重新启动kafka connect后,当我尝试创建新连接器时,会引发异常:

{
    "error_code": 400,
    "message": "Connector configuration is invalid and contains the following 2 error(s):\nInvalid value org.kafka.connect.my.connector.ValidateId for configuration transforms.Validation.type: Class org.kafka.connect.my.connector.ValidateId could not be found.\nInvalid value null for configuration transforms.Validation.type: Not a Transformation\nYou can also find the above list of errors at the endpoint `/{connectorType}/config/validate`"
}

我也尝试手动安装插件,如下文档:https://docs.confluent.io/current/connect/managing/install.html

但看起来好像Connect没有装我的罐子。

当我将jar复制到share/java/kafka时,它可以工作,但这不是一个解决方案。

我怀疑我的插件是skip,因为它不包含连接器。在这种情况下,我应该手动将JAR添加到类路径吗?(与相对)https://docs.confluent.io/current/connect/userguide.html#installing-插件)

或者我应该明确指出我的连接器配置来尝试使用我的插件吗?

你好,M。

共有1个答案

淳于玺
2023-03-14

对不起,问题真的很琐碎。在重构过程中,其中一个包在最后得到“s”,我错过了在配置中更新它。

"transforms.Validation.type" : "org.kafka.connect.my.connectors.ValidateId"

而不是

"transforms.Validation.type" : "org.kafka.connect.my.connector.ValidateId"

我在从独立切换到分布式之前对其进行了重构。再次抱歉让您担心,并感谢您的支持。

问候你,马辛

 类似资料:
  • 我正在使用kafka 2.0,kafka connect在分布式模式下运行,并尝试配置debezium mysql连接器,但得到错误 电话是这样的:

  • 我们使用Debezium(MongoDB)和Confluent S3连接器以分布式模式运行Kafka Connect(Confluent Platform 5.4,即Kafka 2.4)。通过REST API添加新连接器时,连接器将在RUNNING状态下创建,但不会为连接器创建任何任务。 暂停和恢复连接器没有帮助。当我们停止所有工作人员,然后再次启动他们时,任务就会创建,一切都会按应有的方式运行。

  • 我试图通过Postman访问我的简单API调用。它不工作,但它是在浏览器工作。 警告:不建议在没有服务器身份验证的情况下建立SSL连接。根据MySQL 5.5.45+、5.6.26+和5.7.6+的要求,如果未设置显式选项,默认情况下必须建立SSL连接。为了符合不使用SSL的现有应用程序,verifyServerCertificate属性设置为'false'。您需要通过设置usessl=false

  • 问题内容: 可以给我打电话吗,如何在Java中创建一个普遍使用的连接器。我对此很陌生,我从不知道要从哪里开始,请告诉我如何为pervasive创建连接器。我创建了示例连接器,但我不确定它是对还是错 问题答案: 这是一个简单的程序,可以连接到PSQL数据库: 要编译它,我使用: 为了运行它,我使用: 如果使用的是64位操作系统,则可能需要更改PSQL JAR文件的位置。

  • 经过多次尝试,我来到这里,根据文档: Firestore为离线功能提供现成的支持。在读取和写入数据时,Firestore使用一个本地数据库,该数据库会自动与服务器同步。当用户脱机时,云Firestore功能将继续,当他们重新连接时,将自动处理数据迁移。 所以,我在中编写了一些代码 工作很好。 但是当我关闭我的互联网连接,并从最近的应用程序中删除应用程序,然后返回我的应用程序时,它在离线状态下不起作

  • BLE(低功耗蓝牙)设备应该在不扫描到我的iPhone(应用程序)的情况下连接。 我有通过扫描QRCode得到的BLE地址。 因此,我希望在不调用(manager.scanForPeripherals(withServices:nil))的情况下连接特定的设备(我传递地址的设备),因为它将启动所有设备。 目前,当我找到我的特定设备时,我可以扫描我的BLE设备的二维码并扫描所有可用设备。我正在停止扫