当前位置: 首页 > 知识库问答 >
问题:

Kafka Mongo on kubernetes(minikube)(Mongo作为Kafka NOT Working的来源)与bitnami/Kafka

幸弘光
2023-03-14

哪个图表:图表/比特纳米/Kafka/https://github.com/bitnami/charts/tree/master/bitnami/kafka

描述我正在遵循的教程bug使用Apache Kafka和MongoDB在库伯内特斯上构建可扩展的容错消息群集

为了解决外部署问题,我遵循了外部署不呈现留档#5649中的示例。问题已解决,我的工作配置如下:

  1. Dockerfile文件
FROM bitnami/kafka:latest

RUN mkdir -p /opt/bitnami/kafka/plugins && \
    cd /opt/bitnami/kafka/plugins && \
    curl --remote-name --location --silent https://search.maven.org/remotecontent?filepath=org/mongodb/kafka/mongo-kafka-connect/1.6.1/mongo-kafka-connect-1.6.1-all.jar
extraDeploy:
  - |
    apiVersion: apps/v1
    kind: Deployment
    metadata:
      name: {{ include "kafka.fullname" . }}-connect
      labels: {{- include "common.labels.standard" . | nindent 4 }}
        app.kubernetes.io/component: connector
    spec:
      replicas: 1
      selector:
        matchLabels: {{- include "common.labels.matchLabels" . | nindent 6 }}
          app.kubernetes.io/component: connector
      template:
        metadata:
          labels: {{- include "common.labels.standard" . | nindent 8 }}
            app.kubernetes.io/component: connector
        spec:
          containers:
            - name: connect
              image: kafka-connect-bitnami:5.22
              imagePullPolicy: Never
              command: [ "/bin/bash", "-c", "/opt/bitnami/kafka/bin/connect-standalone.sh /config/connect-standalone.properties /config/mongodb-sink.properties"]
              ports:
                - name: connector
                  containerPort: 8083
              volumeMounts:
                - name: configuration
                  mountPath: /config
          volumes:
            - name: configuration
              configMap:
                name: {{ include "kafka.fullname" . }}-connect
  - |
    apiVersion: v1
    kind: ConfigMap
    metadata:
      name: {{ include "kafka.fullname" . }}-connect
      labels: {{- include "common.labels.standard" . | nindent 4 }}
        app.kubernetes.io/component: connector
    data:
      connect-standalone.properties: |-
        bootstrap.servers={{ include "kafka.fullname" . }}.{{ .Release.Namespace }}.svc.{{ .Values.clusterDomain }}:{{ .Values.service.port }}
        key.converter=org.apache.kafka.connect.json.JsonConverter
        value.converter=org.apache.kafka.connect.json.JsonConverter
        key.converter.schemas.enable=true
        value.converter.schemas.enable=true
        offset.storage.file.filename=/tmp/connect.offsets
        offset.flush.interval.ms=20000
        plugin.path=/opt/bitnami/kafka/plugins
      mongodb-source.properties: |-
        connection.uri=mongodb://user:password@mongodb.default.svc.cluster.local:27017/mydb
        name=mongo-source-connector
        topics=source-topic
        connector.class=com.mongodb.kafka.connect.MongoSourceConnector
        tasks.max=1
        key.converter=org.apache.kafka.connect.json.JsonConverter
        value.converter=org.apache.kafka.connect.json.JsonConverter
        key.converter.schemas.enable=false
        value.converter.schemas.enable=false
        database=mydb
        collection=source
        batch.size=0
        change.stream.full.document=updateLookup
        pipeline=[]
        collation=
      mongodb-sink.properties: |-
        connection.uri=mongodb://user:password@mongodb.default.svc.cluster.local:27017/mydb
        name=mongo-sink-connector
        topics=sink-topic
        connector.class=com.mongodb.kafka.connect.MongoSinkConnector
        tasks.max=1
        key.converter=org.apache.kafka.connect.json.JsonConverter
        value.converter=org.apache.kafka.connect.json.JsonConverter
        key.converter.schemas.enable=false
        value.converter.schemas.enable=false
        database=mydb
        collection=sink
  - |
    apiVersion: v1
    kind: Service    
    metadata:
      name: {{ include "kafka.fullname" . }}-connect
      labels: {{- include "common.labels.standard" . | nindent 4 }}
        app.kubernetes.io/component: connector
    spec:
      ports:
        - protocol: TCP
          port: 8083
          targetPort: connector
      selector: {{- include "common.labels.matchLabels" . | nindent 4 }}
        app.kubernetes.io/component: connector

然后我跑了

helm install kafka bitnami/kafka -f values.yml

我已经为mongodb接收器连接器安装并运行了以下设置:

NAME                                READY   STATUS    RESTARTS      AGE
pod/kafka-0                         1/1     Running   1 (35h ago)   35h
pod/kafka-client                    1/1     Running   0             43h
pod/kafka-connect-669487944-gb4p7   1/1     Running   0             57m
pod/kafka-zookeeper-0               1/1     Running   0             35h
pod/mongodb-arbiter-0               1/1     Running   0             42h
pod/mongodb-client                  1/1     Running   0             41h
pod/mongodb-primary-0               2/2     Running   0             42h
pod/mongodb-secondary-0             2/2     Running   0             42h

NAME                               TYPE        CLUSTER-IP      EXTERNAL-IP   PORT(S)                      AGE
service/kafka                      ClusterIP   10.98.236.199   <none>        9092/TCP                     35h
service/kafka-connect              ClusterIP   10.105.58.215   <none>        8083/TCP                     35h
service/kafka-headless             ClusterIP   None            <none>        9092/TCP,9093/TCP            35h
service/kafka-zookeeper            ClusterIP   10.108.22.188   <none>        2181/TCP,2888/TCP,3888/TCP   35h
service/kafka-zookeeper-headless   ClusterIP   None            <none>        2181/TCP,2888/TCP,3888/TCP   35h
service/kubernetes                 ClusterIP   10.96.0.1       <none>        443/TCP                      14d
service/mongodb                    ClusterIP   10.96.105.153   <none>        27017/TCP,9216/TCP           42h
service/mongodb-headless           ClusterIP   None            <none>        27017/TCP                    42h

NAME                            READY   UP-TO-DATE   AVAILABLE   AGE
deployment.apps/kafka-connect   1/1     1            1           35h

NAME                                       DESIRED   CURRENT   READY   AGE
replicaset.apps/kafka-connect-54cff6f879   0         0         0       35h
replicaset.apps/kafka-connect-59fcf7754c   0         0         0       24h
replicaset.apps/kafka-connect-64c5697f54   0         0         0       21h
replicaset.apps/kafka-connect-669487944    1         1         1       21h
replicaset.apps/kafka-connect-66c6dd4679   0         0         0       35h
replicaset.apps/kafka-connect-84ffbffd5c   0         0         0       23h

NAME                                 READY   AGE
statefulset.apps/kafka               1/1     35h
statefulset.apps/kafka-zookeeper     1/1     35h
statefulset.apps/mongodb-arbiter     1/1     42h
statefulset.apps/mongodb-primary     1/1     42h
statefulset.apps/mongodb-secondary   1/1     42h

我可以使用上述配置成功将消息发布到 mongodb。以下是在 kafaka-connect pod 上成功下沉后的一些消息。

[2022-01-19 11:52:25,476] INFO [mongo-sink-connector|task-0] [Consumer clientId=connector-consumer-mongo-sink-connector-0, groupId=connect-mongo-sink-connector] Setting offset for partition sink-topic-0 to the committed offset FetchPosition{offset=3, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[kafka-0.kafka-headless.default.svc.cluster.local:9092 (id: 0 rack: null)], epoch=0}} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:818)
[2022-01-19 11:52:25,519] INFO [mongo-sink-connector|task-0] Cluster created with settings {hosts=[mongodb.default.svc.cluster.local:27017], mode=SINGLE, requiredClusterType=UNKNOWN, serverSelectionTimeout='30000 ms'} (org.mongodb.driver.cluster:71)
[2022-01-19 11:52:25,522] INFO [mongo-sink-connector|task-0] Opened connection [connectionId{localValue:5, serverValue:7744}] to mongodb.default.svc.cluster.local:27017 (org.mongodb.driver.connection:71)
[2022-01-19 11:52:25,522] INFO [mongo-sink-connector|task-0] Opened connection [connectionId{localValue:4, serverValue:7743}] to mongodb.default.svc.cluster.local:27017 (org.mongodb.driver.connection:71)
[2022-01-19 11:52:25,523] INFO [mongo-sink-connector|task-0] Monitor thread successfully connected to server with description ServerDescription{address=mongodb.default.svc.cluster.local:27017, type=REPLICA_SET_PRIMARY, state=CONNECTED, ok=true, minWireVersion=0, maxWireVersion=8, maxDocumentSize=16777216, logicalSessionTimeoutMinutes=30, roundTripTimeNanos=2650238, setName='rs0', canonicalAddress=mongodb-primary-0.mongodb-headless.default.svc.cluster.local:27017, hosts=[mongodb-primary-0.mongodb-headless.default.svc.cluster.local:27017, mongodb-secondary-0.mongodb-headless.default.svc.cluster.local:27017], passives=[], arbiters=[mongodb-arbiter-0.mongodb-headless.default.svc.cluster.local:27017], primary='mongodb-primary-0.mongodb-headless.default.svc.cluster.local:27017', tagSet=TagSet{[]}, electionId=7fffffff0000000000000002, setVersion=3, topologyVersion=null, lastWriteDate=Wed Jan 19 11:52:24 UTC 2022, lastUpdateTimeNanos=188123424943131} (org.mongodb.driver.cluster:71)
[2022-01-19 11:52:25,566] INFO [mongo-sink-connector|task-0] Opened connection [connectionId{localValue:6, serverValue:7745}] to mongodb.default.svc.cluster.local:27017 (org.mongodb.driver.connection:71)

繁殖

来自生产商:

kubectl exec --tty -i kafka-client --namespace default -- kafka-console-producer.sh --broker-list kafka-0.kafka-headless.default.svc.cluster.local:9092 --topic sink-topic  

>{"foo":"bar.....12"}  
>{"foo":"bar.....1122"}
>

在蒙戈德中:

rs0:PRIMARY> use mydb;
rs0:PRIMARY> db.sink.find()
{ "_id" : ObjectId("61e7fb793bb99a00505efa14"), "foo" : "bar.....12" }
{ "_id" : ObjectId("61e7fb9f3bb99a00505efa16"), "foo" : "bar.....1122" }
rs0:PRIMARY> 

问题?问题是这个图表只适用于mongo-sink。我无法使用 mongo-source-connector 将 mongodb 用作具有上述配置值的源.yml。请注意,要使用 mongo-source-connector,我在 values.yml 文件中进行了以下更改。

command: [ "/bin/bash", "-c", "/opt/bitnami/kafka/bin/connect-standalone.sh /config/connect-standalone.properties /config/mongodb-source.properties"]

预期行为

(i) -Kafka Connect以SINK的身份与Mongo合作,即Kafka--

(ii)但是,它不是作为SOURCE工作,意思是mongo——

pods中没有错误或任何错误消息告诉kafka无法与mongo通信。

我的问题:

  1. 如果我缺少任何必要的设置,要实现的是什么(ii)。
  2. 是否可以模拟运行源代码连接器和接收器连接器以接收kafka中的更新并从kafka发送数据。如果是,需要进行哪些必要的更新来完成此操作。

提前感谢您的反馈

共有1个答案

印成天
2023-03-14

问题是这个图表只适用于mongo sink

因为这是您提供的唯一连接器文件。

你可以给多个

command: 
  - /bin/bash
  - -c
  - /opt/bitnami/kafka/bin/connect-standalone.sh 
  - /config/connect-standalone.properties 
  - /config/mongodb-source.properties
  - /config/mongodb-sink.properties

注意:这不是容错的,也不能在生产中使用。相反,您的命令应该真正使用< code > connect-distributed . sh ,并且您的连接器配置可以保存为JSON文件并发布到服务入口端口。

 类似资料:
  • 我通过玩迷你库贝来学习库伯内特。 如何与Minikube一起使用ingress?或者如何将minikube ip与ingress服务绑定--这样就可以在不使用nodeport的情况下将应用程序公开到外部

  • 下面是AWS Ec2中Mongo的副本设置,io1(10000 iops)和r4。8XL码 1个主节点、1个辅助节点、1个仲裁节点 我们有2种类型的应用程序写入/读取Mongo,几乎数据接近2亿 现在,我们从Mongo获得readtimeout/sockettimeour,而application1在Mongo上几乎没有写操作,而application2在Mongo上执行大量写操作 Applica

  • 我试图从Kafka主题中读取数据,在Flink流媒体。我试图运行以下示例代码,在APACHE Flink 1.1.3文档页面上作为示例:Apache kafka连接器, } 我有以下错误: 你能指导我修理这个吗?Kafka连接器是否存在依赖性问题。我的版本是: Flink 1.1.3

  • 我使用的是kafka connect支持的以下mongo源代码。我发现mongo源代码的一个配置(从这里)是tasks.max。 这意味着我可以提供连接器tasks.max这是 如果它将创建多个连接器来侦听mongoDb更改流,那么我将最终得到重复的消息。那么,mongo真的具有并行性并作为集群工作吗?如果它有超过1个tasks.max?

  • 我使用kafka connect从mongo读取数据并将其写入kafka主题。 我正在使用 mongo kafka 源连接器。 我收到以下错误: 罐子里好像有一个小盒子。为了得到这个罐子,我使用了两种不同的方法,但是我得到了同样的错误。首先,我使用了下载的from:maven资源库,然后我从github repo中克隆了源代码,并自己构建了jar。我将jar推到plugins.path中,当我解压

  • 我创建了https://github.com/mongodb/mongo-kafka的构建 但是这个运行如何与我正在运行的kafka实例连接。 甚至这个问题听起来多么愚蠢。但似乎没有任何文档可以使其与的本地运行的一起工作。 如果你有一个好的资源,请引导我走向它。 更新1-- 已使用maven插件-https://search.maven.org/artifact/org.mongodb.kafka