Docs in server.yml, schemaVersion table, refactoring

This commit is contained in:
binwiederhier 2023-06-08 21:45:52 -04:00
parent d3ac976d05
commit 9d38aeb863
7 changed files with 74 additions and 37 deletions

View file

@ -38,15 +38,6 @@
#
# firebase-key-file: <filename>
# Enable web push
#
# Run "ntfy webpush keys" to generate the keys
#
# web-push-public-key:
# web-push-private-key:
# web-push-subscriptions-file:
# web-push-email-address:
# If "cache-file" is set, messages are cached in a local SQLite database instead of only in-memory.
# This allows for service restarts without losing messages in support of the since= parameter.
#
@ -153,6 +144,25 @@
# smtp-server-domain:
# smtp-server-addr-prefix:
# Web Push support (background notifications for browsers)
#
# If enabled, allows ntfy to receive push notifications, even when the ntfy web app is closed. When enabled, the user
# can enable background notifications. Once enabled by the user, ntfy will forward published messages to the push
# endpoint, which will then forward it to the browser.
#
# You must configure all settings below to enable Web Push.
# Run "ntfy webpush keys" to generate the keys.
#
# - web-push-public-key is the generated VAPID public key, e.g. AA1234BBCCddvveekaabcdfqwertyuiopasdfghjklzxcvbnm1234567890
# - web-push-private-key is the generated VAPID private key, e.g. AA2BB1234567890abcdefzxcvbnm1234567890
# - web-push-subscriptions-file is a database file to keep track of browser subscription endpoints, e.g. `/var/cache/ntfy/webpush.db`
# - web-push-email-address is the admin email address send to the push provider, e.g. `sysadmin@example.com`
#
# web-push-public-key:
# web-push-private-key:
# web-push-subscriptions-file:
# web-push-email-address:
# If enabled, ntfy can perform voice calls via Twilio via the "X-Call" header.
#
# - twilio-account is the Twilio account SID, e.g. AC12345beefbeef67890beefbeef122586

View file

@ -15,9 +15,7 @@ func (s *Server) execManager() {
s.pruneTokens()
s.pruneAttachments()
s.pruneMessages()
if s.config.WebPushPublicKey != "" {
s.expireOrNotifyOldSubscriptions()
}
s.pruneOrNotifyWebPushSubscriptions()
// Message count per topic
var messagesCached int

View file

@ -78,28 +78,39 @@ func (s *Server) publishToWebPushEndpoints(v *visitor, m *message) {
// TODO this should return error
// TODO rate limiting
func (s *Server) expireOrNotifyOldSubscriptions() {
func (s *Server) pruneOrNotifyWebPushSubscriptions() {
if s.config.WebPushPublicKey == "" {
return
}
go func() {
if err := s.pruneOrNotifyWebPushSubscriptionsInternal(); err != nil {
log.Tag(tagWebPush).Err(err).Warn("Unable to prune or notify web push subscriptions")
}
}()
}
func (s *Server) pruneOrNotifyWebPushSubscriptionsInternal() error {
subscriptions, err := s.webPush.ExpireAndGetExpiringSubscriptions(s.config.WebPushExpiryWarningDuration, s.config.WebPushExpiryDuration)
if err != nil {
log.Tag(tagWebPush).Err(err).Warn("Unable to publish expiry imminent warning")
return
return err
} else if len(subscriptions) == 0 {
return
return nil
}
payload, err := json.Marshal(newWebPushSubscriptionExpiringPayload())
if err != nil {
log.Tag(tagWebPush).Err(err).Warn("Unable to marshal expiring payload")
return
return err
}
go func() {
for _, subscription := range subscriptions {
ctx := log.Context{"endpoint": subscription.BrowserSubscription.Endpoint}
if err := s.sendWebPushNotification(payload, &subscription, &ctx); err != nil {
log.Tag(tagWebPush).Err(err).Fields(ctx).Warn("Unable to publish expiry imminent warning")
}
for _, subscription := range subscriptions {
ctx := log.Context{"endpoint": subscription.BrowserSubscription.Endpoint}
if err := s.sendWebPushNotification(payload, &subscription, &ctx); err != nil {
log.Tag(tagWebPush).Err(err).Fields(ctx).Warn("Unable to publish expiry imminent warning")
return err
}
}()
}
log.Tag(tagWebPush).Debug("Expiring old subscriptions and published %d expiry imminent warnings", len(subscriptions))
return nil
}
func (s *Server) sendWebPushNotification(message []byte, sub *webPushSubscription, ctx *log.Context) error {

View file

@ -149,7 +149,7 @@ func TestServer_WebPush_Publish(t *testing.T) {
})
}
func TestServer_WebPush_PublishExpire(t *testing.T) {
func TestServer_WebPush_Publish_RemoveOnError(t *testing.T) {
s := newTestServer(t, newTestConfigWithWebPush(t))
var received atomic.Bool
@ -201,7 +201,7 @@ func TestServer_WebPush_Expiry(t *testing.T) {
_, err := s.webPush.db.Exec("UPDATE subscriptions SET updated_at = datetime('now', '-7 days')")
require.Nil(t, err)
s.expireOrNotifyOldSubscriptions()
s.pruneOrNotifyWebPushSubscriptions()
requireSubscriptionCount(t, s, "test-topic", 1)
waitFor(t, func() bool {
@ -211,8 +211,12 @@ func TestServer_WebPush_Expiry(t *testing.T) {
_, err = s.webPush.db.Exec("UPDATE subscriptions SET updated_at = datetime('now', '-8 days')")
require.Nil(t, err)
s.expireOrNotifyOldSubscriptions()
requireSubscriptionCount(t, s, "test-topic", 0)
s.pruneOrNotifyWebPushSubscriptions()
waitFor(t, func() bool {
subs, err := s.webPush.SubscriptionsForTopic("test-topic")
require.Nil(t, err)
return len(subs) == 0
})
}
func payloadForTopics(t *testing.T, topics []string, endpoint string) string {
@ -246,6 +250,5 @@ func addSubscription(t *testing.T, s *Server, topic string, url string) {
func requireSubscriptionCount(t *testing.T, s *Server, topic string, expectedLength int) {
subs, err := s.webPush.SubscriptionsForTopic("test-topic")
require.Nil(t, err)
require.Len(t, subs, expectedLength)
}

View file

@ -22,11 +22,16 @@ const (
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP,
warning_sent BOOLEAN DEFAULT FALSE
);
CREATE TABLE IF NOT EXISTS schemaVersion (
id INT PRIMARY KEY,
version INT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_topic ON subscriptions (topic);
CREATE INDEX IF NOT EXISTS idx_endpoint ON subscriptions (endpoint);
CREATE UNIQUE INDEX IF NOT EXISTS idx_topic_endpoint ON subscriptions (topic, endpoint);
COMMIT;
`
insertWebPushSubscriptionQuery = `
INSERT OR REPLACE INTO subscriptions (topic, user_id, endpoint, key_auth, key_p256dh)
VALUES (?, ?, ?, ?, ?)
@ -39,8 +44,13 @@ const (
selectWebPushSubscriptionsExpiringSoonQuery = `SELECT DISTINCT endpoint, key_auth, key_p256dh FROM subscriptions WHERE warning_sent = 0 AND updated_at <= datetime('now', ?)`
updateWarningSentQuery = `UPDATE subscriptions SET warning_sent = true WHERE warning_sent = 0 AND updated_at <= datetime('now', ?)`
)
selectWebPushSubscriptionsCountQuery = `SELECT COUNT(*) FROM subscriptions`
// Schema management queries
const (
currentWebPushSchemaVersion = 1
insertWebPushSchemaVersion = `INSERT INTO schemaVersion VALUES (1, ?)`
selectWebPushSchemaVersionQuery = `SELECT version FROM schemaVersion WHERE id = 1`
)
type webPushStore struct {
@ -52,7 +62,7 @@ func newWebPushStore(filename string) (*webPushStore, error) {
if err != nil {
return nil, err
}
if err := setupSubscriptionsDB(db); err != nil {
if err := setupWebPushDB(db); err != nil {
return nil, err
}
return &webPushStore{
@ -60,33 +70,38 @@ func newWebPushStore(filename string) (*webPushStore, error) {
}, nil
}
func setupSubscriptionsDB(db *sql.DB) error {
// If 'subscriptions' table does not exist, this must be a new database
rows, err := db.Query(selectWebPushSubscriptionsCountQuery)
func setupWebPushDB(db *sql.DB) error {
// If 'schemaVersion' table does not exist, this must be a new database
rows, err := db.Query(selectWebPushSchemaVersionQuery)
if err != nil {
return setupNewSubscriptionsDB(db)
return setupNewWebPushDB(db)
}
return rows.Close()
}
func setupNewSubscriptionsDB(db *sql.DB) error {
func setupNewWebPushDB(db *sql.DB) error {
if _, err := db.Exec(createWebPushSubscriptionsTableQuery); err != nil {
return err
}
if _, err := db.Exec(insertWebPushSchemaVersion, currentWebPushSchemaVersion); err != nil {
return err
}
return nil
}
// UpdateSubscriptions updates the subscriptions for the given topics and user ID. It always first deletes all
// existing entries for a given endpoint.
func (c *webPushStore) UpdateSubscriptions(topics []string, userID string, subscription webpush.Subscription) error {
tx, err := c.db.Begin()
if err != nil {
return err
}
defer tx.Rollback()
if err = c.RemoveByEndpoint(subscription.Endpoint); err != nil {
if _, err := tx.Exec(deleteWebPushSubscriptionByEndpointQuery, subscription.Endpoint); err != nil {
return err
}
for _, topic := range topics {
if err := c.AddSubscription(topic, userID, subscription); err != nil {
if _, err = tx.Exec(insertWebPushSubscriptionQuery, topic, userID, subscription.Endpoint, subscription.Keys.Auth, subscription.Keys.P256dh); err != nil {
return err
}
}