当前位置: 首页 > 知识库问答 >
问题:

向kafka connect添加新的接收器配置

吴炎彬
2023-03-14

我正忙于学习 kafka,特别是使用接收器连接器 (jdbc) 将数据从 kafka 主题发送到 mysql 的下游元素。

我已经用jdbc连接器和mysqljdbc驱动程序从汇合的kafka connect基础映像构建了一个映像,在Dockerfile中如下:

FROM confluentinc/cp-kafka-connect-base

RUN confluent-hub install --no-prompt confluentinc/kafka-connect-jdbc:5.4.1

ENV MYSQL_DRIVER_VERSION 5.1.39

RUN curl -k -SL "https://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-${MYSQL_DRIVER_VERSION}.tar.gz" \
     | tar -xzf - -C /usr/share/java/kafka/ --strip-components=1 mysql-connector-java-5.1.39/mysql-connector-java-${MYSQL_DRIVER_VERSION}-bin.jar

老实说,我在文档中有点迷失了方向,我到了想要包含接收器配置的地步,但我不知道如何包含它或将其复制到何处。我已经创建了接收器配置文件,但不确定将其放在哪里。作为映像构建的一部分,还是在运行 kafka-connect 容器时?

这样做的最终目标是为此连接器创建 helm 部署,但我还没有。

当我了解到关于Kafka和Kafka连接的所有信息时,我将非常感谢任何帮助。

共有1个答案

孟意致
2023-03-14

由于您在分布式模式下使用Kafka Connect(这通常是最佳选择),因此您将连接器配置作为REST调用传递。

这是一个示例JDBC接收器配置-您需要修改它以适应您的源主题、序列化等:

curl -X PUT http://localhost:8083/connectors/sink_postgres_foo_00/config -H "Content-Type: application/json" -d '{
      "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
      "connection.url": "jdbc:mysql://mysql-host:3306/",
      "connection.user": "user",
      "connection.password": "pw",
      "value.converter": "org.apache.kafka.connect.json.JsonConverter",
      "value.converter.schemas.enable": "true",
      "tasks.max": "1",
      "topics": "foo",
      "auto.create": "true",
      "auto.evolve":"true",
      "pk.mode":"none"          
    }'

如果您想在容器实例化时传递连接器配置,您可以将其嵌入到开始命令中,该命令应该启动工作程序,等待它可用,然后传递配置。这里有一个例子。另请参见https://rmoff.net/2018/12/15/docker-tips-and-tricks-with-ksql-and-kafka/.

 类似资料:
  • 在标准/自定义kafkaconnect接收器中,我们如何指定它应该只使用来自kafka主题的read_comitted消息。我可以在这里看到配置,但看不到任何选项(除非这是默认行为)。谢了。https://docs . confluent . io/current/installation/configuration/connect/sink-connect-configs . html

  • 我参考了以下链接来了解Kafka的HDFS连接https://docs.confluent.io/2.0.0/Connect/connect-hdfs/docs/index.html 我能够通过配置单元集成将数据从Kafka导出到HDFS。 现在我正尝试在Java程序的帮助下将avro记录写入Kafka 当我把Avro记录写到Kafka主题时,我在Connect中出现以下错误

  • 我正在使用Confluent Kafka Docker镜像,特别是使用这个: https://github.com/confluentinc/cp-docker-images/tree/4.0.x/examples/cp-all-in-one 我想添加MySQL连接器,通过: 正在下载连接器的1.5.46版本(https://dev.mysql.com/get/Downloads/Connecto

  • 事件适配器 事件活动 @override public void onBindViewHolder(@nonnull ViewHolder holder,final int position){

  • 问题内容: 创建密钥后,是否可以向Python字典添加密钥?它似乎没有方法。 问题答案:

  • 假设我有一个Christmas list对象,它携带一个用于圣诞礼物的ArrayList和另一个用于收件人的字符串。我对这个客户端有以下方法: 所以基本上,我试图在Hazelcast IMAP中初始化一个新的键/值对。我尝试使用christmaslistimap.put()和christmaslistimap.set(),但它们似乎都不起作用。至少,我需要知道如何添加一个新的键让IMap识别。