推送服务 - 服务端
服务器端提供了比较多的关于推送的 API,包括广播,多播和单播方式的推送,还有超时,心跳,推送事件等设置。
Timeout 字段
该字段用于设置推送空闲超时。默认值为 120 秒,即 2 分钟。
当服务器发布了推送主题后(后面会专门介绍推送),客户端会跟服务器端保持一个长连接,如果达到超时时间,仍然没有任何消息推送给客户端,则返回 nil
,此时,如果客户端仍然在线的话,则会立即再次发送获取推送主题的请求。服务器端通过这个方式可以获知客户端是否还在线。
Heartbeat 字段
该字段用来设置推送的心跳检测间隔时间。该字段默认值为 3 秒钟。
当服务器端推送数据给客户端后,如果客户端在 Heartbeat
时间内没有取走推送数据,则服务器端认为客户端以掉线。对于以掉线的客户端,服务器端会清除为该客户端分配的内存空间,并将该客户端从推送列表中移除。
Timeout
和 Heartbeat
字段在检测客户端是否离线时是相互配合的,当服务器端没有向客户端推送任何消息时,服务器端需要至少 Timeout
+ Heartbeat
的时间才能检测到客户端以离线。当服务器端有向客户端推送消息时,则在推送消息之后经过 Heartbeat
时间可以检测到客户端以掉线。
Timeout
和 Heartbeat
设置的时间越短,检测到客户端离线的时间就越短。但是需要注意以下几个问题:
Timeout
时间越短,服务器端和客户端之间的用于检测是否掉线的通讯就越频繁,所以不应该将 Timeout
设置的过短,否则会严重增加服务器的负担。
因此,Timeout
的设置一般不应少于 30 秒。对于负载比较高的服务器,保持默认值就是一个不错的选项。
对于推送频繁的服务器来说,Heartbeat
时间越长,对于已经离线的客户端,在服务器端存储的离线消息就越多,这会严重的占用服务器端的内存,因此,不宜将 Heartbeat
的时间设置的过长。
如果 Heartbeat
的时间设置的过短,客户端可能会因为网络原因导致不能及时取走推送消息,这就会导致错误的离线判断,当错误离线判断发生后,会丢失一些推送消息。
因此,Heartbeat
的选择则应根据客户端的网络情况来决定,如果客户端都是来自局域网,并且客户端数量较少,设置为 1 秒甚至更短的时间也是可以的。而对于比较慢速且不太稳定的移动网络,设置为 5 秒或者 10 秒可能是一个比较合适的取值。对于普通的互联网客户端来说,保持默认值就可以了。
推送事件
服务器端有两个事件是跟推送有关的,它们是:
- type subscribeEvent interface {
- OnSubscribe(topic string, id string, service Service)
- }
- type unsubscribeEvent interface {
- OnUnsubscribe(topic string, id string, service Service)
- }
当编号为 id
的客户端订阅主题 topic
时,触发 OnSubscribe
事件。
当编号为 id
的客户端退订主题 topic
时,触发 OnUnsubscribe
事件。
这两个事件同样也是通过 Event
字段来设置。在后面的例子中,我们可以看到这两个事件如何设置。
Publish 方法
- Publish(topic string, timeout time.Duration, heartbeat time.Duration) Service
该方法用于发布一个推送主题。这个推送的主题实际上是一个自动生成的远程服务方法。它的功能就是实现推送。
topic
为主题名。
这里 timeout
和 heartbeat
参数在前面的字段介绍里已经说明过了,这里不再重复。如果这两个参数设置 <= 0
,表示使用服务器端对应字段的默认设置。
Publish
方法仅仅是告诉客户端,现在有一个叫做 topic
的推送主题可以订阅。
而要真正推送数据给客户端,则需要使用以下几个方法。
- Push(topic string, result interface{}, id ...string)
- Broadcast(topic string, result interface{}, callback func([]string))
- Multicast(topic string, ids []string, result interface{}, callback func([]string))
- Unicast(topic string, id string, result interface{}, callback func(bool))
广播
可以使用 Push
或者 Broadcast
方法实现广播功能。它们的区别在于,Broadcast
有一个回调函数,而 Push
没有。
Broadcast
的回调函数的参数是成功推送的客户端 id
列表。
一旦服务器启动,你可以在任何地方进行数据推送。比如在其它的服务方法中,在服务器事件中,甚至在服务器外的并行运行的函数中。例如:
time_push_server.go
- package main
- import (
- "fmt"
- "time"
- "github.com/hprose/hprose-golang/rpc"
- )
- type event struct{}
- func (event) OnSubscribe(topic string, id string, service rpc.Service) {
- fmt.Println("client " + id + " subscribe topic: " + topic)
- }
- func (event) OnUnsubscribe(topic string, id string, service rpc.Service) {
- fmt.Println("client " + id + " unsubscribe topic: " + topic)
- }
- func main() {
- server := rpc.NewTCPServer("tcp4://0.0.0.0:2016/")
- server.Publish("time", 0, 0)
- server.Event = event{}
- var timer *time.Timer
- timer = time.AfterFunc(1*time.Second, func() {
- server.Broadcast("time", time.Now().String(), func(sended []string) {
- if len(sended) > 0 {
- fmt.Println(sended)
- }
- })
- timer.Reset(1 * time.Second)
- })
- server.Start()
- }
time_push_client.go
- package main
- import (
- "fmt"
- "github.com/hprose/hprose-golang/rpc"
- )
- type event struct {}
- func (e *event) OnError(name string, err error) {
- fmt.Printf("name: %s, err: %s\n", name, err.Error())
- }
- func main() {
- client := rpc.NewTCPClient("tcp4://127.0.0.1:2016/")
- client.SetEvent(&event{})
- count := 0
- done := make(chan struct{})
- client.Subscribe("time", "", nil, func(data string) {
- count++
- if count > 10 {
- client.Unsubscribe("time")
- done <- struct{}{}
- }
- fmt.Println(data)
- })
- <-done
- }
运行上面两个程序,我们会看到如下结果:
服务器端输出
client 34c0d969-6445-468f-a733-8828455b3455 subscribe topic: time
[34c0d969-6445-468f-a733-8828455b3455]
[34c0d969-6445-468f-a733-8828455b3455]
[34c0d969-6445-468f-a733-8828455b3455]
[34c0d969-6445-468f-a733-8828455b3455]
[34c0d969-6445-468f-a733-8828455b3455]
[34c0d969-6445-468f-a733-8828455b3455]
[34c0d969-6445-468f-a733-8828455b3455]
[34c0d969-6445-468f-a733-8828455b3455]
[34c0d969-6445-468f-a733-8828455b3455]
[34c0d969-6445-468f-a733-8828455b3455]
[34c0d969-6445-468f-a733-8828455b3455]
client 34c0d969-6445-468f-a733-8828455b3455 unsubscribe topic: time
客户端输出
2016-10-29 13:47:37.403264657 +0800 CST
2016-10-29 13:47:38.406085093 +0800 CST
2016-10-29 13:47:39.410960974 +0800 CST
2016-10-29 13:47:40.412490658 +0800 CST
2016-10-29 13:47:41.413672273 +0800 CST
2016-10-29 13:47:42.414227648 +0800 CST
2016-10-29 13:47:43.415540827 +0800 CST
2016-10-29 13:47:44.415833625 +0800 CST
2016-10-29 13:47:45.419348921 +0800 CST
2016-10-29 13:47:46.420147822 +0800 CST
2016-10-29 13:47:47.421158194 +0800 CST
如果你同时运行两个或更多个客户端,会看到每个客户端都能收到推送信息。服务器端也会看到相应的输出。
有时候,你可能想在某个服务方法中推送数据给客户端,但是该服务方法可能在其它文件中定义。因此,你得不到 server
对象。那这时还能进行推送吗?
答案是:可以的。我们前面说过,在服务方法中我们可以得到一个 context
参数,这个 context
参数中就包含有一个 Clients
对象,这个对象上包含了所有跟推送有关的方法,这些方法跟 server
对象上的推送方法是完全一样的。
例如:
push_server.go
- package main
- import "github.com/hprose/hprose-golang/rpc"
- func hello(name string, context *rpc.SocketContext) string {
- context.Clients().Push("ip", context.Conn.RemoteAddr().String())
- return "Hello " + name + "!"
- }
- func main() {
- server := rpc.NewTCPServer("tcp4://0.0.0.0:4321/")
- server.AddFunction("hello", hello)
- server.Publish("ip", 0, 0)
- server.Start()
- }
push_client.go
- package main
- import (
- "fmt"
- "github.com/hprose/hprose-golang/rpc"
- )
- type HelloService struct {
- Hello func(string) (string, error)
- }
- func main() {
- client := rpc.NewClient("tcp://127.0.0.1:4321/")
- client.Subscribe("ip", "", nil, func(ip string) {
- fmt.Println(ip)
- })
- var helloService *HelloService
- client.UseService(&helloService)
- for i := 0; i < 10; i++ {
- fmt.Println(helloService.Hello("world"))
- }
- }
然后分别运行服务器和客户端,会看到客户端有如下输出:
Hello world! <nil>
Hello world! <nil>
Hello world! <nil>
127.0.0.1:54998
Hello world! <nil>
127.0.0.1:54998
Hello world! <nil>
127.0.0.1:54998
Hello world! <nil>
127.0.0.1:54998
Hello world! <nil>
127.0.0.1:54998
Hello world! <nil>
127.0.0.1:54998
Hello world! <nil>
127.0.0.1:54998
Hello world! <nil>
因为订阅是异步的,所以客户端可能要在几个同步调用之后,才能正式与服务器建立推送连接。所以我们看到客户端收到的 IP 地址是在几个同步调用之后,才会显示。
_注意:虽然上面的例子都是使用的 TCP 服务器和客户端,但是并不是说只有 Hprose 的 TCP 实现才支持推送服务,实际上 Hprose 的 HTTP 和 WebSocket 实现也支持推送。
多播
可以使用 Push
或者 Multicast
方法实现多播功能。它们的区别除了参数顺序不同以外,Multicast
方法可以接收回调函数,而 Push
没有回调。Multicast
方法的回调跟 Broadcast
方法的回调意义相同。
单播
可以使用 Push
或者 Unicast
方法实现单播功能。Unicast
方法可以接收回调函数,Push
没有回调。Unicast
方法的回调参数是一个bool
值,推送成功该值为 true
,失败为 false
。
IDList 方法
- IDList(topic string) []string
该方法用于获取当前在线的订阅了主题 topic
的所有客户端的 id
列表。
Exist 方法
- Exist(topic string, id string) bool
该方法用于快速判断 id
是否在当前在线的订阅了主题 topic
的客户端列表中。
注意,客户端在线状态是针对主题的,同一个客户端可能针对一个主题处于在线状态,但是针对另一个主题却处于离线状态,这种情况是正常的。