pulsar是和kafak同类型的消息处理平台,这两年开始走入大众视野。相比于老牌kafka,新秀pulsar带来了一些让我喜欢的新功能:
详细的说明就不赘述了,我们来快速使用体验下。
在安装前需要有java环境,本次使用的系统是Ubuntu 20.04, java版本是17.02。
本次使用的pulsar版本是2.10.1。
$ wget https://archive.apache.org/dist/pulsar/pulsar-2.10.1/apache-pulsar-2.10.1-bin.tar.gz
$ tar xvfz apache-pulsar-2.10.1-bin.tar.gz
$ cd apache-pulsar-2.10.1
$ ./bin/pulsar standalone
看到如下信息就是启动成功了:
2022-08-13T21:34:46,652+0800 [worker-scheduler-0] INFO org.apache.pulsar.functions.worker.SchedulerManager - Schedule summary - execution time: 0.033926031 sec | total unassigned: 0 | stats: {"Added": 0, "Updated": 0, "removed": 0}
{
"c-standalone-fw-localhost-8080" : {
"originalNumAssignments" : 0,
"finalNumAssignments" : 0,
"instancesAdded" : 0,
"instancesRemoved" : 0,
"instancesUpdated" : 0,
"alive" : true
}
}
如果想后台启动可以执行如下命令:
$ ./bin/pulsar-daemon start standalone
$ ./bin/pulsar-admin brokers healthcheck
ok
$ ./bin/pulsar-client produce k-topic --messages "hello test"
如果正常发送的话可以看到如下提示:
2022-08-13T21:43:50,559+0800 [main] INFO org.apache.pulsar.client.cli.PulsarClientTool - 1 messages successfully produced
$ ./bin/pulsar-client consume j-topic -s "first-subscription"
如果正常发送的话可以看到刚才发的那个消息:
----- got message -----
key:[null], properties:[], content:hello test
可以看到整个过程我们不需要预先做其他的设置,不需要手动去处理zookeeper,启动pulsar后就可以直接使用了。
pulsar官方客户端除了有java之外,还有Python、go、c++、nodejs、C#,还支持websocket、以及REST api
来发送消息,可以说是很方便了。
我们来用go试试。
创建一个测试目录,并初始化go mod:
$ mkdir test_dir && cd test_dir
$ go mod init test_dir
$ go mod tidy
安装pulsar客户端:
$ go get -u "github.com/apache/pulsar-client-go/pulsar"
package main
import (
"context"
"fmt"
"github.com/apache/pulsar-client-go/pulsar"
"log"
"time"
)
func main() {
// 创建客户端
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: "pulsar://localhost:6650",
OperationTimeout: 30 * time.Second,
ConnectionTimeout: 30 * time.Second,
})
if err != nil {
log.Fatalf("Could not instantiate Pulsar client: %v", err)
}
// 创建生产者
producer, err := client.CreateProducer(pulsar.ProducerOptions{
Topic: "go-topic",
})
if err != nil {
log.Fatal(err)
}
defer client.Close()
// 每隔3秒发一次消息
num := 0
for {
msg := fmt.Sprintf("hello %d", num)
_, err = producer.Send(context.Background(), &pulsar.ProducerMessage{
Payload: []byte(msg),
})
num += 1
time.Sleep(time.Duration(3) * time.Second)
}
}
package main
import (
"context"
"fmt"
"github.com/apache/pulsar-client-go/pulsar"
"log"
"time"
)
func main() {
// 创建客户端
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: "pulsar://localhost:6650",
OperationTimeout: 30 * time.Second,
ConnectionTimeout: 30 * time.Second,
})
if err != nil {
log.Fatalf("Could not instantiate Pulsar client: %v", err)
}
// 创建消费者
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
Topic: "go-topic",
SubscriptionName: "my-sub",
Type: pulsar.Shared,
})
if err != nil {
log.Fatal(err)
}
defer consumer.Close()
// 消费消息
for {
msg, err := consumer.Receive(context.Background())
if err != nil {
log.Fatal(err)
}
fmt.Printf("Received message msgId: %#v -- content: '%s'\n",
msg.ID(), string(msg.Payload()))
consumer.Ack(msg)
}
}
$ go build -o consumer consumer.go
$ go build -o produce produce.go
$ chmod +x consumer produce
启动一个生产者
$ ./produce
INFO[0000] [Connecting to broker] remote_addr="pulsar://localhost:6650"
INFO[0000] [TCP connection established] local_addr="127.0.0.1:33210" remote_addr="pulsar://localhost:6650"
INFO[0000] [Connection is ready] local_addr="127.0.0.1:33210" remote_addr="pulsar://localhost:6650"
INFO[0000] [Created producer] cnx="127.0.0.1:33210 -> 127.0.0.1:6650" producerID=1 producer_name=standalone-2-9 topic="persistent://public/default/go-topic"
再启动一个消费者,获取消息
$ ./consumer
INFO[0000] [Connecting to broker] remote_addr="pulsar://localhost:6650"
INFO[0000] [TCP connection established] local_addr="127.0.0.1:33212" remote_addr="pulsar://localhost:6650"
INFO[0000] [Connection is ready] local_addr="127.0.0.1:33212" remote_addr="pulsar://localhost:6650"
INFO[0000] [Connected consumer] consumerID=1 name=xgqua subscription=my-sub topic="persistent://public/default/go-topic"
INFO[0000] [Created consumer] consumerID=1 name=xgqua subscription=my-sub topic="persistent://public/default/go-topic"
Received message msgId: pulsar.trackingMessageID{messageID:pulsar.messageID{ledgerID:242, entryID:5, batchIdx:0, partitionIdx:0}, tracker:(*pulsar.ackTracker)(nil), consumer:(*pulsar.partitionConsumer)(0xc000274ea0), receivedTime:time.Time{wall:0xc0b60af1958c2114, ext:2499737025, loc:(*time.Location)(0x1024400)}} -- content: 'hello 4'
Received message msgId: pulsar.trackingMessageID{messageID:pulsar.messageID{ledgerID:242, entryID:6, batchIdx:0, partitionIdx:0}, tracker:(*pulsar.ackTracker)(nil), consumer:(*pulsar.partitionConsumer)(0xc000274ea0), receivedTime:time.Time{wall:0xc0b60af255a034ea, ext:5501052821, loc:(*time.Location)(0x1024400)}} -- content: 'hello 5'
Received message msgId: pulsar.trackingMessageID{messageID:pulsar.messageID{ledgerID:242, entryID:7, batchIdx:0, partitionIdx:0}, tracker:(*pulsar.ackTracker)(nil), consumer:(*pulsar.partitionConsumer)(0xc000274ea0), receivedTime:time.Time{wall:0xc0b60af315c8fc87, ext:8503725362, loc:(*time.Location)(0x1024400)}} -- content: 'hello 6'
今天的介绍就到这里了,感谢阅读。