Account sync in action

This commit is contained in:
binwiederhier 2023-01-11 21:38:10 -05:00
parent 3dd8dd4288
commit fdee54f921
8 changed files with 118 additions and 32 deletions

View file

@ -46,6 +46,7 @@ func (s *Server) handleAccountGet(w http.ResponseWriter, _ *http.Request, v *vis
return err
}
limits, stats := info.Limits, info.Stats
response := &apiAccountResponse{
Limits: &apiAccountLimits{
Basis: string(limits.Basis),

View file

@ -6,7 +6,7 @@ import {
accountSubscriptionSingleUrl,
accountSubscriptionUrl,
accountTokenUrl,
accountUrl,
accountUrl, maybeWithAuth, topicUrl,
withBasicAuth,
withBearerAuth
} from "./utils";
@ -15,6 +15,7 @@ import subscriptionManager from "./SubscriptionManager";
import i18n from "i18next";
import prefs from "./Prefs";
import routes from "../components/routes";
import userManager from "./UserManager";
const delayMillis = 45000; // 45 seconds
const intervalMillis = 900000; // 15 minutes
@ -23,6 +24,11 @@ class AccountApi {
constructor() {
this.timer = null;
this.listener = null; // Fired when account is fetched from remote
// Random ID used to identify this client when sending/receiving "sync" events
// to the sync topic of an account. This ID doesn't matter much, but it will prevent
// a client from reacting to its own message.
this.identity = Math.floor(Math.random() * 2586000);
}
registerListener(listener) {
@ -164,6 +170,7 @@ class AccountApi {
} else if (response.status !== 200) {
throw new Error(`Unexpected server response ${response.status}`);
}
this.triggerChange(); // Dangle!
}
async addSubscription(payload) {
@ -182,6 +189,7 @@ class AccountApi {
}
const subscription = await response.json();
console.log(`[AccountApi] Subscription`, subscription);
this.triggerChange(); // Dangle!
return subscription;
}
@ -201,6 +209,7 @@ class AccountApi {
}
const subscription = await response.json();
console.log(`[AccountApi] Subscription`, subscription);
this.triggerChange(); // Dangle!
return subscription;
}
@ -216,6 +225,7 @@ class AccountApi {
} else if (response.status !== 200) {
throw new Error(`Unexpected server response ${response.status}`);
}
this.triggerChange(); // Dangle!
}
async upsertAccess(topic, everyone) {
@ -236,6 +246,7 @@ class AccountApi {
} else if (response.status !== 200) {
throw new Error(`Unexpected server response ${response.status}`);
}
this.triggerChange(); // Dangle!
}
async deleteAccess(topic) {
@ -250,6 +261,7 @@ class AccountApi {
} else if (response.status !== 200) {
throw new Error(`Unexpected server response ${response.status}`);
}
this.triggerChange(); // Dangle!
}
async sync() {
@ -285,6 +297,34 @@ class AccountApi {
}
}
async triggerChange() {
const account = await this.get();
if (!account.sync_topic) {
return;
}
const url = topicUrl(config.base_url, account.sync_topic);
console.log(`[AccountApi] Triggering account change to ${url}`);
const user = await userManager.get(config.base_url);
const headers = {
Cache: "no" // We really don't need to store this!
};
try {
const response = await fetch(url, {
method: 'PUT',
body: JSON.stringify({
event: "sync",
source: this.identity
}),
headers: maybeWithAuth(headers, user)
});
if (response.status < 200 || response.status > 299) {
throw new Error(`Unexpected response: ${response.status}`);
}
} catch (e) {
console.log(`[AccountApi] Publishing to sync topic failed`, e);
}
}
startWorker() {
if (this.timer !== null) {
return;

View file

@ -1,13 +1,6 @@
import {
accountPasswordUrl,
accountSettingsUrl,
accountSubscriptionSingleUrl,
accountSubscriptionUrl,
accountTokenUrl,
accountUrl,
fetchLinesIterator, maybeWithAuth,
withBasicAuth,
withBearerAuth,
fetchLinesIterator,
maybeWithAuth,
topicShortUrl,
topicUrl,
topicUrlAuth,

View file

@ -11,7 +11,7 @@ class ConnectionManager {
constructor() {
this.connections = new Map(); // ConnectionId -> Connection (hash, see below)
this.stateListener = null; // Fired when connection state changes
this.notificationListener = null; // Fired when new notifications arrive
this.messageListener = null; // Fired when new notifications arrive
}
registerStateListener(listener) {
@ -22,12 +22,12 @@ class ConnectionManager {
this.stateListener = null;
}
registerNotificationListener(listener) {
this.notificationListener = listener;
registerMessageListener(listener) {
this.messageListener = listener;
}
resetNotificationListener() {
this.notificationListener = null;
resetMessageListener() {
this.messageListener = null;
}
/**
@ -97,9 +97,9 @@ class ConnectionManager {
}
notificationReceived(subscriptionId, notification) {
if (this.notificationListener) {
if (this.messageListener) {
try {
this.notificationListener(subscriptionId, notification);
this.messageListener(subscriptionId, notification);
} catch (e) {
console.error(`[ConnectionManager] Error handling notification for ${subscriptionId}`, e);
}

View file

@ -29,7 +29,8 @@ class SubscriptionManager {
topic: topic,
mutedUntil: 0,
last: null,
remoteId: null
remoteId: null,
internal: false
};
await db.subscriptions.put(subscription);
return subscription;
@ -44,9 +45,11 @@ class SubscriptionManager {
const remote = remoteSubscriptions[i];
const local = await this.add(remote.base_url, remote.topic);
const reservation = remoteReservations?.find(r => remote.base_url === config.base_url && remote.topic === r.topic) || null;
await this.setRemoteId(local.id, remote.id);
await this.setDisplayName(local.id, remote.display_name);
await this.setReservation(local.id, reservation); // May be null!
await this.update(local.id, {
remoteId: remote.id,
displayName: remote.display_name,
reservation: reservation // May be null!
});
remoteIds.push(remote.id);
}
@ -54,7 +57,8 @@ class SubscriptionManager {
const localSubscriptions = await db.subscriptions.toArray();
for (let i = 0; i < localSubscriptions.length; i++) {
const local = localSubscriptions[i];
if (!local.remoteId || !remoteIds.includes(local.remoteId)) {
const remoteExists = local.remoteId && remoteIds.includes(local.remoteId);
if (!local.internal && !remoteExists) {
await this.remove(local.id);
}
}
@ -182,6 +186,10 @@ class SubscriptionManager {
});
}
async update(subscriptionId, params) {
await db.subscriptions.update(subscriptionId, params);
}
async pruneNotifications(thresholdTimestamp) {
await db.notifications
.where("time").below(thresholdTimestamp)

View file

@ -27,6 +27,7 @@ import Login from "./Login";
import Pricing from "./Pricing";
import Signup from "./Signup";
import Account from "./Account";
import accountApi from "../app/AccountApi";
export const AccountContext = createContext(null);
@ -79,6 +80,20 @@ const Layout = () => {
useBackgroundProcesses();
useEffect(() => updateTitle(newNotificationsCount), [newNotificationsCount]);
useEffect(() => {
if (!account || !account.sync_topic) {
return;
}
(async () => {
const subscription = await subscriptionManager.add(config.base_url, account.sync_topic);
if (!subscription.hidden) {
await subscriptionManager.update(subscription.id, {
internal: true
});
}
})();
}, [account]);
return (
<Box sx={{display: 'flex'}}>
<ActionBar

View file

@ -215,9 +215,11 @@ const UpgradeBanner = () => {
};
const SubscriptionList = (props) => {
const sortedSubscriptions = props.subscriptions.sort( (a, b) => {
return (topicUrl(a.baseUrl, a.topic) < topicUrl(b.baseUrl, b.topic)) ? -1 : 1;
});
const sortedSubscriptions = props.subscriptions
.filter(s => !s.internal)
.sort((a, b) => {
return (topicUrl(a.baseUrl, a.topic) < topicUrl(b.baseUrl, b.topic)) ? -1 : 1;
});
return (
<>
{sortedSubscriptions.map(subscription =>

View file

@ -1,5 +1,5 @@
import {useNavigate, useParams} from "react-router-dom";
import {useEffect, useState} from "react";
import {useContext, useEffect, useState} from "react";
import subscriptionManager from "../app/SubscriptionManager";
import {disallowedTopic, expandSecureUrl, topicUrl} from "../app/utils";
import notifier from "../app/Notifier";
@ -10,6 +10,7 @@ import pruner from "../app/Pruner";
import session from "../app/Session";
import {UnauthorizedError} from "../app/AccountApi";
import accountApi from "../app/AccountApi";
import {AccountContext} from "./App";
/**
* Wire connectionManager and subscriptionManager so that subscriptions are updated when the connection
@ -20,6 +21,34 @@ export const useConnectionListeners = (subscriptions, users) => {
const navigate = useNavigate();
useEffect(() => {
const handleMessage = async (subscriptionId, message) => {
const subscription = await subscriptionManager.get(subscriptionId);
if (subscription.internal) {
await handleInternalMessage(message);
} else {
await handleNotification(subscriptionId, message);
}
};
const handleInternalMessage = async (message) => {
console.log(`[ConnectionListener] Received message on sync topic`, message.message);
try {
const data = JSON.parse(message.message);
if (data.event === "sync") {
if (data.source !== accountApi.identity) {
console.log(`[ConnectionListener] Triggering account sync`);
await accountApi.sync();
} else {
console.log(`[ConnectionListener] I triggered the account sync, ignoring message`);
}
} else {
console.log(`[ConnectionListener] Unknown message type. Doing nothing.`);
}
} catch (e) {
console.log(`[ConnectionListener] Error parsing sync topic message`, e);
}
};
const handleNotification = async (subscriptionId, notification) => {
const added = await subscriptionManager.addNotification(subscriptionId, notification);
if (added) {
@ -28,10 +57,10 @@ export const useConnectionListeners = (subscriptions, users) => {
}
};
connectionManager.registerStateListener(subscriptionManager.updateState);
connectionManager.registerNotificationListener(handleNotification);
connectionManager.registerMessageListener(handleMessage);
return () => {
connectionManager.resetStateListener();
connectionManager.resetNotificationListener();
connectionManager.resetMessageListener();
}
},
// We have to disable dep checking for "navigate". This is fine, it never changes.
@ -100,11 +129,9 @@ export const useBackgroundProcesses = () => {
export const useAccountListener = (setAccount) => {
useEffect(() => {
accountApi.registerListener(setAccount);
(async () => {
await accountApi.sync();
})();
accountApi.sync(); // Dangle
return () => {
accountApi.registerListener();
accountApi.resetListener();
}
}, []);
}