推送服务 - 服务端

优质
小牛编辑
133浏览
2023-12-01

服务器端提供了比较多的关于推送的 API,包括广播,多播和单播方式的推送,还有超时,心跳,推送事件等设置。

Timeout 字段

该字段用于设置推送空闲超时。默认值为 120 秒,即 2 分钟。

当服务器发布了推送主题后(后面会专门介绍推送),客户端会跟服务器端保持一个长连接,如果达到超时时间,仍然没有任何消息推送给客户端,则返回 nil,此时,如果客户端仍然在线的话,则会立即再次发送获取推送主题的请求。服务器端通过这个方式可以获知客户端是否还在线。

Heartbeat 字段

该字段用来设置推送的心跳检测间隔时间。该字段默认值为 3 秒钟。

当服务器端推送数据给客户端后,如果客户端在 Heartbeat 时间内没有取走推送数据,则服务器端认为客户端以掉线。对于以掉线的客户端,服务器端会清除为该客户端分配的内存空间,并将该客户端从推送列表中移除。

TimeoutHeartbeat 字段在检测客户端是否离线时是相互配合的,当服务器端没有向客户端推送任何消息时,服务器端需要至少 Timeout + Heartbeat 的时间才能检测到客户端以离线。当服务器端有向客户端推送消息时,则在推送消息之后经过 Heartbeat 时间可以检测到客户端以掉线。

TimeoutHeartbeat 设置的时间越短,检测到客户端离线的时间就越短。但是需要注意以下几个问题:

Timeout 时间越短,服务器端和客户端之间的用于检测是否掉线的通讯就越频繁,所以不应该将 Timeout 设置的过短,否则会严重增加服务器的负担。

因此,Timeout 的设置一般不应少于 30 秒。对于负载比较高的服务器,保持默认值就是一个不错的选项。

对于推送频繁的服务器来说,Heartbeat 时间越长,对于已经离线的客户端,在服务器端存储的离线消息就越多,这会严重的占用服务器端的内存,因此,不宜将 Heartbeat 的时间设置的过长。

如果 Heartbeat 的时间设置的过短,客户端可能会因为网络原因导致不能及时取走推送消息,这就会导致错误的离线判断,当错误离线判断发生后,会丢失一些推送消息。

因此,Heartbeat 的选择则应根据客户端的网络情况来决定,如果客户端都是来自局域网,并且客户端数量较少,设置为 1 秒甚至更短的时间也是可以的。而对于比较慢速且不太稳定的移动网络,设置为 5 秒或者 10 秒可能是一个比较合适的取值。对于普通的互联网客户端来说,保持默认值就可以了。

推送事件

服务器端有两个事件是跟推送有关的,它们是:

  1. type subscribeEvent interface {
  2. OnSubscribe(topic string, id string, service Service)
  3. }
  4.  
  5. type unsubscribeEvent interface {
  6. OnUnsubscribe(topic string, id string, service Service)
  7. }

当编号为 id 的客户端订阅主题 topic 时,触发 OnSubscribe 事件。

当编号为 id 的客户端退订主题 topic 时,触发 OnUnsubscribe 事件。

这两个事件同样也是通过 Event 字段来设置。在后面的例子中,我们可以看到这两个事件如何设置。

Publish 方法

  1. Publish(topic string, timeout time.Duration, heartbeat time.Duration) Service

该方法用于发布一个推送主题。这个推送的主题实际上是一个自动生成的远程服务方法。它的功能就是实现推送。

topic 为主题名。

这里 timeoutheartbeat 参数在前面的字段介绍里已经说明过了,这里不再重复。如果这两个参数设置 <= 0,表示使用服务器端对应字段的默认设置。

Publish 方法仅仅是告诉客户端,现在有一个叫做 topic 的推送主题可以订阅。

而要真正推送数据给客户端,则需要使用以下几个方法。

  1. Push(topic string, result interface{}, id ...string)
  2. Broadcast(topic string, result interface{}, callback func([]string))
  3. Multicast(topic string, ids []string, result interface{}, callback func([]string))
  4. Unicast(topic string, id string, result interface{}, callback func(bool))

广播

可以使用 Push 或者 Broadcast 方法实现广播功能。它们的区别在于,Broadcast 有一个回调函数,而 Push 没有。

Broadcast 的回调函数的参数是成功推送的客户端 id 列表。

一旦服务器启动,你可以在任何地方进行数据推送。比如在其它的服务方法中,在服务器事件中,甚至在服务器外的并行运行的函数中。例如:

time_push_server.go

  1. package main
  2.  
  3. import (
  4. "fmt"
  5. "time"
  6.  
  7. "github.com/hprose/hprose-golang/rpc"
  8. )
  9.  
  10. type event struct{}
  11.  
  12. func (event) OnSubscribe(topic string, id string, service rpc.Service) {
  13. fmt.Println("client " + id + " subscribe topic: " + topic)
  14. }
  15.  
  16. func (event) OnUnsubscribe(topic string, id string, service rpc.Service) {
  17. fmt.Println("client " + id + " unsubscribe topic: " + topic)
  18. }
  19.  
  20. func main() {
  21. server := rpc.NewTCPServer("tcp4://0.0.0.0:2016/")
  22. server.Publish("time", 0, 0)
  23. server.Event = event{}
  24. var timer *time.Timer
  25. timer = time.AfterFunc(1*time.Second, func() {
  26. server.Broadcast("time", time.Now().String(), func(sended []string) {
  27. if len(sended) > 0 {
  28. fmt.Println(sended)
  29. }
  30. })
  31. timer.Reset(1 * time.Second)
  32. })
  33. server.Start()
  34. }

time_push_client.go

  1. package main
  2.  
  3. import (
  4. "fmt"
  5.  
  6. "github.com/hprose/hprose-golang/rpc"
  7. )
  8. type event struct {}
  9.  
  10. func (e *event) OnError(name string, err error) {
  11. fmt.Printf("name: %s, err: %s\n", name, err.Error())
  12. }
  13.  
  14. func main() {
  15. client := rpc.NewTCPClient("tcp4://127.0.0.1:2016/")
  16. client.SetEvent(&event{})
  17. count := 0
  18. done := make(chan struct{})
  19. client.Subscribe("time", "", nil, func(data string) {
  20. count++
  21. if count > 10 {
  22. client.Unsubscribe("time")
  23. done <- struct{}{}
  24. }
  25. fmt.Println(data)
  26. })
  27. <-done
  28. }

运行上面两个程序,我们会看到如下结果:

服务器端输出


  1. client 34c0d969-6445-468f-a733-8828455b3455 subscribe topic: time
  2. [34c0d969-6445-468f-a733-8828455b3455]
  3. [34c0d969-6445-468f-a733-8828455b3455]
  4. [34c0d969-6445-468f-a733-8828455b3455]
  5. [34c0d969-6445-468f-a733-8828455b3455]
  6. [34c0d969-6445-468f-a733-8828455b3455]
  7. [34c0d969-6445-468f-a733-8828455b3455]
  8. [34c0d969-6445-468f-a733-8828455b3455]
  9. [34c0d969-6445-468f-a733-8828455b3455]
  10. [34c0d969-6445-468f-a733-8828455b3455]
  11. [34c0d969-6445-468f-a733-8828455b3455]
  12. [34c0d969-6445-468f-a733-8828455b3455]
  13. client 34c0d969-6445-468f-a733-8828455b3455 unsubscribe topic: time

客户端输出


  1. 2016-10-29 13:47:37.403264657 +0800 CST
  2. 2016-10-29 13:47:38.406085093 +0800 CST
  3. 2016-10-29 13:47:39.410960974 +0800 CST
  4. 2016-10-29 13:47:40.412490658 +0800 CST
  5. 2016-10-29 13:47:41.413672273 +0800 CST
  6. 2016-10-29 13:47:42.414227648 +0800 CST
  7. 2016-10-29 13:47:43.415540827 +0800 CST
  8. 2016-10-29 13:47:44.415833625 +0800 CST
  9. 2016-10-29 13:47:45.419348921 +0800 CST
  10. 2016-10-29 13:47:46.420147822 +0800 CST
  11. 2016-10-29 13:47:47.421158194 +0800 CST

如果你同时运行两个或更多个客户端,会看到每个客户端都能收到推送信息。服务器端也会看到相应的输出。

有时候,你可能想在某个服务方法中推送数据给客户端,但是该服务方法可能在其它文件中定义。因此,你得不到 server 对象。那这时还能进行推送吗?

答案是:可以的。我们前面说过,在服务方法中我们可以得到一个 context 参数,这个 context 参数中就包含有一个 Clients 对象,这个对象上包含了所有跟推送有关的方法,这些方法跟 server 对象上的推送方法是完全一样的。

例如:

push_server.go

  1. package main
  2.  
  3. import "github.com/hprose/hprose-golang/rpc"
  4.  
  5. func hello(name string, context *rpc.SocketContext) string {
  6. context.Clients().Push("ip", context.Conn.RemoteAddr().String())
  7. return "Hello " + name + "!"
  8. }
  9.  
  10. func main() {
  11. server := rpc.NewTCPServer("tcp4://0.0.0.0:4321/")
  12. server.AddFunction("hello", hello)
  13. server.Publish("ip", 0, 0)
  14. server.Start()
  15. }

push_client.go

  1. package main
  2.  
  3. import (
  4. "fmt"
  5.  
  6. "github.com/hprose/hprose-golang/rpc"
  7. )
  8.  
  9. type HelloService struct {
  10. Hello func(string) (string, error)
  11. }
  12.  
  13. func main() {
  14. client := rpc.NewClient("tcp://127.0.0.1:4321/")
  15. client.Subscribe("ip", "", nil, func(ip string) {
  16. fmt.Println(ip)
  17. })
  18. var helloService *HelloService
  19. client.UseService(&helloService)
  20. for i := 0; i < 10; i++ {
  21. fmt.Println(helloService.Hello("world"))
  22. }
  23. }

然后分别运行服务器和客户端,会看到客户端有如下输出:


  1. Hello world! <nil>
  2. Hello world! <nil>
  3. Hello world! <nil>
  4. 127.0.0.1:54998
  5. Hello world! <nil>
  6. 127.0.0.1:54998
  7. Hello world! <nil>
  8. 127.0.0.1:54998
  9. Hello world! <nil>
  10. 127.0.0.1:54998
  11. Hello world! <nil>
  12. 127.0.0.1:54998
  13. Hello world! <nil>
  14. 127.0.0.1:54998
  15. Hello world! <nil>
  16. 127.0.0.1:54998
  17. Hello world! <nil>

因为订阅是异步的,所以客户端可能要在几个同步调用之后,才能正式与服务器建立推送连接。所以我们看到客户端收到的 IP 地址是在几个同步调用之后,才会显示。


_注意:虽然上面的例子都是使用的 TCP 服务器和客户端,但是并不是说只有 Hprose 的 TCP 实现才支持推送服务,实际上 Hprose 的 HTTP 和 WebSocket 实现也支持推送。


多播

可以使用 Push 或者 Multicast 方法实现多播功能。它们的区别除了参数顺序不同以外,Multicast 方法可以接收回调函数,而 Push 没有回调。Multicast 方法的回调跟 Broadcast 方法的回调意义相同。

单播

可以使用 Push 或者 Unicast 方法实现单播功能。Unicast 方法可以接收回调函数,Push 没有回调。Unicast 方法的回调参数是一个bool 值,推送成功该值为 true,失败为 false

IDList 方法

  1. IDList(topic string) []string

该方法用于获取当前在线的订阅了主题 topic 的所有客户端的 id 列表。

Exist 方法

  1. Exist(topic string, id string) bool

该方法用于快速判断 id 是否在当前在线的订阅了主题 topic 的客户端列表中。

注意,客户端在线状态是针对主题的,同一个客户端可能针对一个主题处于在线状态,但是针对另一个主题却处于离线状态,这种情况是正常的。