From 24161d12bf2b2f9319a7b3ef183eddaabe726235 Mon Sep 17 00:00:00 2001 From: adrinr Date: Tue, 28 Feb 2023 12:47:28 +0100 Subject: [PATCH 1/9] Type locks --- packages/backend-core/src/redis/redlockImpl.ts | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/packages/backend-core/src/redis/redlockImpl.ts b/packages/backend-core/src/redis/redlockImpl.ts index 136d7f5d33..d3a74bb4db 100644 --- a/packages/backend-core/src/redis/redlockImpl.ts +++ b/packages/backend-core/src/redis/redlockImpl.ts @@ -63,7 +63,10 @@ export const newRedlock = async (opts: Options = {}) => { return new Redlock([client], options) } -export const doWithLock = async (opts: LockOptions, task: any) => { +export const doWithLock = async ( + opts: LockOptions, + task: () => Promise +) => { const redlock = await getClient(opts.type) let lock try { From 66217d6b0830cb8b3c2c00a1c9e54d2fed739bd5 Mon Sep 17 00:00:00 2001 From: adrinr Date: Tue, 28 Feb 2023 12:52:43 +0100 Subject: [PATCH 2/9] Return redlock execution info --- packages/backend-core/src/redis/redlockImpl.ts | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/packages/backend-core/src/redis/redlockImpl.ts b/packages/backend-core/src/redis/redlockImpl.ts index d3a74bb4db..f641bbf36e 100644 --- a/packages/backend-core/src/redis/redlockImpl.ts +++ b/packages/backend-core/src/redis/redlockImpl.ts @@ -63,10 +63,22 @@ export const newRedlock = async (opts: Options = {}) => { return new Redlock([client], options) } +type SuccessfulRedlockExecution = { + executed: true + result: T +} +type UnsuccessfulRedlockExecution = { + executed: false +} + +type RedlockExecution = + | SuccessfulRedlockExecution + | UnsuccessfulRedlockExecution + export const doWithLock = async ( opts: LockOptions, task: () => Promise -) => { +): Promise> => { const redlock = await getClient(opts.type) let lock try { @@ -86,7 +98,7 @@ export const doWithLock = async ( // perform locked task // need to await to ensure completion before unlocking const result = await task() - return result + return { executed: true, result } } catch (e: any) { console.warn("lock error") // lock limit exceeded @@ -95,7 +107,7 @@ export const doWithLock = async ( // don't throw for try-once locks, they will always error // due to retry count (0) exceeded console.warn(e) - return + return { executed: false } } else { console.error(e) throw e From c254c565e4abd96ec72121fcfee361a0ad1b7911 Mon Sep 17 00:00:00 2001 From: adrinr Date: Tue, 28 Feb 2023 14:54:34 +0100 Subject: [PATCH 3/9] Use redlock for writethrough --- .../src/cache/tests/writethrough.spec.ts | 62 +++++++++++++++---- .../backend-core/src/cache/writethrough.ts | 60 +++++++++++------- .../backend-core/tests/utilities/index.ts | 2 + .../backend-core/tests/utilities/jestUtils.ts | 9 +++ packages/types/src/sdk/locks.ts | 21 +++++-- 5 files changed, 115 insertions(+), 39 deletions(-) create mode 100644 packages/backend-core/tests/utilities/jestUtils.ts diff --git a/packages/backend-core/src/cache/tests/writethrough.spec.ts b/packages/backend-core/src/cache/tests/writethrough.spec.ts index d346788121..6087cc0db9 100644 --- a/packages/backend-core/src/cache/tests/writethrough.spec.ts +++ b/packages/backend-core/src/cache/tests/writethrough.spec.ts @@ -1,10 +1,13 @@ -import { structures, DBTestConfiguration } from "../../../tests" +import { + structures, + DBTestConfiguration, + expectFunctionWasCalledTimesWith, +} from "../../../tests" import { Writethrough } from "../writethrough" import { getDB } from "../../db" import tk from "timekeeper" -const START_DATE = Date.now() -tk.freeze(START_DATE) +tk.freeze(Date.now()) const DELAY = 5000 @@ -17,34 +20,67 @@ describe("writethrough", () => { const writethrough = new Writethrough(db, DELAY) const writethrough2 = new Writethrough(db2, DELAY) + const docId = structures.uuid() + + beforeEach(() => { + jest.clearAllMocks() + }) + describe("put", () => { - let first: any + let current: any it("should be able to store, will go to DB", async () => { await config.doInTenant(async () => { - const response = await writethrough.put({ _id: "test", value: 1 }) + const response = await writethrough.put({ + _id: docId, + value: 1, + }) const output = await db.get(response.id) - first = output + current = output expect(output.value).toBe(1) }) }) it("second put shouldn't update DB", async () => { await config.doInTenant(async () => { - const response = await writethrough.put({ ...first, value: 2 }) + const response = await writethrough.put({ ...current, value: 2 }) const output = await db.get(response.id) - expect(first._rev).toBe(output._rev) + expect(current._rev).toBe(output._rev) expect(output.value).toBe(1) }) }) it("should put it again after delay period", async () => { await config.doInTenant(async () => { - tk.freeze(START_DATE + DELAY + 1) - const response = await writethrough.put({ ...first, value: 3 }) + tk.freeze(Date.now() + DELAY + 1) + const response = await writethrough.put({ ...current, value: 3 }) const output = await db.get(response.id) - expect(response.rev).not.toBe(first._rev) + expect(response.rev).not.toBe(current._rev) expect(output.value).toBe(3) + + current = output + }) + }) + + it("should handle parallel DB updates ignoring conflicts", async () => { + await config.doInTenant(async () => { + tk.freeze(Date.now() + DELAY + 1) + const responses = await Promise.all([ + writethrough.put({ ...current, value: 4 }), + writethrough.put({ ...current, value: 4 }), + writethrough.put({ ...current, value: 4 }), + ]) + + const newRev = responses.map(x => x.rev).find(x => x !== current._rev) + expect(newRev).toBeDefined() + expect(responses.map(x => x.rev)).toEqual( + expect.arrayContaining([current._rev, current._rev, newRev]) + ) + expectFunctionWasCalledTimesWith( + console.warn, + 2, + "bb-warn: Ignoring redlock conflict in write-through cache" + ) }) }) }) @@ -52,8 +88,8 @@ describe("writethrough", () => { describe("get", () => { it("should be able to retrieve", async () => { await config.doInTenant(async () => { - const response = await writethrough.get("test") - expect(response.value).toBe(3) + const response = await writethrough.get(docId) + expect(response.value).toBe(4) }) }) }) diff --git a/packages/backend-core/src/cache/writethrough.ts b/packages/backend-core/src/cache/writethrough.ts index dc889d5b18..14d1d6e2d0 100644 --- a/packages/backend-core/src/cache/writethrough.ts +++ b/packages/backend-core/src/cache/writethrough.ts @@ -1,7 +1,8 @@ import BaseCache from "./base" import { getWritethroughClient } from "../redis/init" import { logWarn } from "../logging" -import { Database } from "@budibase/types" +import { Database, Document, LockName, LockType } from "@budibase/types" +import * as locks from "../redis/redlockImpl" const DEFAULT_WRITE_RATE_MS = 10000 let CACHE: BaseCache | null = null @@ -29,38 +30,55 @@ function makeCacheItem(doc: any, lastWrite: number | null = null): CacheItem { export async function put( db: Database, - doc: any, + doc: Document, writeRateMs: number = DEFAULT_WRITE_RATE_MS ) { const cache = await getCache() const key = doc._id - let cacheItem: CacheItem | undefined = await cache.get(makeCacheKey(db, key)) + let cacheItem: CacheItem | undefined + if (key) { + cacheItem = await cache.get(makeCacheKey(db, key)) + } const updateDb = !cacheItem || cacheItem.lastWrite < Date.now() - writeRateMs let output = doc if (updateDb) { - const writeDb = async (toWrite: any) => { - // doc should contain the _id and _rev - const response = await db.put(toWrite) - output = { - ...doc, - _id: response.id, - _rev: response.rev, - } - } - try { - await writeDb(doc) - } catch (err: any) { - if (err.status !== 409) { - throw err - } else { - // Swallow 409s but log them - logWarn(`Ignoring conflict in write-through cache`) + const lockResponse = await locks.doWithLock( + { + type: LockType.TRY_ONCE, + name: LockName.PERSIST_WRITETHROUGH(key!), + ttl: 1000, + }, + async () => { + const writeDb = async (toWrite: any) => { + // doc should contain the _id and _rev + const response = await db.put(toWrite) + output = { + ...doc, + _id: response.id, + _rev: response.rev, + } + } + try { + await writeDb(doc) + } catch (err: any) { + if (err.status !== 409) { + throw err + } else { + // Swallow 409s but log them + logWarn(`Ignoring conflict in write-through cache`) + } + } } + ) + if (!lockResponse.executed) { + logWarn(`Ignoring redlock conflict in write-through cache`) } } // if we are updating the DB then need to set the lastWrite to now cacheItem = makeCacheItem(output, updateDb ? null : cacheItem?.lastWrite) - await cache.store(makeCacheKey(db, key), cacheItem) + if (output._id) { + await cache.store(makeCacheKey(db, output._id), cacheItem) + } return { ok: true, id: output._id, rev: output._rev } } diff --git a/packages/backend-core/tests/utilities/index.ts b/packages/backend-core/tests/utilities/index.ts index efe014908b..1c73216d76 100644 --- a/packages/backend-core/tests/utilities/index.ts +++ b/packages/backend-core/tests/utilities/index.ts @@ -4,4 +4,6 @@ export { generator } from "./structures" export * as testEnv from "./testEnv" export * as testContainerUtils from "./testContainerUtils" +export * from "./jestUtils" + export { default as DBTestConfiguration } from "./DBTestConfiguration" diff --git a/packages/backend-core/tests/utilities/jestUtils.ts b/packages/backend-core/tests/utilities/jestUtils.ts new file mode 100644 index 0000000000..d84eac548c --- /dev/null +++ b/packages/backend-core/tests/utilities/jestUtils.ts @@ -0,0 +1,9 @@ +export function expectFunctionWasCalledTimesWith( + jestFunction: any, + times: number, + argument: any +) { + expect( + jestFunction.mock.calls.filter((call: any) => call[0] === argument).length + ).toBe(times) +} diff --git a/packages/types/src/sdk/locks.ts b/packages/types/src/sdk/locks.ts index d868691891..d9c0b606a8 100644 --- a/packages/types/src/sdk/locks.ts +++ b/packages/types/src/sdk/locks.ts @@ -8,11 +8,22 @@ export enum LockType { DELAY_500 = "delay_500", } -export enum LockName { - MIGRATIONS = "migrations", - TRIGGER_QUOTA = "trigger_quota", - SYNC_ACCOUNT_LICENSE = "sync_account_license", - UPDATE_TENANTS_DOC = "update_tenants_doc", +export class LockName { + static readonly MIGRATIONS = new LockName("migrations") + static readonly TRIGGER_QUOTA = new LockName("trigger_quota") + static readonly SYNC_ACCOUNT_LICENSE = new LockName("sync_account_license") + static readonly UPDATE_TENANTS_DOC = new LockName("update_tenants_doc") + static readonly PERSIST_WRITETHROUGH = (key: string) => + new LockName(`persist_writethrough_${key}`) + + constructor(public readonly value: string) {} + + valueOf() { + return this.value + } + toString() { + return this.valueOf() + } } export interface LockOptions { From c1462a7c9ca738a6486fd7472411e38cd3a6fc65 Mon Sep 17 00:00:00 2001 From: adrinr Date: Tue, 28 Feb 2023 14:54:43 +0100 Subject: [PATCH 4/9] Clean code --- packages/backend-core/src/cache/writethrough.ts | 10 +++------- packages/backend-core/src/redis/redlockImpl.ts | 4 ++-- 2 files changed, 5 insertions(+), 9 deletions(-) diff --git a/packages/backend-core/src/cache/writethrough.ts b/packages/backend-core/src/cache/writethrough.ts index 14d1d6e2d0..e895406c2c 100644 --- a/packages/backend-core/src/cache/writethrough.ts +++ b/packages/backend-core/src/cache/writethrough.ts @@ -28,7 +28,7 @@ function makeCacheItem(doc: any, lastWrite: number | null = null): CacheItem { return { doc, lastWrite: lastWrite || Date.now() } } -export async function put( +async function put( db: Database, doc: Document, writeRateMs: number = DEFAULT_WRITE_RATE_MS @@ -82,7 +82,7 @@ export async function put( return { ok: true, id: output._id, rev: output._rev } } -export async function get(db: Database, id: string): Promise { +async function get(db: Database, id: string): Promise { const cache = await getCache() const cacheKey = makeCacheKey(db, id) let cacheItem: CacheItem = await cache.get(cacheKey) @@ -94,11 +94,7 @@ export async function get(db: Database, id: string): Promise { return cacheItem.doc } -export async function remove( - db: Database, - docOrId: any, - rev?: any -): Promise { +async function remove(db: Database, docOrId: any, rev?: any): Promise { const cache = await getCache() if (!docOrId) { throw new Error("No ID/Rev provided.") diff --git a/packages/backend-core/src/redis/redlockImpl.ts b/packages/backend-core/src/redis/redlockImpl.ts index f641bbf36e..e1ffa3e64e 100644 --- a/packages/backend-core/src/redis/redlockImpl.ts +++ b/packages/backend-core/src/redis/redlockImpl.ts @@ -24,7 +24,7 @@ const getClient = async (type: LockType): Promise => { } } -export const OPTIONS = { +const OPTIONS = { TRY_ONCE: { // immediately throws an error if the lock is already held retryCount: 0, @@ -56,7 +56,7 @@ export const OPTIONS = { }, } -export const newRedlock = async (opts: Options = {}) => { +const newRedlock = async (opts: Options = {}) => { let options = { ...OPTIONS.DEFAULT, ...opts } const redisWrapper = await getLockClient() const client = redisWrapper.getClient() From b922fc3b005d03ad454a20bbdc7ef148f6f398e3 Mon Sep 17 00:00:00 2001 From: adrinr Date: Tue, 28 Feb 2023 15:03:18 +0100 Subject: [PATCH 5/9] Improve testing --- packages/backend-core/src/cache/tests/writethrough.spec.ts | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/packages/backend-core/src/cache/tests/writethrough.spec.ts b/packages/backend-core/src/cache/tests/writethrough.spec.ts index 6087cc0db9..f7b7488fc9 100644 --- a/packages/backend-core/src/cache/tests/writethrough.spec.ts +++ b/packages/backend-core/src/cache/tests/writethrough.spec.ts @@ -81,6 +81,10 @@ describe("writethrough", () => { 2, "bb-warn: Ignoring redlock conflict in write-through cache" ) + + const output = await db.get(current._id) + expect(output.value).toBe(4) + expect(output._rev).toBe(newRev) }) }) }) From dded8beaebd79ea0f25c94c92d4e64a80b1c591f Mon Sep 17 00:00:00 2001 From: adrinr Date: Wed, 1 Mar 2023 13:16:57 +0100 Subject: [PATCH 6/9] Fix writethrough falling behind --- .../src/cache/tests/writethrough.spec.ts | 28 +++++++++++++++++++ .../backend-core/src/cache/writethrough.ts | 2 +- .../tests/utilities/structures/db.ts | 7 +++++ 3 files changed, 36 insertions(+), 1 deletion(-) diff --git a/packages/backend-core/src/cache/tests/writethrough.spec.ts b/packages/backend-core/src/cache/tests/writethrough.spec.ts index f7b7488fc9..a34f05e881 100644 --- a/packages/backend-core/src/cache/tests/writethrough.spec.ts +++ b/packages/backend-core/src/cache/tests/writethrough.spec.ts @@ -85,6 +85,34 @@ describe("writethrough", () => { const output = await db.get(current._id) expect(output.value).toBe(4) expect(output._rev).toBe(newRev) + + current = output + }) + }) + + it("should handle updates with documents falling behind", async () => { + await config.doInTenant(async () => { + tk.freeze(Date.now() + DELAY + 1) + + const id = structures.uuid() + await writethrough.put({ _id: id, value: 1 }) + const doc = await writethrough.get(id) + + // Updating document + tk.freeze(Date.now() + DELAY + 1) + await writethrough.put({ ...doc, value: 2 }) + + // Update with the old rev value + tk.freeze(Date.now() + DELAY + 1) + const res = await writethrough.put({ + ...doc, + value: 3, + }) + expect(res.ok).toBe(true) + + const output = await db.get(id) + expect(output.value).toBe(3) + expect(output._rev).toBe(res.rev) }) }) }) diff --git a/packages/backend-core/src/cache/writethrough.ts b/packages/backend-core/src/cache/writethrough.ts index e895406c2c..43f3bfb3aa 100644 --- a/packages/backend-core/src/cache/writethrough.ts +++ b/packages/backend-core/src/cache/writethrough.ts @@ -51,7 +51,7 @@ async function put( async () => { const writeDb = async (toWrite: any) => { // doc should contain the _id and _rev - const response = await db.put(toWrite) + const response = await db.put(toWrite, { force: true }) output = { ...doc, _id: response.id, diff --git a/packages/backend-core/tests/utilities/structures/db.ts b/packages/backend-core/tests/utilities/structures/db.ts index e25b707cb9..f4a677e777 100644 --- a/packages/backend-core/tests/utilities/structures/db.ts +++ b/packages/backend-core/tests/utilities/structures/db.ts @@ -1,5 +1,12 @@ +import { structures } from ".." import { newid } from "../../../src/newid" export function id() { return `db_${newid()}` } + +export function rev() { + return `${structures.generator.character({ + numeric: true, + })}-${structures.uuid().replace(/-/, "")}` +} From 49493d80be50bf5214e663a2bbf806ba1221a768 Mon Sep 17 00:00:00 2001 From: adrinr Date: Tue, 7 Mar 2023 12:45:20 +0100 Subject: [PATCH 7/9] Use name suffix instead of complex key object --- .../backend-core/src/cache/writethrough.ts | 3 ++- packages/types/src/sdk/locks.ts | 22 +++++-------------- 2 files changed, 8 insertions(+), 17 deletions(-) diff --git a/packages/backend-core/src/cache/writethrough.ts b/packages/backend-core/src/cache/writethrough.ts index 43f3bfb3aa..af5a6e7721 100644 --- a/packages/backend-core/src/cache/writethrough.ts +++ b/packages/backend-core/src/cache/writethrough.ts @@ -45,7 +45,8 @@ async function put( const lockResponse = await locks.doWithLock( { type: LockType.TRY_ONCE, - name: LockName.PERSIST_WRITETHROUGH(key!), + name: LockName.PERSIST_WRITETHROUGH, + nameSuffix: key, ttl: 1000, }, async () => { diff --git a/packages/types/src/sdk/locks.ts b/packages/types/src/sdk/locks.ts index d9c0b606a8..a878fe6a4f 100644 --- a/packages/types/src/sdk/locks.ts +++ b/packages/types/src/sdk/locks.ts @@ -8,22 +8,12 @@ export enum LockType { DELAY_500 = "delay_500", } -export class LockName { - static readonly MIGRATIONS = new LockName("migrations") - static readonly TRIGGER_QUOTA = new LockName("trigger_quota") - static readonly SYNC_ACCOUNT_LICENSE = new LockName("sync_account_license") - static readonly UPDATE_TENANTS_DOC = new LockName("update_tenants_doc") - static readonly PERSIST_WRITETHROUGH = (key: string) => - new LockName(`persist_writethrough_${key}`) - - constructor(public readonly value: string) {} - - valueOf() { - return this.value - } - toString() { - return this.valueOf() - } +export enum LockName { + MIGRATIONS = "migrations", + TRIGGER_QUOTA = "trigger_quota", + SYNC_ACCOUNT_LICENSE = "sync_account_license", + UPDATE_TENANTS_DOC = "update_tenants_doc", + PERSIST_WRITETHROUGH = "persist_writethrough", } export interface LockOptions { From 84d450a931fcc55b49573592fc89425b3291bd75 Mon Sep 17 00:00:00 2001 From: adrinr Date: Tue, 7 Mar 2023 12:47:27 +0100 Subject: [PATCH 8/9] Renames --- packages/backend-core/src/cache/writethrough.ts | 2 +- packages/backend-core/src/redis/redlockImpl.ts | 4 ++-- packages/types/src/sdk/locks.ts | 4 ++-- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/packages/backend-core/src/cache/writethrough.ts b/packages/backend-core/src/cache/writethrough.ts index af5a6e7721..a3b1ecc08d 100644 --- a/packages/backend-core/src/cache/writethrough.ts +++ b/packages/backend-core/src/cache/writethrough.ts @@ -46,7 +46,7 @@ async function put( { type: LockType.TRY_ONCE, name: LockName.PERSIST_WRITETHROUGH, - nameSuffix: key, + resource: key, ttl: 1000, }, async () => { diff --git a/packages/backend-core/src/redis/redlockImpl.ts b/packages/backend-core/src/redis/redlockImpl.ts index e1ffa3e64e..5e71488689 100644 --- a/packages/backend-core/src/redis/redlockImpl.ts +++ b/packages/backend-core/src/redis/redlockImpl.ts @@ -88,8 +88,8 @@ export const doWithLock = async ( let name: string = `lock:${prefix}_${opts.name}` // add additional unique name if required - if (opts.nameSuffix) { - name = name + `_${opts.nameSuffix}` + if (opts.resource) { + name = name + `_${opts.resource}` } // create the lock diff --git a/packages/types/src/sdk/locks.ts b/packages/types/src/sdk/locks.ts index a878fe6a4f..5e8fff75ea 100644 --- a/packages/types/src/sdk/locks.ts +++ b/packages/types/src/sdk/locks.ts @@ -30,9 +30,9 @@ export interface LockOptions { */ ttl: number /** - * The suffix to add to the lock name for additional uniqueness + * The resource to the lock name */ - nameSuffix?: string + resource?: string /** * This is a system-wide lock - don't use tenancy in lock key */ From 77cbe9bc8dc327feae8705083104827a9017ed8e Mon Sep 17 00:00:00 2001 From: adrinr Date: Tue, 7 Mar 2023 13:30:23 +0100 Subject: [PATCH 9/9] Update docs --- packages/types/src/sdk/locks.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/types/src/sdk/locks.ts b/packages/types/src/sdk/locks.ts index 5e8fff75ea..3f6f03b811 100644 --- a/packages/types/src/sdk/locks.ts +++ b/packages/types/src/sdk/locks.ts @@ -30,7 +30,7 @@ export interface LockOptions { */ ttl: number /** - * The resource to the lock name + * The individual resource to lock. This is useful for locking around very specific identifiers, e.g. a document that is prone to conflicts */ resource?: string /**