当前位置: 首页 > 知识库问答 >
问题:

如何在分布式模式下部署kafka connect?

毛峻
2023-03-14

我正在使用kubernetes中的JDBC接收器连接器构建Kafka-连接应用程序。我尝试了独立模式,它正在工作。我想转移到分布式html" target="_blank">模式。我可以通过运行下面的yaml文件成功构建两个pod(kafka连接器):

apiVersion: extensions/v1beta1
kind: Deployment
metadata:
  namespace: vtq
  name: kafka-sink-postgres-dis
spec:
  replicas: 2
  template:
    metadata:
      labels:
        app: kafka-sink-postgres-dis
    spec:
      containers:
      - name: kafka-sink-postgres-dis
        image: ***
        imagePullPolicy: Always

bin/connect-distributed.sh配置/worker.properties

bootstrap.servers=***:9092
offset.flush.interval.ms=10000

rest.port=8083
rest.host.name=127.0.0.1


key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://schema-registry:8081


# Prevent the connector from pulling all historical messages
auto.offset.reset=latest

# options below may be required for distributed mode

# unique name for the cluster, used in forming the Connect cluster group. Note that this must not conflict with consumer group IDs
group.id=connect-postgres-sink-dis

# Topic to use for storing offsets. This topic should have many partitions and be replicated.
offset.storage.topic=postgres-connect-offsets
offset.storage.replication.factor=3

# Topic to use for storing connector and task configurations; note that this should be a single partition, highly replicated topic.
# You MUST manually create the topic to ensure single partition for the config topic as auto created topics may have multiple partitions.

config.storage.topic=postgres-connect-configs
config.storage.replication.factor=3

# Topic to use for storing statuses. This topic can have multiple partitions and should be replicated.
status.storage.topic=postgres-connect-status
status.storage.replication.factor=3

并在每个 pod 内部创建了一个接收器连接器,任务.max=1,两个连接器侦听相同的主题。原来他们只是复制了。

curl -X POST -H "Content-Type: application/json" --data '{"name": "postgres_sink_dis", "config": {"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector", "tasks.max":"1", "connection.url":"***","topics":"***"}}' http://127.0.0.1:8083/connectors

但是我对kafka连接集群、工作人员、连接器和任务的概念感到非常困惑。我读了https://github.com/enfuse/kafka-connect-demo/blob/master/docs/install-connector.md.他们在配置连接器之前设置端口转发到Rest端口。我尝试了一下,在部署服务和创建连接器之后,curl-s没有返回172.0.0.1:8083/connectors.

谁能给我一个简短的描述,我下一步应该做什么,任何相关的信息将是非常有用的。谢谢!

更新:最后,我想出了问题并解决了问题。1. 使用相同的 group.id 和不同的 rest.port (https://docs.confluent.io/current/connect/userguide.html) 分别部署两个 pod/worker。2. 在容器内,创建包含任务的连接器。

共有1个答案

端木渝
2023-03-14

例如,您有一个由两个工作人员/两个pod组成的连接器集群。您可以使用集群中的多个任务创建连接器(接收器或源),这些任务将分布在两个工作人员中。

 类似资料:
  • 提示 GatewayWorker提供的所有接口都是支持分布式调用的,所以业务代码不需要任何更改,直接就可以分布式部署。 如何分布式GatewayWorker GatewayWorker通过Register服务来建立划分集群。同一集群使用相同的Register服务ip和端口,即Gateway 和 businessWorker的注册服务地址($gateway->registerAddress $bus

  • 1、成倍提高系统承载能力并降低成本 单机遇到资源瓶颈时,要想支持更大的用户量,一般是优化业务和增加服务器配置。然而这么做只能是杯水车薪,成本巨大并且效果非常有限。 GatewayWorker支持分布式部署,你可以利用多台价格低廉的普通服务器,组成一个庞大的服务器集群,成倍的增加系统承载能力,这不管在资金成本上还是人力成本上都是最划算的方案。 2、提高系统稳定性 单机对外提供服务,则风险很大,服务器

  • 提示 GatewayWorker提供的所有接口都是支持分布式调用的,所以业务代码不需要任何更改,直接就可以分布式部署。

  • 什么是Gateway Worker分离部署 GatewayWorker有三种进程,Gateway进程负责网络IO,BusinessWorker进程负责业务处理,Register进程负责协调Gateway与BusinessWorker之间建立TCP长连接通讯。我们可以把Gateway BusinessWorker Register分开部署在不同的服务器上,当业务进程BusinessWorker出现瓶

  • 我想使用 Confluent 的复制器将数据从一个系统复制到另一个系统。我正在使用两个Ubuntu 18.04系统,其中一个充当源,另一个充当目的地。 我尝试在分布式模式下运行kafka connect replicator,更改了以下配置: < li >在confluent/etc/Kafka/server . properties中,我做了以下更改 源 目的地 然后,我在源系统中创建了主题,并

  • 我无法在hadoop fs-ls/命令上查看HDFS中的文件,我想这是因为name节点没有运行。我尝试了格式化namenode,并将core-site.xml中的端口更改为不同的值,但我的JPS没有列出namenode。 下面是这些文件:1)core-site.xml 3)mapred-site.xml JPS输出为: 21043作业跟踪器 20839数据阳极