Pulsar 是一个用于服务器到服务器的消息系统,具有多租户、高性能等优势。 Pulsar 最初由 Yahoo 开发,目前由 Apache 软件基金会管理。
Pulsar 的关键特性如下:
Pulsar 的单个实例原生支持多个集群,可跨机房在集群间无缝地完成消息复制。
极低的发布延迟和端到端延迟。
可无缝扩展到超过一百万个 topic。
简单的客户端 API,支持 Java、Go、Python 和 C++。
支持多种 topic 订阅模式(独占订阅、共享订阅、故障转移订阅)。
通过 Apache BookKeeper 提供的持久化消息存储机制保证消息传递 。
由轻量级的 serverless 计算框架 Pulsar Functions 实现流原生的数据处理。
基于 Pulsar Functions 的 serverless connector 框架 Pulsar IO 使得数据更易移入、移出 Apache Pulsar。
分层式存储可在数据陈旧时,将数据从热存储卸载到冷/长期存储(如S3、GCS)中。
# The max size of a message (in bytes).
maxMessageSize=5242880
# The max size of the netty frame (in bytes). Any messages received larger than this value are rejected. The default value is 5 MB.
nettyMaxFrameSizeBytes=5253120
访问模式:
压缩
当批量处理启用时,producer 会在单个请求中积累并发送一批消息。 批量处理的量大小由最大消息数和最大发布延迟定义。 因此,积压数量是分批处理的总数,而不是信息总数。
在 Pulsar 中,批次被跟踪并存储为单个单元,而不是单个消息。 Consumer 将批量处理的消息拆分成单个消息。 但即使启用了批量处理,也始终将计划中的消息(通过 deliverAt 或者 deliverAfter 进行配置) 作为单个消息发送。
一般来说,当 consumer 确认了一个批的所有消息,该批才会被认定为确认。 这意味着当发生不可预料的失败、否定的确认(negative acknowledgements)或确认超时,都可能导致批中的所有消息都被重新发送,即使其中一些消息已经被确认了。
维护批量索引的确认状态并跟踪每批索引的确认状态,以避免向 consumer 发送已确认的消息。 当某一批消息的所有索引都被确认时,该批消息将被删除。
启用批量索引确认将会导致更多内存开销。
Batching and chunking cannot be enabled simultaneously. 如果想要启用分块(chunking) ,您必须提前禁用批量处理。
Chunking is only supported for persisted topics.分块只支持持久消息。
Chunking is only supported for the exclusive and failover subscription modes.分块只支持独占和灾备订阅模式。
处理一个 producer 和一个订阅 consumer 的分块消息
如下图所示,当生产者向主题发送一批大的分块消息和普通的非分块消息时。 假设生产者发送的消息为 M1,M1 有三个分块 M1-C1,M1-C2 和 M1-C3。 这个 broker 在其管理的ledger里面保存所有的三个块消息,然后以相同的顺序分发给消费者(独占/灾备模式)。 消费者将在内存缓存所有的块消息,直到收到所有的消息块。将这些消息合并成为原始的消息M1,发送给处理进程。
当多个生产者发布块消息到单个主题,这个 Broker 在同一个 Ledger 里面保存来自不同生产者的所有块消息。 如下所示,生产者1发布的消息 M1,M1 由 M1-C1, M1-C2 和 M1-C3 三个块组成。 生产者2发布的消息 M2,M2 由 M2-C1, M2-C2 和 M2-C3 三个块组成。 这些特定消息的所有分块是顺序排列的,但是其在 ledger 里面可能不是连续的。 这种方式会给消费者带来一定的内存负担。因为消费者会为每个大消息在内存开辟一块缓冲区,以便将所有的块消息合并为原始的大消息。
Consumer 向 broker 发送消息流获取申请(flow permit request)以获取消息。 在 Consumer 端有一个队列,用于接收从 broker 推送来的消息。 你能够通过receiverQueueSize参数配置队列的长度 (队列的默认长度是1000) 每当 consumer.receive() 被调用一次,就从缓冲区(buffer)获取一条消息。
3.消息确认
当消费者成功的消费了一条消息,这个消费者会发送一个确认信息给broker。 这个消息时是永久保存的,只有在收到订阅者消费成功的消息确认后才会被删除。 如果希望消息被 Consumer 确认后仍然保留下来,可配置 消息保留策略实现。
消息有两种确认模式:单条确认或者累计确认。 累积确认时,消费者只需要确认最后一条他收到的消息。 所有之前(包含此条)的消息,都不会被再次重发给那个消费者。
4.取消确认
当消费者在某个时间没有成功的消费某条消息,消费者想重新消费到这条消息,这个消费者可以发送一条取消确认消息到 broker,broker 会将这条消息重新发给消费者。
消息取消确认也有单条取消模式和累积取消模式 ,这依赖于消费者使用的订阅模式。
在独占消费模式和灾备订阅模式中,消费者仅仅只能对收到的最后一条消息进行取消确认。
5.确认超时
如果消息没有被成功消费,你想去让 broker 自动重新交付这个消息, 你可以采用未确认消息自动重新交付机制。 客户端会跟踪 超时 时间范围内所有未确认的消息。 并且在指定超时时间后会发送一个 重发未确认的消息 请求到 broker。
6.死信主题
Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
.topic(topic)
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Shared)
.deadLetterPolicy(DeadLetterPolicy.builder()
.maxRedeliverCount(maxRedeliveryCount)
.deadLetterTopic("your-topic-name")
.build())
.subscribe();
7.重试消息
很多在线的业务系统,由于业务逻辑处理出现异常,消息一般需要被重新消费。 若需要允许延时重新消费失败的消息,你可以配置生产者同时发送消息到业务主题和重试主题,并允许消费者自动重试消费。 配置了允许消费者自动重试。如果消息没有被消费成功,它将被保存到重试主题当中。并在指定延时时间后,自动重新消费重试主题里面的消费失败消息。
Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
.topic(topic)
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Shared)
.enableRetry(true)
.receiverQueueSize(100)
.deadLetterPolicy(DeadLetterPolicy.builder()
.maxRedeliverCount(maxRedeliveryCount)
.retryLetterTopic("persistent://my-property/my-ns/my-subscription-custom-Retry")
.build())
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();
Topic的名称为符合良好结构的URL:
{persistent|non-persistent}://tenant/namespace/topic
用来标识 topic 的类型。 Pulsar 支持两种主题类型:持久化和非持久化。 主题默认是持久化类型,如果不特殊指定主题类型,那主题就是持久化的。 对于持久化的主题,所有的消息都会被持久化的保存到磁盘当中(如果 broker 不是单机模式,消息会被持久化到多块磁盘),而非持久化的主题的数据不会被保存到磁盘里面。
2.租户
The topic tenant within the instance. Tenants are essential to multi-tenancy in Pulsar, and spread across clusters.
3.命名空间
将相关联的 topic 作为一个组来管理,是管理 Topic 的基本单元。 大多数对 topic 的管理都是对命名空间的一项配置。 每个租户里面可以有一个或者多个命名空间。
4.topic
The final part of the name. Topic names have no special meaning in a Pulsar instance.
订阅是命名好的配置规则,指导消息如何投递给消费者。 Pulsar 中有四种订阅模式: 独占,共享,灾备和key共享 。
When a subscription has no consumers, its subscription mode is undefined. A subscription’s mode is defined when a consumer connects to the subscription, and the mode can be changed by restarting all consumers with a different configuration.(如果一个订阅没有消费者,订阅模式是不确定的。订阅模式在一个消费者连接到订阅的时候被定义,订阅模式可以呗更改当重启所有的消费者(不同的配置))。
在独占模式中只有一个消费者允许订阅,如果多个消费者订阅相同的主题,会报错。
对于分区主题来说,Broker 将按照消费者的优先级和消费者名称的词汇表顺序对消费者进行排序。 然后试图将主题均匀的分配给优先级最高的消费者。
对于非分区主题来说,broker 会根据消费者订阅非分区主题的顺序选择消费者。
当consumer订阅pulsar的主题时,它默认指定订阅了一个主题,例如:persistent://public/default/my-topic。 从Pulsar的1.23.0-incubating的版本开始,Pulsar消费者可以同时订阅多个topic。 你可以用以下两种方式定义topic的列表:
On the basis of a regular expression (regex), for example persistent://public/default/finance-.*
通过明确指定的topic列表
当订阅多个主题的时候,Pulsar 客户端将自动调用 Pulsar API 找到符合匹配规则的主题列表,然后订阅这些主题。 如果此时有暂不存在的主题,那么一旦这些主题被创建,消费者会自动订阅这些主题。
// Subscribe to all topics in a namespace
Pattern allTopicsInNamespace = Pattern.compile("persistent://public/default/.*");
Consumer<byte[]> allTopicsConsumer = pulsarClient.newConsumer()
.topicsPattern(allTopicsInNamespace)
.subscriptionName("subscription-1")
.subscribe();
// Subscribe to a subsets of topics in a namespace, based on regex
Pattern someTopicsInNamespace = Pattern.compile("persistent://public/default/foo.*");
Consumer<byte[]> someTopicsConsumer = pulsarClient.newConsumer()
.topicsPattern(someTopicsInNamespace)
.subscriptionName("subscription-1")
.subscribe();
普通的主题仅仅被保存在单个 broker中,这限制了主题的最大吞吐量。 Partitioned topics are a special type of topic that are handled by multiple brokers, thus allowing for higher throughput.
分区主题实际是通过在底层拥有 N 个内部主题来实现的,这个 N 的数量就是等于分区的数量。 当向分区的topic发送消息,每条消息被路由到其中一个broker。 Pulsar自动处理跨broker的分区分布。
路由模式确定每条消息该发往哪个分区而订阅模式确定消息传递给哪个消费者。
分区topic和普通topic,对于订阅模式如何工作,没有任何不同。分区只是决定了从生产者生产消息到消费者处理及确认消息过程中发生的事情。
顺序保证
当使用 SinglePartition或者RoundRobinPartition模式时,如果消息有key,消息将会被路由到匹配的分区,这是基于ProducerBuilder 中HashingScheme 指定的散列shema。
散列scheme
HashingScheme 是代表一组标准散列函数的枚举。为一个指定消息选择分区时使用。
有两种可用的散列函数: JavaStringHash 和Murmur3_32Hash. The default hashing function for producer is JavaStringHash. 请注意,当producer可能来自于不同语言客户端时,JavaStringHash是不起作用的。建议使用Murmur3_32Hash。
持久性主题上的消息数据可以在 broker 重启和订阅者故障转移之后继续存在。
Pulsar也提供了非持久topic。非持久topic的消息不会被保存在硬盘上,只存活于内存中。当使用非持久topic分发时,杀掉Pulsar的broker或者关闭订阅者,此topic( non-persistent))上所有的瞬时消息都会丢失,意味着客户端可能会遇到消息缺失。
non-persistent://tenant/namespace/topic
默认非持久topic在broker上是开启的。
性能
非持久topic让producer有更低的发布延迟
客户端API
非持久topic consumer.
PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650")
.build();
String npTopic = "non-persistent://public/default/my-topic";
String subscriptionName = "my-subscription-name";
Consumer<byte[]> consumer = client.newConsumer()
.topic(npTopic)
.subscriptionName(subscriptionName)
.subscribe();
非持久 producer
Producer<byte[]> producer = client.newProducer()
.topic(npTopic)
.create();
Pulsar broker默认如下:
immediately delete all messages that have been acknowledged by a consumer。被消费者确认后立即删除。
以消息backlog的形式,持久保存所有的未被确认消息。
存留规则会被用于某namespace下所有的topic,指明哪些消息会被持久存储,即使已经被确认过。 没有被留存规则覆盖的消息将会被删除。 Without a retention policy, all of the acknowledged messages would be deleted.
图中下面的是消息过期,有些消息即使还没有被确认,也被删除掉了。因为根据设置在namespace上的TTL,他们已经过期了。(例如,TTL为5分钟,过了十分钟消息还没被确认)
消息去重保证了一条消息只能在 Pulsar 服务端被持久化一次。 消息去重是一个 Pulsar 可选的特性,它能够阻止不必要的消息重复,它保证了即使消息被消费了多次,也只会被保存一次。
在 Pulsar 中,消息去重是在 broker上处理的,用户不需要去修改客户端的代码。 相反,你只需要通过修改配置就可以实现。
消息去重,使 Pulsar 成为了流处理引擎(SPE)或者其他寻求 “仅仅一次” 语义的连接系统所需的理想消息系统。 如果消息系统没有提供自动去重能力,那么 SPE (流处理引擎) 或者其他连接系统就必须自己实现去重语义,这意味着需要应用去承担这部分的去重工作。 使用Pulsar,严格的顺序保证不会带来任何应用层面的代价。
延时消息功能允许你能够过一段时间才能消费到这条消息,而不是消息发布后,就马上可以消费到。
Broker 保存消息是不经过任何检查的。 当消费者消费一条消息时,如果这条消息是延时消息,那么这条消息会被加入到DelayedDeliveryTracker当中。 订阅检查机制会从DelayedDeliveryTracker获取到超时的消息,并交付给消费者。
# Whether to enable the delayed delivery for messages.
# If disabled, messages are immediately delivered and there is no tracking overhead.
delayedDeliveryEnabled=true
# Control the ticking time for the retry of delayed message delivery,
# affecting the accuracy of the delivery time compared to the scheduled time.
# Default is 1 second.
delayedDeliveryTickTimeMillis=1000
producer.newMessage().deliverAfter(3L, TimeUnit.Minute).value("Hello Pulsar!").send();
http://pulsar.apache.org/docs/zh-CN/concepts-architecture-overview/
http://pulsar.apache.org/docs/zh-CN/concepts-multi-tenancy/
Multi Tenancy
成为一个多租户系统是 Pulsar 最初设计理念的一部分。 并且,Pulsar 提出了租户的概念。 租户可以跨集群分布,每个租户都可以有单独的认证和授权机制。 租户也是存储配额、消息 TTL 和隔离策略的管理单元。
Pulsar 的多租户性质主要体现在 topic 的 URL 中,其结构如下:
persistent://tenant/namespace/topic
可以看到,租户是 topic 的最基本单位(比命名空间和 topic 名称更为基本)。
某些情况下,consumer并不需要完整的topic日志。 他们可能只需要几个值来构造一个更 “浅” 的日志图像, 也许仅仅只是最新的值。
Topic压缩的工作原理
当通过命令行触发topic压缩,Pulsar将会从头到尾迭代整个topic。 对于它碰到的每个key,压缩程序将会只保留这个key最近的事件。
之后,broker将会创建一个新的BookKeeper ledger 然后开始对topic的每条消息进行第二次迭代。 对于每条消息,如果key匹配到它的最新事件,key的数据内容,消息ID,元数据将会被写入最新创建的ledger。 如果key并没有匹配到最新的消息,消息将被跳过。 如果给定的消息,负载是空的,它将被跳过并且视为删除(类似key-value数据库中的 tombstones概念); At the end of this second iteration through the topic, the newly created BookKeeper ledger is closed and two things are written to the topic’s metadata: the ID of the BookKeeper ledger and the message ID of the last compacted message (this is known as the compaction horizon of the topic). 写入元数据后,压缩就完成了。
初始化压缩操作完成后,将来任何对压缩层位及压缩backlog的修改,都会通知给拥有该topic的Pulsar broker。 当下列更改发生时:
启用读取压缩功能的客户端(consumer和reader),将会尝试从topic中读取消息,或者:
像从正常的主题那样读取(如果消息的ID大于等于压缩层位),或
从压缩层位的开始读取(如果消息ID小于压缩层位)
代理服务器是一个中转服务,它通过网络转发多个客户端的请求到不同的后端服务器上。 代理服务器在正向代理和反向代理的场景中起到了类似“流量检查”的作用。它能给你的系统带来如下好处,如负载均衡,性能,安全,自动扩缩容等等。
代理服务起到了反向代理的作用,它在所有的 broker 前面创建了一个网关。 Pulsar 不支持Apache Traffic Server (ATS), HAProxy, Nginx和Envoy 等开源代理组件。 代理服务器支持SNI 路由, SNI 路由用于在不断开 SSL 连接的情况下将流量转发到目标地址。 四层路由转发提供了更大的透明度,因为转发的出口连接是根据TCP包里面携带的目标地址信息来确定的。
默认情况下,records.config文件在本地的/usr/local/etc/trafficserver/目录下。 该文件包含了 ATS 相关的可配置参数。
要修改配置文件records.config,有如下步骤。
通过参数http.server_ports配置 TLS 监听的端口,通过修改参数: ssl.client.cert.path和ssl.client.cert.filename,修改所使用的代理证书,以建立安全的 TLS 隧道。
通过参数http.connect_port配置 Broker 的服务端口,用来建立隧道。 假设 Pulsar broker 监听的是443和6651端口,那么就需要修改配置http.connect_ports为对应的值。
Pulsar 客户端使用多个 Advertised 监听器需要以下两步。
advertisedListeners={listenerName}:pulsar://xxxx:6650, {listenerName}:pulsar+ssl://xxxx:6651
为客户端指定监听器名称。
PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar://xxxx:6650")
.listenerName("external")
.build();
https://pulsar.apache.org/docs/zh-CN/schema-evolution-compatibility/
无schema得情况下创建producer,则 producer 只能 produce byte[]
类型的消息。 如果有 POJO 类,则需要在发送消息前将 POJO 序列化为字节。
Producer<byte[]> producer = client.newProducer()
.topic(topic)
.create();
User user = new User("Tom", 28);
byte[] message = … // serialize the `user` by yourself;
producer.send(message);
有schema。如果通过指定 schema 来创建 producer,则可以直接将类发送到 topic,而无需考虑如何将 POJO 序列化为字节。
Producer<User> producer = client.newProducer(JSONSchema.of(User.class))
.topic(topic)
.create();
User user = new User("Tom", 28);
producer.send(user);
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-9DQPoHWw-1627607746077)(/Users/pengwang/Library/Application Support/typora-user-images/image-20210719111602674.png)]
使用 Pulsar Functions SDK for Java 实现一个 Java 函数,你可以实现如下函数:
import java.util.function.Function;
public class JavaNativeExclamationFunction implements Function<String, String> {
@Override
public String apply(String input) {
return String.format("%s!", input);
}
}
package org.example.functions;
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;
import java.util.Arrays;
public class WordCountFunction implements Function<String, Void> {
// This function is invoked every time a message is published to the input topic
@Override
public Void process(String input, Context context) throws Exception {
Arrays.asList(input.split(" ")).forEach(word -> {
String counterKey = word.toLowerCase();
context.incrCounter(counterKey, 1);
});
return null;
}
}
部署:将上面的代码编译成可部署的 JAR 文件,可以使用如下命令行将 JAR 包部署到 Pulsar 集群中
$ bin/pulsar-admin functions create \
--jar target/my-jar-with-dependencies.jar \
--classname org.example.functions.WordCountFunction \
--tenant public \
--namespace default \
--name word-count \
--inputs persistent://public/default/sentences \
--output persistent://public/default/count
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-b0R29FYO-1627607746079)(/Users/pengwang/Library/Application Support/typora-user-images/image-20210719153532618.png)]
启动命令:bin/pulsar standalone
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-rD0nZtdY-1627607746081)(/Users/pengwang/Library/Application Support/typora-user-images/image-20210719154214979.png)]
硬件条件
zookeeper可以使用功能较弱的机器或虚拟机。脉冲星仅在周期协调和配置相关的任务中使用ZooKeeper,不用于基本操作。
运行 bookie 和 Pulsar broker 的机器,必须使用高规格的机器。 例如,对于AWS部署,i3.4xlarge实例可能是合适的。 你可以参考如下方法选择机器:
安装
一. pulsar
wget https://archive.apache.org/dist/pulsar/pulsar-2.8.0/apache-pulsar-2.8.0-bin.tar.gz
tar -xvzf apache-pulsar-2.8.0-bin.tar.gz
cd apache-pulsar-2.8.0
二.可选组件
Connector,分层存储。
三.zookeeper集群
部署 Pulsar 集群,你必须首先部署 Zookeeper (在所有其他组件之前)。 建议部署3个节点的 Zookeeper 集群。
将所有zookeeper信息添加到conf/zookeeper.conf。如:
server.1=zk1.us-west.example.com:2888:3888
server.2=zk2.us-west.example.com:2888:3888
server.3=zk3.us-west.example.com:2888:3888
在每台zookeeper机器设置myid。
mkdir -p data/zookeeper
echo 1 > data/zookeeper/myid
启动zookeeper。
$ bin/pulsar-daemon start zookeeper
初始化集群元数据。
$ bin/pulsar initialize-cluster-metadata \
--cluster pulsar-cluster-1 \
--zookeeper zk1.us-west.example.com:2181 \
--configuration-store zk1.us-west.example.com:2181 \
--web-service-url http://pulsar.us-west.example.com:8080 \
--web-service-url-tls https://pulsar.us-west.example.com:8443 \
--broker-service-url pulsar://pulsar.us-west.example.com:6650 \
--broker-service-url-tls pulsar+ssl://pulsar.us-west.example.com:6651
部署bookeeper集群
在conf/bookkeerper.conf 配置bookeeper bookies。设置zkServers参数。
zkServers=zk1.us-west.example.com:2181,zk2.us-west.example.com:2181,zk3.us-west.example.com:2181
在每台bookeeper上启动bookie。
后台运行:
$ bin/pulsar-daemon start bookie
前台运行:
$ bin/pulsar bookie
验证bookie是否正常工作:
$ bin/bookkeeper shell bookiesanity
这个命令会在本地的 bookie 创建一个临时的 BookKeeper ledger,往里面写一些条目,然后读取它,最后删除这个 ledger。
启动了所有的 bookie 后,你能够在任意一台bookie上,使用BookKeeper shell的simpletest
命令,去校验集群内所有的 bookie 是否都已经启动。
$ bin/bookkeeper shell simpletest --ensemble <num-bookies> --writeQuorum <num-bookies> --ackQuorum <num-bookies> --numEntries <num-entries>
部署pulsar broker
Broker 配置中有一些非常重要的参数,这些参数可以确保每个Broker 连接到已部署的 ZooKeeper 集群。 需要确认 zookeeperServers
和 configurationStoreServers
配置项的值是正确的。 在当前情况下,由于只有一个集群,没有单独用来存储配置的 Zookeeper 集群,那么配置项configurationStoreServers
和zookeeperServers
是一样的值。
zookeeperServers=zk1.us-west.example.com:2181,zk2.us-west.example.com:2181,zk3.us-west.example.com:2181
configurationStoreServers=zk1.us-west.example.com:2181,zk2.us-west.example.com:2181,zk3.us-west.example.com:2181
你必须配置集群的名字( 初始化集群元数据 提供的集群名字必须和这个配置项匹配):
clusterName=pulsar-cluster-1
此外,初始化集群的时候提供的 broker 和 web 服务的端口也必须和下面的配置匹配(特别是当你定义了和默认值不同的端口的时候)
brokerServicePort=6650
brokerServicePortTls=6651
webServicePort=8080
webServicePortTls=8443
启用pulsar function(可选)
将conf/broker.conf
文件的配置项functionsWorkerEnabled
设置为true
,启用函数 worker,
functionsWorkerEnabled=true
修改conf/functions_worker.yml
的配置项pulsarFunctionsCluster
为集群的名称,该名称和初始化集群元数据使用的集群名字是同一个。
pulsarFunctionsCluster: pulsar-cluster-1
启动broker。
后台:
$ bin/pulsar-daemon start broker
前台:
$ bin/pulsar broker
一旦启动了你打算使用的所有 broker, 你的 Pulsar 集群应该就准备就绪了!
ZooKeeper 负责各种与配置和协调相关的任务。
BookKeeper 负责消息数据的 持久化存储。
ZooKeeper 和 BookKeeper 都是 Apache 开源项目。
Pulsar manager
docker 安装:
docker pull apachepulsar/pulsar-manager:v0.2.0
docker run -it \
-p 9527:9527 -p 7750:7750 \
-e SPRING_CONFIGURATION_FILE=/pulsar-manager/pulsar-manager/application.properties \
apachepulsar/pulsar-manager:v0.2.0
设置管理员账号密码:
CSRF_TOKEN=$(curl http://localhost:7750/pulsar-manager/csrf-token)
curl \
-H 'X-XSRF-TOKEN: $CSRF_TOKEN' \
-H 'Cookie: XSRF-TOKEN=$CSRF_TOKEN;' \
-H "Content-Type: application/json" \
-X PUT http://localhost:7750/pulsar-manager/users/superuser \
-d '{"name": "admin", "password": "apachepulsar", "description": "test", "email": "username@test.org"}'