diff --git a/server/server.go b/server/server.go index 1540f41a..0d104ac0 100644 --- a/server/server.go +++ b/server/server.go @@ -38,24 +38,16 @@ import ( TODO -- -- HIGH Rate limiting: Sensitive endpoints (account/login/change-password/...) - HIGH Rate limiting: dailyLimitToRate is wrong? + TESTS +- HIGH Rate limiting: Sensitive endpoints (account/login/change-password/...) - HIGH Rate limiting: Bandwidth limit must be in tier + TESTS -- HIGH Sync problems with "deleteAfter=0" and "displayName=" -- Reservation (UI): Show "This topic is reserved" error message when trying to reserve a reserved topic (Thorben) -- Reservation (UI): Ask for confirmation when removing reservation (deadcade) -- Reservation table delete button: dialog "keep or delete messages?" -- UI: Flickering upgrade banner when logging in -- JS constants - -races: -- v.user --> see publishSyncEventAsync() test - -payments: -- reconciliation - -delete messages + reserved topics on ResetTier delete attachments in access.go - +- MEDIUM: Races with v.user (see publishSyncEventAsync test) +- MEDIUM: Reservation (UI): Show "This topic is reserved" error message when trying to reserve a reserved topic (Thorben) +- MEDIUM: Reservation (UI): Ask for confirmation when removing reservation (deadcade) +- MEDIUM: Reservation table delete button: dialog "keep or delete messages?" +- LOW: UI: Flickering upgrade banner when logging in +- LOW: JS constants +- LOW: Payments reconciliation process Limits & rate limiting: users without tier: should the stats be persisted? are they meaningful? -> test that the visitor is based on the IP address! @@ -1030,12 +1022,12 @@ func (s *Server) handleSubscribeWS(w http.ResponseWriter, r *http.Request, v *vi defer conn.Close() // Subscription connections can be canceled externally, see topic.CancelSubscribers - subscriberContext, cancel := context.WithCancel(context.Background()) + cancelCtx, cancel := context.WithCancel(context.Background()) defer cancel() // Use errgroup to run WebSocket reader and writer in Go routines var wlock sync.Mutex - g, gctx := errgroup.WithContext(subscriberContext) + g, gctx := errgroup.WithContext(cancelCtx) g.Go(func() error { pongWait := s.config.KeepaliveInterval + wsPongWait conn.SetReadLimit(wsReadLimit) @@ -1072,7 +1064,7 @@ func (s *Server) handleSubscribeWS(w http.ResponseWriter, r *http.Request, v *vi select { case <-gctx.Done(): return nil - case <-subscriberContext.Done(): + case <-cancelCtx.Done(): log.Trace("%s Cancel received, closing subscriber connection", logHTTPPrefix(v, r)) conn.Close() return &websocket.CloseError{Code: websocket.CloseNormalClosure, Text: "subscription was canceled"} diff --git a/web/src/app/SubscriptionManager.js b/web/src/app/SubscriptionManager.js index ec8c13bc..ef9ff42c 100644 --- a/web/src/app/SubscriptionManager.js +++ b/web/src/app/SubscriptionManager.js @@ -17,7 +17,7 @@ class SubscriptionManager { return await db.subscriptions.get(subscriptionId) } - async add(baseUrl, topic) { + async add(baseUrl, topic, internal) { const id = topicUrl(baseUrl, topic); const existingSubscription = await this.get(id); if (existingSubscription) { @@ -30,7 +30,7 @@ class SubscriptionManager { mutedUntil: 0, last: null, remoteId: null, - internal: false + internal: internal || false }; await db.subscriptions.put(subscription); return subscription; @@ -47,7 +47,7 @@ class SubscriptionManager { const reservation = remoteReservations?.find(r => remote.base_url === config.base_url && remote.topic === r.topic) || null; await this.update(local.id, { remoteId: remote.id, - displayName: remote.display_name, + displayName: remote.display_name, // May be undefined reservation: reservation // May be null! }); remoteIds.push(remote.id); diff --git a/web/src/components/App.js b/web/src/components/App.js index fc2d7fad..861a3709 100644 --- a/web/src/components/App.js +++ b/web/src/components/App.js @@ -71,25 +71,11 @@ const Layout = () => { || (config.base_url === s.baseUrl && params.topic === s.topic) }); - useConnectionListeners(subscriptions, users); + useConnectionListeners(account, subscriptions, users); useAccountListener(setAccount) useBackgroundProcesses(); useEffect(() => updateTitle(newNotificationsCount), [newNotificationsCount]); - useEffect(() => { - if (!account || !account.sync_topic) { - return; - } - (async () => { - const subscription = await subscriptionManager.add(config.base_url, account.sync_topic); - if (!subscription.hidden) { - await subscriptionManager.update(subscription.id, { - internal: true - }); - } - })(); - }, [account]); - return ( { +export const useConnectionListeners = (account, subscriptions, users) => { const navigate = useNavigate(); + // Register listeners for incoming messages, and connection state changes useEffect(() => { const handleMessage = async (subscriptionId, message) => { const subscription = await subscriptionManager.get(subscriptionId); @@ -64,6 +63,15 @@ export const useConnectionListeners = (subscriptions, users) => { [] ); + // Sync topic listener: For accounts with sync_topic, subscribe to an internal topic + useEffect(() => { + if (!account || !account.sync_topic) { + return; + } + subscriptionManager.add(config.base_url, account.sync_topic, true); // Dangle! + }, [account]); + + // When subscriptions or users change, refresh the connections useEffect(() => { connectionManager.refresh(subscriptions, users); // Dangle }, [subscriptions, users]);