From 6333a063a13d7a01797ea40ccd1031bcd3025045 Mon Sep 17 00:00:00 2001 From: Philipp Heckel Date: Sun, 19 Jun 2022 15:19:01 -0400 Subject: [PATCH] Switch to since= for WsConnection/JsonConnection --- .../java/io/heckel/ntfy/msg/ApiService.kt | 4 ++-- .../java/io/heckel/ntfy/service/Connection.kt | 2 +- .../io/heckel/ntfy/service/JsonConnection.kt | 8 ++++---- .../heckel/ntfy/service/SubscriberService.kt | 12 +++--------- .../io/heckel/ntfy/service/WsConnection.kt | 19 ++++++++++++------- 5 files changed, 22 insertions(+), 23 deletions(-) diff --git a/app/src/main/java/io/heckel/ntfy/msg/ApiService.kt b/app/src/main/java/io/heckel/ntfy/msg/ApiService.kt index c541eba..64baa45 100644 --- a/app/src/main/java/io/heckel/ntfy/msg/ApiService.kt +++ b/app/src/main/java/io/heckel/ntfy/msg/ApiService.kt @@ -108,12 +108,12 @@ class ApiService { fun subscribe( baseUrl: String, topics: String, - since: Long, + since: String?, user: User?, notify: (topic: String, Notification) -> Unit, fail: (Exception) -> Unit ): Call { - val sinceVal = if (since == 0L) "all" else since.toString() + val sinceVal = since ?: "all" val url = topicUrlJson(baseUrl, topics, sinceVal) Log.d(TAG, "Opening subscription connection to $url") val request = requestBuilder(url, user).build() diff --git a/app/src/main/java/io/heckel/ntfy/service/Connection.kt b/app/src/main/java/io/heckel/ntfy/service/Connection.kt index a8dfed1..71a98be 100644 --- a/app/src/main/java/io/heckel/ntfy/service/Connection.kt +++ b/app/src/main/java/io/heckel/ntfy/service/Connection.kt @@ -3,7 +3,7 @@ package io.heckel.ntfy.service interface Connection { fun start() fun close() - fun since(): Long + fun since(): String? } data class ConnectionId( diff --git a/app/src/main/java/io/heckel/ntfy/service/JsonConnection.kt b/app/src/main/java/io/heckel/ntfy/service/JsonConnection.kt index 9ccb93e..8bca688 100644 --- a/app/src/main/java/io/heckel/ntfy/service/JsonConnection.kt +++ b/app/src/main/java/io/heckel/ntfy/service/JsonConnection.kt @@ -14,7 +14,7 @@ class JsonConnection( private val repository: Repository, private val api: ApiService, private val user: User?, - private val sinceTime: Long, + private val sinceId: String?, private val stateChangeListener: (Collection, ConnectionState) -> Unit, private val notificationListener: (Subscription, Notification) -> Unit, private val serviceActive: () -> Boolean @@ -25,7 +25,7 @@ class JsonConnection( private val topicsStr = topicsToSubscriptionIds.keys.joinToString(separator = ",") private val url = topicUrl(baseUrl, topicsStr) - private var since: Long = sinceTime + private var since: String? = sinceId private lateinit var call: Call private lateinit var job: Job @@ -39,7 +39,7 @@ class JsonConnection( Log.d(TAG, "[$url] (Re-)starting connection for subscriptions: $topicsToSubscriptionIds") val startTime = System.currentTimeMillis() val notify = notify@ { topic: String, notification: Notification -> - since = notification.timestamp + since = notification.id val subscriptionId = topicsToSubscriptionIds[topic] ?: return@notify val subscription = repository.getSubscription(subscriptionId) ?: return@notify val notificationWithSubscriptionId = notification.copy(subscriptionId = subscription.id) @@ -81,7 +81,7 @@ class JsonConnection( } } - override fun since(): Long { + override fun since(): String? { return since } diff --git a/app/src/main/java/io/heckel/ntfy/service/SubscriberService.kt b/app/src/main/java/io/heckel/ntfy/service/SubscriberService.kt index b453e4d..634dfe3 100644 --- a/app/src/main/java/io/heckel/ntfy/service/SubscriberService.kt +++ b/app/src/main/java/io/heckel/ntfy/service/SubscriberService.kt @@ -178,14 +178,8 @@ class SubscriberService : Service() { val newConnectionIds = desiredConnectionIds.subtract(activeConnectionIds) val obsoleteConnectionIds = activeConnectionIds.subtract(desiredConnectionIds) val match = activeConnectionIds == desiredConnectionIds - val newSinceByBaseUrl = connections - .map { e -> - // Get last message timestamp to determine new ?since= param; set to $last+1 if it - // is defined to avoid retrieving old messages. See comment below too. - val lastMessage = e.value.since() - val newSince = if (lastMessage > 0) lastMessage+1 else 0 - e.key.baseUrl to newSince - } + val sinceByBaseUrl = connections + .map { e -> e.key.baseUrl to e.value.since() } // Use since=, avoid retrieving old messages (see comment below) .toMap() Log.d(TAG, "Refreshing subscriptions") @@ -205,7 +199,7 @@ class SubscriberService : Service() { // IMPORTANT: Do NOT request old messages for new connections; we call poll() in MainActivity to // retrieve old messages. This is important, so we don't download attachments from old messages. - val since = newSinceByBaseUrl[connectionId.baseUrl] ?: (System.currentTimeMillis() / 1000) + val since = sinceByBaseUrl[connectionId.baseUrl] ?: "none" val serviceActive = { -> isServiceStarted } val user = repository.getUser(connectionId.baseUrl) val connection = if (repository.getConnectionProtocol() == Repository.CONNECTION_PROTOCOL_WS) { diff --git a/app/src/main/java/io/heckel/ntfy/service/WsConnection.kt b/app/src/main/java/io/heckel/ntfy/service/WsConnection.kt index a79ce07..1f168d3 100644 --- a/app/src/main/java/io/heckel/ntfy/service/WsConnection.kt +++ b/app/src/main/java/io/heckel/ntfy/service/WsConnection.kt @@ -5,15 +5,19 @@ import android.os.Build import android.os.Handler import android.os.Looper import io.heckel.ntfy.db.* -import io.heckel.ntfy.util.Log import io.heckel.ntfy.msg.ApiService.Companion.requestBuilder import io.heckel.ntfy.msg.NotificationParser +import io.heckel.ntfy.util.Log import io.heckel.ntfy.util.topicShortUrl import io.heckel.ntfy.util.topicUrlWs -import okhttp3.* +import okhttp3.OkHttpClient +import okhttp3.Response +import okhttp3.WebSocket +import okhttp3.WebSocketListener import java.util.* import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicLong +import java.util.concurrent.atomic.AtomicReference import kotlin.random.Random /** @@ -30,7 +34,7 @@ class WsConnection( private val connectionId: ConnectionId, private val repository: Repository, private val user: User?, - private val sinceTime: Long, + private val sinceId: String?, private val stateChangeListener: (Collection, ConnectionState) -> Unit, private val notificationListener: (Subscription, Notification) -> Unit, private val alarmManager: AlarmManager @@ -49,7 +53,7 @@ class WsConnection( private val globalId = GLOBAL_ID.incrementAndGet() private val listenerId = AtomicLong(0) - private val since = AtomicLong(sinceTime) + private val since = AtomicReference(sinceId) private val baseUrl = connectionId.baseUrl private val topicsToSubscriptionIds = connectionId.topicsToSubscriptionIds private val subscriptionIds = topicsToSubscriptionIds.values @@ -71,7 +75,8 @@ class WsConnection( } state = State.Connecting val nextListenerId = listenerId.incrementAndGet() - val sinceVal = if (since.get() == 0L) "all" else since.get().toString() + val sinceId = since.get() + val sinceVal = sinceId ?: "all" val urlWithSince = topicUrlWs(baseUrl, topicsStr, sinceVal) val request = requestBuilder(urlWithSince, user).build() Log.d(TAG, "$shortUrl (gid=$globalId): Opening $urlWithSince with listener ID $nextListenerId ...") @@ -92,7 +97,7 @@ class WsConnection( } @Synchronized - override fun since(): Long { + override fun since(): String? { return since.get() } @@ -141,7 +146,7 @@ class WsConnection( val subscription = repository.getSubscription(subscriptionId) ?: return@synchronize val notificationWithSubscriptionId = notification.copy(subscriptionId = subscription.id) notificationListener(subscription, notificationWithSubscriptionId) - since.set(notification.timestamp) + since.set(notification.id) } }