[TOC]
> NSQ是Go语言编写的一个开源的实时分布式内存消息队列,其性能十分优异
## 安装NSQ服务端
### 下载地址
> https://nsq.io/deployment/installing.html 下载linux最新稳定版
### 启动nsqlookupd
> 主要负责服务发现 负责nsqd的心跳、状态监测,给客户端、nsqadmin提供nsqd地址与状态
~~~
./nsqlookupd
~~~
### 启动nsqd
> 负责接收消息,存储队列和将消息发送给客户端
~~~
./nsqd --lookupd-tcp-address=127.0.0.1:4160
~~~
### 启动nqsadmin
> nsqadmin是一个web管理界面,访问地址 http://localhost:4171/
~~~
./nsqadmin --lookupd-http-address=127.0.0.1:4161
~~~
## 生产者 (nsq-send.go)
~~~
package main
import (
"github.com/nsqio/go-nsq"
"fmt"
"strconv"
)
var (
//nsqd的地址,使用了tcp监听的端口
tcpNsqdAddrr = "127.0.0.1:4150"
)
func main() {
//初始化配置
config := nsq.NewConfig()
for i := 0; i < 100; i++ {
//创建100个生产者
tPro, err := nsq.NewProducer(tcpNsqdAddrr, config)
if err != nil {
fmt.Println(err)
}
//主题
topic := "topic_demo"
//主题内容
tCommand := "new data!" + strconv.Itoa(i)
//发布消息
err = tPro.Publish(topic, []byte(tCommand))
if err != nil {
fmt.Println(err)
}
}
}
~~~
## 消费者 (nsq-receive.go)
> 测试服务器上可执行`go run nsq-receive.go`进行测试
> 正式服务器上,需先进行打包`go build nsq-receive.go`后,再使用`nohup ./nsq-receive &`或者`supervisor`进行运行
~~~
package main
import (
"fmt"
"time"
"github.com/nsqio/go-nsq"
)
// 消费者
type ConsumerT struct{}
// 主函数
func main() {
InitConsumer("topic_demo", "test-channel", "127.0.0.1:4161")
for {
time.Sleep(time.Second * 10)
}
}
//处理消息
func (*ConsumerT) HandleMessage(msg *nsq.Message) error {
fmt.Println("receive", msg.NSQDAddress, "message:", string(msg.Body))
return nil
}
//初始化消费者
func InitConsumer(topic string, channel string, address string) {
cfg := nsq.NewConfig()
cfg.LookupdPollInterval = time.Second //设置重连时间
c, err := nsq.NewConsumer(topic, channel, cfg) // 新建一个消费者
if err != nil {
panic(err)
}
c.SetLogger(nil, 0) //屏蔽系统日志
c.AddHandler(&ConsumerT{}) // 添加消费者接口
//建立NSQLookupd连接
if err := c.ConnectToNSQLookupd(address); err != nil {
panic(err)
}
}
~~~