当前位置: 首页 > 知识库问答 >
问题:

如何自动运行Kafka Connect连接器(例如在生产中)?

何聪
2023-03-14

有没有办法在Kafka Connect启动时自动加载(多个)Kafka Connect连接器(例如在Confluent Platform中)?

我目前发现的是:

ConFluent Docs状态为使用bin/连接独立命令进行独立模式,并为工作人员和每个连接器提供属性文件。

对于分布式模式,您必须通过REST API运行连接器。

https://docs.confluent.io/current/connect/userguide.html#standalone-mode,https://docs.confluent.io/current/connect/managing/configuring.html#standalone-example

是否有另一种方法,例如,包括应该在“连接-[独立|分布式]中运行的所有连接器。“属性”文件(类似于在ksql-server.properties中提供KSQL查询文件),以便在Kafka Connect启动时自动加载它们(例如,在Confluent平台中)?

或者,即使在生产环境中,连接器也是如上所述“手动”加载的?

共有1个答案

赖淇
2023-03-14

通常,在分布式模式下运行Kafka Connect时,您必须使用REST API。但是,您可以使用docker compose编写连接器创建的脚本;@Robin Moffatt就此写了一篇很好的文章:

kafka-connect:
  image: confluentinc/cp-kafka-connect:5.1.2
  environment:
    CONNECT_REST_PORT: 18083
    CONNECT_REST_ADVERTISED_HOST_NAME: "kafka-connect"
    […]
  volumes:
    - $PWD/scripts:/scripts
  command: 
    - bash 
    - -c 
    - |
      /etc/confluent/docker/run & 
      echo "Waiting for Kafka Connect to start listening on kafka-connect ⏳"
      while [ $$(curl -s -o /dev/null -w %{http_code} http://kafka-connect:8083/connectors) -eq 000 ] ; do 
        echo -e $$(date) " Kafka Connect listener HTTP state: " $$(curl -s -o /dev/null -w %{http_code} http://kafka-connect:8083/connectors) " (waiting for 200)"
        sleep 5 
      done
      nc -vz kafka-connect 8083
      echo -e "\n--\n+> Creating Kafka Connect Elasticsearch sink"
      /scripts/create-es-sink.sh 
      sleep infinity

注意事项:

>

  • 在命令部分,$替换为$$以避免错误“命令”选项的插值格式无效

    睡眠无限是必要的,因为我们已经将 /etc/confluent/docker/run 进程发送到后台线程(

    配置连接器的实际脚本是一个单独文件中的curl调用。你可以将其构建到Docker Compose中,但感觉有点恶心。

    如果您想在启动Kafka Connect之前安装自定义连接器插件,您可以将这一点和上述技术结合起来,例如conFluent-Hub安装--no-提示符conFluentInc/kafka-Conntion-gcs: 5.0.0 /etc/confluent/docker/run

  •  类似资料:
    • 问题内容: 我有一个文件,想一直在服务器上运行。 目前,我已经弄清楚了如何通过on 访问服务器并启动。但是,当我关闭开发计算机上的Cygwin窗口时,它会杀死服务器上的进程(我认为,因为它不再响应了)。 目前,我是这样启动的: 我需要使该文件在服务器上自动且连续地运行(这是我正在开发的系统的组成部分)。 不幸的是,我几乎不了解服务器管理或一般的非Windows操作系统(不是由我选择或制造服务器的)

    • 我参考了以下链接来了解Kafka的HDFS连接https://docs.confluent.io/2.0.0/Connect/connect-hdfs/docs/index.html 我能够通过配置单元集成将数据从Kafka导出到HDFS。 现在我正尝试在Java程序的帮助下将avro记录写入Kafka 当我把Avro记录写到Kafka主题时,我在Connect中出现以下错误

    • 我想在弹簧靴中使用RedisTemplate。我可以成功地使用StringRedisTemplate,但我不能使用Redistemplate。这是密码。 然后,运行测试方法:testObject(),下面是错误报告:

    • 如何触发此示例 Spring 启动 OAuth2 应用程序的自动注销? 我尝试将以下代码从这个其他帖子的答案添加到应用程序的包中的新控制器类中: 但是,当我尝试启动应用程序时,调试日志会出现以下错误,表明它无法令牌存储: 当编译错误解决后,我打算从以下代码触发注销过程,这些代码将添加到<code>hello中。js控制器在上面的github链接中的应用程序: 示例应用的完整代码位于上面的 gith

    • 问题:当我的spring应用程序运行时,同时数据库服务器停止/重新启动,然后db连接丢失并且从未恢复。 com.mysql.jdbc.exceptions.jdbc4.mysqlnontransientConnectionException:连接关闭后不允许任何操作。 服务mysql启动 问题:如何告诉spring在连接丢失后自动重新连接? 这是我的配置:

    • Android将WIFI详细信息保存到WifiConfiguration类中。当WIFI为ON和SSID时,Pass-key详细信息与WIFI网络匹配,然后Android自动连接特定的WIFI网络。如何禁用该自动连接功能?问这个问题的目的是,我有两个配置的网络(已经保存),我想在按钮单击事件上连接WIFI,但android的自动连接功能与WIFI本身连接。