WebSocket是一种在单个TCP连接上进行全双工通信的协议。它使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据。在WebSocket API中,客户端和服务器只需要完成一次握手,两者之间就直接可以创建持久性的连接,并进行双向数据传输。
android本身没有websocket的库,一般使用第三方框架实现,如Java-WebSocket、okhttp等。
GitHub地址:https://github.com/TooTallNate/Java-WebSocket
implementation "org.java-websocket:Java-WebSocket:1.4.0"
<uses-permission android:name="android.permission.INTERNET" />
新建一个客户端类并继承WebSocketClient
,需要实现它的四个抽象方法和构造函数,如下:
open class JWebSocketClient(serverUri: URI?) : WebSocketClient(serverUri, Draft_6455()) {
override fun onOpen(handshakedata: ServerHandshake?) {
Log.e("JWebSocketClient", "==连接成功==")
}
override fun onMessage(message: String) {
Log.i("JWebSocketClient", "onMessage:$message")
}
override fun onClose(code: Int, reason: String?, remote: Boolean) {
Log.e("JWebSocketClient", "==WebSocket关闭=== code:$code===== reason: $reason==== wasClean: $remote")
}
override fun onError(ex: Exception?) {
ex?.printStackTrace()
Log.e("JWebSocketClient", "==连接失败==")
}
}
其中
onOpen()
方法在websocket连接开启时调用,
onMessage()
方法在接收到消息时调用,
onClose()
方法在连接断开时调用,
onError()
方法在连接出错时调用。
构造方法中的new Draft_6455()代表使用的协议版本,这里可以不写或者写成这样即可。
建立连接只需要初始化此客户端再调用连接方法,需要注意的是WebSocketClient对象是不能重复使用的,所以不能重复初始化,其他地方只能调用当前这个Client。
//websocket协议地址大致是这样的: ws:// ip地址 : 端口号
val uri: URI = URI.create("ws://*******")
client = object : JWebSocketClient(uri) {
override fun onMessage(message: String) {
super.onMessage(message)
//message就是接收到的消息
Log.e("JWebSClientService", message)
}
}
try {
client?.connectBlocking()
} catch (e: InterruptedException) {
e.printStackTrace()
}
连接时可以使用connect()
方法或connectBlocking()
方法,建议使用connectBlocking()方法,connectBlocking多出一个等待操作,会先连接再发送。
发送消息只需要调用send()方法,如下
if (client?.isOpen == true) {
client?.send("你好")
}
client.reconnectBlocking()
1、重连机制:
(1)监听网络连接, 当从断开到连上就可以调用重连
(2)在client回调的onError和onClose中调用重连
2、心跳机制:
写一个定时线程, 隔20秒就发送一个心跳:client.sendPing()
, 这时候onWebsocketPong()
会回调一次,这就是一次心跳
3、后端传回来的消息在onMessage中返回来, 我们可以跟后台定格式,哪些业务需要处理什么格式的数据,然后根据type来分发给业务。
关闭连接调用close()方法,最后为了避免重复实例化WebSocketClient对象,关闭时一定要将对象置空。
/**
* 断开连接
*/
fun closeConnect() {
if (null != client) {
client?.close()
client = null
}
}
错误状态码:
WebSocket断开时,会触发CloseEvent, CloseEvent会在连接关闭时发送给使用 WebSockets 的客户端. 它在 WebSocket 对象的 onclose 事件监听器中使用。CloseEvent的code字段表示了WebSocket断开的原因。可以从该字段中分析断开的原因。
CloseEvent有三个字段需要注意, 通过分析这三个字段,一般就可以找到断开原因
CloseEvent.code
: code是错误码,是整数类型CloseEvent.reason
: reason是断开原因,是字符串CloseEvent.wasClean
: wasClean表示是否正常断开,是布尔值。一般异常断开时,该值为false状态码| 名称 |描述
–|–|–|–
0–999| |保留段, 未使用.
1000| CLOSE_NORMAL| 正常关闭; 无论为何目的而创建, 该链接都已成功完成任务.
1001| CLOSE_GOING_AWAY| 终端离开, 可能因为服务端错误, 也可能因为浏览器正从打开连接的页面跳转离开.
1002| CLOSE_PROTOCOL_ERROR| 由于协议错误而中断连接.
1003| CLOSE_UNSUPPORTED| 由于接收到不允许的数据类型而断开连接 (如仅接收文本数据的终端接收到了二进制数据).
1004| | 保留. 其意义可能会在未来定义.
1005| CLOSE_NO_STATUS| 保留. 表示没有收到预期的状态码.
1006| CLOSE_ABNORMAL| 保留. 用于期望收到状态码时连接非正常关闭 (也就是说, 没有发送关闭帧).
1007| Unsupported Data| 由于收到了格式不符的数据而断开连接 (如文本消息中包含了非 UTF-8 数据).
1008| Policy Violation| 由于收到不符合约定的数据而断开连接. 这是一个通用状态码, 用于不适合使用 1003 和 1009 状态码的场景.
1009| CLOSE_TOO_LARGE| 由于收到过大的数据帧而断开连接.
1010| Missing Extension| 客户端期望服务器商定一个或多个拓展, 但服务器没有处理, 因此客户端断开连接.
1011| Internal Error| 客户端由于遇到没有预料的情况阻止其完成请求, 因此服务端断开连接.
1012| Service Restart| 服务器由于重启而断开连接.
1013| Try Again Later| 服务器由于临时原因断开连接, 如服务器过载因此断开一部分客户端连接.
1014| | 由 WebSocket标准保留以便未来使用.
1015| TLS Handshake | 保留. 表示连接由于无法完成 TLS 握手而关闭 (例如无法验证服务器证书).
1016–1999| | 由 WebSocket标准保留以便未来使用.
2000–2999| | 由 WebSocket拓展保留使用.
JWebSocketClient.kt
open class JWebSocketClient(serverUri: URI?) : WebSocketClient(serverUri, Draft_6455()) {
override fun onOpen(handshakedata: ServerHandshake?) {
Log.e("JWebSocketClient", "==连接成功==")
}
override fun onMessage(message: String) {
Log.i("JWebSocketClient", "onMessage:$message")
}
override fun onClose(code: Int, reason: String?, remote: Boolean) {
Log.e("JWebSocketClient", "==WebSocket关闭=== code:$code===== reason: $reason==== wasClean: $remote")
}
override fun onError(ex: Exception?) {
ex?.printStackTrace()
Log.e("JWebSocketClient", "==连接失败==")
}
}
JWebSocketClientService.kt
/**
* WebSocket服务类
*/
class JWebSocketClientService : Service() {
companion object {
private var heartbeatTime: Long = 15 //心跳间隔时间
private var mScheduledExecutorService: ScheduledExecutorService? = null
/**
* 开始发送心跳线程
*/
fun startHeartbeat(initialDelay: Long = 0) {
if (mScheduledExecutorService == null) {
mScheduledExecutorService = Executors.newSingleThreadScheduledExecutor()
mScheduledExecutorService?.scheduleWithFixedDelay(heartbeatRunnable, initialDelay, heartbeatTime, TimeUnit.SECONDS)
}
}
/**
* 取消心跳
*/
fun cancelHeartbeat() {
mScheduledExecutorService?.shutdownNow()
mScheduledExecutorService = null
}
/**
* 心跳线程
*/
private var heartbeatRunnable = Runnable {
run {
Log.d("JWebSocketClientService", "心跳包检测websocket连接状态");
if (WebSocketTool.mWebSocketClient != null) {
if (WebSocketTool.mWebSocketClient?.isClosed == true) {
WebSocketTool.reconnectWs()
} else {
WebSocketTool.mWebSocketClient?.sendPing()
}
} else {
//如果client已为空,重新初始化连接
WebSocketTool.initSocketClient()
}
}
}
}
override fun onCreate() {
super.onCreate()
notificationManager = getSystemService(Context.NOTIFICATION_SERVICE) as NotificationManager
}
override fun onBind(p0: Intent?): IBinder? {
return null
}
private var rv: RemoteViews? = null
private var notification: Notification? = null
private var notificationManager: NotificationManager? = null
/**
* 通知栏显示
*/
private fun showNotification() {
var builder: NotificationCompat.Builder?
//如果当前Android的版本相比Android O,一样或者版本更高,就建通知渠道(Notification Channels )
val id = "channel_0"
if (notificationManager == null) {
notificationManager = getSystemService(Context.NOTIFICATION_SERVICE) as NotificationManager
}
if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.O) {//判断API
//1.0 建渠道
val mChannel = NotificationChannel(id, "ip", NotificationManager.IMPORTANCE_LOW)
//2.0 把通知渠道通过createNotificationChannel( )方法给-
// -状态栏通知的管理类 NotificationManager
notificationManager!!.createNotificationChannel(mChannel)
//3.0 Notification这时候可以正常工作了
builder = NotificationCompat.Builder(this, id)
// notification = builder.build();
} else {
//如果当前Android的版本比Android O(API 26)版本要低
//直接开始上面的3.0步骤——Notification这时候就可以正常工作了
builder = NotificationCompat.Builder(this, id)
}
if (rv == null) {
rv = RemoteViews(packageName, R.layout.layout_ip_notification)
}
builder.setContent(rv)
.setOngoing(true)
.setSmallIcon(R.mipmap.logo)
notification = builder.build()
// 点击该通知后要跳转的Activity
val notificationIntent = Intent(this, MainActivity::class.java)
val contentIntent = PendingIntent.getActivity(this, 0, notificationIntent, 0)
rv!!.setOnClickPendingIntent(R.id.notice, contentIntent)
startForeground(73, notification)
notificationManager!!.notify(73, notification)
}
override fun onStartCommand(intent: Intent, flags: Int, startId: Int): Int {
//初始化websocket
WebSocketTool.initSocketClient()
showNotification()
return START_REDELIVER_INTENT
}
/**
* 关闭webSocket服务器时关闭连接
*/
override fun onDestroy() {
super.onDestroy()
Log.e("==onDestroy==", "webSocket服务器关闭")
WebSocketTool.closeWebSocketClient() // 关闭webSocket通道
stopForeground(true)
if (rv != null) {
stopForeground(true)
notificationManager?.cancel(73)
}
mScheduledExecutorService?.shutdownNow()
mScheduledExecutorService = null
}
}
WebSocketTool.kt
object WebSocketTool {
private var context: Context = IPBroadcastApplication.getContext()
var mWebSocketClient: WebSocketClient? = null
/**
* 发送消息
*/
fun requestBody(str: String, actionCode: String) {
if (mWebSocketClient?.isOpen == true) {
mWebSocketClient?.send(str)
} else {
if (mWebSocketClient != null) {
reconnectWs()
} else {
//如果client已为空,重新初始化连接
initSocketClient()
}
}
}
/**
* 开始WebSocket心跳
*/
fun startWebSocketHeartbeat(initialDelay: Long = 0) {
JWebSocketClientService.cancelHeartbeat()
JWebSocketClientService.startHeartbeat(initialDelay)
}
/**
* 开始WebSocket服务
*/
fun startWebSocketService() {
val intent = Intent(context, JWebSocketClientService::class.java)
context.startService(intent)
}
/**
* 关闭webSocket服务
*/
fun closeWebSocketService() {
context.stopService(Intent(context, JWebSocketClientService::class.java)) // 关闭webSocket服务
}
/**
* 关闭webSocket通道
*/
fun closeWebSocketClient() {
mWebSocketClient?.close() // 主动关闭通道
mWebSocketClient = null
}
/**
* 初始化websocket连接
*/
fun initSocketClient() {
JWebSocketClientService.cancelHeartbeat()
val uri: URI = URI.create("ws://*******")
mWebSocketClient = object : JWebSocketClient(uri) {
override fun onOpen(handshakedata: ServerHandshake?) {
super.onOpen(handshakedata)
startWebSocketHeartbeat(15)
}
override fun onClose(code: Int, reason: String?, remote: Boolean) {
super.onClose(code, reason, remote)
reconnectWs()
}
override fun onMessage(message: String) {
super.onMessage(message)
// 处理数据
}
}
connectWs()
}
/**
* 连接websocket
*/
private fun connectWs() {
object : Thread() {
override fun run() {
try {
//connectBlocking多出一个等待操作,会先连接再发送,否则未连接发送会报错
mWebSocketClient?.connectBlocking()
} catch (e: InterruptedException) {
e.printStackTrace()
}
}
}.start()
}
/**
* 开启重连
*/
fun reconnectWs() {
JWebSocketClientService.cancelHeartbeat()
object : Thread() {
override fun run() {
try {
Log.e("JWebSocketClientService", "开启重连")
mWebSocketClient?.reconnectBlocking()
} catch (e: InterruptedException) {
e.printStackTrace()
}
}
}.start()
}
}