re new_endpoint on new_topic from server

This commit is contained in:
Karmanyaah Malhotra 2023-12-16 15:01:32 -06:00
parent c15efff72c
commit a09374b2a5
7 changed files with 89 additions and 20 deletions

View file

@ -111,7 +111,7 @@ class ApiService {
unifiedPushTopics: String, unifiedPushTopics: String,
since: String?, since: String?,
user: User?, user: User?,
notify: (topic: String, Notification) -> Unit, notify: (Message) -> Unit,
fail: (Exception) -> Unit fail: (Exception) -> Unit
): Call { ): Call {
val sinceVal = since ?: "all" val sinceVal = since ?: "all"
@ -128,10 +128,8 @@ class ApiService {
val source = response.body?.source() ?: throw Exception("Unexpected response for $url: body is empty") val source = response.body?.source() ?: throw Exception("Unexpected response for $url: body is empty")
while (!source.exhausted()) { while (!source.exhausted()) {
val line = source.readUtf8Line() ?: throw Exception("Unexpected response for $url: line is null") val line = source.readUtf8Line() ?: throw Exception("Unexpected response for $url: line is null")
val notification = parser.parseWithTopic(line, notificationId = Random.nextInt(), subscriptionId = 0) // subscriptionId to be set downstream val message = parser.parseMessage(line)
if (notification != null) { if (message != null) notify(message)
notify(notification.topic, notification.notification)
}
} }
} catch (e: Exception) { } catch (e: Exception) {
Log.e(TAG, "Connection to $url failed (1): ${e.message}", e) Log.e(TAG, "Connection to $url failed (1): ${e.message}", e)
@ -175,6 +173,8 @@ class ApiService {
// These constants have corresponding values in the server codebase! // These constants have corresponding values in the server codebase!
const val CONTROL_TOPIC = "~control" const val CONTROL_TOPIC = "~control"
const val EVENT_OPEN_PARAM_NEW_TOPIC = "new_topic"
const val EVENT_OPEN = "open"
const val EVENT_MESSAGE = "message" const val EVENT_MESSAGE = "message"
const val EVENT_KEEPALIVE = "keepalive" const val EVENT_KEEPALIVE = "keepalive"
const val EVENT_POLL_REQUEST = "poll_request" const val EVENT_POLL_REQUEST = "poll_request"

View file

@ -16,7 +16,7 @@ data class Message(
val icon: String?, val icon: String?,
val actions: List<MessageAction>?, val actions: List<MessageAction>?,
val title: String?, val title: String?,
val message: String, val message: String?,
val encoding: String?, val encoding: String?,
val attachment: MessageAttachment?, val attachment: MessageAttachment?,
) )

View file

@ -6,6 +6,7 @@ import io.heckel.ntfy.db.Action
import io.heckel.ntfy.db.Attachment import io.heckel.ntfy.db.Attachment
import io.heckel.ntfy.db.Icon import io.heckel.ntfy.db.Icon
import io.heckel.ntfy.db.Notification import io.heckel.ntfy.db.Notification
import io.heckel.ntfy.util.Log
import io.heckel.ntfy.util.joinTags import io.heckel.ntfy.util.joinTags
import io.heckel.ntfy.util.toPriority import io.heckel.ntfy.util.toPriority
import java.lang.reflect.Type import java.lang.reflect.Type
@ -13,13 +14,20 @@ import java.lang.reflect.Type
class NotificationParser { class NotificationParser {
private val gson = Gson() private val gson = Gson()
fun parseMessage(s: String) : Message? {
val message= gson.fromJson(s, Message::class.java)
Log.d("HITAGME", message.toString())
return message
}
fun parse(s: String, subscriptionId: Long = 0, notificationId: Int = 0): Notification? { fun parse(s: String, subscriptionId: Long = 0, notificationId: Int = 0): Notification? {
val notificationWithTopic = parseWithTopic(s, subscriptionId = subscriptionId, notificationId = notificationId) val message = parseMessage(s) ?: return null
val notificationWithTopic = parseWithTopic(message, subscriptionId = subscriptionId, notificationId = notificationId)
return notificationWithTopic?.notification return notificationWithTopic?.notification
} }
fun parseWithTopic(s: String, subscriptionId: Long = 0, notificationId: Int = 0): NotificationWithTopic? { fun parseWithTopic(message: Message, subscriptionId: Long = 0, notificationId: Int = 0): NotificationWithTopic? {
val message = gson.fromJson(s, Message::class.java)
if (message.event != ApiService.EVENT_MESSAGE) { if (message.event != ApiService.EVENT_MESSAGE) {
return null return null
} }
@ -56,7 +64,7 @@ class NotificationParser {
subscriptionId = subscriptionId, subscriptionId = subscriptionId,
timestamp = message.time, timestamp = message.time,
title = message.title ?: "", title = message.title ?: "",
message = message.message, message = message.message?: "",
encoding = message.encoding ?: "", encoding = message.encoding ?: "",
priority = toPriority(message.priority), priority = toPriority(message.priority),
tags = joinTags(message.tags), tags = joinTags(message.tags),

View file

@ -3,10 +3,13 @@ package io.heckel.ntfy.service
import io.heckel.ntfy.db.* import io.heckel.ntfy.db.*
import io.heckel.ntfy.util.Log import io.heckel.ntfy.util.Log
import io.heckel.ntfy.msg.ApiService import io.heckel.ntfy.msg.ApiService
import io.heckel.ntfy.msg.Message
import io.heckel.ntfy.msg.NotificationParser
import io.heckel.ntfy.util.topicUrl import io.heckel.ntfy.util.topicUrl
import kotlinx.coroutines.* import kotlinx.coroutines.*
import okhttp3.Call import okhttp3.Call
import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.atomic.AtomicBoolean
import kotlin.random.Random
class JsonConnection( class JsonConnection(
private val connectionId: ConnectionId, private val connectionId: ConnectionId,
@ -15,11 +18,13 @@ class JsonConnection(
private val api: ApiService, private val api: ApiService,
private val user: User?, private val user: User?,
private val sinceId: String?, private val sinceId: String?,
private val connectionOpenListener: (ConnectionId, String?) -> Unit,
private val stateChangeListener: (Collection<Long>, ConnectionState) -> Unit, private val stateChangeListener: (Collection<Long>, ConnectionState) -> Unit,
private val notificationListener: (Subscription, Notification) -> Unit, private val notificationListener: (Subscription, Notification) -> Unit,
private val serviceActive: () -> Boolean private val serviceActive: () -> Boolean
) : Connection { ) : Connection {
private val baseUrl = connectionId.baseUrl private val baseUrl = connectionId.baseUrl
private val parser = NotificationParser()
private val topicsToSubscriptionIds = connectionId.topicsToSubscriptionIds private val topicsToSubscriptionIds = connectionId.topicsToSubscriptionIds
private val topicIsUnifiedPush = connectionId.topicIsUnifiedPush private val topicIsUnifiedPush = connectionId.topicIsUnifiedPush
private val subscriptionIds = topicsToSubscriptionIds.values private val subscriptionIds = topicsToSubscriptionIds.values
@ -40,12 +45,24 @@ class JsonConnection(
while (isActive && serviceActive()) { while (isActive && serviceActive()) {
Log.d(TAG, "[$url] (Re-)starting connection for subscriptions: $topicsToSubscriptionIds") Log.d(TAG, "[$url] (Re-)starting connection for subscriptions: $topicsToSubscriptionIds")
val startTime = System.currentTimeMillis() val startTime = System.currentTimeMillis()
val notify = notify@ { topic: String, notification: Notification -> val notify = notify@ { message : Message ->
since = notification.id if (message.event == ApiService.EVENT_OPEN) {
val subscriptionId = topicsToSubscriptionIds[topic] ?: return@notify connectionOpenListener(ConnectionId(baseUrl, topicsToSubscriptionIds, topicIsUnifiedPush), message.message)
val subscription = repository.getSubscription(subscriptionId) ?: return@notify return@notify
val notificationWithSubscriptionId = notification.copy(subscriptionId = subscription.id) }
notificationListener(subscription, notificationWithSubscriptionId) val (topic, notification) = parser.parseWithTopic(
message,
notificationId = Random.nextInt(),
subscriptionId = 0
) ?: return@notify // subscriptionId to be set downstream
since = notification.id
val subscriptionId = topicsToSubscriptionIds[topic] ?: return@notify
val subscription =
repository.getSubscription(subscriptionId) ?: return@notify
val notificationWithSubscriptionId =
notification.copy(subscriptionId = subscription.id)
notificationListener(subscription, notificationWithSubscriptionId)
} }
val failed = AtomicBoolean(false) val failed = AtomicBoolean(false)
val fail = { _: Exception -> val fail = { _: Exception ->

View file

@ -204,9 +204,9 @@ class SubscriberService : Service() {
val user = repository.getUser(connectionId.baseUrl) val user = repository.getUser(connectionId.baseUrl)
val connection = if (repository.getConnectionProtocol() == Repository.CONNECTION_PROTOCOL_WS) { val connection = if (repository.getConnectionProtocol() == Repository.CONNECTION_PROTOCOL_WS) {
val alarmManager = getSystemService(ALARM_SERVICE) as AlarmManager val alarmManager = getSystemService(ALARM_SERVICE) as AlarmManager
WsConnection(connectionId, repository, user, since, ::onStateChanged, ::onNotificationReceived, alarmManager) WsConnection(connectionId, repository, user, since, ::onConnectionOpen, ::onStateChanged, ::onNotificationReceived, alarmManager)
} else { } else {
JsonConnection(connectionId, scope, repository, api, user, since, ::onStateChanged, ::onNotificationReceived, serviceActive) JsonConnection(connectionId, scope, repository, api, user, since, ::onConnectionOpen, ::onStateChanged, ::onNotificationReceived, serviceActive)
} }
connections[connectionId] = connection connections[connectionId] = connection
connection.start() connection.start()
@ -247,6 +247,22 @@ class SubscriberService : Service() {
} }
} }
private fun onConnectionOpen(connectionId: ConnectionId, message: String?) {
Log.d(TAG, "Received open from connection to ${connectionId.baseUrl} with message: $message")
// TODO extract constant
if (message?.contains(ApiService.EVENT_OPEN_PARAM_NEW_TOPIC) == true) {
val connectionId = connectionId.copy() // TODO does this deep copy
GlobalScope.launch(Dispatchers.IO) {
for (topic in connectionId.topicsToSubscriptionIds.keys)
if (connectionId.topicIsUnifiedPush[topic] == true){
io.heckel.ntfy.up.BroadcastReceiver.sendRegistration(baseContext, connectionId.baseUrl, topic)
Log.d(TAG, "Attempting to re-register ${connectionId.baseUrl}/$topic")
}
// TODO is that the right context
// looks like it works???
}
}
}
private fun onStateChanged(subscriptionIds: Collection<Long>, state: ConnectionState) { private fun onStateChanged(subscriptionIds: Collection<Long>, state: ConnectionState) {
repository.updateState(subscriptionIds, state) repository.updateState(subscriptionIds, state)
} }

View file

@ -5,6 +5,7 @@ import android.os.Build
import android.os.Handler import android.os.Handler
import android.os.Looper import android.os.Looper
import io.heckel.ntfy.db.* import io.heckel.ntfy.db.*
import io.heckel.ntfy.msg.ApiService
import io.heckel.ntfy.msg.ApiService.Companion.requestBuilder import io.heckel.ntfy.msg.ApiService.Companion.requestBuilder
import io.heckel.ntfy.msg.NotificationParser import io.heckel.ntfy.msg.NotificationParser
import io.heckel.ntfy.util.Log import io.heckel.ntfy.util.Log
@ -35,6 +36,7 @@ class WsConnection(
private val repository: Repository, private val repository: Repository,
private val user: User?, private val user: User?,
private val sinceId: String?, private val sinceId: String?,
private val connectionOpenListener: (ConnectionId, String?) -> Unit,
private val stateChangeListener: (Collection<Long>, ConnectionState) -> Unit, private val stateChangeListener: (Collection<Long>, ConnectionState) -> Unit,
private val notificationListener: (Subscription, Notification) -> Unit, private val notificationListener: (Subscription, Notification) -> Unit,
private val alarmManager: AlarmManager private val alarmManager: AlarmManager
@ -137,7 +139,12 @@ class WsConnection(
override fun onMessage(webSocket: WebSocket, text: String) { override fun onMessage(webSocket: WebSocket, text: String) {
synchronize("onMessage") { synchronize("onMessage") {
Log.d(TAG, "$shortUrl (gid=$globalId, lid=$id): Received message: $text") Log.d(TAG, "$shortUrl (gid=$globalId, lid=$id): Received message: $text")
val notificationWithTopic = parser.parseWithTopic(text, subscriptionId = 0, notificationId = Random.nextInt()) val message = parser.parseMessage(text) ?: return@synchronize
if (message.event == ApiService.EVENT_OPEN){
connectionOpenListener(ConnectionId(baseUrl, topicsToSubscriptionIds, topicIsUnifiedPush), message.message)
return@synchronize
}
val notificationWithTopic = parser.parseWithTopic(message, subscriptionId = 0, notificationId = Random.nextInt())
if (notificationWithTopic == null) { if (notificationWithTopic == null) {
Log.d(TAG, "$shortUrl (gid=$globalId, lid=$id): Irrelevant or unknown message. Discarding.") Log.d(TAG, "$shortUrl (gid=$globalId, lid=$id): Irrelevant or unknown message. Discarding.")
return@synchronize return@synchronize

View file

@ -93,7 +93,8 @@ class BroadcastReceiver : android.content.BroadcastReceiver() {
try { try {
// Note, this may fail due to a SQL constraint exception, see https://github.com/binwiederhier/ntfy/issues/185 // Note, this may fail due to a SQL constraint exception, see https://github.com/binwiederhier/ntfy/issues/185
repository.addSubscription(subscription) repository.addSubscription(subscription)
distributor.sendEndpoint(appId, connectorToken, endpoint) //distributor.sendEndpoint(appId, connectorToken, endpoint)
//lets wait to subscribe, THEN, send
// Refresh (and maybe start) foreground service // Refresh (and maybe start) foreground service
SubscriberServiceManager.refresh(app) SubscriberServiceManager.refresh(app)
@ -143,5 +144,25 @@ class BroadcastReceiver : android.content.BroadcastReceiver() {
private const val TOPIC_RANDOM_ID_LENGTH = 12 private const val TOPIC_RANDOM_ID_LENGTH = 12
val mutex = Mutex() // https://github.com/binwiederhier/ntfy/issues/230 val mutex = Mutex() // https://github.com/binwiederhier/ntfy/issues/230
public fun sendRegistration(context: Context, baseUrl : String, topic : String) : Boolean {
val app = context.applicationContext as Application
val repository = app.repository
val distributor = Distributor(app)
GlobalScope.launch(Dispatchers.IO) {
// We're doing all of this inside a critical section, because of possible races.
// See https://github.com/binwiederhier/ntfy/issues/230 for details.
mutex.withLock {
val existingSubscription = repository.getSubscription(baseUrl, topic) ?: return@launch
val appId = existingSubscription.upAppId ?: return@launch
val connectorToken = existingSubscription.upConnectorToken ?: return@launch
val endpoint = topicUrlUp(existingSubscription.baseUrl, existingSubscription.topic)
Log.d(TAG, "Sending endpoint $endpoint to ${existingSubscription.upAppId}")
distributor.sendEndpoint(appId, connectorToken, endpoint)
}
}
return false // todo return result async somehow
}
} }
} }