我尝试为我的数据创建带有“转换”的插件到kafka-connect,并将其与不同的接收器连接器一起使用。当我安装插件时,kafka-connect看不到我的类。
我使用kafka connect maven插件创建了我的捆绑包zip。使用confluent hub(来自本地文件)的安装已成功。
所有文件都已解压,我的工作者属性已更新插件。路径。我在分布式模式下运行connect,并尝试从包中创建带有transformer的新连接器。
我的插件结构如下所示:
- mwojdowski-my-connect-plugin-0.0.1-SNAPSHOT
|- manifest.json
|- lib
||- my-connect-plugin-0.0.1-SNAPSHOT.jar
还有我的舱单。json文件:
{
"name" : "my-connect-plugin",
"version" : "0.0.1-SNAPSHOT",
"title" : "my-connect-plugin",
"description" : "A set of transformations for Kafka Connect",
"owner" : {
"username" : "mwojdowski",
"name" : "Marcin Wojdowski<mwojdowski@gmail.com>"
},
"tags" : [ "transform", "field", "topic" ],
"features" : {
"supported_encodings" : [ "any" ],
"single_message_transforms" : true,
"confluent_control_center_integration" : true,
"kafka_connect_api" : true
},
"documentation_url" : "",
"docker_image" : { },
"license" : [ {
"name" : "Confluent Software License",
"url" : "https://www.confluent.io/software-evaluation-license"
} ],
"component_types" : [ "transform" ],
"release_date" : "2019-08-29"
}
接下来,我尝试创建新的连接器:
curl -XPOST -H 'Content-type:application/json' 'localhost:8083/connectors' -d '{
"name" : "custom-file-sink-with-validation",
"config" : {
"connector.class" : "FileStreamSink",
"tasks.max" : "1",
"topics" : "test_topic",
"file" : "/tmp/my-plugin-test.txt",
"key.ignore" : "true",
"schema.ignore" : "true",
"drop.invalid.message": "false",
"behavior.on.malformed.documents": "warn",
"key.converter":"org.apache.kafka.connect.storage.StringConverter",
"value.converter":"org.apache.kafka.connect.storage.StringConverter",
"transforms" : "Validation",
"transforms.Validation.type" : "org.kafka.connect.my.connector.ValidateId"
}
}'
重新启动kafka connect后,当我尝试创建新连接器时,会引发异常:
{
"error_code": 400,
"message": "Connector configuration is invalid and contains the following 2 error(s):\nInvalid value org.kafka.connect.my.connector.ValidateId for configuration transforms.Validation.type: Class org.kafka.connect.my.connector.ValidateId could not be found.\nInvalid value null for configuration transforms.Validation.type: Not a Transformation\nYou can also find the above list of errors at the endpoint `/{connectorType}/config/validate`"
}
我也尝试手动安装插件,如下文档:https://docs.confluent.io/current/connect/managing/install.html
但看起来好像Connect没有装我的罐子。
当我将jar复制到share/java/kafka时,它可以工作,但这不是一个解决方案。
我怀疑我的插件是skip,因为它不包含连接器。在这种情况下,我应该手动将JAR添加到类路径吗?(与相对)https://docs.confluent.io/current/connect/userguide.html#installing-插件)
或者我应该明确指出我的连接器配置来尝试使用我的插件吗?
你好,M。
对不起,问题真的很琐碎。在重构过程中,其中一个包在最后得到“s”,我错过了在配置中更新它。
"transforms.Validation.type" : "org.kafka.connect.my.connectors.ValidateId"
而不是
"transforms.Validation.type" : "org.kafka.connect.my.connector.ValidateId"
我在从独立切换到分布式之前对其进行了重构。再次抱歉让您担心,并感谢您的支持。
问候你,马辛
我正在使用kafka 2.0,kafka connect在分布式模式下运行,并尝试配置debezium mysql连接器,但得到错误 电话是这样的:
我们使用Debezium(MongoDB)和Confluent S3连接器以分布式模式运行Kafka Connect(Confluent Platform 5.4,即Kafka 2.4)。通过REST API添加新连接器时,连接器将在RUNNING状态下创建,但不会为连接器创建任何任务。 暂停和恢复连接器没有帮助。当我们停止所有工作人员,然后再次启动他们时,任务就会创建,一切都会按应有的方式运行。
我试图通过Postman访问我的简单API调用。它不工作,但它是在浏览器工作。 警告:不建议在没有服务器身份验证的情况下建立SSL连接。根据MySQL 5.5.45+、5.6.26+和5.7.6+的要求,如果未设置显式选项,默认情况下必须建立SSL连接。为了符合不使用SSL的现有应用程序,verifyServerCertificate属性设置为'false'。您需要通过设置usessl=false
问题内容: 可以给我打电话吗,如何在Java中创建一个普遍使用的连接器。我对此很陌生,我从不知道要从哪里开始,请告诉我如何为pervasive创建连接器。我创建了示例连接器,但我不确定它是对还是错 问题答案: 这是一个简单的程序,可以连接到PSQL数据库: 要编译它,我使用: 为了运行它,我使用: 如果使用的是64位操作系统,则可能需要更改PSQL JAR文件的位置。
经过多次尝试,我来到这里,根据文档: Firestore为离线功能提供现成的支持。在读取和写入数据时,Firestore使用一个本地数据库,该数据库会自动与服务器同步。当用户脱机时,云Firestore功能将继续,当他们重新连接时,将自动处理数据迁移。 所以,我在中编写了一些代码 工作很好。 但是当我关闭我的互联网连接,并从最近的应用程序中删除应用程序,然后返回我的应用程序时,它在离线状态下不起作
BLE(低功耗蓝牙)设备应该在不扫描到我的iPhone(应用程序)的情况下连接。 我有通过扫描QRCode得到的BLE地址。 因此,我希望在不调用(manager.scanForPeripherals(withServices:nil))的情况下连接特定的设备(我传递地址的设备),因为它将启动所有设备。 目前,当我找到我的特定设备时,我可以扫描我的BLE设备的二维码并扫描所有可用设备。我正在停止扫