diff --git a/app/src/main/java/io/heckel/ntfy/msg/ApiService.kt b/app/src/main/java/io/heckel/ntfy/msg/ApiService.kt index 2692c84..708ca71 100644 --- a/app/src/main/java/io/heckel/ntfy/msg/ApiService.kt +++ b/app/src/main/java/io/heckel/ntfy/msg/ApiService.kt @@ -111,7 +111,7 @@ class ApiService { unifiedPushTopics: String, since: String?, user: User?, - notify: (topic: String, Notification) -> Unit, + notify: (Message) -> Unit, fail: (Exception) -> Unit ): Call { val sinceVal = since ?: "all" @@ -128,10 +128,8 @@ class ApiService { val source = response.body?.source() ?: throw Exception("Unexpected response for $url: body is empty") while (!source.exhausted()) { val line = source.readUtf8Line() ?: throw Exception("Unexpected response for $url: line is null") - val notification = parser.parseWithTopic(line, notificationId = Random.nextInt(), subscriptionId = 0) // subscriptionId to be set downstream - if (notification != null) { - notify(notification.topic, notification.notification) - } + val message = parser.parseMessage(line) + if (message != null) notify(message) } } catch (e: Exception) { Log.e(TAG, "Connection to $url failed (1): ${e.message}", e) @@ -175,6 +173,8 @@ class ApiService { // These constants have corresponding values in the server codebase! const val CONTROL_TOPIC = "~control" + const val EVENT_OPEN_PARAM_NEW_TOPIC = "new_topic" + const val EVENT_OPEN = "open" const val EVENT_MESSAGE = "message" const val EVENT_KEEPALIVE = "keepalive" const val EVENT_POLL_REQUEST = "poll_request" diff --git a/app/src/main/java/io/heckel/ntfy/msg/Message.kt b/app/src/main/java/io/heckel/ntfy/msg/Message.kt index 8de0ab8..4573517 100644 --- a/app/src/main/java/io/heckel/ntfy/msg/Message.kt +++ b/app/src/main/java/io/heckel/ntfy/msg/Message.kt @@ -16,7 +16,7 @@ data class Message( val icon: String?, val actions: List?, val title: String?, - val message: String, + val message: String?, val encoding: String?, val attachment: MessageAttachment?, ) diff --git a/app/src/main/java/io/heckel/ntfy/msg/NotificationParser.kt b/app/src/main/java/io/heckel/ntfy/msg/NotificationParser.kt index 81a60db..be7d07a 100644 --- a/app/src/main/java/io/heckel/ntfy/msg/NotificationParser.kt +++ b/app/src/main/java/io/heckel/ntfy/msg/NotificationParser.kt @@ -6,6 +6,7 @@ import io.heckel.ntfy.db.Action import io.heckel.ntfy.db.Attachment import io.heckel.ntfy.db.Icon import io.heckel.ntfy.db.Notification +import io.heckel.ntfy.util.Log import io.heckel.ntfy.util.joinTags import io.heckel.ntfy.util.toPriority import java.lang.reflect.Type @@ -13,13 +14,20 @@ import java.lang.reflect.Type class NotificationParser { private val gson = Gson() + fun parseMessage(s: String) : Message? { + + val message= gson.fromJson(s, Message::class.java) + Log.d("HITAGME", message.toString()) + return message + } + fun parse(s: String, subscriptionId: Long = 0, notificationId: Int = 0): Notification? { - val notificationWithTopic = parseWithTopic(s, subscriptionId = subscriptionId, notificationId = notificationId) + val message = parseMessage(s) ?: return null + val notificationWithTopic = parseWithTopic(message, subscriptionId = subscriptionId, notificationId = notificationId) return notificationWithTopic?.notification } - fun parseWithTopic(s: String, subscriptionId: Long = 0, notificationId: Int = 0): NotificationWithTopic? { - val message = gson.fromJson(s, Message::class.java) + fun parseWithTopic(message: Message, subscriptionId: Long = 0, notificationId: Int = 0): NotificationWithTopic? { if (message.event != ApiService.EVENT_MESSAGE) { return null } @@ -56,7 +64,7 @@ class NotificationParser { subscriptionId = subscriptionId, timestamp = message.time, title = message.title ?: "", - message = message.message, + message = message.message?: "", encoding = message.encoding ?: "", priority = toPriority(message.priority), tags = joinTags(message.tags), diff --git a/app/src/main/java/io/heckel/ntfy/service/JsonConnection.kt b/app/src/main/java/io/heckel/ntfy/service/JsonConnection.kt index 39fa008..fcb9dc1 100644 --- a/app/src/main/java/io/heckel/ntfy/service/JsonConnection.kt +++ b/app/src/main/java/io/heckel/ntfy/service/JsonConnection.kt @@ -3,10 +3,13 @@ package io.heckel.ntfy.service import io.heckel.ntfy.db.* import io.heckel.ntfy.util.Log import io.heckel.ntfy.msg.ApiService +import io.heckel.ntfy.msg.Message +import io.heckel.ntfy.msg.NotificationParser import io.heckel.ntfy.util.topicUrl import kotlinx.coroutines.* import okhttp3.Call import java.util.concurrent.atomic.AtomicBoolean +import kotlin.random.Random class JsonConnection( private val connectionId: ConnectionId, @@ -15,11 +18,13 @@ class JsonConnection( private val api: ApiService, private val user: User?, private val sinceId: String?, + private val connectionOpenListener: (ConnectionId, String?) -> Unit, private val stateChangeListener: (Collection, ConnectionState) -> Unit, private val notificationListener: (Subscription, Notification) -> Unit, private val serviceActive: () -> Boolean ) : Connection { private val baseUrl = connectionId.baseUrl + private val parser = NotificationParser() private val topicsToSubscriptionIds = connectionId.topicsToSubscriptionIds private val topicIsUnifiedPush = connectionId.topicIsUnifiedPush private val subscriptionIds = topicsToSubscriptionIds.values @@ -40,12 +45,24 @@ class JsonConnection( while (isActive && serviceActive()) { Log.d(TAG, "[$url] (Re-)starting connection for subscriptions: $topicsToSubscriptionIds") val startTime = System.currentTimeMillis() - val notify = notify@ { topic: String, notification: Notification -> - since = notification.id - val subscriptionId = topicsToSubscriptionIds[topic] ?: return@notify - val subscription = repository.getSubscription(subscriptionId) ?: return@notify - val notificationWithSubscriptionId = notification.copy(subscriptionId = subscription.id) - notificationListener(subscription, notificationWithSubscriptionId) + val notify = notify@ { message : Message -> + if (message.event == ApiService.EVENT_OPEN) { + connectionOpenListener(ConnectionId(baseUrl, topicsToSubscriptionIds, topicIsUnifiedPush), message.message) + return@notify + } + val (topic, notification) = parser.parseWithTopic( + message, + notificationId = Random.nextInt(), + subscriptionId = 0 + ) ?: return@notify // subscriptionId to be set downstream + since = notification.id + val subscriptionId = topicsToSubscriptionIds[topic] ?: return@notify + val subscription = + repository.getSubscription(subscriptionId) ?: return@notify + val notificationWithSubscriptionId = + notification.copy(subscriptionId = subscription.id) + notificationListener(subscription, notificationWithSubscriptionId) + } val failed = AtomicBoolean(false) val fail = { _: Exception -> diff --git a/app/src/main/java/io/heckel/ntfy/service/SubscriberService.kt b/app/src/main/java/io/heckel/ntfy/service/SubscriberService.kt index 192cfc9..f194d5f 100644 --- a/app/src/main/java/io/heckel/ntfy/service/SubscriberService.kt +++ b/app/src/main/java/io/heckel/ntfy/service/SubscriberService.kt @@ -204,9 +204,9 @@ class SubscriberService : Service() { val user = repository.getUser(connectionId.baseUrl) val connection = if (repository.getConnectionProtocol() == Repository.CONNECTION_PROTOCOL_WS) { val alarmManager = getSystemService(ALARM_SERVICE) as AlarmManager - WsConnection(connectionId, repository, user, since, ::onStateChanged, ::onNotificationReceived, alarmManager) + WsConnection(connectionId, repository, user, since, ::onConnectionOpen, ::onStateChanged, ::onNotificationReceived, alarmManager) } else { - JsonConnection(connectionId, scope, repository, api, user, since, ::onStateChanged, ::onNotificationReceived, serviceActive) + JsonConnection(connectionId, scope, repository, api, user, since, ::onConnectionOpen, ::onStateChanged, ::onNotificationReceived, serviceActive) } connections[connectionId] = connection connection.start() @@ -247,6 +247,22 @@ class SubscriberService : Service() { } } + private fun onConnectionOpen(connectionId: ConnectionId, message: String?) { + Log.d(TAG, "Received open from connection to ${connectionId.baseUrl} with message: $message") + // TODO extract constant + if (message?.contains(ApiService.EVENT_OPEN_PARAM_NEW_TOPIC) == true) { + val connectionId = connectionId.copy() // TODO does this deep copy + GlobalScope.launch(Dispatchers.IO) { + for (topic in connectionId.topicsToSubscriptionIds.keys) + if (connectionId.topicIsUnifiedPush[topic] == true){ + io.heckel.ntfy.up.BroadcastReceiver.sendRegistration(baseContext, connectionId.baseUrl, topic) + Log.d(TAG, "Attempting to re-register ${connectionId.baseUrl}/$topic") + } + // TODO is that the right context + // looks like it works??? + } + } + } private fun onStateChanged(subscriptionIds: Collection, state: ConnectionState) { repository.updateState(subscriptionIds, state) } diff --git a/app/src/main/java/io/heckel/ntfy/service/WsConnection.kt b/app/src/main/java/io/heckel/ntfy/service/WsConnection.kt index 080e848..d71d182 100644 --- a/app/src/main/java/io/heckel/ntfy/service/WsConnection.kt +++ b/app/src/main/java/io/heckel/ntfy/service/WsConnection.kt @@ -5,6 +5,7 @@ import android.os.Build import android.os.Handler import android.os.Looper import io.heckel.ntfy.db.* +import io.heckel.ntfy.msg.ApiService import io.heckel.ntfy.msg.ApiService.Companion.requestBuilder import io.heckel.ntfy.msg.NotificationParser import io.heckel.ntfy.util.Log @@ -35,6 +36,7 @@ class WsConnection( private val repository: Repository, private val user: User?, private val sinceId: String?, + private val connectionOpenListener: (ConnectionId, String?) -> Unit, private val stateChangeListener: (Collection, ConnectionState) -> Unit, private val notificationListener: (Subscription, Notification) -> Unit, private val alarmManager: AlarmManager @@ -137,7 +139,12 @@ class WsConnection( override fun onMessage(webSocket: WebSocket, text: String) { synchronize("onMessage") { Log.d(TAG, "$shortUrl (gid=$globalId, lid=$id): Received message: $text") - val notificationWithTopic = parser.parseWithTopic(text, subscriptionId = 0, notificationId = Random.nextInt()) + val message = parser.parseMessage(text) ?: return@synchronize + if (message.event == ApiService.EVENT_OPEN){ + connectionOpenListener(ConnectionId(baseUrl, topicsToSubscriptionIds, topicIsUnifiedPush), message.message) + return@synchronize + } + val notificationWithTopic = parser.parseWithTopic(message, subscriptionId = 0, notificationId = Random.nextInt()) if (notificationWithTopic == null) { Log.d(TAG, "$shortUrl (gid=$globalId, lid=$id): Irrelevant or unknown message. Discarding.") return@synchronize diff --git a/app/src/main/java/io/heckel/ntfy/up/BroadcastReceiver.kt b/app/src/main/java/io/heckel/ntfy/up/BroadcastReceiver.kt index 46a0bcb..359b757 100644 --- a/app/src/main/java/io/heckel/ntfy/up/BroadcastReceiver.kt +++ b/app/src/main/java/io/heckel/ntfy/up/BroadcastReceiver.kt @@ -93,7 +93,8 @@ class BroadcastReceiver : android.content.BroadcastReceiver() { try { // Note, this may fail due to a SQL constraint exception, see https://github.com/binwiederhier/ntfy/issues/185 repository.addSubscription(subscription) - distributor.sendEndpoint(appId, connectorToken, endpoint) + //distributor.sendEndpoint(appId, connectorToken, endpoint) +//lets wait to subscribe, THEN, send // Refresh (and maybe start) foreground service SubscriberServiceManager.refresh(app) @@ -143,5 +144,25 @@ class BroadcastReceiver : android.content.BroadcastReceiver() { private const val TOPIC_RANDOM_ID_LENGTH = 12 val mutex = Mutex() // https://github.com/binwiederhier/ntfy/issues/230 + public fun sendRegistration(context: Context, baseUrl : String, topic : String) : Boolean { + val app = context.applicationContext as Application + val repository = app.repository + val distributor = Distributor(app) + GlobalScope.launch(Dispatchers.IO) { + // We're doing all of this inside a critical section, because of possible races. + // See https://github.com/binwiederhier/ntfy/issues/230 for details. + + mutex.withLock { + val existingSubscription = repository.getSubscription(baseUrl, topic) ?: return@launch + val appId = existingSubscription.upAppId ?: return@launch + val connectorToken = existingSubscription.upConnectorToken ?: return@launch + val endpoint = topicUrlUp(existingSubscription.baseUrl, existingSubscription.topic) + Log.d(TAG, "Sending endpoint $endpoint to ${existingSubscription.upAppId}") + distributor.sendEndpoint(appId, connectorToken, endpoint) + } + } + return false // todo return result async somehow + } + } }