前言
前些日子了解到mqtt这样一个协议,可以在web上达到即时通讯的效果,但网上并不能很方便地找到一篇目前版本的在node下正确实现这个协议的博客。
自己捣鼓了一段时间,理解不深刻,但也算是基本能够达到使用目的。
本文尚未对离线消息的接收顺序进行处理。
代码
服务端: server.js
//服务端引入中间件mosca let mosca = require('mosca') let settings = { port: 5112 } let server = new mosca.Server(settings) server.on('ready', function(){ console.log('Mosca server is up and running at port 5112'); }) server.on('published', function(packet, client) { console.log('Published', packet.payload) }) server.on('clientDisconnected', function(client){ console.log('disconnected: ', client.id) })
推送端: pub.js
//客户端引入mqtt let mqtt = require('mqtt'); let client = mqtt.connect('mqtt://localhost', { port: 5112, clientId: 'cli_pub', }) let num = 0; setInterval(function (){ client.publish('test', 'Hello mqtt ' + (++num), {qos:1}, () => console.log(num)); }, 1000)
订阅端: sub.js
let mqtt = require('mqtt') let client = mqtt.connect('mqtt://localhost', { port: 5112, clientId: 'cli_sub', }) client.subscribe('test',{qos:1}) client.on('message', function (topic, message) { console.log('received message: ', message.toString()) })
server运行后,先启动pub,再启动sub,即可在sub中接收到推送过来的消息序列
至此实现了简单的即时推送
离线推送相关配置及简要介绍
离线配置-服务端:
要实现消息的离线推送,必然需要一个存储临时数据的部件
此处用到的是mongo,当然可以根据需要选择其他的存储工具
server.js中的settings需更改为:
let settings = { port: 5112, persistence:{ //增加了此项 factory: mosca.persistence.Mongo, url: "mongodb://localhost:27017/mosca" } }
factory: 引入mosca对特定存储工具的一些处理方法
url: 其中的 27017 为mongo所监听的端口号,mosca为存储相关数据的数据库
值得一提的是:配置好mongo的环境后,不需要提前在mongo中手动创建,若数据库不存在会自动生成,而且mosca会为你作好其他一切基本事项 (即:若只想临时体验下效果,甚至可以暂时把mongo放一边 )
在mongo中,可以看到自动新添了db: mosca及其下的collection(相当于关系型数据库中的表/关系)
离线配置-客户端:
pub.js和sub.js中的client中都可以改为:
let client = mqtt.connect('mqtt://localhost', { port: 5112, clientId: 'cli_**', clean: false//增加了此项 })
mqtt.connect()会返回一个mqttClient对象,包含了:reconnect(), subscribe(), publish()等一系列方法。
本文中发送端接收端被分为了pub.js和sub.js两个独立文件,仅仅为了方便在不同控制台中观察效果
一个client可以既为推送端,又为订阅端
至此,所有代码已完成
其他介绍:
client.subscribe():
为本客户端订阅一个话题,所有订阅此话题的用户都会收到在此话题下推送的信息
//client.subscribe(topic,opts) client.subscribe('test',{qos:1})
opts中的qos为通信机制,控制发送端与接收端的互锁程度
上文中的其中一个collection: subscriptions即记录各用户话题订阅情况
用户cli_sub及cli2_sub订阅了话题test:
(新增一个cli2_pub,下文有用)
注:
重复执行脚本sub.js实际上对topic进行了重复订阅
实际编码时,应避免topic的重复订阅,即使重复订阅并不影响实现效果
client.publish():
向指定topic发送数据
message为Buffer或String格式,可以通过序列化或转json实现对复杂数据对象的传送
//client.publish(topic, message, opts, callback) let num = 0; setInterval(function (){ client.publish('test', 'Hello mqtt ' + (++num), {qos:1}, () => console.log(num)); }, 1000)
参数不再赘述
此处用一个定时器定时在 topic: test 下发送'Hello mqtt 1,2,3..'
用回调函数实时打印一下发送的num:
当订阅者处于离线状态时,可以在collection: packets中查看到临时数据的存储情况:
mosca把每一条推送消息为所有订阅用户都生成了独立的记录,用同一个messageId进行关联
当其中一个用户(cli2_sub)上线时,获取到其对应的数据,
而后数据库中相应记录便会被删除
此时仅有cli_sub用户的数据
当cli2_sub上线接收消息后,packets中记录将被清空
client.on():
即在client上触发的事件,此处只列举消息接收事件
//client.on(event, callback) client.on('message', function (topic, message) { console.log('received message: ', message.toString()) })
处理为简单地打印到控制台
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持小牛知识库。
由于TCP是基于流的,客户端发送的请求数据是像水流一样流入到服务端,服务端探测到有数据到来后应该检查数据是否是完整的,因为可能只是一个请求的部分数据到达服务端,甚至可能是多个请求连在一起到达服务端。如何判断请求是否全部到达或者从多个连在一起的请求中分离请求,就需要规定一套通讯协议。 在WorkerMan中为什么要制定协议? 传统PHP开发都是基于Web的,基本上都是HTTP协议,HTTP协议的解析
最近华为被禁止使用谷歌服务,这让我对一件重要的事情产生了疑惑,那就是推送通知。 似乎苹果和谷歌都有自己的(我将称之为)“根级”推送通知服务器,它是将推送通知分发到手机的唯一点(至少从我所能知道的),这些服务器是APN和GCM(或者现在的FireBase) 虽然有推送通知服务,如Pushover、aimtell、OneSignal、AirNotifier等,但我的理解是他们仍然使用这两家巨头的主要“
由于TCP是基于流的,客户端发送的请求数据是像水流一样流入到服务端,服务端探测到有数据到来后应该检查数据是否是完整的,因为可能只是一个请求的部分数据到达服务端,甚至可能是多个请求连在一起到达服务端。如何判断请求是否全部到达或者从多个连在一起的请求中分离请求,就需要规定一套通讯协议。 在WorkerMan中为什么要制定协议? 传统PHP开发都是基于Web的,基本上都是HTTP协议,HTTP协议的解析
提示 视频 PPT 下载 背景介绍 MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的“轻量级”通讯协议,该协议构建于 TCP/IP 协议上,由 IBM 在 1999 年发布。 MQTT 最大优点在于,可以以极少的代码和有限的带宽,为连接远程设备提供实时可靠的消息服务。作为一种低
application提供了千牛移动提供的原生能力,比如打开聊天窗口(openChat)、获取地址位置(location)和打开网址(openWebsite)等,点击查看api列表和具体用法 打开“千牛欢迎页” QN.application.invoke({ api: 'openPlugin', query: { appkey: 23093073 }, settings: