Make a setting for WS

This commit is contained in:
Philipp Heckel 2022-01-15 18:40:38 -05:00
parent 26b408c828
commit 5175b1b0cb
10 changed files with 209 additions and 188 deletions

View file

@ -189,6 +189,22 @@ class Repository(private val sharedPrefs: SharedPreferences, private val subscri
.apply()
}
fun setConnectionProtocol(connectionProtocol: String) {
if (connectionProtocol == CONNECTION_PROTOCOL_JSONHTTP) {
sharedPrefs.edit()
.remove(SHARED_PREFS_CONNECTION_PROTOCOL)
.apply()
} else {
sharedPrefs.edit()
.putString(SHARED_PREFS_CONNECTION_PROTOCOL, connectionProtocol)
.apply()
}
}
fun getConnectionProtocol(): String {
return sharedPrefs.getString(SHARED_PREFS_CONNECTION_PROTOCOL, null) ?: CONNECTION_PROTOCOL_JSONHTTP
}
fun getBroadcastEnabled(): Boolean {
return sharedPrefs.getBoolean(SHARED_PREFS_BROADCAST_ENABLED, true) // Enabled by default
}
@ -321,6 +337,7 @@ class Repository(private val sharedPrefs: SharedPreferences, private val subscri
const val SHARED_PREFS_MIN_PRIORITY = "MinPriority"
const val SHARED_PREFS_AUTO_DOWNLOAD_MAX_SIZE = "AutoDownload"
const val SHARED_PREFS_WAKELOCK_ENABLED = "WakelockEnabled"
const val SHARED_PREFS_CONNECTION_PROTOCOL = "ConnectionProtocol"
const val SHARED_PREFS_BROADCAST_ENABLED = "BroadcastEnabled"
const val SHARED_PREFS_UNIFIED_PUSH_ENABLED = "UnifiedPushEnabled"
const val SHARED_PREFS_UNIFIED_PUSH_BASE_URL = "UnifiedPushBaseURL"
@ -329,6 +346,9 @@ class Repository(private val sharedPrefs: SharedPreferences, private val subscri
const val AUTO_DOWNLOAD_ALWAYS = 1L
const val AUTO_DOWNLOAD_DEFAULT = 1024 * 1024L // Must match a value in values.xml
const val CONNECTION_PROTOCOL_JSONHTTP = "jsonhttp"
const val CONNECTION_PROTOCOL_WS = "ws"
private const val TAG = "NtfyRepository"
private var instance: Repository? = null

View file

@ -2,12 +2,8 @@ package io.heckel.ntfy.msg
import android.os.Build
import android.util.Log
import androidx.annotation.Keep
import com.google.gson.Gson
import io.heckel.ntfy.BuildConfig
import io.heckel.ntfy.data.Attachment
import io.heckel.ntfy.data.Notification
import io.heckel.ntfy.data.PROGRESS_NONE
import io.heckel.ntfy.util.*
import okhttp3.*
import okhttp3.RequestBody.Companion.toRequestBody
@ -16,7 +12,6 @@ import java.util.concurrent.TimeUnit
import kotlin.random.Random
class ApiService {
private val gson = Gson()
private val client = OkHttpClient.Builder()
.callTimeout(15, TimeUnit.SECONDS) // Total timeout for entire request
.connectTimeout(15, TimeUnit.SECONDS)
@ -26,6 +21,7 @@ class ApiService {
private val subscriberClient = OkHttpClient.Builder()
.readTimeout(5, TimeUnit.MINUTES) // Assuming that keepalive messages are more frequent than this
.build()
private val parser = NotificationParser()
fun publish(baseUrl: String, topic: String, message: String, title: String, priority: Int, tags: List<String>, delay: String) {
val url = topicUrl(baseUrl, topic)
@ -70,9 +66,10 @@ class ApiService {
}
val body = response.body?.string()?.trim()
if (body == null || body.isEmpty()) return emptyList()
val notifications = body.lines().map { line ->
fromString(subscriptionId, line)
val notifications = body.lines().mapNotNull { line ->
parser.parse(line, subscriptionId = subscriptionId, notificationId = 0) // No notification when we poll
}
Log.d(TAG, "Notifications: $notifications")
return notifications
}
@ -103,32 +100,9 @@ class ApiService {
val source = response.body?.source() ?: throw Exception("Unexpected response for $url: body is empty")
while (!source.exhausted()) {
val line = source.readUtf8Line() ?: throw Exception("Unexpected response for $url: line is null")
val message = gson.fromJson(line, Message::class.java)
if (message.event == EVENT_MESSAGE) {
val topic = message.topic
val attachment = if (message.attachment?.url != null) {
Attachment(
name = message.attachment.name,
type = message.attachment.type,
size = message.attachment.size,
expires = message.attachment.expires,
url = message.attachment.url,
)
} else null
val notification = Notification(
id = message.id,
subscriptionId = 0, // TO BE SET downstream
timestamp = message.time,
title = message.title ?: "",
message = message.message,
priority = toPriority(message.priority),
tags = joinTags(message.tags),
click = message.click ?: "",
attachment = attachment,
notificationId = Random.nextInt(),
deleted = false
)
notify(topic, notification)
val notification = parser.parseWithTopic(line, notificationId = Random.nextInt(), subscriptionId = 0) // subscriptionId to be set downstream
if (notification != null) {
notify(notification.topic, notification.notification)
}
}
} catch (e: Exception) {
@ -144,57 +118,6 @@ class ApiService {
return call
}
private fun fromString(subscriptionId: Long, s: String): Notification {
val message = gson.fromJson(s, Message::class.java)
val attachment = if (message.attachment?.url != null) {
Attachment(
name = message.attachment.name,
type = message.attachment.type,
size = message.attachment.size,
expires = message.attachment.expires,
url = message.attachment.url,
)
} else null
return Notification(
id = message.id,
subscriptionId = subscriptionId,
timestamp = message.time,
title = message.title ?: "",
message = message.message,
priority = toPriority(message.priority),
tags = joinTags(message.tags),
click = message.click ?: "",
attachment = attachment,
notificationId = 0, // zero: when we poll, we do not want a notificationId!
deleted = false
)
}
/* This annotation ensures that proguard still works in production builds,
* see https://stackoverflow.com/a/62753300/1440785 */
@Keep
data class Message(
val id: String,
val time: Long,
val event: String,
val topic: String,
val priority: Int?,
val tags: List<String>?,
val click: String?,
val title: String?,
val message: String,
val attachment: MessageAttachment?,
)
@Keep
data class MessageAttachment(
val name: String,
val type: String?,
val size: Long?,
val expires: Long?,
val url: String,
)
companion object {
val USER_AGENT = "ntfy/${BuildConfig.VERSION_NAME} (${BuildConfig.FLAVOR}; Android ${Build.VERSION.RELEASE}; SDK ${Build.VERSION.SDK_INT})"
private const val TAG = "NtfyApiService"

View file

@ -0,0 +1,28 @@
package io.heckel.ntfy.msg
import androidx.annotation.Keep
/* This annotation ensures that proguard still works in production builds,
* see https://stackoverflow.com/a/62753300/1440785 */
@Keep
data class Message(
val id: String,
val time: Long,
val event: String,
val topic: String,
val priority: Int?,
val tags: List<String>?,
val click: String?,
val title: String?,
val message: String,
val attachment: MessageAttachment?,
)
@Keep
data class MessageAttachment(
val name: String,
val type: String?,
val size: Long?,
val expires: Long?,
val url: String,
)

View file

@ -0,0 +1,48 @@
package io.heckel.ntfy.msg
import com.google.gson.Gson
import io.heckel.ntfy.data.Attachment
import io.heckel.ntfy.data.Notification
import io.heckel.ntfy.util.joinTags
import io.heckel.ntfy.util.toPriority
class NotificationParser {
private val gson = Gson()
fun parse(s: String, subscriptionId: Long = 0, notificationId: Int = 0): Notification? {
val notificationWithTopic = parseWithTopic(s, subscriptionId = subscriptionId, notificationId = notificationId)
return notificationWithTopic?.notification
}
fun parseWithTopic(s: String, subscriptionId: Long = 0, notificationId: Int = 0): NotificationWithTopic? {
val message = gson.fromJson(s, Message::class.java)
if (message.event != ApiService.EVENT_MESSAGE) {
return null
}
val attachment = if (message.attachment?.url != null) {
Attachment(
name = message.attachment.name,
type = message.attachment.type,
size = message.attachment.size,
expires = message.attachment.expires,
url = message.attachment.url,
)
} else null
val notification = Notification(
id = message.id,
subscriptionId = subscriptionId,
timestamp = message.time,
title = message.title ?: "",
message = message.message,
priority = toPriority(message.priority),
tags = joinTags(message.tags),
click = message.click ?: "",
attachment = attachment,
notificationId = notificationId,
deleted = false
)
return NotificationWithTopic(message.topic, notification)
}
data class NotificationWithTopic(val topic: String, val notification: Notification)
}

View file

@ -15,6 +15,7 @@ import io.heckel.ntfy.BuildConfig
import io.heckel.ntfy.R
import io.heckel.ntfy.app.Application
import io.heckel.ntfy.data.ConnectionState
import io.heckel.ntfy.data.Repository
import io.heckel.ntfy.data.Subscription
import io.heckel.ntfy.msg.ApiService
import io.heckel.ntfy.msg.NotificationDispatcher
@ -182,7 +183,7 @@ class SubscriberService : Service() {
}
if (!connections.containsKey(baseUrl)) {
val serviceActive = { -> isServiceStarted }
val connection = if (true) {
val connection = if (repository.getConnectionProtocol() == Repository.CONNECTION_PROTOCOL_WS) {
val alarmManager = getSystemService(ALARM_SERVICE) as AlarmManager
WsConnection(repository, baseUrl, since, subscriptions, ::onStateChanged, ::onNotificationReceived, alarmManager)
} else {

View file

@ -2,12 +2,15 @@ package io.heckel.ntfy.service
import android.app.AlarmManager
import android.os.Build
import android.os.Handler
import android.os.Looper
import android.util.Log
import com.google.gson.Gson
import io.heckel.ntfy.data.*
import io.heckel.ntfy.msg.ApiService
import io.heckel.ntfy.util.joinTags
import io.heckel.ntfy.util.toPriority
import io.heckel.ntfy.data.ConnectionState
import io.heckel.ntfy.data.Notification
import io.heckel.ntfy.data.Repository
import io.heckel.ntfy.data.Subscription
import io.heckel.ntfy.msg.NotificationParser
import io.heckel.ntfy.util.topicUrl
import io.heckel.ntfy.util.topicUrlWs
import okhttp3.*
import java.util.*
@ -24,34 +27,20 @@ class WsConnection(
private val notificationListener: (Subscription, Notification) -> Unit,
private val alarmManager: AlarmManager
) : Connection {
private val client: OkHttpClient
//private val reconnectHandler = Handler()
//private val reconnectCallback = Runnable { start() }
private val parser = NotificationParser()
private val client = OkHttpClient.Builder()
.readTimeout(0, TimeUnit.MILLISECONDS)
.pingInterval(1, TimeUnit.MINUTES)
.connectTimeout(10, TimeUnit.SECONDS)
.build()
private var errorCount = 0
private var webSocket: WebSocket? = null
private var state: State? = null
private val gson = Gson()
private var since: Long = sinceTime
private val subscriptionIds = topicsToSubscriptionIds.values
private val topicsStr = topicsToSubscriptionIds.keys.joinToString(separator = ",")
private val sinceVal = if (sinceTime == 0L) "all" else sinceTime.toString()
private val wsurl = topicUrlWs(baseUrl, topicsStr, sinceVal)
init {
val builder = OkHttpClient.Builder()
.readTimeout(0, TimeUnit.MILLISECONDS)
//.pingInterval(1, TimeUnit.MINUTES)
.pingInterval(30, TimeUnit.SECONDS)
.connectTimeout(10, TimeUnit.SECONDS)
client = builder.build()
}
private fun request(): Request {
return Request.Builder()
.url(wsurl)
.get()
.build()
}
private val url = topicUrl(baseUrl, topicsStr)
@Synchronized
override fun start() {
@ -61,22 +50,27 @@ class WsConnection(
cancel()
state = State.Connecting
val nextId = ID.incrementAndGet()
Log.d(TAG, "WebSocket($nextId): starting...")
webSocket = client.newWebSocket(request(), Listener(nextId))
val sinceVal = if (since == 0L) "all" else since.toString()
val urlWithSince = topicUrlWs(baseUrl, topicsStr, sinceVal)
val request = Request.Builder().url(urlWithSince).get().build()
Log.d(TAG, "[$url] WebSocket($nextId): opening $urlWithSince ...")
webSocket = client.newWebSocket(request, Listener(nextId))
}
@Synchronized
override fun cancel() {
if (webSocket != null) {
Log.d(TAG, "WebSocket(" + ID.get() + "): closing existing connection.")
state = State.Disconnected
webSocket!!.close(1000, "")
webSocket = null
if (webSocket == null) {
return
}
Log.d(TAG, "[$url] WebSocket(${ID.get()}): closing existing connection")
state = State.Disconnected
webSocket!!.close(1000, "")
webSocket = null
}
@Synchronized
override fun since(): Long {
return 0L
return since
}
override fun matches(otherSubscriptionIds: Collection<Long>): Boolean {
@ -84,39 +78,29 @@ class WsConnection(
}
@Synchronized
fun scheduleReconnect(seconds: Long) {
fun scheduleReconnect(seconds: Int) {
if (state == State.Connecting || state == State.Connected) {
return
}
state = State.Scheduled
if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.N) {
Log.d(TAG,
"WebSocket: scheduling a restart in "
+ seconds
+ " second(s) (via alarm manager)"
)
val future = Calendar.getInstance()
future.add(Calendar.SECOND, seconds.toInt())
alarmManager.setExact(
AlarmManager.RTC_WAKEUP,
future.timeInMillis,
"reconnect-tag", { start() },
null
)
Log.d(TAG,"[$url] WebSocket: Scheduling a restart in $seconds seconds (via alarm manager)")
val reconnectTime = Calendar.getInstance()
reconnectTime.add(Calendar.SECOND, seconds)
alarmManager.setExact(AlarmManager.RTC_WAKEUP, reconnectTime.timeInMillis, RECONNECT_TAG, { start() }, null)
} else {
Log.d(TAG, "WebSocket: scheduling a restart in $seconds second(s)")
//reconnectHandler.removeCallbacks(reconnectCallback)
//reconnectHandler.postDelayed(reconnectCallback, TimeUnit.SECONDS.toMillis(seconds))
Log.d(TAG, "[$url] WebSocket: Scheduling a restart in $seconds seconds (via handler)")
val handler = Handler(Looper.getMainLooper())
handler.postDelayed({ start() }, TimeUnit.SECONDS.toMillis(seconds.toLong()))
}
}
private inner class Listener(private val id: Long) : WebSocketListener() {
override fun onOpen(webSocket: WebSocket, response: Response) {
syncExec {
Log.d(TAG, "[$url] WebSocket($id): opened")
state = State.Connected
Log.d(TAG, "WebSocket(" + id + "): opened")
if (errorCount > 0) {
Log.d(TAG, "reconnected")
errorCount = 0
}
stateChangeListener(subscriptionIds, ConnectionState.CONNECTED)
@ -125,65 +109,36 @@ class WsConnection(
override fun onMessage(webSocket: WebSocket, text: String) {
syncExec {
Log.d(TAG, "WebSocket(" + id + "): received message " + text)
val message = gson.fromJson(text, ApiService.Message::class.java)
if (message.event == ApiService.EVENT_MESSAGE) {
val topic = message.topic
val attachment = if (message.attachment?.url != null) {
Attachment(
name = message.attachment.name,
type = message.attachment.type,
size = message.attachment.size,
expires = message.attachment.expires,
url = message.attachment.url,
)
} else null
val notification = Notification(
id = message.id,
subscriptionId = 0, // TO BE SET downstream
timestamp = message.time,
title = message.title ?: "",
message = message.message,
priority = toPriority(message.priority),
tags = joinTags(message.tags),
click = message.click ?: "",
attachment = attachment,
notificationId = Random.nextInt(),
deleted = false
)
val subscriptionId = topicsToSubscriptionIds[topic] ?: return@syncExec
val subscription = repository.getSubscription(subscriptionId) ?: return@syncExec
val notificationWithSubscriptionId = notification.copy(subscriptionId = subscription.id)
notificationListener(subscription, notificationWithSubscriptionId)
Log.d(TAG, "[$url] WebSocket($id): received message: $text")
val notificationWithTopic = parser.parseWithTopic(text, subscriptionId = 0, notificationId = Random.nextInt())
if (notificationWithTopic == null) {
return@syncExec
}
val topic = notificationWithTopic.topic
val notification = notificationWithTopic.notification
val subscriptionId = topicsToSubscriptionIds[topic] ?: return@syncExec
val subscription = repository.getSubscription(subscriptionId) ?: return@syncExec
val notificationWithSubscriptionId = notification.copy(subscriptionId = subscription.id)
notificationListener(subscription, notificationWithSubscriptionId)
since = notification.timestamp
}
}
override fun onClosed(webSocket: WebSocket, code: Int, reason: String) {
syncExec {
if (state == State.Connected) {
Log.w(TAG, "WebSocket(" + id + "): closed")
}
Log.w(TAG, "[$url] WebSocket($id): closed")
state = State.Disconnected
}
}
override fun onFailure(webSocket: WebSocket, t: Throwable, response: Response?) {
val code = if (response != null) "StatusCode: " + response.code else ""
val message = response?.message ?: ""
Log.e(TAG, "WebSocket($id): failure $code Message: $message", t)
Log.e(TAG, "[$url] WebSocket($id): failure ${response?.code}: ${response?.message}", t)
syncExec {
stateChangeListener(subscriptionIds, ConnectionState.CONNECTING)
state = State.Disconnected
if ((response != null) && (response.code >= 400) && (response.code <= 499)) {
Log.d(TAG, "bad request")
cancel()
return@syncExec
}
errorCount++
val minutes: Int = Math.min(errorCount * 2 - 1, 20)
//scheduleReconnect(TimeUnit.MINUTES.toSeconds(minutes.toLong()))
scheduleReconnect(30)
val retrySeconds = RETRY_SECONDS.getOrNull(errorCount) ?: RETRY_SECONDS.last()
scheduleReconnect(retrySeconds)
}
}
@ -202,6 +157,8 @@ class WsConnection(
companion object {
private const val TAG = "NtfyWsConnection"
private const val RECONNECT_TAG = "WsReconnect"
private val RETRY_SECONDS = listOf(5, 10, 15, 20, 30, 45, 60, 120)
private val ID = AtomicLong(0)
}
}

View file

@ -153,6 +153,27 @@ class SettingsActivity : AppCompatActivity() {
}
}
// Connection protocol
val connectionProtocolPrefId = context?.getString(R.string.settings_advanced_connection_protocol_key) ?: return
val connectionProtocol: ListPreference? = findPreference(connectionProtocolPrefId)
connectionProtocol?.value = repository.getConnectionProtocol()
connectionProtocol?.preferenceDataStore = object : PreferenceDataStore() {
override fun putString(key: String?, value: String?) {
val proto = value ?: repository.getConnectionProtocol()
repository.setConnectionProtocol(proto)
restartService()
}
override fun getString(key: String?, defValue: String?): String {
return repository.getConnectionProtocol()
}
}
connectionProtocol?.summaryProvider = Preference.SummaryProvider<ListPreference> { pref ->
when (pref.value) {
Repository.CONNECTION_PROTOCOL_WS -> getString(R.string.settings_advanced_connection_protocol_summary_ws)
else -> getString(R.string.settings_advanced_connection_protocol_summary_jsonhttp)
}
}
// Permanent wakelock enabled
val wakelockEnabledPrefId = context?.getString(R.string.settings_advanced_wakelock_key) ?: return
val wakelockEnabled: SwitchPreference? = findPreference(wakelockEnabledPrefId)
@ -160,11 +181,7 @@ class SettingsActivity : AppCompatActivity() {
wakelockEnabled?.preferenceDataStore = object : PreferenceDataStore() {
override fun putBoolean(key: String?, value: Boolean) {
repository.setWakelockEnabled(value)
val context = this@SettingsFragment.context
Intent(context, SubscriberService::class.java).also { intent ->
// Service will autorestart
context?.stopService(intent)
}
restartService()
}
override fun getBoolean(key: String?, defValue: Boolean): Boolean {
return repository.getWakelockEnabled()
@ -265,6 +282,13 @@ class SettingsActivity : AppCompatActivity() {
autoDownload?.value = autoDownloadSelectionCopy.toString()
repository.setAutoDownloadMaxSize(autoDownloadSelectionCopy)
}
private fun restartService() {
val context = this@SettingsFragment.context
Intent(context, SubscriberService::class.java).also { intent ->
context?.stopService(intent) // Service will auto-restart
}
}
}
override fun onRequestPermissionsResult(requestCode: Int, permissions: Array<String>, grantResults: IntArray) {

View file

@ -226,6 +226,12 @@
<string name="settings_unified_push_base_url_title">Server URL</string>
<string name="settings_unified_push_base_url_default_summary">%1$s (default)</string>
<string name="settings_advanced_header">Advanced</string>
<string name="settings_advanced_connection_protocol_key">ConnectionProtocol</string>
<string name="settings_advanced_connection_protocol_title">Connection protocol</string>
<string name="settings_advanced_connection_protocol_summary_jsonhttp">Use a JSON stream over HTTP to connect to the server. This is the tried and true method, though it may consume more battery.</string>
<string name="settings_advanced_connection_protocol_summary_ws">Use WebSockets to connect to the server. This option is experimental. Let us know if it consumes less battery or is unstable.</string>
<string name="settings_advanced_connection_protocol_entry_jsonhttp">JSON stream over HTTP</string>
<string name="settings_advanced_connection_protocol_entry_ws">WebSockets (experimental)</string>
<string name="settings_advanced_wakelock_key">WakelockEnabled</string>
<string name="settings_advanced_wakelock_title">Permanent wakelock</string>
<string name="settings_advanced_wakelock_summary_enabled">Prevents app from sleeping to ensure timely notification delivery. This consumes a lot of battery, but some devices require this.</string>

View file

@ -34,4 +34,12 @@
<item>10485760</item>
<item>52428800</item>
</string-array>
<string-array name="settings_advanced_connection_protocol_entries">
<item>@string/settings_advanced_connection_protocol_entry_jsonhttp</item>
<item>@string/settings_advanced_connection_protocol_entry_ws</item>
</string-array>
<string-array name="settings_advanced_connection_protocol_values">
<item>jsonhttp</item>
<item>ws</item>
</string-array>
</resources>

View file

@ -34,6 +34,12 @@
app:dependency="@string/settings_unified_push_enabled_key"/>
</PreferenceCategory>
<PreferenceCategory app:title="@string/settings_advanced_header">
<ListPreference
app:key="@string/settings_advanced_connection_protocol_key"
app:title="@string/settings_advanced_connection_protocol_title"
app:entries="@array/settings_advanced_connection_protocol_entries"
app:entryValues="@array/settings_advanced_connection_protocol_values"
app:defaultValue="jsonhttp"/>
<SwitchPreference
app:key="@string/settings_advanced_wakelock_key"
app:title="@string/settings_advanced_wakelock_title"