当前位置: 首页 > 工具软件 > php-proxy-app > 使用案例 >

安装kafka rest proxy - 使得我们能通过http访问kafka

汤昊
2023-12-01

安装kafka rest proxy - 使得我们能通过http访问kafka,这对于那些没用对应kafka客户端的语言尤其有用,比如php。


The Kafka REST Proxy provides a RESTful interface to a Kafka cluster. It makes it easy to produce and consume messages, view the state of the cluster, and perform administrative actions without using the native Kafka protocol or clients. Examples of use cases include reporting data to Kafka from any frontend app built in any language, ingesting messages into a stream processing framework that doesn’t yet support Kafka, and scripting administrative actions.


对应kafka0.9版本

安装方法1:

wget http://packages.confluent.io/archive/2.0/confluent-2.0.1-2.11.7.zip
unzip confluent-2.0.1-2.11.7.zip
cd confluent-2.0.1

vi etc/schema-registry/schema-registry.properties 修改其中的kafka的zookeeper路径(同kafka配置文件中的zookeeper路径)  可以用port设置端口
port=8081 schema-registry的端口,后面会用到
kafkastore.connection.url=localhost:2181/kafka   kafka的zookeeper路径(同kafka配置文件中的zookeeper路径,包括/chroot)
kafkastore.topic=_schemas   使用默认即可
debug=false




./bin/schema-registry-start etc/schema-registry/schema-registry.properties 启动注册节点


vi etc/kafka-rest/kafka-rest.properties 可以用port设置端口
port=8082 rest端口,通过此端口访问kafka的功能,如发送消息等
id=kafka-rest-test-server
schema.registry.url=http://localhost:8081   schema-registry的ip:端口
zookeeper.connect=localhost:2181/kafka   kafka的zookeeper路径(同kafka配置文件中的zookeeper路径,包括/chroot)




./bin/kafka-rest-start etc/kafka-rest/kafka-rest.properties 启动rest服务器


安装方法2

配置文件修改参看安装方法1

rpm --import http://packages.confluent.io/rpm/2.0/archive.key
vi /etc/yum.repos.d/confluent2.0.repo
[confluent-2.0]
name=Confluent repository for 2.0.x packages
baseurl=http://packages.confluent.io/rpm/2.0
gpgcheck=1
gpgkey=http://packages.confluent.io/rpm/2.0/archive.key
enabled=1




yum install confluent-common-2.0.1
yum install confluent-rest-utils-2.0.1
yum install confluent-kafka-rest-2.0.1
yum install confluent-schema-registry-2.0.1
vi /etc/schema-registry/schema-registry.properties
vi /etc/kafka-rest/kafka-rest.properties
schema-registry-start /etc/schema-registry/schema-registry.properties
kafka-rest-start /etc/kafka-rest/kafka-rest.properties





对应kafka0.8版本

安装方法1:

wget http://packages.confluent.io/archive/1.0/confluent-1.0.1-2.10.4.zip
unzip confluent-1.0.1-2.10.4.zip
cd confluent-1.0.1

vi etc/schema-registry/schema-registry.properties 修改其中的kafka的zookeeper路径(同kafka配置文件中的zookeeper路径)  可以用port设置端口
port=8081 schema-registry的端口,后面会用到
kafkastore.connection.url=localhost:2181/kafka   kafka的zookeeper路径(同kafka配置文件中的zookeeper路径,包括/chroot)
kafkastore.topic=_schemas   使用默认即可
debug=false





./bin/schema-registry-start etc/schema-registry/schema-registry.properties 启动注册节点


vi etc/kafka-rest/kafka-rest.properties 可以用port设置端口
port=8082 rest端口,通过此端口访问kafka的功能,如发送消息等
id=kafka-rest-test-server
schema.registry.url=http://localhost:8081   schema-registry的ip:端口
zookeeper.connect=localhost:2181/kafka   kafka的zookeeper路径(同kafka配置文件中的zookeeper路径,包括/chroot)





./bin/kafka-rest-start etc/kafka-rest/kafka-rest.properties 启动rest服务器






安装方法2

配置文件修改参看安装方法1

rpm --import http://packages.confluent.io/rpm/1.0/archive.key
vi /etc/yum.repos.d/confluent1.0.repo
[confluent-1.0]
name=Confluent repository for 1.0.x packages
baseurl=http://packages.confluent.io/rpm/1.0
gpgcheck=1
gpgkey=http://packages.confluent.io/rpm/1.0/archive.key
enabled=1






yum install confluent-common-1.0.1
yum install confluent-rest-utils-1.0.1
yum install confluent-kafka-rest-1.0.1
yum install confluent-schema-registry-1.0.1
vi /etc/schema-registry/schema-registry.properties
vi /etc/kafka-rest/kafka-rest.properties
schema-registry-start /etc/schema-registry/schema-registry.properties
kafka-rest-start /etc/kafka-rest/kafka-rest.properties




调用kafka功能

对于kafka 0.8,可参看文档:http://docs.confluent.io/1.0.1/kafka-rest/docs/intro.html
对于kafka 0.9,可参看文档:http://docs.confluent.io/2.0.0/kafka-rest/docs/intro.html


POST http://10.255.xx.xx:8082/topics/test HTTP/1.1
Host: 10.255.xx.xx:8082
Content-Type: application/vnd.kafka.binary.v1+json
Content-Length: 34


{"records":[{"value":"5Lit5paHIGhlbGxvIHdvcmQ="}]}

此方法将发送消息到kafka的名字叫“test”的topic。
发送的消息内容用Base64编码,放到json(格式为“{"records":[{"value":"消息内容的Base64编码后的字符串"}]}”)中的value字段






 类似资料: