下载地址:https://pulsar.apache.org/zh-CN/download/
tar -zxvf apache-pulsar-2.8.1-bin.tar.gz -C /data/
在risen-cdh01上执行
bin/pulsar initialize-cluster-metadata \
--cluster pulsar-cluster \
--zookeeper risen-cdh01:2181 \
--configuration-store risen-cdh01:2181 \
--web-service-url http://risen-cdh01:8089 \
--web-service-url-tls https://risen-cdh01:8443 \
--broker-service-url pulsar://risen-cdh01:6650 \
--broker-service-url-tls pulsar+ssl://risen-cdh01:6651
执行成功
10:36:09.876 [main] INFO org.apache.bookkeeper.discover.ZKRegistrationManager - Successfully formatted BookKeeper metadata
10:36:09.880 [main] INFO org.apache.zookeeper.ZooKeeper - Session: 0x16734464b360002 closed
10:36:09.880 [main-EventThread] INFO org.apache.zookeeper.ClientCnxn - EventThread shut down for session: 0x16734464b360002
10:36:10.033 [main] INFO org.apache.pulsar.PulsarClusterMetadataSetup - Cluster metadata for 'pulsar-cluster-1' setup correctly
如果执行失败,进入zkclient中。删除相关文件即可
[zookeeper, counters, bookies, ledgers, managed-ledgers, schemas, namespace, admin, loadbalance]
vim conf/bookkeeper.conf
修改如下部分:
zkServers=risen-cdh01:2181,risen-cdh02:2181,risen-cdh03:2181
**ps:**端口修改可以自定义,但是不能与已有的端口冲突
vim conf/broker.conf
修改如下部分:
zookeeperServers=risen-cdh01:2181,risen-cdh02:2181,risen-cdh03:2181
configurationStoreServers=risen-cdh01:2181,risen-cdh02:2181,risen-cdh03:2181
clusterName=pulsar-cluster
因为8080端口过于常用,很容易被占用
这里进行调整,改为8089即可
scp -r apache-pulsar-2.8.1/ risen-cdh02:$PWD
scp -r apache-pulsar-2.8.1/ risen-cdh03:$PWD
分别在三台机器执行
bin/pulsar-daemon start bookie
关闭
bin/pulsar-daemon stop bookie
执行完毕之后使用如下命令看看是否启动成功
bin/bookkeeper shell bookiesanity
分别在三台机器执行
bin/pulsar-daemon start broker
关闭
bin/pulsar-daemon stop broker
然后在risen-cdh01上执行
bin/pulsar-admin brokers list pulsar-cluster
不报错则启动成功
docker pull apachepulsar/pulsar-manager:latest
docker run -dit \
-p 9527:9527 -p 7750:7750 \
-e SPRING_CONFIGURATION_FILE=/pulsar-manager/pulsar-manager/application.properties \
apachepulsar/pulsar-manager:latest
CSRF_TOKEN=$(curl http://risen-cdh01:7750/pulsar-manager/csrf-token)
curl \
-H "X-XSRF-TOKEN: $CSRF_TOKEN" \
-H "Cookie: XSRF-TOKEN=$CSRF_TOKEN;" \
-H 'Content-Type: application/json' \
-X PUT http://risen-cdh01:7750/pulsar-manager/users/superuser \
-d '{"name": "admin", "password": "apachepulsar", "description": "test", "email": "username@test.org"}'
**pulsar-manager调用的pulsar-admin的api,而这个api需要从broker取信息,所以需要给pulsar-admin指定获取信息的broker url。 **
bin/pulsar-admin clusters list
bin/pulsar-admin clusters update pulsar-cluster --url http://192.168.5.213:8089
2.2.6、登录查询
访问http://risen-cdh01:9527
登录刚刚2.2.3设置的账号密码
安装完毕!
目的资源隔离,为每位用户配置不同的资源。A用户只能操作20%的资源,B用户操作30%资源(租户与命名空间的操作配合使用)
租户和命名空间是pulsar支持多租户的两个核心概念。
在租户级别,pulsar为特定的租户预留合适的存储空间、应用授权与认证机制
在命名空间级别,pulsar有一系列的配置策略。包括配额、流控、消息过期策略和命名空间之间的隔离策略
在kafka中有消费者组(consumer group),消费者组中的消费者只能消费topic某个分区中的数据
而在pulsar中是发布订阅模式(sub),可以自己制定策略去进行消费,例如让每个消费者都能消费所有数据
**Messages:**消息是 Pulsar 的基础“单元”。 消息指 producer 发布到 topic的内容,也指 consumer 从 topic 中 consume 的内容(并在消息处理完成后发送确认)。 消息类似于邮政服务系统中的信件。
**Producers:**生产者是连接 topic 的程序,它将消息发布到一个 Pulsar broker 上。
**发送模式:**同步发送(sync)或者异步(async)
**Consumers:**Consumer 向 broker 发送消息流获取申请以获取消息。 在 Consumer 端有一个队列,用于接收从 broker 推送来的消息。 队列大小可以通过receiverQueueSize 进行配置(默认:1000)。 每当 consumer.receive()
被调用一次,就从缓冲区(buffer)获取一条消息。
**接收模式:**同步接收(sync)或者异步接收 (async)
**监听:**在这个接口中,一旦接受到新的消息,received
方法将被调用。
确认:当 consumer 成功消费一条消息后会向 broker 发送一个确认请求(acknowledgement request)。 仅当所有订阅都完成确认后,消息才会被删除,在这之前消息都是被永久保存的。 如果希望消息被 Consumer 确认后仍然保留下来,可配置 消息保留策略实现。
**Topic:**Pulsar 中的 topic 是被命名的通道,用做从producer到 consumer传输消息。 Topic的名称为符合良好结构的URL:{persistent|non-persistent}://tenant/namespace/topic
**namespace:**命名空间是租户内部逻辑上的命名术语。 一个租户可以通过admin API创建多个命名空间。 例如,包含多个应用程序的租户可以为每个应用程序创建单独的命名空间。 Namespace使得程序可以以层级的方式创建和管理topic Topicmy-tenant/app1
,它的namespace是app1
这个应用,对应的租户是 my-tenant
。 你可以在namespace下创建任意数量的topic。
**Subscriptions:**订阅是命名好的配置规则,指导消息如何投递给消费者。 Pulsar 中有 4 种可用的订阅模式: 独占(exclusive),共享(shared),灾备(failover)和 键共享(key_shared)。
**多主题订阅:**Pulsar消费者可以同时订阅多个topic
**核心:**计算与存储分离
broker会把数据从Managed Ledger缓存中分派到consumer,当积压超过缓存大小的时候,开始将数据给Bookkeeper
pulsar用zk进行源数据的存储、集群配置和协调
配置存储:存储租户、命名空间和其他需要全局一致的配置项
持久化存储容器,是一个分布式的预写(WAL)
特性参考官网文档
为所有的broker提供一个网关,当不能直接连接的时候,可以通过proxy与brokers进行通信
pulsar-admin namespaces create test-tenant/test-namespace
pulsar-admin namespaces list test-tenant
pulsar-admin namespaces delete test-tenant/ns1
pulsar-admin namespaces set-backlog-quota --limit 10--policy producer_request_hold test-tenant/ns1
pulsar-admin namespaces get-backlog-quotas test-tenant/ns1
pulsar-admin namespaces remove-backlog-quota test-tenant/ns1
pulsar-admin namespaces set-persistence --bookkeeper-ack-quorum 2--bookkeeper-ensemble 3--bookkeeper-write-quorum 2--ml-mark-delete-max-rate 0 test-tenant/ns1
pulsar-admin namespaces get-persistence test-tenant/ns1
pulsar-admin namespaces unload --bundle 0x00000000_0xffffffff test-tenant/ns1
pulsar-admin namespaces clear-backlog --submy-subscription test-tenant/ns1
命名空间包含多个 topic,每个 topic 的保留大小(存储大小)不应超过特定阈值,否则其存储时间会受到限制。 可通过以下命令配置指定命名空间中 topic 的保留大小和保留时间。
pulsar-admin set-retention --size 10--time 100 test-tenant/ns1
为给定命名空间下所有 topic 设置消息派发速率。 通过每 X 秒消息数(msg-dispatch-rate
))或者每 X 秒消息字节数来限制(byte-dispatch-rate
)派发速率。 派发速率指每秒派发的消息数,可通过 dispatch-rate-period
来配置。 msg-dispatch-rate
和 byte-dispatch-rate
的默认值均为 -1,即禁用配额限制。
pulsar-admin namespaces set-dispatch-rate test-tenant/ns1 \
--msg-dispatch-rate 1000 \
--byte-dispatch-rate 1048576 \
--dispatch-rate-period 1
发送消息数 / 秒
pulsar-admin namespaces get-dispatch-rate test-tenant/ns1
pulsar-admin tenants list
pulsar-admin tenants create 租户名
pulsar-admin tenants delete 租户名
pulsar-admin persistent list my-tenant/my-namespace
pulsar-admin persistent grant-permission \
--actions produce,consume --role application1 \
persistent://test-tenant/ns1/tp1 \
pulsar-admin persistent permissions \
persistent://test-tenant/ns1/tp1 \
{
"application1": [
"consume",
"produce"
]
}
pulsar-admin persistent revoke-permission \
--role application1 \
persistent://test-tenant/ns1/tp1 \
{
"application1": [
"consume",
"produce"
]
}
pulsar-admin persistent delete persistent://test-tenant/ns1/tp1
pulsar-admin persistent unload persistent://test-tenant/ns1/tp1
pulsar-admin persistent peek-messages \
--count 10 --subscription my-subscription \
persistent://test-tenant/ns1/tp1
**注意:**不管是否有分区,在创建topic后,60s内无操作则会认为该topic是不活动的,会进行删除
相关参数:
Brokerdeleteinactivetopicsenabenabled:默认值为true表示是否启动自动删除功能
BrokerDeleteInactiveTopicsFrequencySeconds:默认60s
pulsar-admin topics create persistent://my-tenant/my-namespace/mytopic
pulsar-admin topics create-partitioned-copic persistent://my-tenant/my-namespace/mytopic --partitions 5
pulsar-admin topics lookup persistent://my-tenant/my-namespace/mytopic
权限可以具体操作topic级别,这里用命名空间级别进行测试
在pulsar根目录下创建一个key目录用于存放key
bin/pulsar tokens create-secret-key --output key/my-secret.key --base64
创建一个super用户的,名字随便起,主要用于之后的管理。
bin/pulsar tokens create --secret-key key/my-secret.key --subject hz-super
执行完拿到一个super用户的拿到一个token
eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJoei1zdXBlciJ9.1ePk26E5XllSBKrSUkyR7XGnwJ1C1G1ceI9XdriNwXE
authenticationEnabled=true
authorizationEnabled=true
authenticationProviders=org.apache.pulsar.broker.authentication.AuthenticationProviderToken
tokenSecretKey=/opt/apache-pulsar-2.8.1/key/my-secret.key
brokerClientAuthenticationPlugin=org.apache.pulsar.client.impl.auth.AuthenticationToken
brokerClientAuthenticationParameters=token:eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJoei1zdXBlciJ9.1ePk26E5XllSBKrSUkyR7XGnwJ1C1G1ceI9XdriNwXE
superUserRoles=hz-super
authPlugin=org.apache.pulsar.client.impl.auth.AuthenticationToken
authParams=token:eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJoei1zdXBlciJ9.1ePk26E5XllSBKrSUkyR7XGnwJ1C1G1ceI9XdriNwXE
bin/pulsar-daemon stop broker
bin/pulsar-daemon start broker
bin/pulsar-admin tenants list
已经正常
"hz-test-tenants"
"public"
"pulsar"
bin/pulsar tokens create --secret-key key/my-secret.key --subject test
拿到token
eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJ0ZXN0In0.ZLU3PsOlh3H282_n34i7Wm8qyb-VMckKsfY-QU9rUIQ
bin/pulsar-admin namespaces grant-permission hz-test-tenants/hz-ns02 --role test --actions produce,consume
PulsarClient client = PulsarClient.builder()
.serviceUrl(SERVER_URL)
.enableTcpNoDelay(true)
.build();
直接执行报错
Exception in thread "main" org.apache.pulsar.client.api.PulsarClientException$AuthenticationException: Unable to authenticate
at org.apache.pulsar.client.api.PulsarClientException.unwrap(PulsarClientException.java:965)
at org.apache.pulsar.client.impl.ConsumerBuilderImpl.subscribe(ConsumerBuilderImpl.java:97)
at ConsumerDemo.main(ConsumerDemo.java:27)
.authentication(AuthenticationFactory.token("eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJ0ZXN0In0.ZLU3PsOlh3H282_n34i7Wm8qyb-VMckKsfY-QU9rUIQ"))
正常执行
需要用super账户的token来进行验证,否则会报权限不足的错误
PulsarAdmin admin = PulsarAdmin.builder()
.serviceHttpUrl(SERVER_HTTP_URL)
.authentication(AuthenticationFactory.token(TOKEN))
.build();
System.out.println(admin.namespaces().getPermissions("hz-test-tenants/hz-ns02").toString());
结果:
{hz-super=[consume, produce], hz-test-tenants=[consume, produce]}
Set<AuthAction> action = new HashSet<AuthAction>();
action.add(AuthAction.produce);
admin.namespaces().grantPermissionOnNamespace("hz-test-tenants/hz-ns02", "hz-produce", action);
System.out.println(admin.namespaces().getPermissions("hz-test-tenants/hz-ns02").toString());
结果:
{hz-test-tenants=[consume, produce], hz-super=[consume, produce], hz-produce=[produce]}
admin.namespaces().revokePermissionsOnNamespace("hz-test-tenants/hz-ns02", "hz-produce");
System.out.println(admin.namespaces().getPermissions("hz-test-tenants/hz-ns02").toString());
结果:
{hz-test-tenants=[consume, produce], hz-super=[consume, produce]}
System.out.println(admin.topics().getPermissions("persistent://hz-test-tenants/hz-ns02/hz-topic2"));
结果:
{hz-super=[consume, produce], hz-test-tenants=[consume, produce]}
Set<AuthAction> action1 = new HashSet<AuthAction>();
action1.add(AuthAction.consume);
admin.topics().grantPermission("persistent://hz-test-tenants/hz-ns02/hz-topic2","hz-test-topic", action1);
System.out.println(admin.topics().getPermissions("persistent://hz-test-tenants/hz-ns02/hz-topic2"));
结果:
{hz-super=[consume, produce], hz-test-topic=[consume], hz-test-tenants=[consume, produce]}
admin.topics().revokePermissions("persistent://hz-test-tenants/hz-ns02/hz-topic2","hz-test-topic") ;
System.out.println(admin.topics().getPermissions("persistent://hz-test-tenants/hz-ns02/hz-topic2"));
结果:
{hz-super=[consume, produce], hz-test-tenants=[consume, produce]}
defaultRetentionTimeInMinutes=0
$ pulsar-admin namespaces get-retention [your tenant]/[your-namespace]
{
"retentionTimeInMinutes": 10,
"retentionSizeInMB": 0
}
java
int retentionTime = 10; // 10 minutes
int retentionSize = 500; // 500 megabytes
RetentionPolicies policies = new RetentionPolicies(retentionTime, retentionSize);
admin.namespaces().setRetention("hz-test-tenants/hz-ns02", policies );
pulsar-admin namespaces get-message-ttl [your tenant]/[your namespace] 60
java
admin.namespaces().setNamespaceMessageTTL("hz-test-tenants/hz-ns02",60);
以mysql binlog为例
在下载页面找到对应的连接器进行下
https://pulsar.apache.org/zh-CN/download/
用于放这些连接器
tenant: "public"
namespace: "default"
name: "debezium-mysql-source"
topicName: "debezium-mysql-topic"
archive: "connectors/pulsar-io-debezium-mysql-2.8.1.nar"
parallelism: 1
configs:
database.hostname: "risen-cdh01"
database.port: "3306"
database.user: "cdh"
database.password: "123456"
database.server.id: "184054"
database.server.name: "dbserver1"
database.whitelist: "test"
database.history: "org.apache.pulsar.io.debezium.PulsarDatabaseHistory"
database.history.pulsar.topic: "history-topic"
database.history.pulsar.service.url: "pulsar://risen-cdh01:6650"
key.converter: "org.apache.kafka.connect.json.JsonConverter"
pulsar.service.url: "pulsar://risen-cdh01:6650"
value.converter: "org.apache.kafka.connect.json.JsonConverter"
offset.storage.topic: "offset-topic"
bin/pulsar-admin source create --source-config-file debeziumConf/mysql.yaml
bin/pulsar-admin persistent list public/default
"persistent://public/default/dbserver1.test.test01"
"persistent://public/default/dbserver1"
"persistent://public/default/dbserver1.test.mysql_func2"
生成了对应的topic
bin/pulsar-client consume -s "sub-products" public/default/dbserver1.test.mysql_func2 -n 0
arDatabaseHistory"
database.history.pulsar.topic: “history-topic”
database.history.pulsar.service.url: “pulsar://risen-cdh01:6650”
key.converter: “org.apache.kafka.connect.json.JsonConverter”
pulsar.service.url: “pulsar://risen-cdh01:6650”
value.converter: “org.apache.kafka.connect.json.JsonConverter”
offset.storage.topic: “offset-topic”
### 8.4、创建连接器
bin/pulsar-admin source create --source-config-file debeziumConf/mysql.yaml
### 8.5、验证是否成功
bin/pulsar-admin persistent list public/default
“persistent://public/default/dbserver1.test.test01”
“persistent://public/default/dbserver1”
“persistent://public/default/dbserver1.test.mysql_func2”
生成了对应的topic
### 8.6、模拟消费者进行订阅
bin/pulsar-client consume -s “sub-products” public/default/dbserver1.test.mysql_func2 -n 0
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-EvZG76x7-1644385852303)(C:\Users\ADMINI~1\AppData\Local\Temp\1640938418974.png)]