forked from gotify/android
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathWebSocketConnection.kt
More file actions
244 lines (214 loc) · 7.64 KB
/
WebSocketConnection.kt
File metadata and controls
244 lines (214 loc) · 7.64 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
package com.github.gotify.service
import android.app.AlarmManager
import android.app.AlarmManager.OnAlarmListener
import android.os.Build
import android.os.Handler
import android.os.Looper
import com.github.gotify.SSLSettings
import com.github.gotify.Utils
import com.github.gotify.api.CertUtils
import com.github.gotify.client.model.Message
import java.util.Calendar
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicLong
import kotlin.math.pow
import kotlin.time.Duration
import kotlin.time.Duration.Companion.minutes
import kotlin.time.Duration.Companion.seconds
import okhttp3.HttpUrl.Companion.toHttpUrlOrNull
import okhttp3.OkHttpClient
import okhttp3.Request
import okhttp3.Response
import okhttp3.WebSocket
import okhttp3.WebSocketListener
import org.tinylog.kotlin.Logger
internal class WebSocketConnection(
private val baseUrl: String,
settings: SSLSettings,
private val token: String?,
private val alarmManager: AlarmManager,
private val reconnectDelay: Duration,
private val exponentialBackoff: Boolean
) {
companion object {
private val ID = AtomicLong(0)
}
private var alarmManagerCallback: OnAlarmListener? = null
private var handlerCallback: Runnable? = null
private val client: OkHttpClient
private val reconnectHandler = Handler(Looper.getMainLooper())
private var errorCount = 0
private var webSocket: WebSocket? = null
private lateinit var onMessageCallback: (Message) -> Unit
private lateinit var onClose: Runnable
private lateinit var onOpen: Runnable
private lateinit var onFailure: OnNetworkFailureRunnable
private lateinit var onReconnected: Runnable
private var state: State? = null
init {
val builder = OkHttpClient.Builder()
.readTimeout(0, TimeUnit.MILLISECONDS)
.pingInterval(1, TimeUnit.MINUTES)
.connectTimeout(10, TimeUnit.SECONDS)
CertUtils.applySslSettings(builder, settings)
client = builder.build()
}
@Synchronized
fun onMessage(onMessage: (Message) -> Unit): WebSocketConnection {
this.onMessageCallback = onMessage
return this
}
@Synchronized
fun onClose(onClose: Runnable): WebSocketConnection {
this.onClose = onClose
return this
}
@Synchronized
fun onOpen(onOpen: Runnable): WebSocketConnection {
this.onOpen = onOpen
return this
}
@Synchronized
fun onFailure(onFailure: OnNetworkFailureRunnable): WebSocketConnection {
this.onFailure = onFailure
return this
}
@Synchronized
fun onReconnected(onReconnected: Runnable): WebSocketConnection {
this.onReconnected = onReconnected
return this
}
private fun request(): Request {
val url = baseUrl.toHttpUrlOrNull()!!
.newBuilder()
.addPathSegment("stream")
.addQueryParameter("token", token)
.build()
return Request.Builder().url(url).get().build()
}
@Synchronized
fun start(): WebSocketConnection {
if (state == State.Connecting || state == State.Connected) {
return this
}
close()
state = State.Connecting
val nextId = ID.incrementAndGet()
Logger.info("WebSocket($nextId): starting...")
webSocket = client.newWebSocket(request(), Listener(nextId))
return this
}
@Synchronized
fun close() {
if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.N) {
alarmManagerCallback?.run(alarmManager::cancel)
alarmManagerCallback = null
} else {
handlerCallback?.run(reconnectHandler::removeCallbacks)
handlerCallback = null
}
if (webSocket != null) {
webSocket?.close(1000, "")
closed()
Logger.info("WebSocket(${ID.get()}): closing existing connection.")
}
}
@Synchronized
private fun closed() {
webSocket = null
state = State.Disconnected
}
fun scheduleReconnectNow(scheduleIn: Duration) = scheduleReconnect(ID.get(), scheduleIn)
@Synchronized
fun scheduleReconnect(id: Long, scheduleIn: Duration) {
if (state == State.Connecting || state == State.Connected) {
return
}
state = State.Scheduled
if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.N) {
Logger.info("WebSocket: scheduling a restart in $scheduleIn (via alarm manager)")
val future = Calendar.getInstance()
future.add(Calendar.SECOND, scheduleIn.inWholeSeconds.toInt())
alarmManagerCallback?.run(alarmManager::cancel)
val cb = OnAlarmListener { syncExec(id) { start() } }
alarmManagerCallback = cb
alarmManager.setExact(
AlarmManager.RTC_WAKEUP,
future.timeInMillis,
"reconnect-tag",
cb,
null
)
} else {
Logger.info("WebSocket: scheduling a restart in $scheduleIn")
handlerCallback?.run(reconnectHandler::removeCallbacks)
val cb = Runnable { syncExec(id) { start() } }
handlerCallback = cb
reconnectHandler.postDelayed(cb, scheduleIn.inWholeMilliseconds)
}
}
private inner class Listener(private val id: Long) : WebSocketListener() {
override fun onOpen(webSocket: WebSocket, response: Response) {
syncExec(id) {
state = State.Connected
Logger.info("WebSocket($id): opened")
onOpen.run()
if (errorCount > 0) {
onReconnected.run()
errorCount = 0
}
}
super.onOpen(webSocket, response)
}
override fun onMessage(webSocket: WebSocket, text: String) {
syncExec(id) {
Logger.info("WebSocket($id): received message $text")
val message = Utils.JSON.fromJson(text, Message::class.java)
onMessageCallback(message)
}
super.onMessage(webSocket, text)
}
override fun onClosed(webSocket: WebSocket, code: Int, reason: String) {
syncExec(id) {
if (state == State.Connected) {
Logger.warn("WebSocket($id): closed")
onClose.run()
}
closed()
}
super.onClosed(webSocket, code, reason)
}
override fun onFailure(webSocket: WebSocket, t: Throwable, response: Response?) {
val code = if (response != null) "StatusCode: ${response.code}" else ""
val message = response?.message ?: ""
Logger.error(t) { "WebSocket($id): failure $code Message: $message" }
syncExec(id) {
closed()
errorCount++
var scheduleIn = reconnectDelay
if (exponentialBackoff) {
scheduleIn *= 2.0.pow(errorCount - 1)
}
scheduleIn = scheduleIn.coerceIn(5.seconds..20.minutes)
onFailure.execute(response?.message ?: "unreachable", scheduleIn)
scheduleReconnect(id, scheduleIn)
}
super.onFailure(webSocket, t, response)
}
}
@Synchronized
private fun syncExec(id: Long, runnable: () -> Unit) {
if (ID.get() == id) {
runnable()
}
}
internal fun interface OnNetworkFailureRunnable {
fun execute(status: String, reconnectIn: Duration)
}
internal enum class State {
Scheduled,
Connecting,
Connected,
Disconnected
}
}