Android利用websocket协议与服务器通信——Java-WebSocket框架的实现

游皓
2023-12-01

一、什么是WebSocket

WebSocket是一种在单个TCP连接上进行全双工通信的协议。它使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据。在WebSocket API中,客户端和服务器只需要完成一次握手,两者之间就直接可以创建持久性的连接,并进行双向数据传输。

二、如何使用WebSocket

android本身没有websocket的库,一般使用第三方框架实现,如Java-WebSocket、okhttp等。

三、Java-WebSocket框架的使用

GitHub地址:https://github.com/TooTallNate/Java-WebSocket

1. build.gradle中加入

implementation "org.java-websocket:Java-WebSocket:1.4.0"

2. 加入网络请求权限

<uses-permission android:name="android.permission.INTERNET" />

3. 新建客户端类

新建一个客户端类并继承WebSocketClient,需要实现它的四个抽象方法和构造函数,如下:

open class JWebSocketClient(serverUri: URI?) : WebSocketClient(serverUri, Draft_6455()) {
    override fun onOpen(handshakedata: ServerHandshake?) {
        Log.e("JWebSocketClient", "==连接成功==")
    }

    override fun onMessage(message: String) {
		Log.i("JWebSocketClient", "onMessage:$message")
    }
    override fun onClose(code: Int, reason: String?, remote: Boolean) {
        Log.e("JWebSocketClient", "==WebSocket关闭=== code:$code===== reason: $reason==== wasClean: $remote")
    }

    override fun onError(ex: Exception?) {
        ex?.printStackTrace()
        Log.e("JWebSocketClient", "==连接失败==")
    }
}

其中
onOpen()方法在websocket连接开启时调用,
onMessage()方法在接收到消息时调用,
onClose()方法在连接断开时调用,
onError()方法在连接出错时调用。
构造方法中的new Draft_6455()代表使用的协议版本,这里可以不写或者写成这样即可。

4. 建立websocket连接

建立连接只需要初始化此客户端再调用连接方法,需要注意的是WebSocketClient对象是不能重复使用的,所以不能重复初始化,其他地方只能调用当前这个Client。

//websocket协议地址大致是这样的: ws:// ip地址 : 端口号
val uri: URI = URI.create("ws://*******")
client = object : JWebSocketClient(uri) {
	override fun onMessage(message: String) {
    	super.onMessage(message)
		//message就是接收到的消息
        Log.e("JWebSClientService", message)
    }
}


try {
    client?.connectBlocking()
} catch (e: InterruptedException) {
    e.printStackTrace()
}

连接时可以使用connect()方法或connectBlocking()方法,建议使用connectBlocking()方法,connectBlocking多出一个等待操作,会先连接再发送。

5. 发送消息

发送消息只需要调用send()方法,如下

if (client?.isOpen == true) {
    client?.send("你好")
}

6. 重连

client.reconnectBlocking()

1、重连机制:

(1)监听网络连接, 当从断开到连上就可以调用重连
(2)在client回调的onError和onClose中调用重连

2、心跳机制:

写一个定时线程, 隔20秒就发送一个心跳:client.sendPing(), 这时候onWebsocketPong()会回调一次,这就是一次心跳

3、后端传回来的消息在onMessage中返回来, 我们可以跟后台定格式,哪些业务需要处理什么格式的数据,然后根据type来分发给业务。

7. 关闭socket连接

关闭连接调用close()方法,最后为了避免重复实例化WebSocketClient对象,关闭时一定要将对象置空。

/**
 * 断开连接
 */
fun closeConnect() {
    if (null != client) {
     	client?.close()
        client = null
    }
}

四、websocket断开原因

错误状态码:
WebSocket断开时,会触发CloseEvent, CloseEvent会在连接关闭时发送给使用 WebSockets 的客户端. 它在 WebSocket 对象的 onclose 事件监听器中使用。CloseEvent的code字段表示了WebSocket断开的原因。可以从该字段中分析断开的原因。

CloseEvent有三个字段需要注意, 通过分析这三个字段,一般就可以找到断开原因

  • CloseEvent.code: code是错误码,是整数类型
  • CloseEvent.reason: reason是断开原因,是字符串
  • CloseEvent.wasClean: wasClean表示是否正常断开,是布尔值。一般异常断开时,该值为false

状态码| 名称 |描述
–|–|–|–
0–999| |保留段, 未使用.
1000| CLOSE_NORMAL| 正常关闭; 无论为何目的而创建, 该链接都已成功完成任务.
1001| CLOSE_GOING_AWAY| 终端离开, 可能因为服务端错误, 也可能因为浏览器正从打开连接的页面跳转离开.
1002| CLOSE_PROTOCOL_ERROR| 由于协议错误而中断连接.
1003| CLOSE_UNSUPPORTED| 由于接收到不允许的数据类型而断开连接 (如仅接收文本数据的终端接收到了二进制数据).
1004| | 保留. 其意义可能会在未来定义.
1005| CLOSE_NO_STATUS| 保留. 表示没有收到预期的状态码.
1006| CLOSE_ABNORMAL| 保留. 用于期望收到状态码时连接非正常关闭 (也就是说, 没有发送关闭帧).
1007| Unsupported Data| 由于收到了格式不符的数据而断开连接 (如文本消息中包含了非 UTF-8 数据).
1008| Policy Violation| 由于收到不符合约定的数据而断开连接. 这是一个通用状态码, 用于不适合使用 1003 和 1009 状态码的场景.
1009| CLOSE_TOO_LARGE| 由于收到过大的数据帧而断开连接.
1010| Missing Extension| 客户端期望服务器商定一个或多个拓展, 但服务器没有处理, 因此客户端断开连接.
1011| Internal Error| 客户端由于遇到没有预料的情况阻止其完成请求, 因此服务端断开连接.
1012| Service Restart| 服务器由于重启而断开连接.
1013| Try Again Later| 服务器由于临时原因断开连接, 如服务器过载因此断开一部分客户端连接.
1014| | 由 WebSocket标准保留以便未来使用.
1015| TLS Handshake | 保留. 表示连接由于无法完成 TLS 握手而关闭 (例如无法验证服务器证书).
1016–1999| | 由 WebSocket标准保留以便未来使用.
2000–2999| | 由 WebSocket拓展保留使用.

五、代码示例

JWebSocketClient.kt

open class JWebSocketClient(serverUri: URI?) : WebSocketClient(serverUri, Draft_6455()) {
    override fun onOpen(handshakedata: ServerHandshake?) {
        Log.e("JWebSocketClient", "==连接成功==")
    }

    override fun onMessage(message: String) {
		Log.i("JWebSocketClient", "onMessage:$message")
    }
    override fun onClose(code: Int, reason: String?, remote: Boolean) {
        Log.e("JWebSocketClient", "==WebSocket关闭=== code:$code===== reason: $reason==== wasClean: $remote")
    }

    override fun onError(ex: Exception?) {
        ex?.printStackTrace()
        Log.e("JWebSocketClient", "==连接失败==")
    }
}

JWebSocketClientService.kt

/**
 * WebSocket服务类
 */
class JWebSocketClientService : Service() {

    companion object {
        private var heartbeatTime: Long = 15 //心跳间隔时间
        private var mScheduledExecutorService: ScheduledExecutorService? = null

        /**
         * 开始发送心跳线程
         */
        fun startHeartbeat(initialDelay: Long = 0) {
            if (mScheduledExecutorService == null) {
                mScheduledExecutorService = Executors.newSingleThreadScheduledExecutor()
                mScheduledExecutorService?.scheduleWithFixedDelay(heartbeatRunnable, initialDelay, heartbeatTime, TimeUnit.SECONDS)
            }
        }


        /**
         * 取消心跳
         */
        fun cancelHeartbeat() {
            mScheduledExecutorService?.shutdownNow()
            mScheduledExecutorService = null
        }

        /**
         * 心跳线程
         */
        private var heartbeatRunnable = Runnable {
            run {
                Log.d("JWebSocketClientService", "心跳包检测websocket连接状态");
                if (WebSocketTool.mWebSocketClient != null) {
                    if (WebSocketTool.mWebSocketClient?.isClosed == true) {
                        WebSocketTool.reconnectWs()
                    } else {
                    	WebSocketTool.mWebSocketClient?.sendPing()
                    }
                } else {
                    //如果client已为空,重新初始化连接
                    WebSocketTool.initSocketClient()
                }
            }
        }

    }
    override fun onCreate() {
        super.onCreate()
        notificationManager = getSystemService(Context.NOTIFICATION_SERVICE) as NotificationManager
    }

    override fun onBind(p0: Intent?): IBinder? {
        return null
    }

    private var rv: RemoteViews? = null
    private var notification: Notification? = null
    private var notificationManager: NotificationManager? = null

    /**
     * 通知栏显示
     */
    private fun showNotification() {
        var builder: NotificationCompat.Builder?
        //如果当前Android的版本相比Android O,一样或者版本更高,就建通知渠道(Notification Channels )
        val id = "channel_0"
        if (notificationManager == null) {
            notificationManager = getSystemService(Context.NOTIFICATION_SERVICE) as NotificationManager
        }
        if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.O) {//判断API
            //1.0 建渠道
            val mChannel = NotificationChannel(id, "ip", NotificationManager.IMPORTANCE_LOW)
            //2.0 把通知渠道通过createNotificationChannel( )方法给-
            //      -状态栏通知的管理类 NotificationManager
            notificationManager!!.createNotificationChannel(mChannel)
            //3.0 Notification这时候可以正常工作了
            builder = NotificationCompat.Builder(this, id)
            //            notification = builder.build();
        } else {
            //如果当前Android的版本比Android O(API 26)版本要低
            //直接开始上面的3.0步骤——Notification这时候就可以正常工作了
            builder = NotificationCompat.Builder(this, id)
        }

        if (rv == null) {
            rv = RemoteViews(packageName, R.layout.layout_ip_notification)
        }

        builder.setContent(rv)
                .setOngoing(true)
                .setSmallIcon(R.mipmap.logo)
        notification = builder.build()

        // 点击该通知后要跳转的Activity
        val notificationIntent = Intent(this, MainActivity::class.java)
        val contentIntent = PendingIntent.getActivity(this, 0, notificationIntent, 0)
        rv!!.setOnClickPendingIntent(R.id.notice, contentIntent)
        startForeground(73, notification)
        notificationManager!!.notify(73, notification)
    }

    override fun onStartCommand(intent: Intent, flags: Int, startId: Int): Int {
        //初始化websocket
        WebSocketTool.initSocketClient()
        showNotification()
        return START_REDELIVER_INTENT
    }

    /**
     * 关闭webSocket服务器时关闭连接
     */
    override fun onDestroy() {
        super.onDestroy()
        Log.e("==onDestroy==", "webSocket服务器关闭")
        WebSocketTool.closeWebSocketClient() // 关闭webSocket通道
        stopForeground(true)
        if (rv != null) {
            stopForeground(true)
            notificationManager?.cancel(73)
        }
        mScheduledExecutorService?.shutdownNow()
        mScheduledExecutorService = null
    }

}

WebSocketTool.kt

object WebSocketTool {
    private var context: Context = IPBroadcastApplication.getContext()

    var mWebSocketClient: WebSocketClient? = null


    /**
     * 发送消息
     */
    fun requestBody(str: String, actionCode: String) {
        if (mWebSocketClient?.isOpen == true) {
            mWebSocketClient?.send(str)
        } else {
            if (mWebSocketClient != null) {
                reconnectWs()
            } else {
                //如果client已为空,重新初始化连接
                initSocketClient()
            }
        }
    }

    /**
     * 开始WebSocket心跳
     */
    fun startWebSocketHeartbeat(initialDelay: Long = 0) {
        JWebSocketClientService.cancelHeartbeat()
        JWebSocketClientService.startHeartbeat(initialDelay)
    }

    /**
     * 开始WebSocket服务
     */
    fun startWebSocketService() {
        val intent = Intent(context, JWebSocketClientService::class.java)
        context.startService(intent)
    }

    /**
     * 关闭webSocket服务
     */
    fun closeWebSocketService() {
        context.stopService(Intent(context, JWebSocketClientService::class.java)) // 关闭webSocket服务
    }

    /**
     * 关闭webSocket通道
     */
    fun closeWebSocketClient() {
        mWebSocketClient?.close() // 主动关闭通道
        mWebSocketClient = null
    }


    /**
     * 初始化websocket连接
     */
    fun initSocketClient() {
        JWebSocketClientService.cancelHeartbeat()
        val uri: URI = URI.create("ws://*******")
        mWebSocketClient = object : JWebSocketClient(uri) {
            override fun onOpen(handshakedata: ServerHandshake?) {
                super.onOpen(handshakedata)
                startWebSocketHeartbeat(15)
            }

            override fun onClose(code: Int, reason: String?, remote: Boolean) {
                super.onClose(code, reason, remote)
                reconnectWs()
            }

            override fun onMessage(message: String) {
                super.onMessage(message)
                // 处理数据
            }
        }
        connectWs()
    }

    /**
     * 连接websocket
     */
    private fun connectWs() {
        object : Thread() {
            override fun run() {
                try {
                    //connectBlocking多出一个等待操作,会先连接再发送,否则未连接发送会报错
                    mWebSocketClient?.connectBlocking()
                } catch (e: InterruptedException) {
                    e.printStackTrace()
                }
            }
        }.start()
    }


    /**
     * 开启重连
     */
    fun reconnectWs() {
        JWebSocketClientService.cancelHeartbeat()
        object : Thread() {
            override fun run() {
                try {
                    Log.e("JWebSocketClientService", "开启重连")
                    mWebSocketClient?.reconnectBlocking()
                } catch (e: InterruptedException) {
                    e.printStackTrace()
                }
            }
        }.start()
    }
}

 类似资料: