AMQ Streams
优质
小牛编辑
142浏览
2023-12-01
功能概述
AMQ Streams 基于 Strimzi(http://strimzi.io/) 社区产品,核心组件包括 Apache Kafka 和 Apache Zookeeper,是一个可大规模扩展,分布式和高性能的数据流平台。详细组件如下所示:
名称 | 功能 |
---|---|
Zookeeper | 高可靠分布式协调器 |
Kafka Broker | 可靠消息传输代理,负责将生产者创建的消息传输给消费者 |
Kafka Connect | 流数据小工具,将 Kafka Broker 和其它系统通过 Connector 插件连接 |
Kafka Consumer and Producer APIs | Java API,用来向 Kafka Broker 发送消息或从 Kafka Broker 接收消息 |
Kafka Streams API | 开发流处理应用 |
本地运行
安装与部署
1. 创建 kafka 用户和组# groupadd kafka
# useradd -g kafka kafka
# passwd kafka
2. 创建 /opt/kafka 路径# mkdir /opt/kafka
3. 解压安装# unzip amq-streams-1.0.0-bin.zip -d /opt/kafka
4. 设定 /opt/kafka 组属关系# chown -R kafka:kafka /opt/kafka/
5. 创建 /var/lib/zookeeper,并设定组属关系# mkdir /var/lib/zookeeper
# chown -R kafka:kafka /var/lib/zookeeper/
6. 创建 /var/lib/kafka,并设定组属关系# mkdir /var/lib/kafka
# chown -R kafka:kafka /var/lib/kafka/
单节点运行
1. 切换到 kafka 用户# su - kafka
2. 编辑 /opt/kafka/config/zookeeper.properties,设定 dataDirdataDir=/var/lib/zookeeper
3. 启动 Zookeeper$ /opt/kafka/bin/zookeeper-server-start.sh -daemon /opt/kafka/config/zookeeper.properties
4. 确保 Zookeeper 运行$ jcmd | grep zookeeper
12433 org.apache.zookeeper.server.quorum.QuorumPeerMain /opt/kafka/config/zookeeper.properties
5. 编辑 /opt/kafka/config/server.properties,设定 log.dirslog.dirs=/var/lib/kafka
6. 启动 Kafka$ /opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties
7. 确保 Kafka 运行$ jcmd | grep kafka
12778 kafka.Kafka /opt/kafka/config/server.properties
8. 向 my-topic 发送消息$ /opt/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-topic
Hello World Kafka
9. 从 my-topic 接收消息$ /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my-topic --from-beginning
Hllo World Kafka
10. 停止 Kafka$ /opt/kafka/bin/kafka-server-stop.sh
$ /opt/kafka/bin/zookeeper-server-stop.sh
OpenShift 上部署 Kafka
集群部署
1. 下载安装脚本# wget https://github.com/strimzi/strimzi-kafka-operator/releases/download/0.8.2/strimzi-0.8.2.zip
2. 创建工程# oc new-project streams
3. 部署 Cluster Operator# sed -i 's/namespace: .*/namespace: streams/' install/cluster-operator/*RoleBinding*.yaml
# oc apply -f install/cluster-operator
# oc apply -f examples/templates/cluster-operator
4. 部署 Kafka cluster# oc apply -f examples/kafka/kafka-persistent.yaml
5. 部署 Kafka Connect# oc apply -f examples/kafka-connect/kafka-connect.yaml
6. 验证 Kafka cluster 启动状况# oc get pods
NAME READY STATUS RESTARTS AGE
my-cluster-entity-operator-54457f4548-x8mvq 3/3 Running 1 32m
my-cluster-kafka-0 2/2 Running 1 34m
my-cluster-kafka-1 2/2 Running 1 34m
my-cluster-kafka-2 2/2 Running 1 34m
my-cluster-zookeeper-0 2/2 Running 0 36m
my-cluster-zookeeper-1 2/2 Running 0 36m
my-cluster-zookeeper-2 2/2 Running 0 36m
my-connect-cluster-connect-64669fb655-6dpzb 1/1 Running 0 22m
strimzi-cluster-operator-696658566-bfmbf 1/1 Running 0 43m
部署测试
1. 发送消息到 my-topic# oc run kafka-producer -ti --image=registry.access.redhat.com/amqstreams-1/amqstreams10-kafka-openshift:1.0.0 --rm=true --restart=Never -- bin/kafka-console-producer.sh --broker-list my-cluster-kafka-bootstrap:9092 --topic my-topic
>Hello World
>AMQ STREAMS
>Kafka
2. 从 my-topic 接收消息# oc run kafka-consumer -ti --image=registry.access.redhat.com/amqstreams-1/amqstreams10-kafka-openshift:1.0.0 --rm=true --restart=Never -- bin/kafka-console-consumer.sh --bootstrap-server my-cluster-kafka-bootstrap:9092 --topic my-topic --from-beginning
Hello World
AMQ STREAMS
Kafka
**
**