当前位置: 首页 > 知识库问答 >
问题:

通过Kafka Connect使用自定义转换器?

闾丘朗
2023-03-14

我正在尝试使用Kafka Connect的自定义转换器,但似乎无法正确使用。我希望有人有这方面的经验,能帮我弄清楚!

>

  • 我的自定义转换器的类路径是自定义。自定义字符串转换器

    为了避免任何错误,我的自定义转换器目前只是预先存在的StringConverter的复制/粘贴(当然,当我开始工作时,这会改变)。https://github.com/apache/kafka/blob/trunk/connect/api/src/main/java/org/apache/kafka/connect/storage/StringConverter.java

    我有一个由3个节点组成的kafka connect集群,这些节点正在运行confluent的官方docker图像(confluentinc/cp-kafka connection:3.3.0)。

    每个节点都配置为加载一个jar,其中包含我的转换器(使用docker卷)。

    当连接器启动时,它们会正确加载罐子并找到自定义转换器。事实上,这就是我在日志中看到的:

    [2017-10-10 13:06:46,274] INFO Registered loader: PluginClassLoader{pluginLocation=file:/opt/custom-connectors/custom-converter-1.0-SNAPSHOT.jar} (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:199)
    [2017-10-10 13:06:46,274] INFO Added plugin 'custom.CustomStringConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
    [...]
    [2017-10-10 13:07:43,454] INFO Added aliases 'CustomStringConverter' and 'CustomString' to plugin 'custom.CustomStringConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:293)
    

    然后,我向其中一个连接器节点发送JSON配置以创建连接器:

    {
      "name": "hdfsSinkCustom",
      "config": {
        "topics": "yellow",
        "tasks.max": "1",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "custom.CustomStringConverter",
        "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
        "hdfs.url": "hdfs://hdfs-namenode:8020/hdfs-sink",
        "topics.dir": "yellow_storage",
        "flush.size": "1",
        "rotate.interval.ms": "1000"
      }
    }
    

    并收到以下回复:

    {
       "error_code": 400,
       "message": "Connector configuration is invalid and contains the following 1 error(s):\nInvalid value custom.CustomStringConverter for configuration value.converter: Class custom.CustomStringConverter could not be found.\nYou can also find the above list of errors at the endpoint `/{connectorType}/config/validate`"
    }
    

    如果我尝试运行Kafka Connect stadnone,错误消息是相同的。

    有人已经面对过这个了吗?我错过了什么?

  • 共有1个答案

    慕仲渊
    2023-03-14

    好的,多亏了Kafka用户邮件列表上的Philip Schmitt,我找到了解决方案。

    他提到了这个问题:https://issues.apache.org/jira/projects/KAFKA/issues/KAFKA-6007,这确实是我面临的问题。

    引用他的话:

    为了测试这一点,我只需将我的SMT jar复制到我正在使用的连接器的文件夹中并调整插件。path属性。

    事实上,我通过将转换器放在连接器的文件夹中来摆脱此错误。

    我还尝试了其他方法:创建一个自定义连接器,并将该自定义连接器与自定义转换器一起使用,两者都作为插件加载。它也有效。

    摘要:转换器由连接器加载。如果您的连接器是插件,则转换器也应该是插件。如果您的连接器不是插件(与您的 kafka connect 发行版捆绑在一起),那么您的转换器也不应该是插件。

     类似资料:
    • 主要内容:JSF自定义转换器实例我们可以在JSF中创建自己的自定义转换器。 以下列表是我们可以在JSF中创建自定义转换器的步骤。 通过实现接口创建一个转换器类。 实现上述接口的和方法。 使用注解为自定义转换器分配唯一的ID。 JSF自定义转换器实例 打开 NetBeans IDE 创建一个Web工程:CustomConverter,其目录结构如下所示 - 创建以下文件代码,文件:index.xhtml 的代码内容如下所示 - 文

    • I'v开始寻找很好的解决方案,如何使用Spring CassandraOperations很好地持久化实体。问题的出现是因为我的实体中的某些字段不支持cassandra,例如joda DateTime。 解决方法是在java类型的同一实体中有其他字段。util。Date而不是joda DateTime,用@Transient标记未要求的字段。但这并不干净,所以我开始寻找自动自定义转换。 目前,sp

    • 我想使用Spring和自定义实现来转换来自Spring XML配置的值。 配置如下所示: 转换器按预期注入方法,以便正确初始化。但是无法执行xml配置中参数的转换。错误消息为: org.springframework.beans.factory.unsatisfiedDependencyException:在通过SAX inputsource加载的资源中创建名为“A1”的bean时出错:未满足的依

    • 我正在尝试使用DozerJava API使用自定义转换器映射两个Java类。有没有办法在Java中配置类级自定义转换器?现在,我必须在XML中添加配置: 感谢您的帮助!

    • 为什么它不能与和Java配置一起工作?为什么它与一起使用?

    • 问题内容: 假设我以ng-repeat以表格格式显示以下数据。 以上代码取自http://code.ciphertrick.com/2015/06/01/search-sort-and-pagination- ngrepeat-angularjs/ 这样我们就可以搜索。无论用户在搜索文本框中输入哪种内容,都将基于该过滤器生成数据,但是我的要求有些不同。 我将有一个下拉列表,其中将填充所有字段名称,