当前位置: 首页 > 工具软件 > Apache Pulsar > 使用案例 >

Apache Pulsar的集群模式部署

纪俊良
2023-12-01

Apache Pulsar分布式集群模式构建

搭建Pulsar集群至少需要3个组件:Zookeeper集群、Bookkeeper集群和Broker集群

  • Zookeeper集群(由3个Zookeeper节点组成)
  • Bookkeeper集群(由3个Bookkeeper节点组成)
  • broker集群(由3个pulsar节点组成)

Pulsar的安装包里已经包含了Zookeeper和Bookkeeper组件,但是一般情况下,我们还是使用外置的Zookeeper集群,本次默认外置的Zookeeper集群已经搭建成功。

一、准备工作

1.虚拟机准备,默认使用VMware虚拟化了三台机器,且IP地址如下:

​ 192.168.203.101
​ 192.168.203.102
​ 192.168.203.103

2.前置安装,默认每台机器都已经完成了JDK1.8和zookeeper的安装,且zookeeper集群能正常工作;

3.每台机器上都已经解压了apache-pulsar-2.10.1,并进入到pulsar目录下;

[root@rentyk opt]# cd apache-pulsar-2.10.1/

下面的所有操作都是在/opt/apache-pulsar-2.10.1/目录下

二、配置修改

主要需要修改bookkeeper.conf和broker.conf两个配置文件

2.1 bookkeeper配置

1.修改bookkeeper集群配置文件

./conf/bookkeeper.conf

2.修改配置内容如下:

# 修改目录(39行)
journalDirectory=/data/pulsar_2.10.1/bookkeeper/journal
# 修改本地ip地址(56行)
advertisedAddress=192.168.203.101
# 修改目录(405行)
ledgerDirectories=/data/pulsar_2.10.1/bookkeeper/ledgers
# 修改zk服务器地址(666)
zkServers=192.168.203.101:2181,192.168.203.102:2181,192.168.203.103:2181

PS:另外两台的advertisedAddress属性分别配置为192.168.203.102和192.168.203.103。

3.检查配置是否正确

[root@rentyk conf]# cat bookkeeper.conf | grep -E 'journalDirectory=|advertisedAddress=|ledgerDirectories=|zkServers='
journalDirectory=/data/pulsar_2.10.1/bookkeeper/journal
advertisedAddress=192.168.203.101
ledgerDirectories=/data/pulsar_2.10.1/bookkeeper/ledgers
zkServers=192.168.203.101:2181,192.168.203.102:2181,192.168.203.103:2181

配置目录

[root@rentyk data]# mkdir -p /data/pulsar_2.10.1/bookkeeper/journal
[root@rentyk data]# mkdir -p /data/pulsar_2.10.1/bookkeeper/ledgers

2.2 broker配置

1.修改broker集群的配置文件

./conf/broker.conf

2.修改配置内容如下:

# 配置广播地址(61行)
advertisedAddress=192.168.203.101
# 修改集群的名称(115行)
clusterName=pulsar-cluster
# 配置zookeeper地址(1425行)
zookeeperServers=192.168.203.101:2181,192.168.203.102:2181,192.168.203.103:2181
# 修改配置存储地址(同zookeeper地址  1430行)
configurationStoreServers=192.168.203.101:2181,192.168.203.102:2181,192.168.203.103:2181

PS:另外两台的advertisedAddress属性分别配置为192.168.203.102和192.168.203.103。

3.检查配置是否正确

[root@rentyk conf]# cat broker.conf | grep -E 'clusterName|zookeeperServers|configurationStoreServers|advertisedAddress='
advertisedAddress=192.168.203.101
clusterName=pulsar-cluster
zookeeperServers=192.168.203.101:2181,192.168.203.102:2181,192.168.203.103:2181
configurationStoreServers=192.168.203.101:2181,192.168.203.102:2181,192.168.203.103:2181

三、分布式集群模式启动

3.1 启动zookeeper集群

cd /opt/apache-zookeeper-3.7.1-bin/bin/ && ./zkServer.sh start

查看是否启动成功

./zkServer.sh status

3.2 初始化元数据

只需要在其中的一台上进行初始化元数据即可

./bin/pulsar initialize-cluster-metadata \
--cluster pulsar-cluster \
--zookeeper 192.168.203.101:2181,192.168.203.102:2181,192.168.203.103:2181 \
--configuration-store 192.168.203.101:2181,192.168.203.102:2181,192.168.203.103:2181 \
--web-service-url http://192.168.203.101:8080,192.168.203.102:8080,192.168.203.103:8080 \
--web-service-url-tls https://192.168.203.101:8443,192.168.203.102:8443,192.168.203.103:8443 \
--broker-service-url pulsar://192.168.203.101:6650,192.168.203.102:6650,192.168.203.103:6650 \
--broker-service-url-tls pulsar+ssl://192.168.203.101:6651,192.168.203.102:6651,192.168.203.103:6651 

注意几个端口,zookeeper的配置端口是2181,broker的端口是6650和6651

3.3 启动bookkeeper服务

./bin/pulsar-daemon start bookie

验证bookkeeper服务是否启动成功

./bin/bookkeeper shell bookiesanity

如果给出提示“Bookie sanity test succeed” 认为启动成功

3.4 启动broker服务

启动broker

./bin/pulsar-daemon start broker

验证broker服务是否启动成功

./bin/pulsar-admin brokers list pulsar-cluster 

broker集群正常启动后,会出现以下结果

[root@rentyk bin]# ./pulsar-admin brokers list pulsar-cluters
192.168.203.103:8080
192.168.203.102:8080
192.168.203.101:8080

3.5 验证集群下的生产消费功能

启动消费者

[root@rentyk bin]# ./pulsar-client consume \
persistent://public/default/test \
-n 100 \
-s "consumer-test" \
-t "Exclusive"

启动生产者

[root@rentyk bin]# ./pulsar-client produce persistent://public/default/test  -m "Hello Pulsar!" -n 1

生产者发送成功会打印如下日志:

org.apache.pulsar.client.cli.PulsarClientTool - 1 messages successfully produced

消费者消费会打印如下日志:

----- got message -----
key:[null], properties:[], content:Hello Pulsar

3.6 命令参数

下面列出生产者和消费者对应的参数,后续学习使用。

消费者启动对应参数如下:

[root@rentyk bin]# ./pulsar-client consume
The following option is required: [-s | --subscription-name]
Consume messages from a specified topic
Usage: consume [options] TopicName
  Options:
    -ac, --auto_ack_chunk_q_full
      Auto ack for oldest message on queue is full
      Default: false
    -ekv, --encryption-key-value
      The URI of private key to decrypt payload, for example 
      file:///path/to/private.key or data:application/x-pem-file;base64,*****
    --hex
      Display binary messages in hex.
      Default: false
    --hide-content
      Do not write the message to console.
      Default: false
    -mc, --max_chunked_msg
      Max pending chunk messages
      Default: 0
    -n, --num-messages
      Number of messages to consume, 0 means to consume forever.
      Default: 1
    -pm, --pool-messages
      Use the pooled message
      Default: true
    -q, --queue-size
      Consumer receiver queue size.
      Default: 0
    -r, --rate
      Rate (in msg/sec) at which to consume, value 0 means to consume messages 
      as fast as possible.
      Default: 0.0
    --regex
      Indicate the topic name is a regex pattern
      Default: false
    -st, --schema-type
      Set a schema type on the consumer, it can be 'bytes' or 'auto_consume'
      Default: bytes
    -m, --subscription-mode
      Subscription mode.
      Default: Durable
      Possible Values: [Durable, NonDurable]
  * -s, --subscription-name
      Subscription name.  // 订阅的名称
    -p, --subscription-position
      Subscription position.
      Default: Latest
      Possible Values: [Latest, Earliest]
    -t, --subscription-type
      Subscription type.
      Default: Exclusive
      Possible Values: [Exclusive, Shared, Failover, Key_Shared]

生产者参数如下:

[root@rentyk bin]# ./pulsar-client produce
Main parameters are required ("TopicName")
Produce messages to a specified topic
Usage: produce [options] TopicName
  Options:
    -c, --chunking
      Should split the message and publish in chunks if message size is larger 
      than allowed max size
      Default: false
    -db, --disable-batching
      Disable batch sending of messages
      Default: false
    -dr, --disable-replication
      Disable geo-replication for messages.
      Default: false
    -ekn, --encryption-key-name
      The public key name to encrypt payload
    -ekv, --encryption-key-value
      The URI of public key to encrypt payload, for example 
      file:///path/to/public.key or data:application/x-pem-file;base64,*****
    -f, --files
      Comma separated file paths to send, either -m or -f must be specified.
      Default: []
    -k, --key
      message key to add
    -ks, --key-schema
      Schema type (can be bytes,avro,json,string...)
      Default: string
    -kvet, --key-value-encoding-type
      Key Value Encoding Type (it can be separated or inline)
    -m, --messages
      Messages to send, either -m or -f must be specified. The default 
      separator is comma
      Default: []
    -n, --num-produce
      Number of times to send message(s), the count of messages/files * 
      num-produce should below than 1000.
      Default: 1
    -p, --properties
      Properties to add, Comma separated key=value string, like k1=v1,k2=v2.
      Default: []
    -r, --rate
      Rate (in msg/sec) at which to produce, value 0 means to produce messages 
      as fast as possible.
      Default: 0.0
    -s, --separator
      Character to split messages string on default is comma
      Default: ,
    -vs, --value-schema
      Schema type (can be bytes,avro,json,string...)
      Default: bytes

四、遇到的问题

1.zookeeper占用8080端口问题

brokers服务默认要使用8080端口,但是被外置的zookeeper服务占用了。当前zookeeper使用的是zookeeper3.7.1版本,因为zookeeper3.6之后的版本,开启服务器会自动占用8080端口,所以需要修改zookeeper的配置zoo.cfg,并重启zk服务。

# admin.serverPort默认占用8080端口
admin.serverPort=8888
 类似资料: