当前位置: 首页 > 知识库问答 >
问题:

Kafka连接集群设置或启动连接工作者

申光临
2023-03-14

我正在浏览Kafka连接,我试图得到一些概念。

假设我有kafka集群(节点k1、k2和k3)设置并且正在运行,现在我想在不同的节点上运行kafka连接工作器,比如分布式模式下的c1和c2。

很少有问题。

1) 要在分布式模式下运行或启动kafka connect,我需要使用命令../bin/connect distributed。sh,这在kaffa集群节点中可用,所以我需要从任何一个kafka集群节点启动kafka连接?或者我启动kafka connect的任何节点都需要有kafka二进制文件,这样我就可以使用../bin/connect-distributed.sh

2)我需要将我的连接器插件复制到任何kafka集群节点(或所有集群节点?

3) kafka如何在worker节点上启动jvm进程之前将这些连接器插件复制到worker节点?因为该插件包含我的任务代码,需要将其复制到worker才能在worker中启动进程。

4)我是否需要在连接群集节点c1和c2中安装任何东西,例如需要安装java或任何与kafka connect相关的内容?

5) 在某些地方,它说使用汇流平台,但我想先从apachekafka connect开始。

有人可以通过一些光甚至指向一些资源的指针来取悦也会有所帮助。

谢谢。

共有2个答案

岳阳飙
2023-03-14

乔治给出的答案是正确的。我运行了几个连接器,现在我更好地理解了它。

我只是想换一种说法。

在Kafka连接中涉及两件事,一是Worker,二是连接器。以下是有关运行分布式Kafka连接的详细信息。

Kafka连接工作器是一个Java的进程,连接器/连接任务将在其上运行。所以首先我们需要启动工作器,要运行/启动一个工作器,我们需要在那台机器上安装java,然后我们需要Kafka连接相关的sh/bat文件来启动工作器和kafka连接工作器将使用的kafka libs,为此,我们只需在工作器机器上复制/安装Kafka,我们还需要在“plugin.path”中复制所有连接器和连接任务相关的jars/依赖项,如下面的工作器属性文件中定义的,现在工作器机器准备好了,要启动工作器,我们需要调用。/bin/connect-distributed.sh./config/connect-distributed.properties,这里connect-distributed.properties将有工作器的配置。同样的事情必须在我们需要运行Kafka连接的每台机器上重复。

现在 worker java 进程在所有机器上运行,woker 配置将具有 group.id 属性,具有相同属性值的 worker 将形成一组/集群。

每个工作进程将公开restendpoint(默认http://localhost:8083/connectors),要在正在运行的worker上启动/启动连接器,我们需要执行http post连接器配置json,根据给定的配置,worker将启动连接器以及上述组/集群worker中的任务数。

示例:连接帖子,

curl -X POST -H "Content-Type: application/json" --data '{"name": "local-file-sink", "config": {"connector.class":"FileStreamSinkConnector", "tasks.max":"3", "file":"test.sink.txt", "topics":"connect-test" }}' http://localhost:8083/connectors
翟嘉年
2023-03-14

1)为了获得高度可用的kafka-connect服务,您需要在两台具有相同< code>group.id的不同计算机上运行至少两个< code > connect-distributed . sh 实例。您可以在这里找到关于每个工人的配置的更多细节。为了提高性能,Connect应该独立于代理和Zookeeper机器运行。

2) 是的,您需要将所有连接器放在插件下。路径(通常位于/usr/share/java/下)。

3) kafka connect将在启动时加载连接器。你不需要处理这个。请注意,如果kafka connect实例正在运行并且添加了新连接器,则需要重新启动服务。

4) 您需要在所有计算机上安装Java。尤其是对于汇流平台:

这个版本的汇合平台支持Java 1.7和1.8(目前不支持Java 1.9)。你应该使用垃圾优先(G1)的垃圾收集器。有关更多信息,请参见支持的版本和互操作性。

5)看情况。Confluent是由Apache Kafka的最初创建者创建的,它是一个更完整的发行版,增加了模式管理、连接器和客户端。它还附带了KSQL,如果您需要对某些事件采取行动,KSQL会非常有用。Confluent只是在Apache Kafka发行版的基础上添加了一些东西,它不是一个修改过的版本。

 类似资料:
  • 我尝试了kafka-console-consumer.sh和kafka-console-producer.sh,它工作得很好。我能够看到生产者在消费者中发送的消息 1)我已经下载了s3连接器(https://docs.confluent.io/current/connect/kafka-connect-S3/index.html) 2)将文件解压缩到/home/ec2-user/plugins/

  • 我想在伪分布式模式下设置一个hadoop-cluster。我设法执行了所有的设置步骤,包括在我的机器上启动一个Namenode、Datanode、Jobtracker和一个Tasktracker。 运行会得到如下输出(我用替换了其中的一部分): 启动和将导致以下输出: 在此之后不久调用将得到: 我还注意到不工作:

  • 下面是我的核心站点。xml: 下面是hdfs-site.xml 下面是mapreduce.xml 谢了。

  • 我正在尝试使用Apache Camel和Qpid JMS客户端连接到在两个不同节点(VM)中运行的ActiveMQ Artemis主动-主动集群。我正在使用ActiveMQ Artemis 2.17.0。 我正在试图找出我的组织的远程URI配置应该是什么。阿帕奇。qpid。jms。JmsConnectionFactory实例。使用<代码>ampq://host1:5672,ampq://host2

  • 目前我有一个Quarkus应用程序,它从一个Kafka话题消费,并在另一个Kafka话题上生产。它使用SmallRye反应消息传递。效果很好。由于外部更改,要在其上生成的主题和要从其消费的主题将在不同集群上的Kafka服务器上(并且不应该/不能组合在一个集群中)。 在这里添加一个服务器并没有帮助,它会尝试将数据传播到代理上,这不是我的本意。 是否可能连接到多个集群(可能每个主题设置一个服务器)?在

  • 我们有一个Kafka集群(作为第3方托管服务),它启用了SSL。我们现在尝试使用第3方Sink(WePay BigQuery连接器)设置Kafka Connect(Conflow ent 5.0)。当在独立模式下启动Kafka连接时,一切都像魅力一样工作。不幸的是,当启用分布式模式时,Kafka Connect突然失败,并出现以下情况: 尝试在Google上查找特定错误,但找不到任何内容。它看起来