当前位置: 首页 > 知识库问答 >
问题:

合流Kafka连接分布式jdbc连接器

柴阳云
2023-03-14

我们已经成功地使用了MySQL - 使用jdbc独立连接器的kafka数据摄取,但现在在分布式模式下使用相同的连接器(作为kafka connect服务)时面临问题。

用于独立连接器的命令,工作正常 -

/usr/bin/connect-standalone /etc/kafka/connect-standalone.properties /etc/kafka-connect-jdbc/source-quickstart-mysql.properties

现在,我们已经停止了这一项,并以分布式模式启动了kafka connect服务,如下所示

systemctl status confluent-kafka-connect
● confluent-kafka-connect.service - Apache Kafka Connect - distributed
   Loaded: loaded (/usr/lib/systemd/system/confluent-kafka-connect.service; disabled; vendor preset: disabled)
   Active: active (running) since Wed 2018-11-14 22:52:49 CET; 41min ago
     Docs: http://docs.confluent.io/
 Main PID: 130178 (java)
   CGroup: /system.slice/confluent-kafka-connect.service
           └─130178 java -Xms256M -Xmx2G -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -Djava.a...

2 个节点当前正在运行具有相同连接分布式属性文件的连接服务。

bootstrap.servers=node1IP:9092,node2IP:9092
group.id=connect-cluster
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.topic=connect-offsets
offset.storage.replication.factor=1
config.storage.topic=connect-configs
config.storage.replication.factor=1
status.storage.topic=connect-status
status.storage.replication.factor=1
offset.flush.interval.ms=10000
plugin.path=/usr/share/java

连接服务已启动并正在运行,但它不会加载在 /etc/kafka/connect-standalone.properties 下定义的连接器。

应该对该服务执行什么操作,以便每当您点击命令systemctl start confluent kafka connect时,它就会运行该服务并启动/etc/kafka connection-*/下定义的连接器,就像您手动运行一个单独的连接器来提供属性文件的路径一样。

共有3个答案

闻人和泽
2023-03-14

分布式模式下的连接器不能像独立模式下那样由属性文件部署。而使用REST API,请参考https://docs . confluent . io/current/connect/managing/configuring . html # connect-managing-distributed-mode

空英达
2023-03-14

我可以描述我在分布式模式下启动jdbc连接器所做的事情:

我在我的本地机器上使用conFluentCLI实用程序来更快地启动服务。

./confluent start

后记 我停止了Kafka连接

./confluent stop connect

然后我继续在两个不同的端口上手动启动定制的<code>connect distributed</code>(<code>18083</code<和<code>28083</code>)

➜  bin ./connect-distributed ../etc/kafka/connect-distributed-worker1.properties

➜  bin ./connect-distributed ../etc/kafka/connect-distributed-worker2.properties

注意:将plugin.path设置设置为完整(而不是相对)路径(例如:plugin.path=/full/path/to/confluent-5.0.0/share/java)

然后我可以很容易地添加一个新的连接器

curl -s -X POST -H "Content-Type: application/json" --data @/full/path/to/confluent-5.0.0/etc/kafka-connect-jdbc/source-quickstart-sqlite.json http://localhost:18083/connectors

这应该可以解决问题。

正如cricket_007已经指出的那样,请考虑 kafka 代理的复制因子至少为 3,以防您正在处理在其中一个代理中断的情况下不想丢失的东西。

公良阳波
2023-03-14

它运行服务并启动/etc/kafka connect-*/下定义的连接器

这不是分布式模式的工作方式...它不知道要加载哪些属性文件,也不会扫描这些文件夹1

在独立模式下,您给出的< code>N 1属性文件可以立即加载,但是对于连接分布式,您必须使用HTTP POST调用来连接REST API。

ConFluent Control Center或Landoop的Connect UI可以为这些操作提供一个很好的管理Web门户。

顺便说一句,如果您有多个代理,我将建议增加连接分布式中那些连接主题的副本因子。属性文件。

< sup>1。< sub >如果是的话,这可能是一个很好的特性,但是您必须确保在分布式模式下连接器永远不会被删除/停止,并且您最终会处于与文件系统上正在运行的内容和文件不一致的状态。

 类似资料:
  • 我正在使用合流kafka connect jdbc源将mysql表中的记录推送到我的kafka主题,但似乎日期列被转换为纪元时间。 这是我的配置: kafka主题中的输出: 我也在类似于“select from_unixtime(updated _ on)from temp”的查询中尝试了from _ unixtime(),但是那不行。 有没有办法推到YYYY-MM-DD HH:MM:SS格式的K

  • 我想连接两个主题流(左连接),并在连接的流上进行基于窗口的聚合。然而,聚合将某些消息计数两倍,因为在连接期间,根据正确主题中的延迟,某些消息将发出两倍。以下是POC的代码。 它是否可以修复以避免因连接而重复?

  • 我正在开发debezium mongodb源连接器。我可以通过提供kafka引导服务器地址作为远程机器(部署在Kubernetes中)和远程MongoDB URL在分布式模式下在本地机器中运行连接器吗? 我尝试了这一点,我看到连接器成功启动,没有错误,只有几个警告,但没有数据从MongoDB流动。 使用以下命令运行连接器 遵循以下教程:https://medium.com/tech-that-wo

  • 我有两个Kafka主题-和。第一个主题包含由唯一Id(称为)键入的recommendations对象。每个产品都有一个用户可以单击的URL。 主题获取通过单击推荐给用户的产品URL生成的消息。它是如此设置的,这些单击消息也由键控。 请注意 > 每个单击对象都会有一个相应的推荐对象。 click对象的时间戳将晚于Recommensions对象。 建议和相应的点击之间的间隔可能是几秒钟到几天(最多7天

  • 分布式模式下Kafka Connect集群的偏移管理行为是什么,即运行多个连接器并监听同一组主题(或一个主题)? 因此,在分布式模式下,Kafka Connect 会将偏移量信息存储在 Kafka 中,此偏移量将由集群中的工作线程读取和提交。如果我在该 Kafka Connect 集群中运行多个连接器侦听同一主题,会发生什么情况?分区的偏移量是否与所有连接器相同,或者每个连接器在分区上的偏移量是否

  • 我正在使用从oracle db获取数据,并按下(两个键 我有一个Kafka流收听这个主题,并有avro Genericrecord。当我启动流时,我开始得到<code>ClassCastException:java.lang.Long不能强制转换为org.apache.avro.generic。GenericRecordconnect生成的架构具有数据类型为“long”的字段 有人对如何解决这个问