当前位置: 首页 > 工具软件 > php-nsq > 使用案例 >

nsq消息队列php,NSQ消息队列

梁华清
2023-12-01

[TOC]

> NSQ是Go语言编写的一个开源的实时分布式内存消息队列,其性能十分优异

## 安装NSQ服务端

### 下载地址

> https://nsq.io/deployment/installing.html 下载linux最新稳定版

### 启动nsqlookupd

> 主要负责服务发现 负责nsqd的心跳、状态监测,给客户端、nsqadmin提供nsqd地址与状态

~~~

./nsqlookupd

~~~

### 启动nsqd

> 负责接收消息,存储队列和将消息发送给客户端

~~~

./nsqd --lookupd-tcp-address=127.0.0.1:4160

~~~

### 启动nqsadmin

> nsqadmin是一个web管理界面,访问地址 http://localhost:4171/

~~~

./nsqadmin --lookupd-http-address=127.0.0.1:4161

~~~

## 生产者 (nsq-send.go)

~~~

package main

import (

"github.com/nsqio/go-nsq"

"fmt"

"strconv"

)

var (

//nsqd的地址,使用了tcp监听的端口

tcpNsqdAddrr = "127.0.0.1:4150"

)

func main() {

//初始化配置

config := nsq.NewConfig()

for i := 0; i < 100; i++ {

//创建100个生产者

tPro, err := nsq.NewProducer(tcpNsqdAddrr, config)

if err != nil {

fmt.Println(err)

}

//主题

topic := "topic_demo"

//主题内容

tCommand := "new data!" + strconv.Itoa(i)

//发布消息

err = tPro.Publish(topic, []byte(tCommand))

if err != nil {

fmt.Println(err)

}

}

}

~~~

## 消费者 (nsq-receive.go)

> 测试服务器上可执行`go run nsq-receive.go`进行测试

> 正式服务器上,需先进行打包`go build nsq-receive.go`后,再使用`nohup ./nsq-receive &`或者`supervisor`进行运行

~~~

package main

import (

"fmt"

"time"

"github.com/nsqio/go-nsq"

)

// 消费者

type ConsumerT struct{}

// 主函数

func main() {

InitConsumer("topic_demo", "test-channel", "127.0.0.1:4161")

for {

time.Sleep(time.Second * 10)

}

}

//处理消息

func (*ConsumerT) HandleMessage(msg *nsq.Message) error {

fmt.Println("receive", msg.NSQDAddress, "message:", string(msg.Body))

return nil

}

//初始化消费者

func InitConsumer(topic string, channel string, address string) {

cfg := nsq.NewConfig()

cfg.LookupdPollInterval = time.Second //设置重连时间

c, err := nsq.NewConsumer(topic, channel, cfg) // 新建一个消费者

if err != nil {

panic(err)

}

c.SetLogger(nil, 0) //屏蔽系统日志

c.AddHandler(&ConsumerT{}) // 添加消费者接口

//建立NSQLookupd连接

if err := c.ConnectToNSQLookupd(address); err != nil {

panic(err)

}

}

~~~

 类似资料: