哪个图表:图表/比特纳米/Kafka/https://github.com/bitnami/charts/tree/master/bitnami/kafka
描述我正在遵循的教程bug使用Apache Kafka和MongoDB在库伯内特斯上构建可扩展的容错消息群集
为了解决外部署问题,我遵循了外部署不呈现留档#5649中的示例。问题已解决,我的工作配置如下:
FROM bitnami/kafka:latest
RUN mkdir -p /opt/bitnami/kafka/plugins && \
cd /opt/bitnami/kafka/plugins && \
curl --remote-name --location --silent https://search.maven.org/remotecontent?filepath=org/mongodb/kafka/mongo-kafka-connect/1.6.1/mongo-kafka-connect-1.6.1-all.jar
extraDeploy:
- |
apiVersion: apps/v1
kind: Deployment
metadata:
name: {{ include "kafka.fullname" . }}-connect
labels: {{- include "common.labels.standard" . | nindent 4 }}
app.kubernetes.io/component: connector
spec:
replicas: 1
selector:
matchLabels: {{- include "common.labels.matchLabels" . | nindent 6 }}
app.kubernetes.io/component: connector
template:
metadata:
labels: {{- include "common.labels.standard" . | nindent 8 }}
app.kubernetes.io/component: connector
spec:
containers:
- name: connect
image: kafka-connect-bitnami:5.22
imagePullPolicy: Never
command: [ "/bin/bash", "-c", "/opt/bitnami/kafka/bin/connect-standalone.sh /config/connect-standalone.properties /config/mongodb-sink.properties"]
ports:
- name: connector
containerPort: 8083
volumeMounts:
- name: configuration
mountPath: /config
volumes:
- name: configuration
configMap:
name: {{ include "kafka.fullname" . }}-connect
- |
apiVersion: v1
kind: ConfigMap
metadata:
name: {{ include "kafka.fullname" . }}-connect
labels: {{- include "common.labels.standard" . | nindent 4 }}
app.kubernetes.io/component: connector
data:
connect-standalone.properties: |-
bootstrap.servers={{ include "kafka.fullname" . }}.{{ .Release.Namespace }}.svc.{{ .Values.clusterDomain }}:{{ .Values.service.port }}
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=20000
plugin.path=/opt/bitnami/kafka/plugins
mongodb-source.properties: |-
connection.uri=mongodb://user:password@mongodb.default.svc.cluster.local:27017/mydb
name=mongo-source-connector
topics=source-topic
connector.class=com.mongodb.kafka.connect.MongoSourceConnector
tasks.max=1
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
database=mydb
collection=source
batch.size=0
change.stream.full.document=updateLookup
pipeline=[]
collation=
mongodb-sink.properties: |-
connection.uri=mongodb://user:password@mongodb.default.svc.cluster.local:27017/mydb
name=mongo-sink-connector
topics=sink-topic
connector.class=com.mongodb.kafka.connect.MongoSinkConnector
tasks.max=1
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
database=mydb
collection=sink
- |
apiVersion: v1
kind: Service
metadata:
name: {{ include "kafka.fullname" . }}-connect
labels: {{- include "common.labels.standard" . | nindent 4 }}
app.kubernetes.io/component: connector
spec:
ports:
- protocol: TCP
port: 8083
targetPort: connector
selector: {{- include "common.labels.matchLabels" . | nindent 4 }}
app.kubernetes.io/component: connector
然后我跑了
helm install kafka bitnami/kafka -f values.yml
我已经为mongodb接收器连接器安装并运行了以下设置:
NAME READY STATUS RESTARTS AGE
pod/kafka-0 1/1 Running 1 (35h ago) 35h
pod/kafka-client 1/1 Running 0 43h
pod/kafka-connect-669487944-gb4p7 1/1 Running 0 57m
pod/kafka-zookeeper-0 1/1 Running 0 35h
pod/mongodb-arbiter-0 1/1 Running 0 42h
pod/mongodb-client 1/1 Running 0 41h
pod/mongodb-primary-0 2/2 Running 0 42h
pod/mongodb-secondary-0 2/2 Running 0 42h
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
service/kafka ClusterIP 10.98.236.199 <none> 9092/TCP 35h
service/kafka-connect ClusterIP 10.105.58.215 <none> 8083/TCP 35h
service/kafka-headless ClusterIP None <none> 9092/TCP,9093/TCP 35h
service/kafka-zookeeper ClusterIP 10.108.22.188 <none> 2181/TCP,2888/TCP,3888/TCP 35h
service/kafka-zookeeper-headless ClusterIP None <none> 2181/TCP,2888/TCP,3888/TCP 35h
service/kubernetes ClusterIP 10.96.0.1 <none> 443/TCP 14d
service/mongodb ClusterIP 10.96.105.153 <none> 27017/TCP,9216/TCP 42h
service/mongodb-headless ClusterIP None <none> 27017/TCP 42h
NAME READY UP-TO-DATE AVAILABLE AGE
deployment.apps/kafka-connect 1/1 1 1 35h
NAME DESIRED CURRENT READY AGE
replicaset.apps/kafka-connect-54cff6f879 0 0 0 35h
replicaset.apps/kafka-connect-59fcf7754c 0 0 0 24h
replicaset.apps/kafka-connect-64c5697f54 0 0 0 21h
replicaset.apps/kafka-connect-669487944 1 1 1 21h
replicaset.apps/kafka-connect-66c6dd4679 0 0 0 35h
replicaset.apps/kafka-connect-84ffbffd5c 0 0 0 23h
NAME READY AGE
statefulset.apps/kafka 1/1 35h
statefulset.apps/kafka-zookeeper 1/1 35h
statefulset.apps/mongodb-arbiter 1/1 42h
statefulset.apps/mongodb-primary 1/1 42h
statefulset.apps/mongodb-secondary 1/1 42h
我可以使用上述配置成功将消息发布到 mongodb。以下是在 kafaka-connect pod 上成功下沉后的一些消息。
[2022-01-19 11:52:25,476] INFO [mongo-sink-connector|task-0] [Consumer clientId=connector-consumer-mongo-sink-connector-0, groupId=connect-mongo-sink-connector] Setting offset for partition sink-topic-0 to the committed offset FetchPosition{offset=3, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[kafka-0.kafka-headless.default.svc.cluster.local:9092 (id: 0 rack: null)], epoch=0}} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:818)
[2022-01-19 11:52:25,519] INFO [mongo-sink-connector|task-0] Cluster created with settings {hosts=[mongodb.default.svc.cluster.local:27017], mode=SINGLE, requiredClusterType=UNKNOWN, serverSelectionTimeout='30000 ms'} (org.mongodb.driver.cluster:71)
[2022-01-19 11:52:25,522] INFO [mongo-sink-connector|task-0] Opened connection [connectionId{localValue:5, serverValue:7744}] to mongodb.default.svc.cluster.local:27017 (org.mongodb.driver.connection:71)
[2022-01-19 11:52:25,522] INFO [mongo-sink-connector|task-0] Opened connection [connectionId{localValue:4, serverValue:7743}] to mongodb.default.svc.cluster.local:27017 (org.mongodb.driver.connection:71)
[2022-01-19 11:52:25,523] INFO [mongo-sink-connector|task-0] Monitor thread successfully connected to server with description ServerDescription{address=mongodb.default.svc.cluster.local:27017, type=REPLICA_SET_PRIMARY, state=CONNECTED, ok=true, minWireVersion=0, maxWireVersion=8, maxDocumentSize=16777216, logicalSessionTimeoutMinutes=30, roundTripTimeNanos=2650238, setName='rs0', canonicalAddress=mongodb-primary-0.mongodb-headless.default.svc.cluster.local:27017, hosts=[mongodb-primary-0.mongodb-headless.default.svc.cluster.local:27017, mongodb-secondary-0.mongodb-headless.default.svc.cluster.local:27017], passives=[], arbiters=[mongodb-arbiter-0.mongodb-headless.default.svc.cluster.local:27017], primary='mongodb-primary-0.mongodb-headless.default.svc.cluster.local:27017', tagSet=TagSet{[]}, electionId=7fffffff0000000000000002, setVersion=3, topologyVersion=null, lastWriteDate=Wed Jan 19 11:52:24 UTC 2022, lastUpdateTimeNanos=188123424943131} (org.mongodb.driver.cluster:71)
[2022-01-19 11:52:25,566] INFO [mongo-sink-connector|task-0] Opened connection [connectionId{localValue:6, serverValue:7745}] to mongodb.default.svc.cluster.local:27017 (org.mongodb.driver.connection:71)
繁殖
来自生产商:
kubectl exec --tty -i kafka-client --namespace default -- kafka-console-producer.sh --broker-list kafka-0.kafka-headless.default.svc.cluster.local:9092 --topic sink-topic
>{"foo":"bar.....12"}
>{"foo":"bar.....1122"}
>
在蒙戈德中:
rs0:PRIMARY> use mydb;
rs0:PRIMARY> db.sink.find()
{ "_id" : ObjectId("61e7fb793bb99a00505efa14"), "foo" : "bar.....12" }
{ "_id" : ObjectId("61e7fb9f3bb99a00505efa16"), "foo" : "bar.....1122" }
rs0:PRIMARY>
问题?问题是这个图表只适用于mongo-sink。我无法使用 mongo-source-connector 将 mongodb 用作具有上述配置值的源.yml
。请注意,要使用 mongo-source-connector,我在 values.yml
文件中进行了以下更改。
command: [ "/bin/bash", "-c", "/opt/bitnami/kafka/bin/connect-standalone.sh /config/connect-standalone.properties /config/mongodb-source.properties"]
预期行为
(i) -Kafka Connect以SINK的身份与Mongo合作,即Kafka--
(ii)但是,它不是作为SOURCE工作,意思是mongo——
pods中没有错误或任何错误消息告诉kafka无法与mongo通信。
我的问题:
提前感谢您的反馈
问题是这个图表只适用于mongo sink
因为这是您提供的唯一连接器文件。
你可以给多个
command:
- /bin/bash
- -c
- /opt/bitnami/kafka/bin/connect-standalone.sh
- /config/connect-standalone.properties
- /config/mongodb-source.properties
- /config/mongodb-sink.properties
注意:这不是容错的,也不能在生产中使用。相反,您的命令应该真正使用< code > connect-distributed . sh ,并且您的连接器配置可以保存为JSON文件并发布到服务入口端口。
我通过玩迷你库贝来学习库伯内特。 如何与Minikube一起使用ingress?或者如何将minikube ip与ingress服务绑定--这样就可以在不使用nodeport的情况下将应用程序公开到外部
下面是AWS Ec2中Mongo的副本设置,io1(10000 iops)和r4。8XL码 1个主节点、1个辅助节点、1个仲裁节点 我们有2种类型的应用程序写入/读取Mongo,几乎数据接近2亿 现在,我们从Mongo获得readtimeout/sockettimeour,而application1在Mongo上几乎没有写操作,而application2在Mongo上执行大量写操作 Applica
我试图从Kafka主题中读取数据,在Flink流媒体。我试图运行以下示例代码,在APACHE Flink 1.1.3文档页面上作为示例:Apache kafka连接器, } 我有以下错误: 你能指导我修理这个吗?Kafka连接器是否存在依赖性问题。我的版本是: Flink 1.1.3
我使用的是kafka connect支持的以下mongo源代码。我发现mongo源代码的一个配置(从这里)是tasks.max。 这意味着我可以提供连接器tasks.max这是 如果它将创建多个连接器来侦听mongoDb更改流,那么我将最终得到重复的消息。那么,mongo真的具有并行性并作为集群工作吗?如果它有超过1个tasks.max?
我使用kafka connect从mongo读取数据并将其写入kafka主题。 我正在使用 mongo kafka 源连接器。 我收到以下错误: 罐子里好像有一个小盒子。为了得到这个罐子,我使用了两种不同的方法,但是我得到了同样的错误。首先,我使用了下载的from:maven资源库,然后我从github repo中克隆了源代码,并自己构建了jar。我将jar推到plugins.path中,当我解压
我创建了https://github.com/mongodb/mongo-kafka的构建 但是这个运行如何与我正在运行的kafka实例连接。 甚至这个问题听起来多么愚蠢。但似乎没有任何文档可以使其与的本地运行的一起工作。 如果你有一个好的资源,请引导我走向它。 更新1-- 已使用maven插件-https://search.maven.org/artifact/org.mongodb.kafka