diff --git a/app/src/main/java/io/heckel/ntfy/msg/ApiService.kt b/app/src/main/java/io/heckel/ntfy/msg/ApiService.kt index 54c48ea..2692c84 100644 --- a/app/src/main/java/io/heckel/ntfy/msg/ApiService.kt +++ b/app/src/main/java/io/heckel/ntfy/msg/ApiService.kt @@ -108,6 +108,7 @@ class ApiService { fun subscribe( baseUrl: String, topics: String, + unifiedPushTopics: String, since: String?, user: User?, notify: (topic: String, Notification) -> Unit, @@ -116,7 +117,7 @@ class ApiService { val sinceVal = since ?: "all" val url = topicUrlJson(baseUrl, topics, sinceVal) Log.d(TAG, "Opening subscription connection to $url") - val request = requestBuilder(url, user).build() + val request = requestBuilder(url, user, unifiedPushTopics).build() val call = subscriberClient.newCall(request) call.enqueue(object : Callback { override fun onResponse(call: Call, response: Response) { @@ -178,13 +179,16 @@ class ApiService { const val EVENT_KEEPALIVE = "keepalive" const val EVENT_POLL_REQUEST = "poll_request" - fun requestBuilder(url: String, user: User?): Request.Builder { + fun requestBuilder(url: String, user: User?, unifiedPushTopics: String? = null): Request.Builder { val builder = Request.Builder() .url(url) .addHeader("User-Agent", USER_AGENT) if (user != null) { builder.addHeader("Authorization", Credentials.basic(user.username, user.password, UTF_8)) } + if (unifiedPushTopics != null) { + builder.addHeader("Rate-Topics", unifiedPushTopics) + } return builder } } diff --git a/app/src/main/java/io/heckel/ntfy/service/Connection.kt b/app/src/main/java/io/heckel/ntfy/service/Connection.kt index 71a98be..dc151e5 100644 --- a/app/src/main/java/io/heckel/ntfy/service/Connection.kt +++ b/app/src/main/java/io/heckel/ntfy/service/Connection.kt @@ -8,5 +8,6 @@ interface Connection { data class ConnectionId( val baseUrl: String, - val topicsToSubscriptionIds: Map + val topicsToSubscriptionIds: Map, + val topicIsUnifiedPush: Map ) diff --git a/app/src/main/java/io/heckel/ntfy/service/JsonConnection.kt b/app/src/main/java/io/heckel/ntfy/service/JsonConnection.kt index 8bca688..39fa008 100644 --- a/app/src/main/java/io/heckel/ntfy/service/JsonConnection.kt +++ b/app/src/main/java/io/heckel/ntfy/service/JsonConnection.kt @@ -21,8 +21,10 @@ class JsonConnection( ) : Connection { private val baseUrl = connectionId.baseUrl private val topicsToSubscriptionIds = connectionId.topicsToSubscriptionIds + private val topicIsUnifiedPush = connectionId.topicIsUnifiedPush private val subscriptionIds = topicsToSubscriptionIds.values private val topicsStr = topicsToSubscriptionIds.keys.joinToString(separator = ",") + private val unifiedPushTopicsStr = topicIsUnifiedPush.filter { entry -> entry.value }.keys.joinToString(separator = ",") private val url = topicUrl(baseUrl, topicsStr) private var since: String? = sinceId @@ -56,7 +58,7 @@ class JsonConnection( // Call /json subscribe endpoint and loop until the call fails, is canceled, // or the job or service are cancelled/stopped try { - call = api.subscribe(baseUrl, topicsStr, since, user, notify, fail) + call = api.subscribe(baseUrl, topicsStr, unifiedPushTopicsStr, since, user, notify, fail) while (!failed.get() && !call.isCanceled() && isActive && serviceActive()) { stateChangeListener(subscriptionIds, ConnectionState.CONNECTED) Log.d(TAG,"[$url] Connection is active (failed=$failed, callCanceled=${call.isCanceled()}, jobActive=$isActive, serviceStarted=${serviceActive()}") diff --git a/app/src/main/java/io/heckel/ntfy/service/SubscriberService.kt b/app/src/main/java/io/heckel/ntfy/service/SubscriberService.kt index 2f59aa6..192cfc9 100644 --- a/app/src/main/java/io/heckel/ntfy/service/SubscriberService.kt +++ b/app/src/main/java/io/heckel/ntfy/service/SubscriberService.kt @@ -172,8 +172,8 @@ class SubscriberService : Service() { .filter { s -> s.instant } val activeConnectionIds = connections.keys().toList().toSet() val desiredConnectionIds = instantSubscriptions // Set - .groupBy { s -> ConnectionId(s.baseUrl, emptyMap()) } - .map { entry -> entry.key.copy(topicsToSubscriptionIds = entry.value.associate { s -> s.topic to s.id }) } + .groupBy { s -> ConnectionId(s.baseUrl, emptyMap(), emptyMap()) } + .map { entry -> entry.key.copy(topicsToSubscriptionIds = entry.value.associate { s -> s.topic to s.id }, topicIsUnifiedPush = entry.value.associate { s -> s.topic to (s.upConnectorToken != null) }) } .toSet() val newConnectionIds = desiredConnectionIds.subtract(activeConnectionIds) val obsoleteConnectionIds = activeConnectionIds.subtract(desiredConnectionIds) diff --git a/app/src/main/java/io/heckel/ntfy/service/WsConnection.kt b/app/src/main/java/io/heckel/ntfy/service/WsConnection.kt index 1f168d3..080e848 100644 --- a/app/src/main/java/io/heckel/ntfy/service/WsConnection.kt +++ b/app/src/main/java/io/heckel/ntfy/service/WsConnection.kt @@ -56,8 +56,10 @@ class WsConnection( private val since = AtomicReference(sinceId) private val baseUrl = connectionId.baseUrl private val topicsToSubscriptionIds = connectionId.topicsToSubscriptionIds + private val topicIsUnifiedPush = connectionId.topicIsUnifiedPush private val subscriptionIds = topicsToSubscriptionIds.values private val topicsStr = topicsToSubscriptionIds.keys.joinToString(separator = ",") + private val unifiedPushTopicsStr = topicIsUnifiedPush.filter { entry -> entry.value }.keys.joinToString(separator = ",") private val shortUrl = topicShortUrl(baseUrl, topicsStr) init { @@ -78,7 +80,7 @@ class WsConnection( val sinceId = since.get() val sinceVal = sinceId ?: "all" val urlWithSince = topicUrlWs(baseUrl, topicsStr, sinceVal) - val request = requestBuilder(urlWithSince, user).build() + val request = requestBuilder(urlWithSince, user, unifiedPushTopicsStr).build() Log.d(TAG, "$shortUrl (gid=$globalId): Opening $urlWithSince with listener ID $nextListenerId ...") webSocket = client.newWebSocket(request, Listener(nextListenerId)) }