mirror of
https://github.com/binwiederhier/ntfy-android.git
synced 2024-05-17 11:02:36 +12:00
Merge 61f57dd997
into c15efff72c
This commit is contained in:
commit
fcf28f389c
|
@ -11,7 +11,6 @@ import java.io.IOException
|
|||
import java.net.URLEncoder
|
||||
import java.nio.charset.StandardCharsets.UTF_8
|
||||
import java.util.concurrent.TimeUnit
|
||||
import kotlin.random.Random
|
||||
|
||||
class ApiService {
|
||||
private val client = OkHttpClient.Builder()
|
||||
|
@ -111,7 +110,7 @@ class ApiService {
|
|||
unifiedPushTopics: String,
|
||||
since: String?,
|
||||
user: User?,
|
||||
notify: (topic: String, Notification) -> Unit,
|
||||
notify: (Message) -> Unit,
|
||||
fail: (Exception) -> Unit
|
||||
): Call {
|
||||
val sinceVal = since ?: "all"
|
||||
|
@ -128,10 +127,8 @@ class ApiService {
|
|||
val source = response.body?.source() ?: throw Exception("Unexpected response for $url: body is empty")
|
||||
while (!source.exhausted()) {
|
||||
val line = source.readUtf8Line() ?: throw Exception("Unexpected response for $url: line is null")
|
||||
val notification = parser.parseWithTopic(line, notificationId = Random.nextInt(), subscriptionId = 0) // subscriptionId to be set downstream
|
||||
if (notification != null) {
|
||||
notify(notification.topic, notification.notification)
|
||||
}
|
||||
val message = parser.parseMessage(line)
|
||||
if (message != null) notify(message)
|
||||
}
|
||||
} catch (e: Exception) {
|
||||
Log.e(TAG, "Connection to $url failed (1): ${e.message}", e)
|
||||
|
@ -175,6 +172,8 @@ class ApiService {
|
|||
|
||||
// These constants have corresponding values in the server codebase!
|
||||
const val CONTROL_TOPIC = "~control"
|
||||
const val EVENT_OPEN_PARAM_NEW_TOPIC = "new_topic"
|
||||
const val EVENT_OPEN = "open"
|
||||
const val EVENT_MESSAGE = "message"
|
||||
const val EVENT_KEEPALIVE = "keepalive"
|
||||
const val EVENT_POLL_REQUEST = "poll_request"
|
||||
|
|
|
@ -16,7 +16,7 @@ data class Message(
|
|||
val icon: String?,
|
||||
val actions: List<MessageAction>?,
|
||||
val title: String?,
|
||||
val message: String,
|
||||
val message: String?,
|
||||
val encoding: String?,
|
||||
val attachment: MessageAttachment?,
|
||||
)
|
||||
|
|
|
@ -13,13 +13,17 @@ import java.lang.reflect.Type
|
|||
class NotificationParser {
|
||||
private val gson = Gson()
|
||||
|
||||
fun parseMessage(s: String) : Message? {
|
||||
return gson.fromJson(s, Message::class.java)
|
||||
}
|
||||
|
||||
fun parse(s: String, subscriptionId: Long = 0, notificationId: Int = 0): Notification? {
|
||||
val notificationWithTopic = parseWithTopic(s, subscriptionId = subscriptionId, notificationId = notificationId)
|
||||
val message = parseMessage(s) ?: return null
|
||||
val notificationWithTopic = parseNotificationWithTopic(message, subscriptionId = subscriptionId, notificationId = notificationId)
|
||||
return notificationWithTopic?.notification
|
||||
}
|
||||
|
||||
fun parseWithTopic(s: String, subscriptionId: Long = 0, notificationId: Int = 0): NotificationWithTopic? {
|
||||
val message = gson.fromJson(s, Message::class.java)
|
||||
fun parseNotificationWithTopic(message: Message, subscriptionId: Long = 0, notificationId: Int = 0): NotificationWithTopic? {
|
||||
if (message.event != ApiService.EVENT_MESSAGE) {
|
||||
return null
|
||||
}
|
||||
|
@ -56,7 +60,7 @@ class NotificationParser {
|
|||
subscriptionId = subscriptionId,
|
||||
timestamp = message.time,
|
||||
title = message.title ?: "",
|
||||
message = message.message,
|
||||
message = message.message?: "",
|
||||
encoding = message.encoding ?: "",
|
||||
priority = toPriority(message.priority),
|
||||
tags = joinTags(message.tags),
|
||||
|
|
|
@ -3,6 +3,7 @@ package io.heckel.ntfy.service
|
|||
import io.heckel.ntfy.db.*
|
||||
import io.heckel.ntfy.util.Log
|
||||
import io.heckel.ntfy.msg.ApiService
|
||||
import io.heckel.ntfy.msg.Message
|
||||
import io.heckel.ntfy.util.topicUrl
|
||||
import kotlinx.coroutines.*
|
||||
import okhttp3.Call
|
||||
|
@ -16,7 +17,7 @@ class JsonConnection(
|
|||
private val user: User?,
|
||||
private val sinceId: String?,
|
||||
private val stateChangeListener: (Collection<Long>, ConnectionState) -> Unit,
|
||||
private val notificationListener: (Subscription, Notification) -> Unit,
|
||||
private val notificationListener: (ConnectionId, Message) -> String?,
|
||||
private val serviceActive: () -> Boolean
|
||||
) : Connection {
|
||||
private val baseUrl = connectionId.baseUrl
|
||||
|
@ -40,12 +41,8 @@ class JsonConnection(
|
|||
while (isActive && serviceActive()) {
|
||||
Log.d(TAG, "[$url] (Re-)starting connection for subscriptions: $topicsToSubscriptionIds")
|
||||
val startTime = System.currentTimeMillis()
|
||||
val notify = notify@ { topic: String, notification: Notification ->
|
||||
since = notification.id
|
||||
val subscriptionId = topicsToSubscriptionIds[topic] ?: return@notify
|
||||
val subscription = repository.getSubscription(subscriptionId) ?: return@notify
|
||||
val notificationWithSubscriptionId = notification.copy(subscriptionId = subscription.id)
|
||||
notificationListener(subscription, notificationWithSubscriptionId)
|
||||
val notify = { message : Message ->
|
||||
since = notificationListener(ConnectionId(baseUrl, topicsToSubscriptionIds, topicIsUnifiedPush), message)?: since
|
||||
}
|
||||
val failed = AtomicBoolean(false)
|
||||
val fail = { _: Exception ->
|
||||
|
|
|
@ -15,9 +15,10 @@ import io.heckel.ntfy.R
|
|||
import io.heckel.ntfy.app.Application
|
||||
import io.heckel.ntfy.db.ConnectionState
|
||||
import io.heckel.ntfy.db.Repository
|
||||
import io.heckel.ntfy.db.Subscription
|
||||
import io.heckel.ntfy.msg.ApiService
|
||||
import io.heckel.ntfy.msg.Message
|
||||
import io.heckel.ntfy.msg.NotificationDispatcher
|
||||
import io.heckel.ntfy.msg.NotificationParser
|
||||
import io.heckel.ntfy.ui.Colors
|
||||
import io.heckel.ntfy.ui.MainActivity
|
||||
import io.heckel.ntfy.util.Log
|
||||
|
@ -28,6 +29,7 @@ import kotlinx.coroutines.GlobalScope
|
|||
import kotlinx.coroutines.launch
|
||||
import kotlinx.coroutines.sync.Mutex
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
import kotlin.random.Random
|
||||
|
||||
/**
|
||||
* The subscriber service manages the foreground service for instant delivery.
|
||||
|
@ -67,6 +69,7 @@ class SubscriberService : Service() {
|
|||
private var notificationManager: NotificationManager? = null
|
||||
private var serviceNotification: Notification? = null
|
||||
private val refreshMutex = Mutex() // Ensure refreshConnections() is only run one at a time
|
||||
private val parser = NotificationParser()
|
||||
|
||||
override fun onStartCommand(intent: Intent?, flags: Int, startId: Int): Int {
|
||||
Log.d(TAG, "onStartCommand executed with startId: $startId")
|
||||
|
@ -247,11 +250,39 @@ class SubscriberService : Service() {
|
|||
}
|
||||
}
|
||||
|
||||
private fun onConnectionOpen(connectionId: ConnectionId, message: String?) {
|
||||
Log.d(TAG, "Received open from connection ${connectionId.baseUrl} with message: $message")
|
||||
// this check is sufficient for now, and the message can be upgraded to include other parameters in the future
|
||||
if (message?.contains(ApiService.EVENT_OPEN_PARAM_NEW_TOPIC) == true) {
|
||||
GlobalScope.launch(Dispatchers.IO) {
|
||||
for (topic in connectionId.topicsToSubscriptionIds.keys) {
|
||||
if (connectionId.topicIsUnifiedPush[topic] == true) {
|
||||
Log.d(TAG, "Attempting to re-register ${connectionId.baseUrl}/$topic")
|
||||
io.heckel.ntfy.up.BroadcastReceiver.sendRegistration(baseContext, connectionId.baseUrl, topic)
|
||||
// TODO is that the right context - looks like it works???
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
private fun onStateChanged(subscriptionIds: Collection<Long>, state: ConnectionState) {
|
||||
repository.updateState(subscriptionIds, state)
|
||||
}
|
||||
|
||||
private fun onNotificationReceived(subscription: Subscription, notification: io.heckel.ntfy.db.Notification) {
|
||||
// Process messages received from the server, and dispatch a notification if required.
|
||||
// Return the ID of the notification if successfully processed, else null.
|
||||
private fun onNotificationReceived(connectionId: ConnectionId, message: Message) : String? {
|
||||
if (message.event == ApiService.EVENT_OPEN) {
|
||||
onConnectionOpen(connectionId, message.message)
|
||||
return null
|
||||
}
|
||||
|
||||
val (topic, notificationWithoutId) = parser.parseNotificationWithTopic(message, notificationId = Random.nextInt(), subscriptionId = 0)
|
||||
?: return null // subscriptionId to be set downstream
|
||||
val subscriptionId = connectionId.topicsToSubscriptionIds[topic] ?: return null
|
||||
val subscription = repository.getSubscription(subscriptionId) ?: return null
|
||||
val notification = notificationWithoutId.copy(subscriptionId = subscription.id)
|
||||
|
||||
// Wakelock while notifications are being dispatched
|
||||
// Wakelocks are reference counted by default so that should work neatly here
|
||||
wakeLock?.acquire(NOTIFICATION_RECEIVED_WAKELOCK_TIMEOUT_MILLIS)
|
||||
|
@ -269,6 +300,7 @@ class SubscriberService : Service() {
|
|||
}
|
||||
}
|
||||
}
|
||||
return notification.id
|
||||
}
|
||||
|
||||
private fun createNotificationChannel(): NotificationManager? {
|
||||
|
|
|
@ -6,6 +6,7 @@ import android.os.Handler
|
|||
import android.os.Looper
|
||||
import io.heckel.ntfy.db.*
|
||||
import io.heckel.ntfy.msg.ApiService.Companion.requestBuilder
|
||||
import io.heckel.ntfy.msg.Message
|
||||
import io.heckel.ntfy.msg.NotificationParser
|
||||
import io.heckel.ntfy.util.Log
|
||||
import io.heckel.ntfy.util.topicShortUrl
|
||||
|
@ -18,7 +19,6 @@ import java.util.*
|
|||
import java.util.concurrent.TimeUnit
|
||||
import java.util.concurrent.atomic.AtomicLong
|
||||
import java.util.concurrent.atomic.AtomicReference
|
||||
import kotlin.random.Random
|
||||
|
||||
/**
|
||||
* Connect to ntfy server via WebSockets. This connection represents a single connection to a server, with
|
||||
|
@ -36,7 +36,7 @@ class WsConnection(
|
|||
private val user: User?,
|
||||
private val sinceId: String?,
|
||||
private val stateChangeListener: (Collection<Long>, ConnectionState) -> Unit,
|
||||
private val notificationListener: (Subscription, Notification) -> Unit,
|
||||
private val notificationListener: (ConnectionId, Message) -> String?,
|
||||
private val alarmManager: AlarmManager
|
||||
) : Connection {
|
||||
private val parser = NotificationParser()
|
||||
|
@ -59,7 +59,8 @@ class WsConnection(
|
|||
private val topicIsUnifiedPush = connectionId.topicIsUnifiedPush
|
||||
private val subscriptionIds = topicsToSubscriptionIds.values
|
||||
private val topicsStr = topicsToSubscriptionIds.keys.joinToString(separator = ",")
|
||||
private val unifiedPushTopicsStr = topicIsUnifiedPush.filter { entry -> entry.value }.keys.joinToString(separator = ",")
|
||||
private val unifiedPushTopicsStr = topicIsUnifiedPush.filter { entry -> entry.value
|
||||
}.keys.joinToString(separator = ",")
|
||||
private val shortUrl = topicShortUrl(baseUrl, topicsStr)
|
||||
|
||||
init {
|
||||
|
@ -137,18 +138,16 @@ class WsConnection(
|
|||
override fun onMessage(webSocket: WebSocket, text: String) {
|
||||
synchronize("onMessage") {
|
||||
Log.d(TAG, "$shortUrl (gid=$globalId, lid=$id): Received message: $text")
|
||||
val notificationWithTopic = parser.parseWithTopic(text, subscriptionId = 0, notificationId = Random.nextInt())
|
||||
if (notificationWithTopic == null) {
|
||||
Log.d(TAG, "$shortUrl (gid=$globalId, lid=$id): Irrelevant or unknown message. Discarding.")
|
||||
return@synchronize
|
||||
val message = parser.parseMessage(text) ?: return@synchronize
|
||||
val id = notificationListener(
|
||||
ConnectionId(baseUrl, topicsToSubscriptionIds, topicIsUnifiedPush),
|
||||
message
|
||||
)
|
||||
if (id != null) {
|
||||
since.set(id)
|
||||
} else {
|
||||
Log.d(WsConnection.TAG,"$shortUrl (gid=$globalId, lid=$id): Irrelevant or unknown message. Discarding.")
|
||||
}
|
||||
val topic = notificationWithTopic.topic
|
||||
val notification = notificationWithTopic.notification
|
||||
val subscriptionId = topicsToSubscriptionIds[topic] ?: return@synchronize
|
||||
val subscription = repository.getSubscription(subscriptionId) ?: return@synchronize
|
||||
val notificationWithSubscriptionId = notification.copy(subscriptionId = subscription.id)
|
||||
notificationListener(subscription, notificationWithSubscriptionId)
|
||||
since.set(notification.id)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -94,6 +94,13 @@ class BroadcastReceiver : android.content.BroadcastReceiver() {
|
|||
// Note, this may fail due to a SQL constraint exception, see https://github.com/binwiederhier/ntfy/issues/185
|
||||
repository.addSubscription(subscription)
|
||||
distributor.sendEndpoint(appId, connectorToken, endpoint)
|
||||
/* We need to stop sending the endpoint here once everyone has the new server,
|
||||
the foreground service will do that after registering with the server.
|
||||
That will avoid a race condition where the application server
|
||||
is rejected before ntfy even establishes that this topic exists.
|
||||
This is fine from an application perspective, because other distributors can't even register
|
||||
without a connection to the push server.
|
||||
Unless the app registers twice. Then it'll get the endpoint anyway.*/
|
||||
|
||||
// Refresh (and maybe start) foreground service
|
||||
SubscriberServiceManager.refresh(app)
|
||||
|
@ -143,5 +150,26 @@ class BroadcastReceiver : android.content.BroadcastReceiver() {
|
|||
private const val TOPIC_RANDOM_ID_LENGTH = 12
|
||||
|
||||
val mutex = Mutex() // https://github.com/binwiederhier/ntfy/issues/230
|
||||
|
||||
// TODO Where's the best place to put this function? This seems to be the only place
|
||||
// with the access to the locks, but also globally accessible
|
||||
// but also, broadcast receiver is for *receiving Android broadcasts*
|
||||
public fun sendRegistration(context: Context, baseUrl : String, topic : String) {
|
||||
val app = context.applicationContext as Application
|
||||
val repository = app.repository
|
||||
val distributor = Distributor(app)
|
||||
GlobalScope.launch(Dispatchers.IO) {
|
||||
// We're doing all of this inside a critical section, because of possible races.
|
||||
// See https://github.com/binwiederhier/ntfy/issues/230 for details.
|
||||
|
||||
mutex.withLock {
|
||||
val existingSubscription = repository.getSubscription(baseUrl, topic) ?: return@launch
|
||||
val appId = existingSubscription.upAppId ?: return@launch
|
||||
val connectorToken = existingSubscription.upConnectorToken ?: return@launch
|
||||
val endpoint = topicUrlUp(existingSubscription.baseUrl, existingSubscription.topic)
|
||||
distributor.sendEndpoint(appId, connectorToken, endpoint)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue