Actually kill WS connection

This commit is contained in:
Philipp Heckel 2022-01-15 19:20:30 -05:00
parent 5175b1b0cb
commit 4dd09ac39d
3 changed files with 20 additions and 12 deletions

View file

@ -86,7 +86,7 @@ class JsonConnection(
return since return since
} }
override fun cancel() { override fun close() {
Log.d(TAG, "[$url] Cancelling connection") Log.d(TAG, "[$url] Cancelling connection")
if (this::job.isInitialized) job?.cancel() if (this::job.isInitialized) job?.cancel()
if (this::call.isInitialized) call?.cancel() if (this::call.isInitialized) call?.cancel()

View file

@ -58,7 +58,7 @@ import java.util.concurrent.ConcurrentHashMap
interface Connection { interface Connection {
fun start() fun start()
fun cancel() fun close()
fun since(): Long fun since(): Long
fun matches(otherSubscriptionIds: Collection<Long>): Boolean fun matches(otherSubscriptionIds: Collection<Long>): Boolean
} }
@ -133,7 +133,7 @@ class SubscriberService : Service() {
Log.d(TAG, "Stopping the foreground service") Log.d(TAG, "Stopping the foreground service")
// Cancelling all remaining jobs and open HTTP calls // Cancelling all remaining jobs and open HTTP calls
connections.values.forEach { connection -> connection.cancel() } connections.values.forEach { connection -> connection.close() }
connections.clear() connections.clear()
// Releasing wake-lock and stopping ourselves // Releasing wake-lock and stopping ourselves
@ -179,7 +179,7 @@ class SubscriberService : Service() {
if (connection != null && !connection.matches(subscriptions.values)) { if (connection != null && !connection.matches(subscriptions.values)) {
since = connection.since() since = connection.since()
connections.remove(baseUrl) connections.remove(baseUrl)
connection.cancel() connection.close()
} }
if (!connections.containsKey(baseUrl)) { if (!connections.containsKey(baseUrl)) {
val serviceActive = { -> isServiceStarted } val serviceActive = { -> isServiceStarted }
@ -199,7 +199,7 @@ class SubscriberService : Service() {
connections.keys().toList().forEach { baseUrl -> connections.keys().toList().forEach { baseUrl ->
if (!baseUrls.contains(baseUrl)) { if (!baseUrls.contains(baseUrl)) {
val connection = connections.remove(baseUrl) val connection = connections.remove(baseUrl)
connection?.cancel() connection?.close()
} }
} }

View file

@ -30,12 +30,13 @@ class WsConnection(
private val parser = NotificationParser() private val parser = NotificationParser()
private val client = OkHttpClient.Builder() private val client = OkHttpClient.Builder()
.readTimeout(0, TimeUnit.MILLISECONDS) .readTimeout(0, TimeUnit.MILLISECONDS)
.pingInterval(1, TimeUnit.MINUTES) .pingInterval(1, TimeUnit.MINUTES) // The server pings us too, so this doesn't matter much
.connectTimeout(10, TimeUnit.SECONDS) .connectTimeout(10, TimeUnit.SECONDS)
.build() .build()
private var errorCount = 0 private var errorCount = 0
private var webSocket: WebSocket? = null private var webSocket: WebSocket? = null
private var state: State? = null private var state: State? = null
private var closed = false
private var since: Long = sinceTime private var since: Long = sinceTime
private val subscriptionIds = topicsToSubscriptionIds.values private val subscriptionIds = topicsToSubscriptionIds.values
@ -44,10 +45,12 @@ class WsConnection(
@Synchronized @Synchronized
override fun start() { override fun start() {
if (state == State.Connecting || state == State.Connected) { if (closed || state == State.Connecting || state == State.Connected) {
return return
} }
cancel() if (webSocket != null) {
webSocket!!.close(1000, "")
}
state = State.Connecting state = State.Connecting
val nextId = ID.incrementAndGet() val nextId = ID.incrementAndGet()
val sinceVal = if (since == 0L) "all" else since.toString() val sinceVal = if (since == 0L) "all" else since.toString()
@ -58,7 +61,8 @@ class WsConnection(
} }
@Synchronized @Synchronized
override fun cancel() { override fun close() {
closed = true
if (webSocket == null) { if (webSocket == null) {
return return
} }
@ -79,7 +83,7 @@ class WsConnection(
@Synchronized @Synchronized
fun scheduleReconnect(seconds: Int) { fun scheduleReconnect(seconds: Int) {
if (state == State.Connecting || state == State.Connected) { if (closed || state == State.Connecting || state == State.Connected) {
return return
} }
state = State.Scheduled state = State.Scheduled
@ -134,6 +138,10 @@ class WsConnection(
override fun onFailure(webSocket: WebSocket, t: Throwable, response: Response?) { override fun onFailure(webSocket: WebSocket, t: Throwable, response: Response?) {
Log.e(TAG, "[$url] WebSocket($id): failure ${response?.code}: ${response?.message}", t) Log.e(TAG, "[$url] WebSocket($id): failure ${response?.code}: ${response?.message}", t)
syncExec { syncExec {
if (closed) {
Log.d(TAG, "WebSocket($id): Connection marked as closed. Not retrying.")
return@syncExec
}
stateChangeListener(subscriptionIds, ConnectionState.CONNECTING) stateChangeListener(subscriptionIds, ConnectionState.CONNECTING)
state = State.Disconnected state = State.Disconnected
errorCount++ errorCount++
@ -142,10 +150,10 @@ class WsConnection(
} }
} }
private fun syncExec(runnable: Runnable) { private fun syncExec(fn: () -> Unit) {
synchronized(this) { synchronized(this) {
if (ID.get() == id) { if (ID.get() == id) {
runnable.run() fn()
} }
} }
} }