Switch to since=<id> for WsConnection/JsonConnection

This commit is contained in:
Philipp Heckel 2022-06-19 15:19:01 -04:00
parent 2324352a50
commit 6333a063a1
5 changed files with 22 additions and 23 deletions

View file

@ -108,12 +108,12 @@ class ApiService {
fun subscribe(
baseUrl: String,
topics: String,
since: Long,
since: String?,
user: User?,
notify: (topic: String, Notification) -> Unit,
fail: (Exception) -> Unit
): Call {
val sinceVal = if (since == 0L) "all" else since.toString()
val sinceVal = since ?: "all"
val url = topicUrlJson(baseUrl, topics, sinceVal)
Log.d(TAG, "Opening subscription connection to $url")
val request = requestBuilder(url, user).build()

View file

@ -3,7 +3,7 @@ package io.heckel.ntfy.service
interface Connection {
fun start()
fun close()
fun since(): Long
fun since(): String?
}
data class ConnectionId(

View file

@ -14,7 +14,7 @@ class JsonConnection(
private val repository: Repository,
private val api: ApiService,
private val user: User?,
private val sinceTime: Long,
private val sinceId: String?,
private val stateChangeListener: (Collection<Long>, ConnectionState) -> Unit,
private val notificationListener: (Subscription, Notification) -> Unit,
private val serviceActive: () -> Boolean
@ -25,7 +25,7 @@ class JsonConnection(
private val topicsStr = topicsToSubscriptionIds.keys.joinToString(separator = ",")
private val url = topicUrl(baseUrl, topicsStr)
private var since: Long = sinceTime
private var since: String? = sinceId
private lateinit var call: Call
private lateinit var job: Job
@ -39,7 +39,7 @@ class JsonConnection(
Log.d(TAG, "[$url] (Re-)starting connection for subscriptions: $topicsToSubscriptionIds")
val startTime = System.currentTimeMillis()
val notify = notify@ { topic: String, notification: Notification ->
since = notification.timestamp
since = notification.id
val subscriptionId = topicsToSubscriptionIds[topic] ?: return@notify
val subscription = repository.getSubscription(subscriptionId) ?: return@notify
val notificationWithSubscriptionId = notification.copy(subscriptionId = subscription.id)
@ -81,7 +81,7 @@ class JsonConnection(
}
}
override fun since(): Long {
override fun since(): String? {
return since
}

View file

@ -178,14 +178,8 @@ class SubscriberService : Service() {
val newConnectionIds = desiredConnectionIds.subtract(activeConnectionIds)
val obsoleteConnectionIds = activeConnectionIds.subtract(desiredConnectionIds)
val match = activeConnectionIds == desiredConnectionIds
val newSinceByBaseUrl = connections
.map { e ->
// Get last message timestamp to determine new ?since= param; set to $last+1 if it
// is defined to avoid retrieving old messages. See comment below too.
val lastMessage = e.value.since()
val newSince = if (lastMessage > 0) lastMessage+1 else 0
e.key.baseUrl to newSince
}
val sinceByBaseUrl = connections
.map { e -> e.key.baseUrl to e.value.since() } // Use since=<id>, avoid retrieving old messages (see comment below)
.toMap()
Log.d(TAG, "Refreshing subscriptions")
@ -205,7 +199,7 @@ class SubscriberService : Service() {
// IMPORTANT: Do NOT request old messages for new connections; we call poll() in MainActivity to
// retrieve old messages. This is important, so we don't download attachments from old messages.
val since = newSinceByBaseUrl[connectionId.baseUrl] ?: (System.currentTimeMillis() / 1000)
val since = sinceByBaseUrl[connectionId.baseUrl] ?: "none"
val serviceActive = { -> isServiceStarted }
val user = repository.getUser(connectionId.baseUrl)
val connection = if (repository.getConnectionProtocol() == Repository.CONNECTION_PROTOCOL_WS) {

View file

@ -5,15 +5,19 @@ import android.os.Build
import android.os.Handler
import android.os.Looper
import io.heckel.ntfy.db.*
import io.heckel.ntfy.util.Log
import io.heckel.ntfy.msg.ApiService.Companion.requestBuilder
import io.heckel.ntfy.msg.NotificationParser
import io.heckel.ntfy.util.Log
import io.heckel.ntfy.util.topicShortUrl
import io.heckel.ntfy.util.topicUrlWs
import okhttp3.*
import okhttp3.OkHttpClient
import okhttp3.Response
import okhttp3.WebSocket
import okhttp3.WebSocketListener
import java.util.*
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.atomic.AtomicReference
import kotlin.random.Random
/**
@ -30,7 +34,7 @@ class WsConnection(
private val connectionId: ConnectionId,
private val repository: Repository,
private val user: User?,
private val sinceTime: Long,
private val sinceId: String?,
private val stateChangeListener: (Collection<Long>, ConnectionState) -> Unit,
private val notificationListener: (Subscription, Notification) -> Unit,
private val alarmManager: AlarmManager
@ -49,7 +53,7 @@ class WsConnection(
private val globalId = GLOBAL_ID.incrementAndGet()
private val listenerId = AtomicLong(0)
private val since = AtomicLong(sinceTime)
private val since = AtomicReference<String?>(sinceId)
private val baseUrl = connectionId.baseUrl
private val topicsToSubscriptionIds = connectionId.topicsToSubscriptionIds
private val subscriptionIds = topicsToSubscriptionIds.values
@ -71,7 +75,8 @@ class WsConnection(
}
state = State.Connecting
val nextListenerId = listenerId.incrementAndGet()
val sinceVal = if (since.get() == 0L) "all" else since.get().toString()
val sinceId = since.get()
val sinceVal = sinceId ?: "all"
val urlWithSince = topicUrlWs(baseUrl, topicsStr, sinceVal)
val request = requestBuilder(urlWithSince, user).build()
Log.d(TAG, "$shortUrl (gid=$globalId): Opening $urlWithSince with listener ID $nextListenerId ...")
@ -92,7 +97,7 @@ class WsConnection(
}
@Synchronized
override fun since(): Long {
override fun since(): String? {
return since.get()
}
@ -141,7 +146,7 @@ class WsConnection(
val subscription = repository.getSubscription(subscriptionId) ?: return@synchronize
val notificationWithSubscriptionId = notification.copy(subscriptionId = subscription.id)
notificationListener(subscription, notificationWithSubscriptionId)
since.set(notification.timestamp)
since.set(notification.id)
}
}