diff --git a/app/src/main/java/io/heckel/ntfy/service/JsonConnection.kt b/app/src/main/java/io/heckel/ntfy/service/JsonConnection.kt index f2b1fd3..d599819 100644 --- a/app/src/main/java/io/heckel/ntfy/service/JsonConnection.kt +++ b/app/src/main/java/io/heckel/ntfy/service/JsonConnection.kt @@ -86,7 +86,7 @@ class JsonConnection( return since } - override fun cancel() { + override fun close() { Log.d(TAG, "[$url] Cancelling connection") if (this::job.isInitialized) job?.cancel() if (this::call.isInitialized) call?.cancel() diff --git a/app/src/main/java/io/heckel/ntfy/service/SubscriberService.kt b/app/src/main/java/io/heckel/ntfy/service/SubscriberService.kt index 5d0943b..55b83c0 100644 --- a/app/src/main/java/io/heckel/ntfy/service/SubscriberService.kt +++ b/app/src/main/java/io/heckel/ntfy/service/SubscriberService.kt @@ -58,7 +58,7 @@ import java.util.concurrent.ConcurrentHashMap interface Connection { fun start() - fun cancel() + fun close() fun since(): Long fun matches(otherSubscriptionIds: Collection): Boolean } @@ -133,7 +133,7 @@ class SubscriberService : Service() { Log.d(TAG, "Stopping the foreground service") // Cancelling all remaining jobs and open HTTP calls - connections.values.forEach { connection -> connection.cancel() } + connections.values.forEach { connection -> connection.close() } connections.clear() // Releasing wake-lock and stopping ourselves @@ -179,7 +179,7 @@ class SubscriberService : Service() { if (connection != null && !connection.matches(subscriptions.values)) { since = connection.since() connections.remove(baseUrl) - connection.cancel() + connection.close() } if (!connections.containsKey(baseUrl)) { val serviceActive = { -> isServiceStarted } @@ -199,7 +199,7 @@ class SubscriberService : Service() { connections.keys().toList().forEach { baseUrl -> if (!baseUrls.contains(baseUrl)) { val connection = connections.remove(baseUrl) - connection?.cancel() + connection?.close() } } diff --git a/app/src/main/java/io/heckel/ntfy/service/WsConnection.kt b/app/src/main/java/io/heckel/ntfy/service/WsConnection.kt index 0f34503..c58412c 100644 --- a/app/src/main/java/io/heckel/ntfy/service/WsConnection.kt +++ b/app/src/main/java/io/heckel/ntfy/service/WsConnection.kt @@ -30,12 +30,13 @@ class WsConnection( private val parser = NotificationParser() private val client = OkHttpClient.Builder() .readTimeout(0, TimeUnit.MILLISECONDS) - .pingInterval(1, TimeUnit.MINUTES) + .pingInterval(1, TimeUnit.MINUTES) // The server pings us too, so this doesn't matter much .connectTimeout(10, TimeUnit.SECONDS) .build() private var errorCount = 0 private var webSocket: WebSocket? = null private var state: State? = null + private var closed = false private var since: Long = sinceTime private val subscriptionIds = topicsToSubscriptionIds.values @@ -44,10 +45,12 @@ class WsConnection( @Synchronized override fun start() { - if (state == State.Connecting || state == State.Connected) { + if (closed || state == State.Connecting || state == State.Connected) { return } - cancel() + if (webSocket != null) { + webSocket!!.close(1000, "") + } state = State.Connecting val nextId = ID.incrementAndGet() val sinceVal = if (since == 0L) "all" else since.toString() @@ -58,7 +61,8 @@ class WsConnection( } @Synchronized - override fun cancel() { + override fun close() { + closed = true if (webSocket == null) { return } @@ -79,7 +83,7 @@ class WsConnection( @Synchronized fun scheduleReconnect(seconds: Int) { - if (state == State.Connecting || state == State.Connected) { + if (closed || state == State.Connecting || state == State.Connected) { return } state = State.Scheduled @@ -134,6 +138,10 @@ class WsConnection( override fun onFailure(webSocket: WebSocket, t: Throwable, response: Response?) { Log.e(TAG, "[$url] WebSocket($id): failure ${response?.code}: ${response?.message}", t) syncExec { + if (closed) { + Log.d(TAG, "WebSocket($id): Connection marked as closed. Not retrying.") + return@syncExec + } stateChangeListener(subscriptionIds, ConnectionState.CONNECTING) state = State.Disconnected errorCount++ @@ -142,10 +150,10 @@ class WsConnection( } } - private fun syncExec(runnable: Runnable) { + private fun syncExec(fn: () -> Unit) { synchronized(this) { if (ID.get() == id) { - runnable.run() + fn() } } }