当前位置: 首页 > 知识库问答 >
问题:

如何设置Kafka连接器,以便在Debezium中使用自定义变换?

松献
2023-03-14

我刚接触Kafka,我不知道如何使用“transforms.router.type”使其与我的Debezium设置一起工作。因此,我对java类进行了特殊的事件转换,并预先进行了配置,以便将其部署到容器中,如下所示:

curl-X POST-H“Accept:application/json”-H“Content Type:application/json”localhost:8083/connectors/-d

{
  "name": "task-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "tasks.max": "1",
    "slot.name" : "task_engine_saga",
    "database.hostname": "postgres",
    "database.port": "5432",
    "database.user": "postgres",
    "database.password": "postgres",
    "database.dbname" : "tasks",
    "schema.whitelist": "public",
    "table.whitelist" : "public.task",
    "tombstones.on.delete" : "false",
    "transforms" : "router",
    "transforms.router.type" : "com.task.connect.TaskEventRouter"
  }
}

响应说找不到这个配置。

CREATE kafka connector task-connector....
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  1091  100   516  100   575   8322   9274 --:--:-- --:--:-- --:--:-- 17596{"error_code":400,"message":"Connector configuration is invalid and contains the following 3 error(s):\nInvalid value com.task.connect.TaskEventRouter for configuration transforms.router.type: Class com.task.connect.TaskEventRouter could not be found.\nInvalid value null for configuration transforms.router.type: Not a Transformation\nA value is required\nYou can also find the above list of errors at the endpoint `/connector-plugins/{connectorType}/config/validate`"}

然后,我将我的主机文件夹和jar文件复制到container/connect目录中,在其中我有一个带有事件转换逻辑的java类,但它也没有帮助。谁能帮我一下,告诉我该怎么做才能做出这种定制转换。路由器。用我的Debezium装置打字?

我的容器docker-comush设置:

version: '3'
services:
    pgadmin:
        container_name: pgadmin_container
        image: dpage/pgadmin4
        environment:
            PGADMIN_DEFAULT_EMAIL: ${PGADMIN_DEFAULT_EMAIL:-pgadmin4@pgadmin.org}
            PGADMIN_DEFAULT_PASSWORD: ${PGADMIN_DEFAULT_PASSWORD:-admin}
        volumes:
            - pgadmin:/root/.pgadmin
        ports:
            - "${PGADMIN_PORT:-5050}:80"
        restart: unless-stopped

    zookeeper:
        image: debezium/zookeeper:1.3
        ports:
            - 2181:2181
            - 2888:2888
            - 3888:3888
    kafka:
        image: debezium/kafka:1.3
        ports:
            - 9092:9092
        links:
            - zookeeper
        environment:
            - ZOOKEEPER_CONNECT=zookeeper:2181
    postgres:
        image: debezium/example-postgres:1.3
        ports:
            - 5432:5432
        environment:
            - POSTGRES_USER=postgres
            - POSTGRES_PASSWORD=postgres
            - PGDATA=/data/postgres
            - POSTGRES_DB=${POSTGRES_DB:-task_engine}
    connect:
        image: debezium/connect:1.3
        ports:
            - 8083:8083
        links:
            - kafka
            - postgres
        environment:
            - BOOTSTRAP_SERVERS=kafka:9092
            - GROUP_ID=1
            - CONFIG_STORAGE_TOPIC=my_connect_configs
            - OFFSET_STORAGE_TOPIC=my_connect_offsets
            - STATUS_STORAGE_TOPIC=my_connect_statuses

volumes:
    postgres:
    pgadmin:

共有1个答案

孙昂然
2023-03-14

我对java类进行了特殊事件转换

据我所知,您还没有将这个编译过的类装入debezium图像或编辑插件。包含该JAR的路径文件

如果您已经用这些数据创建了自己的Docker映像,那么您应该更改image:debezium/connect:1.3

这可以解释为什么转换不可用

 类似资料:
  • 我正在尝试使用Debezium将Amazon RDS中托管的Postgres SQL db与Kafka主题连接起来。 我正在遵循以下教程: 我的kafka和kafka connect服务启动良好,kafka connect服务还在/usr/share/java dir中接收我的debezium postgres连接器jar。 但是,在尝试通过kafka connect API使用以下curl命令附

  • 我有Kafka主题,有多种类型的消息流入并使用Kafka Connect写入弹性搜索。流看起来不错,直到我不得不将唯一的消息集分离到唯一的索引中。也就是说,我必须根据字段(JSON消息)为新的数据集获取新的索引。 我如何配置/定制Kafka connect以实现同样的功能?每个消息都包含一个表示消息类型和时间戳的字段。 示例 Json 如下所示: Sample1: {“log”:{“data”:“

  • 我正在尝试使用docker容器中的kafka connect和一个自定义连接器(PROGRESS _ DATADIRECT _ JDBC _ OE _ all . jar)来连接openedge数据库。 我将JAR文件放在插件路径(usr/share/java)中,但它不会作为连接器加载。 我可以通过将另一个(标准)连接器放在插件路径中来加载它。这行得通 有点不知道如何前进,我对Kafka很陌生。

  • 目标是:开发一个自定义Kafka连接器,该连接器以循环方式从websocket读取消息。我试着给你们举一个我所认识到的例子: 我创建了一个接口IWebsocketClientEndpoint 以及实现上述接口的类: WebsocketClientEndpoint类专用于创建websocket并管理连接、断开连接、发送和接收消息。 目标是:如何在Kafka连接结构中调整我的websocket结构?我

  • 我正在使用kafka 2.0,kafka connect在分布式模式下运行,并尝试配置debezium mysql连接器,但得到错误 电话是这样的:

  • 下面是/etc/kafka/connect-MongoDB-source.properties中的MongoDB配置 但是低于误差 以独立模式运行连接器。 我在debezium-debezium-连接器-mongob-1.0.0/debezium-connector-mongodb-1.0.0.Final.jar 类路径的设置如下 使用插件路径,我看到它能够注册和加载所有必需的插件。 但最后还是同