ntfy-android/app/src/main/java/io/heckel/ntfy/service/WsConnection.kt

197 lines
8.5 KiB
Kotlin
Raw Normal View History

2022-01-16 07:31:34 +13:00
package io.heckel.ntfy.service
import android.app.AlarmManager
import android.os.Build
2022-01-16 12:40:38 +13:00
import android.os.Handler
import android.os.Looper
2022-01-28 13:57:43 +13:00
import io.heckel.ntfy.db.*
2022-01-17 18:19:05 +13:00
import io.heckel.ntfy.log.Log
2022-02-03 06:11:04 +13:00
import io.heckel.ntfy.msg.ApiService.Companion.requestBuilder
2022-01-16 12:40:38 +13:00
import io.heckel.ntfy.msg.NotificationParser
2022-02-03 06:30:34 +13:00
import io.heckel.ntfy.util.topicShortUrl
2022-01-16 07:31:34 +13:00
import io.heckel.ntfy.util.topicUrlWs
import okhttp3.*
import java.util.*
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicLong
import kotlin.random.Random
2022-01-21 15:43:28 +13:00
/**
* Connect to ntfy server via WebSockets. This connection represents a single connection to a server, with
* one or more topics. When the topics are changed, the connection is recreated by the service.
*
* The connection re-connects on failure, indefinitely. It reports limited status via the stateChangeListener,
* and forwards incoming messages via the notificationListener.
*
* The original class is taken from the fantastic Gotify project (MIT). Thank you:
* https://github.com/gotify/android/blob/master/app/src/main/java/com/github/gotify/service/WebSocketConnection.java
*/
2022-01-16 07:31:34 +13:00
class WsConnection(
2022-01-28 13:57:43 +13:00
private val connectionId: ConnectionId,
2022-01-16 07:31:34 +13:00
private val repository: Repository,
2022-01-28 13:57:43 +13:00
private val user: User?,
2022-01-16 07:31:34 +13:00
private val sinceTime: Long,
private val stateChangeListener: (Collection<Long>, ConnectionState) -> Unit,
private val notificationListener: (Subscription, Notification) -> Unit,
private val alarmManager: AlarmManager
) : Connection {
2022-01-16 12:40:38 +13:00
private val parser = NotificationParser()
private val client = OkHttpClient.Builder()
.readTimeout(0, TimeUnit.MILLISECONDS)
2022-01-16 13:20:30 +13:00
.pingInterval(1, TimeUnit.MINUTES) // The server pings us too, so this doesn't matter much
2022-01-16 12:40:38 +13:00
.connectTimeout(10, TimeUnit.SECONDS)
.build()
2022-01-16 07:31:34 +13:00
private var errorCount = 0
private var webSocket: WebSocket? = null
private var state: State? = null
2022-01-16 13:20:30 +13:00
private var closed = false
2022-01-16 07:31:34 +13:00
2022-02-05 13:52:34 +13:00
private val globalId = GLOBAL_ID.incrementAndGet()
private val listenerId = AtomicLong(0)
private val since = AtomicLong(sinceTime)
2022-01-28 13:57:43 +13:00
private val baseUrl = connectionId.baseUrl
private val topicsToSubscriptionIds = connectionId.topicsToSubscriptionIds
2022-01-16 07:31:34 +13:00
private val subscriptionIds = topicsToSubscriptionIds.values
private val topicsStr = topicsToSubscriptionIds.keys.joinToString(separator = ",")
2022-02-03 06:30:34 +13:00
private val shortUrl = topicShortUrl(baseUrl, topicsStr)
2022-01-16 07:31:34 +13:00
2022-02-05 13:52:34 +13:00
init {
Log.d(TAG, "$shortUrl (gid=$globalId): New connection with global ID $globalId")
}
2022-01-16 07:31:34 +13:00
@Synchronized
override fun start() {
2022-01-16 13:20:30 +13:00
if (closed || state == State.Connecting || state == State.Connected) {
2022-02-05 13:52:34 +13:00
Log.d(TAG,"$shortUrl (gid=$globalId): Not (re-)starting, because connection is marked closed/connecting/connected")
2022-01-16 07:31:34 +13:00
return
}
2022-01-16 13:20:30 +13:00
if (webSocket != null) {
webSocket!!.close(WS_CLOSE_NORMAL, "")
2022-01-16 13:20:30 +13:00
}
2022-01-16 07:31:34 +13:00
state = State.Connecting
2022-02-05 13:52:34 +13:00
val nextListenerId = listenerId.incrementAndGet()
val sinceVal = if (since.get() == 0L) "all" else since.get().toString()
2022-01-16 12:40:38 +13:00
val urlWithSince = topicUrlWs(baseUrl, topicsStr, sinceVal)
2022-02-03 06:11:04 +13:00
val request = requestBuilder(urlWithSince, user).build()
2022-02-05 13:52:34 +13:00
Log.d(TAG, "$shortUrl (gid=$globalId): Opening $urlWithSince with listener ID $nextListenerId ...")
webSocket = client.newWebSocket(request, Listener(nextListenerId))
2022-01-16 07:31:34 +13:00
}
@Synchronized
2022-01-16 13:20:30 +13:00
override fun close() {
closed = true
2022-01-16 12:40:38 +13:00
if (webSocket == null) {
2022-02-05 13:52:34 +13:00
Log.d(TAG,"$shortUrl (gid=$globalId): Not closing existing connection, because there is no active web socket")
2022-01-16 12:40:38 +13:00
return
2022-01-16 07:31:34 +13:00
}
2022-02-05 13:52:34 +13:00
Log.d(TAG, "$shortUrl (gid=$globalId): Closing connection")
2022-01-16 12:40:38 +13:00
state = State.Disconnected
webSocket!!.close(WS_CLOSE_NORMAL, "")
2022-01-16 12:40:38 +13:00
webSocket = null
2022-01-16 07:31:34 +13:00
}
2022-01-16 12:40:38 +13:00
@Synchronized
2022-01-16 07:31:34 +13:00
override fun since(): Long {
2022-02-05 13:52:34 +13:00
return since.get()
2022-01-16 07:31:34 +13:00
}
@Synchronized
2022-01-16 12:40:38 +13:00
fun scheduleReconnect(seconds: Int) {
2022-01-16 13:20:30 +13:00
if (closed || state == State.Connecting || state == State.Connected) {
2022-02-05 13:52:34 +13:00
Log.d(TAG,"$shortUrl (gid=$globalId): Not rescheduling connection, because connection is marked closed/connecting/connected")
2022-01-16 07:31:34 +13:00
return
}
state = State.Scheduled
if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.N) {
2022-02-05 13:52:34 +13:00
Log.d(TAG,"$shortUrl (gid=$globalId): Scheduling a restart in $seconds seconds (via alarm manager)")
2022-01-16 12:40:38 +13:00
val reconnectTime = Calendar.getInstance()
reconnectTime.add(Calendar.SECOND, seconds)
alarmManager.setExact(AlarmManager.RTC_WAKEUP, reconnectTime.timeInMillis, RECONNECT_TAG, { start() }, null)
2022-01-16 07:31:34 +13:00
} else {
2022-02-05 13:52:34 +13:00
Log.d(TAG, "$shortUrl (gid=$globalId): Scheduling a restart in $seconds seconds (via handler)")
2022-01-16 12:40:38 +13:00
val handler = Handler(Looper.getMainLooper())
handler.postDelayed({ start() }, TimeUnit.SECONDS.toMillis(seconds.toLong()))
2022-01-16 07:31:34 +13:00
}
}
private inner class Listener(private val id: Long) : WebSocketListener() {
override fun onOpen(webSocket: WebSocket, response: Response) {
2022-02-03 06:30:34 +13:00
synchronize("onOpen") {
2022-02-05 13:52:34 +13:00
Log.d(TAG, "$shortUrl (gid=$globalId, lid=$id): Opened connection")
2022-01-16 07:31:34 +13:00
state = State.Connected
if (errorCount > 0) {
errorCount = 0
}
stateChangeListener(subscriptionIds, ConnectionState.CONNECTED)
}
}
override fun onMessage(webSocket: WebSocket, text: String) {
2022-02-03 06:30:34 +13:00
synchronize("onMessage") {
2022-02-05 13:52:34 +13:00
Log.d(TAG, "$shortUrl (gid=$globalId, lid=$id): Received message: $text")
2022-01-16 12:40:38 +13:00
val notificationWithTopic = parser.parseWithTopic(text, subscriptionId = 0, notificationId = Random.nextInt())
if (notificationWithTopic == null) {
2022-02-05 13:52:34 +13:00
Log.d(TAG, "$shortUrl (gid=$globalId, lid=$id): Irrelevant or unknown message. Discarding.")
2022-02-03 06:30:34 +13:00
return@synchronize
2022-01-16 07:31:34 +13:00
}
2022-01-16 12:40:38 +13:00
val topic = notificationWithTopic.topic
val notification = notificationWithTopic.notification
2022-02-03 06:30:34 +13:00
val subscriptionId = topicsToSubscriptionIds[topic] ?: return@synchronize
val subscription = repository.getSubscription(subscriptionId) ?: return@synchronize
2022-01-16 12:40:38 +13:00
val notificationWithSubscriptionId = notification.copy(subscriptionId = subscription.id)
notificationListener(subscription, notificationWithSubscriptionId)
2022-02-05 13:52:34 +13:00
since.set(notification.timestamp)
2022-01-16 07:31:34 +13:00
}
}
override fun onClosed(webSocket: WebSocket, code: Int, reason: String) {
2022-02-03 06:30:34 +13:00
synchronize("onClosed") {
2022-02-05 13:52:34 +13:00
Log.w(TAG, "$shortUrl (gid=$globalId, lid=$id): Closed connection")
2022-01-16 07:31:34 +13:00
state = State.Disconnected
}
}
override fun onFailure(webSocket: WebSocket, t: Throwable, response: Response?) {
2022-02-03 06:30:34 +13:00
synchronize("onFailure") {
2022-02-03 06:11:04 +13:00
if (response == null) {
2022-02-05 13:52:34 +13:00
Log.e(TAG, "$shortUrl (gid=$globalId, lid=$id): Connection failed (response is null): ${t.message}", t)
2022-02-03 06:11:04 +13:00
} else {
2022-02-05 13:52:34 +13:00
Log.e(TAG, "$shortUrl (gid=$globalId, lid=$id): Connection failed (response code ${response.code}, message: ${response.message}): ${t.message}", t)
2022-02-03 06:11:04 +13:00
}
2022-01-16 13:20:30 +13:00
if (closed) {
2022-02-05 13:52:34 +13:00
Log.d(TAG, "$shortUrl (gid=$globalId, lid=$id): Connection marked as closed. Not retrying.")
2022-02-03 06:30:34 +13:00
return@synchronize
2022-01-16 13:20:30 +13:00
}
2022-01-16 07:31:34 +13:00
stateChangeListener(subscriptionIds, ConnectionState.CONNECTING)
state = State.Disconnected
errorCount++
2022-01-16 12:40:38 +13:00
val retrySeconds = RETRY_SECONDS.getOrNull(errorCount) ?: RETRY_SECONDS.last()
scheduleReconnect(retrySeconds)
2022-01-16 07:31:34 +13:00
}
}
2022-02-03 06:30:34 +13:00
private fun synchronize(tag: String, fn: () -> Unit) {
2022-01-16 07:31:34 +13:00
synchronized(this) {
2022-02-03 06:30:34 +13:00
if (listenerId.get() == id) {
2022-01-16 13:20:30 +13:00
fn()
2022-02-03 06:11:04 +13:00
} else {
2022-02-05 13:52:34 +13:00
Log.w(TAG, "$shortUrl (gid=$globalId, lid=$id): Skipping synchronized block '$tag', because listener ID does not match ${listenerId.get()}")
2022-01-16 07:31:34 +13:00
}
}
}
}
internal enum class State {
Scheduled, Connecting, Connected, Disconnected
}
companion object {
private const val TAG = "NtfyWsConnection"
2022-01-16 12:40:38 +13:00
private const val RECONNECT_TAG = "WsReconnect"
private const val WS_CLOSE_NORMAL = 1000
2022-01-16 12:40:38 +13:00
private val RETRY_SECONDS = listOf(5, 10, 15, 20, 30, 45, 60, 120)
2022-02-05 13:52:34 +13:00
private val GLOBAL_ID = AtomicLong(0)
2022-01-16 07:31:34 +13:00
}
}