搭建Pulsar集群至少需要3个组件:Zookeeper集群、Bookkeeper集群和Broker集群
Pulsar的安装包里已经包含了Zookeeper和Bookkeeper组件,但是一般情况下,我们还是使用外置的Zookeeper集群,本次默认外置的Zookeeper集群已经搭建成功。
1.虚拟机准备,默认使用VMware虚拟化了三台机器,且IP地址如下:
192.168.203.101
192.168.203.102
192.168.203.103
2.前置安装,默认每台机器都已经完成了JDK1.8和zookeeper的安装,且zookeeper集群能正常工作;
3.每台机器上都已经解压了apache-pulsar-2.10.1,并进入到pulsar目录下;
[root@rentyk opt]# cd apache-pulsar-2.10.1/
下面的所有操作都是在/opt/apache-pulsar-2.10.1/目录下
主要需要修改bookkeeper.conf和broker.conf两个配置文件
1.修改bookkeeper集群配置文件
./conf/bookkeeper.conf
2.修改配置内容如下:
# 修改目录(39行)
journalDirectory=/data/pulsar_2.10.1/bookkeeper/journal
# 修改本地ip地址(56行)
advertisedAddress=192.168.203.101
# 修改目录(405行)
ledgerDirectories=/data/pulsar_2.10.1/bookkeeper/ledgers
# 修改zk服务器地址(666)
zkServers=192.168.203.101:2181,192.168.203.102:2181,192.168.203.103:2181
PS:另外两台的advertisedAddress属性分别配置为192.168.203.102和192.168.203.103。
3.检查配置是否正确
[root@rentyk conf]# cat bookkeeper.conf | grep -E 'journalDirectory=|advertisedAddress=|ledgerDirectories=|zkServers='
journalDirectory=/data/pulsar_2.10.1/bookkeeper/journal
advertisedAddress=192.168.203.101
ledgerDirectories=/data/pulsar_2.10.1/bookkeeper/ledgers
zkServers=192.168.203.101:2181,192.168.203.102:2181,192.168.203.103:2181
配置目录
[root@rentyk data]# mkdir -p /data/pulsar_2.10.1/bookkeeper/journal
[root@rentyk data]# mkdir -p /data/pulsar_2.10.1/bookkeeper/ledgers
1.修改broker集群的配置文件
./conf/broker.conf
2.修改配置内容如下:
# 配置广播地址(61行)
advertisedAddress=192.168.203.101
# 修改集群的名称(115行)
clusterName=pulsar-cluster
# 配置zookeeper地址(1425行)
zookeeperServers=192.168.203.101:2181,192.168.203.102:2181,192.168.203.103:2181
# 修改配置存储地址(同zookeeper地址 1430行)
configurationStoreServers=192.168.203.101:2181,192.168.203.102:2181,192.168.203.103:2181
PS:另外两台的advertisedAddress属性分别配置为192.168.203.102和192.168.203.103。
3.检查配置是否正确
[root@rentyk conf]# cat broker.conf | grep -E 'clusterName|zookeeperServers|configurationStoreServers|advertisedAddress='
advertisedAddress=192.168.203.101
clusterName=pulsar-cluster
zookeeperServers=192.168.203.101:2181,192.168.203.102:2181,192.168.203.103:2181
configurationStoreServers=192.168.203.101:2181,192.168.203.102:2181,192.168.203.103:2181
cd /opt/apache-zookeeper-3.7.1-bin/bin/ && ./zkServer.sh start
查看是否启动成功
./zkServer.sh status
只需要在其中的一台上进行初始化元数据即可
./bin/pulsar initialize-cluster-metadata \
--cluster pulsar-cluster \
--zookeeper 192.168.203.101:2181,192.168.203.102:2181,192.168.203.103:2181 \
--configuration-store 192.168.203.101:2181,192.168.203.102:2181,192.168.203.103:2181 \
--web-service-url http://192.168.203.101:8080,192.168.203.102:8080,192.168.203.103:8080 \
--web-service-url-tls https://192.168.203.101:8443,192.168.203.102:8443,192.168.203.103:8443 \
--broker-service-url pulsar://192.168.203.101:6650,192.168.203.102:6650,192.168.203.103:6650 \
--broker-service-url-tls pulsar+ssl://192.168.203.101:6651,192.168.203.102:6651,192.168.203.103:6651
注意几个端口,zookeeper的配置端口是2181,broker的端口是6650和6651
./bin/pulsar-daemon start bookie
验证bookkeeper服务是否启动成功
./bin/bookkeeper shell bookiesanity
如果给出提示“Bookie sanity test succeed” 认为启动成功
启动broker
./bin/pulsar-daemon start broker
验证broker服务是否启动成功
./bin/pulsar-admin brokers list pulsar-cluster
broker集群正常启动后,会出现以下结果
[root@rentyk bin]# ./pulsar-admin brokers list pulsar-cluters
192.168.203.103:8080
192.168.203.102:8080
192.168.203.101:8080
启动消费者
[root@rentyk bin]# ./pulsar-client consume \
persistent://public/default/test \
-n 100 \
-s "consumer-test" \
-t "Exclusive"
启动生产者
[root@rentyk bin]# ./pulsar-client produce persistent://public/default/test -m "Hello Pulsar!" -n 1
生产者发送成功会打印如下日志:
org.apache.pulsar.client.cli.PulsarClientTool - 1 messages successfully produced
消费者消费会打印如下日志:
----- got message -----
key:[null], properties:[], content:Hello Pulsar
下面列出生产者和消费者对应的参数,后续学习使用。
消费者启动对应参数如下:
[root@rentyk bin]# ./pulsar-client consume
The following option is required: [-s | --subscription-name]
Consume messages from a specified topic
Usage: consume [options] TopicName
Options:
-ac, --auto_ack_chunk_q_full
Auto ack for oldest message on queue is full
Default: false
-ekv, --encryption-key-value
The URI of private key to decrypt payload, for example
file:///path/to/private.key or data:application/x-pem-file;base64,*****
--hex
Display binary messages in hex.
Default: false
--hide-content
Do not write the message to console.
Default: false
-mc, --max_chunked_msg
Max pending chunk messages
Default: 0
-n, --num-messages
Number of messages to consume, 0 means to consume forever.
Default: 1
-pm, --pool-messages
Use the pooled message
Default: true
-q, --queue-size
Consumer receiver queue size.
Default: 0
-r, --rate
Rate (in msg/sec) at which to consume, value 0 means to consume messages
as fast as possible.
Default: 0.0
--regex
Indicate the topic name is a regex pattern
Default: false
-st, --schema-type
Set a schema type on the consumer, it can be 'bytes' or 'auto_consume'
Default: bytes
-m, --subscription-mode
Subscription mode.
Default: Durable
Possible Values: [Durable, NonDurable]
* -s, --subscription-name
Subscription name. // 订阅的名称
-p, --subscription-position
Subscription position.
Default: Latest
Possible Values: [Latest, Earliest]
-t, --subscription-type
Subscription type.
Default: Exclusive
Possible Values: [Exclusive, Shared, Failover, Key_Shared]
生产者参数如下:
[root@rentyk bin]# ./pulsar-client produce
Main parameters are required ("TopicName")
Produce messages to a specified topic
Usage: produce [options] TopicName
Options:
-c, --chunking
Should split the message and publish in chunks if message size is larger
than allowed max size
Default: false
-db, --disable-batching
Disable batch sending of messages
Default: false
-dr, --disable-replication
Disable geo-replication for messages.
Default: false
-ekn, --encryption-key-name
The public key name to encrypt payload
-ekv, --encryption-key-value
The URI of public key to encrypt payload, for example
file:///path/to/public.key or data:application/x-pem-file;base64,*****
-f, --files
Comma separated file paths to send, either -m or -f must be specified.
Default: []
-k, --key
message key to add
-ks, --key-schema
Schema type (can be bytes,avro,json,string...)
Default: string
-kvet, --key-value-encoding-type
Key Value Encoding Type (it can be separated or inline)
-m, --messages
Messages to send, either -m or -f must be specified. The default
separator is comma
Default: []
-n, --num-produce
Number of times to send message(s), the count of messages/files *
num-produce should below than 1000.
Default: 1
-p, --properties
Properties to add, Comma separated key=value string, like k1=v1,k2=v2.
Default: []
-r, --rate
Rate (in msg/sec) at which to produce, value 0 means to produce messages
as fast as possible.
Default: 0.0
-s, --separator
Character to split messages string on default is comma
Default: ,
-vs, --value-schema
Schema type (can be bytes,avro,json,string...)
Default: bytes
brokers服务默认要使用8080端口,但是被外置的zookeeper服务占用了。当前zookeeper使用的是zookeeper3.7.1版本,因为zookeeper3.6之后的版本,开启服务器会自动占用8080端口,所以需要修改zookeeper的配置zoo.cfg,并重启zk服务。
# admin.serverPort默认占用8080端口
admin.serverPort=8888