diff --git a/.eslintrc.json b/.eslintrc.json index 3de9d13046..ae9512152f 100644 --- a/.eslintrc.json +++ b/.eslintrc.json @@ -44,7 +44,8 @@ "no-undef": "off", "no-prototype-builtins": "off", "local-rules/no-budibase-imports": "error", - "local-rules/no-test-com": "error" + "local-rules/no-test-com": "error", + "local-rules/email-domain-example-com": "error" } }, { diff --git a/eslint-local-rules/index.js b/eslint-local-rules/index.js index 71bb5068da..177b0a129c 100644 --- a/eslint-local-rules/index.js +++ b/eslint-local-rules/index.js @@ -51,4 +51,41 @@ module.exports = { } }, }, + "email-domain-example-com": { + meta: { + type: "problem", + docs: { + description: + "enforce using the example.com domain for generator.email calls", + category: "Possible Errors", + recommended: false, + }, + fixable: "code", + schema: [], + }, + create: function (context) { + return { + CallExpression(node) { + if ( + node.callee.type === "MemberExpression" && + node.callee.object.name === "generator" && + node.callee.property.name === "email" && + node.arguments.length === 0 + ) { + context.report({ + node, + message: + "Prefer using generator.email with the domain \"{ domain: 'example.com' }\".", + fix: function (fixer) { + return fixer.replaceText( + node, + 'generator.email({ domain: "example.com" })' + ) + }, + }) + } + }, + } + }, + }, } diff --git a/lerna.json b/lerna.json index a77a16a24e..a50794e91e 100644 --- a/lerna.json +++ b/lerna.json @@ -1,5 +1,5 @@ { - "version": "2.21.3", + "version": "2.21.4", "npmClient": "yarn", "packages": [ "packages/*", diff --git a/packages/backend-core/src/cache/base/index.ts b/packages/backend-core/src/cache/base/index.ts index 264984c6a5..74da4fe0d2 100644 --- a/packages/backend-core/src/cache/base/index.ts +++ b/packages/backend-core/src/cache/base/index.ts @@ -23,6 +23,18 @@ export default class BaseCache { return client.keys(pattern) } + async exists(key: string, opts = { useTenancy: true }) { + key = opts.useTenancy ? generateTenantKey(key) : key + const client = await this.getClient() + return client.exists(key) + } + + async scan(key: string, opts = { useTenancy: true }) { + key = opts.useTenancy ? generateTenantKey(key) : key + const client = await this.getClient() + return client.scan(key) + } + /** * Read only from the cache. */ @@ -32,6 +44,15 @@ export default class BaseCache { return client.get(key) } + /** + * Read only from the cache. + */ + async bulkGet(keys: string[], opts = { useTenancy: true }) { + keys = opts.useTenancy ? keys.map(key => generateTenantKey(key)) : keys + const client = await this.getClient() + return client.bulkGet(keys) + } + /** * Write to the cache. */ @@ -46,6 +67,25 @@ export default class BaseCache { await client.store(key, value, ttl) } + /** + * Bulk write to the cache. + */ + async bulkStore( + data: Record, + ttl: number | null = null, + opts = { useTenancy: true } + ) { + if (opts.useTenancy) { + data = Object.entries(data).reduce((acc, [key, value]) => { + acc[generateTenantKey(key)] = value + return acc + }, {} as Record) + } + + const client = await this.getClient() + await client.bulkStore(data, ttl) + } + /** * Remove from cache. */ @@ -55,15 +95,24 @@ export default class BaseCache { return client.delete(key) } + /** + * Remove from cache. + */ + async bulkDelete(keys: string[], opts = { useTenancy: true }) { + keys = opts.useTenancy ? keys.map(key => generateTenantKey(key)) : keys + const client = await this.getClient() + return client.bulkDelete(keys) + } + /** * Read from the cache. Write to the cache if not exists. */ - async withCache( + async withCache( key: string, - ttl: number, - fetchFn: any, + ttl: number | null = null, + fetchFn: () => Promise | T, opts = { useTenancy: true } - ) { + ): Promise { const cachedValue = await this.get(key, opts) if (cachedValue) { return cachedValue @@ -89,4 +138,13 @@ export default class BaseCache { throw err } } + + /** + * Delete the entry if the provided value matches the stored one. + */ + async deleteIfValue(key: string, value: any, opts = { useTenancy: true }) { + key = opts.useTenancy ? generateTenantKey(key) : key + const client = await this.getClient() + await client.deleteIfValue(key, value) + } } diff --git a/packages/backend-core/src/cache/docWritethrough.ts b/packages/backend-core/src/cache/docWritethrough.ts new file mode 100644 index 0000000000..51018b2317 --- /dev/null +++ b/packages/backend-core/src/cache/docWritethrough.ts @@ -0,0 +1,86 @@ +import { AnyDocument, Database } from "@budibase/types" + +import { JobQueue, createQueue } from "../queue" +import * as dbUtils from "../db" + +interface ProcessDocMessage { + dbName: string + docId: string + data: Record +} + +const PERSIST_MAX_ATTEMPTS = 100 + +export const docWritethroughProcessorQueue = createQueue( + JobQueue.DOC_WRITETHROUGH_QUEUE, + { + jobOptions: { + attempts: PERSIST_MAX_ATTEMPTS, + }, + } +) + +class DocWritethroughProcessor { + init() { + docWritethroughProcessorQueue.process(async message => { + try { + await this.persistToDb(message.data) + } catch (err: any) { + if (err.status === 409) { + // If we get a 409, it means that another job updated it meanwhile. We want to retry it to persist it again. + throw new Error( + `Conflict persisting message ${message.id}. Attempt ${message.attemptsMade}` + ) + } + + throw err + } + }) + return this + } + + private async persistToDb({ + dbName, + docId, + data, + }: { + dbName: string + docId: string + data: Record + }) { + const db = dbUtils.getDB(dbName) + let doc: AnyDocument | undefined + try { + doc = await db.get(docId) + } catch { + doc = { _id: docId } + } + + doc = { ...doc, ...data } + await db.put(doc) + } +} + +export const processor = new DocWritethroughProcessor().init() + +export class DocWritethrough { + private db: Database + private _docId: string + + constructor(db: Database, docId: string) { + this.db = db + this._docId = docId + } + + get docId() { + return this._docId + } + + async patch(data: Record) { + await docWritethroughProcessorQueue.add({ + dbName: this.db.name, + docId: this.docId, + data, + }) + } +} diff --git a/packages/backend-core/src/cache/generic.ts b/packages/backend-core/src/cache/generic.ts index 3ac323a8d4..2d6d8b9472 100644 --- a/packages/backend-core/src/cache/generic.ts +++ b/packages/backend-core/src/cache/generic.ts @@ -26,7 +26,8 @@ export const store = (...args: Parameters) => GENERIC.store(...args) export const destroy = (...args: Parameters) => GENERIC.delete(...args) -export const withCache = (...args: Parameters) => - GENERIC.withCache(...args) +export const withCache = ( + ...args: Parameters> +) => GENERIC.withCache(...args) export const bustCache = (...args: Parameters) => GENERIC.bustCache(...args) diff --git a/packages/backend-core/src/cache/tests/docWritethrough.spec.ts b/packages/backend-core/src/cache/tests/docWritethrough.spec.ts new file mode 100644 index 0000000000..d90c83afd3 --- /dev/null +++ b/packages/backend-core/src/cache/tests/docWritethrough.spec.ts @@ -0,0 +1,288 @@ +import tk from "timekeeper" + +import _ from "lodash" +import { DBTestConfiguration, generator, structures } from "../../../tests" +import { getDB } from "../../db" + +import { + DocWritethrough, + docWritethroughProcessorQueue, +} from "../docWritethrough" + +import InMemoryQueue from "../../queue/inMemoryQueue" + +const initialTime = Date.now() + +async function waitForQueueCompletion() { + const queue: InMemoryQueue = docWritethroughProcessorQueue as never + await queue.waitForCompletion() +} + +describe("docWritethrough", () => { + const config = new DBTestConfiguration() + + const db = getDB(structures.db.id()) + let documentId: string + let docWritethrough: DocWritethrough + + describe("patch", () => { + function generatePatchObject(fieldCount: number) { + const keys = generator.unique(() => generator.word(), fieldCount) + return keys.reduce((acc, c) => { + acc[c] = generator.word() + return acc + }, {} as Record) + } + + beforeEach(async () => { + jest.clearAllMocks() + documentId = structures.uuid() + docWritethrough = new DocWritethrough(db, documentId) + }) + + it("patching will not persist until the messages are persisted", async () => { + await config.doInTenant(async () => { + await docWritethrough.patch(generatePatchObject(2)) + await docWritethrough.patch(generatePatchObject(2)) + + expect(await db.exists(documentId)).toBe(false) + }) + }) + + it("patching will persist when the messages are persisted", async () => { + await config.doInTenant(async () => { + const patch1 = generatePatchObject(2) + const patch2 = generatePatchObject(2) + await docWritethrough.patch(patch1) + await docWritethrough.patch(patch2) + + await waitForQueueCompletion() + + // This will not be persisted + const patch3 = generatePatchObject(3) + await docWritethrough.patch(patch3) + + expect(await db.get(documentId)).toEqual({ + _id: documentId, + ...patch1, + ...patch2, + _rev: expect.stringMatching(/2-.+/), + createdAt: new Date(initialTime).toISOString(), + updatedAt: new Date(initialTime).toISOString(), + }) + }) + }) + + it("patching will persist keeping the previous data", async () => { + await config.doInTenant(async () => { + const patch1 = generatePatchObject(2) + const patch2 = generatePatchObject(2) + await docWritethrough.patch(patch1) + await docWritethrough.patch(patch2) + + await waitForQueueCompletion() + + const patch3 = generatePatchObject(3) + await docWritethrough.patch(patch3) + + await waitForQueueCompletion() + + expect(await db.get(documentId)).toEqual( + expect.objectContaining({ + _id: documentId, + ...patch1, + ...patch2, + ...patch3, + }) + ) + }) + }) + + it("date audit fields are set correctly when persisting", async () => { + await config.doInTenant(async () => { + const patch1 = generatePatchObject(2) + const patch2 = generatePatchObject(2) + await docWritethrough.patch(patch1) + const date1 = new Date() + await waitForQueueCompletion() + await docWritethrough.patch(patch2) + + tk.travel(Date.now() + 100) + const date2 = new Date() + await waitForQueueCompletion() + + expect(date1).not.toEqual(date2) + expect(await db.get(documentId)).toEqual( + expect.objectContaining({ + createdAt: date1.toISOString(), + updatedAt: date2.toISOString(), + }) + ) + }) + }) + + it("concurrent patches will override keys", async () => { + await config.doInTenant(async () => { + const patch1 = generatePatchObject(2) + await docWritethrough.patch(patch1) + await waitForQueueCompletion() + const patch2 = generatePatchObject(1) + await docWritethrough.patch(patch2) + + const keyToOverride = _.sample(Object.keys(patch1))! + expect(await db.get(documentId)).toEqual( + expect.objectContaining({ + [keyToOverride]: patch1[keyToOverride], + }) + ) + + await waitForQueueCompletion() + + const patch3 = { + ...generatePatchObject(3), + [keyToOverride]: generator.word(), + } + await docWritethrough.patch(patch3) + await waitForQueueCompletion() + + expect(await db.get(documentId)).toEqual( + expect.objectContaining({ + ...patch1, + ...patch2, + ...patch3, + }) + ) + }) + }) + + it("concurrent patches to different docWritethrough will not pollute each other", async () => { + await config.doInTenant(async () => { + const secondDocWritethrough = new DocWritethrough( + db, + structures.db.id() + ) + + const doc1Patch = generatePatchObject(2) + await docWritethrough.patch(doc1Patch) + const doc2Patch = generatePatchObject(1) + await secondDocWritethrough.patch(doc2Patch) + + await waitForQueueCompletion() + + const doc1Patch2 = generatePatchObject(3) + await docWritethrough.patch(doc1Patch2) + const doc2Patch2 = generatePatchObject(3) + await secondDocWritethrough.patch(doc2Patch2) + await waitForQueueCompletion() + + expect(await db.get(docWritethrough.docId)).toEqual( + expect.objectContaining({ + ...doc1Patch, + ...doc1Patch2, + }) + ) + + expect(await db.get(secondDocWritethrough.docId)).toEqual( + expect.objectContaining({ + ...doc2Patch, + ...doc2Patch2, + }) + ) + }) + }) + + it("cached values are persisted only once", async () => { + await config.doInTenant(async () => { + const initialPatch = generatePatchObject(5) + + await docWritethrough.patch(initialPatch) + await waitForQueueCompletion() + + expect(await db.get(documentId)).toEqual( + expect.objectContaining(initialPatch) + ) + + await db.remove(await db.get(documentId)) + + await waitForQueueCompletion() + const extraPatch = generatePatchObject(5) + await docWritethrough.patch(extraPatch) + await waitForQueueCompletion() + + expect(await db.get(documentId)).toEqual( + expect.objectContaining(extraPatch) + ) + expect(await db.get(documentId)).not.toEqual( + expect.objectContaining(initialPatch) + ) + }) + }) + + it("concurrent calls will not cause conflicts", async () => { + async function parallelPatch(count: number) { + const patches = Array.from({ length: count }).map(() => + generatePatchObject(1) + ) + await Promise.all(patches.map(p => docWritethrough.patch(p))) + + return patches.reduce((acc, c) => { + acc = { ...acc, ...c } + return acc + }, {}) + } + const queueMessageSpy = jest.spyOn(docWritethroughProcessorQueue, "add") + + await config.doInTenant(async () => { + let patches = await parallelPatch(5) + expect(queueMessageSpy).toBeCalledTimes(5) + + await waitForQueueCompletion() + expect(await db.get(documentId)).toEqual( + expect.objectContaining(patches) + ) + + patches = { ...patches, ...(await parallelPatch(40)) } + expect(queueMessageSpy).toBeCalledTimes(45) + + await waitForQueueCompletion() + expect(await db.get(documentId)).toEqual( + expect.objectContaining(patches) + ) + + patches = { ...patches, ...(await parallelPatch(10)) } + expect(queueMessageSpy).toBeCalledTimes(55) + + await waitForQueueCompletion() + expect(await db.get(documentId)).toEqual( + expect.objectContaining(patches) + ) + }) + }) + + // This is not yet supported + it.skip("patches will execute in order", async () => { + let incrementalValue = 0 + const keyToOverride = generator.word() + async function incrementalPatches(count: number) { + for (let i = 0; i < count; i++) { + await docWritethrough.patch({ [keyToOverride]: incrementalValue++ }) + } + } + + await config.doInTenant(async () => { + await incrementalPatches(5) + + await waitForQueueCompletion() + expect(await db.get(documentId)).toEqual( + expect.objectContaining({ [keyToOverride]: 5 }) + ) + + await incrementalPatches(40) + await waitForQueueCompletion() + expect(await db.get(documentId)).toEqual( + expect.objectContaining({ [keyToOverride]: 45 }) + ) + }) + }) + }) +}) diff --git a/packages/backend-core/src/db/couch/DatabaseImpl.ts b/packages/backend-core/src/db/couch/DatabaseImpl.ts index 416ed4faa2..c1347f4f2b 100644 --- a/packages/backend-core/src/db/couch/DatabaseImpl.ts +++ b/packages/backend-core/src/db/couch/DatabaseImpl.ts @@ -71,7 +71,15 @@ export class DatabaseImpl implements Database { DatabaseImpl.nano = buildNano(couchInfo) } - async exists() { + exists(docId?: string) { + if (docId === undefined) { + return this.dbExists() + } + + return this.docExists(docId) + } + + private async dbExists() { const response = await directCouchUrlCall({ url: `${this.couchInfo.url}/${this.name}`, method: "HEAD", @@ -80,6 +88,15 @@ export class DatabaseImpl implements Database { return response.status === 200 } + private async docExists(id: string): Promise { + try { + await this.performCall(db => () => db.head(id)) + return true + } catch { + return false + } + } + private nano() { return this.instanceNano || DatabaseImpl.nano } diff --git a/packages/backend-core/src/db/instrumentation.ts b/packages/backend-core/src/db/instrumentation.ts index 9aa7403900..880f0a3c72 100644 --- a/packages/backend-core/src/db/instrumentation.ts +++ b/packages/backend-core/src/db/instrumentation.ts @@ -24,9 +24,12 @@ export class DDInstrumentedDatabase implements Database { return this.db.name } - exists(): Promise { + exists(docId?: string): Promise { return tracer.trace("db.exists", span => { - span?.addTags({ db_name: this.name }) + span?.addTags({ db_name: this.name, doc_id: docId }) + if (docId) { + return this.db.exists(docId) + } return this.db.exists() }) } diff --git a/packages/backend-core/src/db/tests/DatabaseImpl.spec.ts b/packages/backend-core/src/db/tests/DatabaseImpl.spec.ts new file mode 100644 index 0000000000..586f13f417 --- /dev/null +++ b/packages/backend-core/src/db/tests/DatabaseImpl.spec.ts @@ -0,0 +1,55 @@ +import _ from "lodash" +import { AnyDocument } from "@budibase/types" +import { generator } from "../../../tests" +import { DatabaseImpl } from "../couch" +import { newid } from "../../utils" + +describe("DatabaseImpl", () => { + const database = new DatabaseImpl(generator.word()) + const documents: AnyDocument[] = [] + + beforeAll(async () => { + const docsToCreate = Array.from({ length: 10 }).map(() => ({ + _id: newid(), + })) + const createdDocs = await database.bulkDocs(docsToCreate) + + documents.push(...createdDocs.map((x: any) => ({ _id: x.id, _rev: x.rev }))) + }) + + describe("document exists", () => { + it("can check existing docs by id", async () => { + const existingDoc = _.sample(documents) + const result = await database.exists(existingDoc!._id!) + + expect(result).toBe(true) + }) + + it("can check non existing docs by id", async () => { + const result = await database.exists(newid()) + + expect(result).toBe(false) + }) + + it("can check an existing doc by id multiple times", async () => { + const existingDoc = _.sample(documents) + const id = existingDoc!._id! + + const results = [] + results.push(await database.exists(id)) + results.push(await database.exists(id)) + results.push(await database.exists(id)) + + expect(results).toEqual([true, true, true]) + }) + + it("returns false after the doc is deleted", async () => { + const existingDoc = _.sample(documents) + const id = existingDoc!._id! + expect(await database.exists(id)).toBe(true) + + await database.remove(existingDoc!) + expect(await database.exists(id)).toBe(false) + }) + }) +}) diff --git a/packages/backend-core/src/queue/constants.ts b/packages/backend-core/src/queue/constants.ts index eb4f21aced..a095c6c769 100644 --- a/packages/backend-core/src/queue/constants.ts +++ b/packages/backend-core/src/queue/constants.ts @@ -4,4 +4,5 @@ export enum JobQueue { AUDIT_LOG = "auditLogQueue", SYSTEM_EVENT_QUEUE = "systemEventQueue", APP_MIGRATION = "appMigration", + DOC_WRITETHROUGH_QUEUE = "docWritethroughQueue", } diff --git a/packages/backend-core/src/queue/inMemoryQueue.ts b/packages/backend-core/src/queue/inMemoryQueue.ts index c05bbffbe9..afb5592562 100644 --- a/packages/backend-core/src/queue/inMemoryQueue.ts +++ b/packages/backend-core/src/queue/inMemoryQueue.ts @@ -1,5 +1,14 @@ import events from "events" -import { timeout } from "../utils" +import { newid, timeout } from "../utils" +import { Queue, QueueOptions, JobOptions } from "./queue" + +interface JobMessage { + id: string + timestamp: number + queue: string + data: any + opts?: JobOptions +} /** * Bull works with a Job wrapper around all messages that contains a lot more information about @@ -10,12 +19,13 @@ import { timeout } from "../utils" * @returns A new job which can now be put onto the queue, this is mostly an * internal structure so that an in memory queue can be easily swapped for a Bull queue. */ -function newJob(queue: string, message: any) { +function newJob(queue: string, message: any, opts?: JobOptions): JobMessage { return { + id: newid(), timestamp: Date.now(), queue: queue, data: message, - opts: {}, + opts, } } @@ -24,26 +34,29 @@ function newJob(queue: string, message: any) { * It is relatively simple, using an event emitter internally to register when messages are available * to the consumers - in can support many inputs and many consumers. */ -class InMemoryQueue { +class InMemoryQueue implements Partial { _name: string - _opts?: any - _messages: any[] + _opts?: QueueOptions + _messages: JobMessage[] + _queuedJobIds: Set _emitter: EventEmitter _runCount: number _addCount: number + /** * The constructor the queue, exactly the same as that of Bulls. * @param name The name of the queue which is being configured. * @param opts This is not used by the in memory queue as there is no real use * case when in memory, but is the same API as Bull */ - constructor(name: string, opts?: any) { + constructor(name: string, opts?: QueueOptions) { this._name = name this._opts = opts this._messages = [] this._emitter = new events.EventEmitter() this._runCount = 0 this._addCount = 0 + this._queuedJobIds = new Set() } /** @@ -55,22 +68,42 @@ class InMemoryQueue { * note this is incredibly limited compared to Bull as in reality the Job would contain * a lot more information about the queue and current status of Bull cluster. */ - process(func: any) { + async process(func: any) { this._emitter.on("message", async () => { if (this._messages.length <= 0) { return } let msg = this._messages.shift() + let resp = func(msg) + + async function retryFunc(fnc: any) { + try { + await fnc + } catch (e: any) { + await new Promise(r => setTimeout(() => r(), 50)) + + await retryFunc(func(msg)) + } + } + if (resp.then != null) { - await resp + try { + await retryFunc(resp) + } catch (e: any) { + console.error(e) + } } this._runCount++ + const jobId = msg?.opts?.jobId?.toString() + if (jobId && msg?.opts?.removeOnComplete) { + this._queuedJobIds.delete(jobId) + } }) } async isReady() { - return true + return this as any } // simply puts a message to the queue and emits to the queue for processing @@ -83,27 +116,45 @@ class InMemoryQueue { * @param repeat serves no purpose for the import queue. */ // eslint-disable-next-line no-unused-vars - add(msg: any, repeat: boolean) { - if (typeof msg !== "object") { + async add(data: any, opts?: JobOptions) { + const jobId = opts?.jobId?.toString() + if (jobId && this._queuedJobIds.has(jobId)) { + console.log(`Ignoring already queued job ${jobId}`) + return + } + + if (typeof data !== "object") { throw "Queue only supports carrying JSON." } - this._messages.push(newJob(this._name, msg)) - this._addCount++ - this._emitter.emit("message") + if (jobId) { + this._queuedJobIds.add(jobId) + } + + const pushMessage = () => { + this._messages.push(newJob(this._name, data, opts)) + this._addCount++ + this._emitter.emit("message") + } + + const delay = opts?.delay + if (delay) { + setTimeout(pushMessage, delay) + } else { + pushMessage() + } + return {} as any } /** * replicating the close function from bull, which waits for jobs to finish. */ - async close() { - return [] - } + async close() {} /** * This removes a cron which has been implemented, this is part of Bull API. * @param cronJobId The cron which is to be removed. */ - removeRepeatableByKey(cronJobId: string) { + async removeRepeatableByKey(cronJobId: string) { // TODO: implement for testing console.log(cronJobId) } @@ -111,12 +162,12 @@ class InMemoryQueue { /** * Implemented for tests */ - getRepeatableJobs() { + async getRepeatableJobs() { return [] } // eslint-disable-next-line no-unused-vars - removeJobs(pattern: string) { + async removeJobs(pattern: string) { // no-op } @@ -128,18 +179,22 @@ class InMemoryQueue { } async getJob() { - return {} + return null } on() { // do nothing - return this + return this as any } async waitForCompletion() { do { await timeout(50) - } while (this._addCount < this._runCount) + } while (this.hasRunningJobs()) + } + + hasRunningJobs() { + return this._addCount > this._runCount } } diff --git a/packages/backend-core/src/queue/listeners.ts b/packages/backend-core/src/queue/listeners.ts index 063a01bd2f..14dce5fe8d 100644 --- a/packages/backend-core/src/queue/listeners.ts +++ b/packages/backend-core/src/queue/listeners.ts @@ -88,6 +88,7 @@ enum QueueEventType { AUDIT_LOG_EVENT = "audit-log-event", SYSTEM_EVENT = "system-event", APP_MIGRATION = "app-migration", + DOC_WRITETHROUGH = "doc-writethrough", } const EventTypeMap: { [key in JobQueue]: QueueEventType } = { @@ -96,6 +97,7 @@ const EventTypeMap: { [key in JobQueue]: QueueEventType } = { [JobQueue.AUDIT_LOG]: QueueEventType.AUDIT_LOG_EVENT, [JobQueue.SYSTEM_EVENT_QUEUE]: QueueEventType.SYSTEM_EVENT, [JobQueue.APP_MIGRATION]: QueueEventType.APP_MIGRATION, + [JobQueue.DOC_WRITETHROUGH_QUEUE]: QueueEventType.DOC_WRITETHROUGH, } function logging(queue: Queue, jobQueue: JobQueue) { diff --git a/packages/backend-core/src/queue/queue.ts b/packages/backend-core/src/queue/queue.ts index 0bcb25a35f..1838eed92f 100644 --- a/packages/backend-core/src/queue/queue.ts +++ b/packages/backend-core/src/queue/queue.ts @@ -7,6 +7,8 @@ import { addListeners, StalledFn } from "./listeners" import { Duration } from "../utils" import * as timers from "../timers" +export { QueueOptions, Queue, JobOptions } from "bull" + // the queue lock is held for 5 minutes const QUEUE_LOCK_MS = Duration.fromMinutes(5).toMs() // queue lock is refreshed every 30 seconds diff --git a/packages/backend-core/src/redis/init.ts b/packages/backend-core/src/redis/init.ts index f3bcee3209..7920dfed2d 100644 --- a/packages/backend-core/src/redis/init.ts +++ b/packages/backend-core/src/redis/init.ts @@ -9,7 +9,8 @@ let userClient: Client, lockClient: Client, socketClient: Client, inviteClient: Client, - passwordResetClient: Client + passwordResetClient: Client, + docWritethroughClient: Client export async function init() { userClient = await new Client(utils.Databases.USER_CACHE).init() @@ -24,6 +25,9 @@ export async function init() { utils.Databases.SOCKET_IO, utils.SelectableDatabase.SOCKET_IO ).init() + docWritethroughClient = await new Client( + utils.Databases.DOC_WRITE_THROUGH + ).init() } export async function shutdown() { @@ -104,3 +108,10 @@ export async function getPasswordResetClient() { } return passwordResetClient } + +export async function getDocWritethroughClient() { + if (!writethroughClient) { + await init() + } + return writethroughClient +} diff --git a/packages/backend-core/src/redis/redis.ts b/packages/backend-core/src/redis/redis.ts index beb59ee3fa..79f75421d3 100644 --- a/packages/backend-core/src/redis/redis.ts +++ b/packages/backend-core/src/redis/redis.ts @@ -320,6 +320,11 @@ class RedisWrapper { await this.getClient().del(addDbPrefix(db, key)) } + async bulkDelete(keys: string[]) { + const db = this._db + await this.getClient().del(keys.map(key => addDbPrefix(db, key))) + } + async clear() { let items = await this.scan() await Promise.all(items.map((obj: any) => this.delete(obj.key))) diff --git a/packages/backend-core/src/redis/utils.ts b/packages/backend-core/src/redis/utils.ts index 7b93458b52..7f84f11467 100644 --- a/packages/backend-core/src/redis/utils.ts +++ b/packages/backend-core/src/redis/utils.ts @@ -30,6 +30,7 @@ export enum Databases { LOCKS = "locks", SOCKET_IO = "socket_io", BPM_EVENTS = "bpmEvents", + DOC_WRITE_THROUGH = "docWriteThrough", } /** diff --git a/packages/backend-core/tests/core/utilities/structures/accounts.ts b/packages/backend-core/tests/core/utilities/structures/accounts.ts index 515f94db1e..7dcc2de116 100644 --- a/packages/backend-core/tests/core/utilities/structures/accounts.ts +++ b/packages/backend-core/tests/core/utilities/structures/accounts.ts @@ -18,7 +18,7 @@ export const account = (partial: Partial = {}): Account => { return { accountId: uuid(), tenantId: generator.word(), - email: generator.email(), + email: generator.email({ domain: "example.com" }), tenantName: generator.word(), hosting: Hosting.SELF, createdAt: Date.now(), diff --git a/packages/backend-core/tests/core/utilities/structures/scim.ts b/packages/backend-core/tests/core/utilities/structures/scim.ts index 80f41c605d..f424b2881a 100644 --- a/packages/backend-core/tests/core/utilities/structures/scim.ts +++ b/packages/backend-core/tests/core/utilities/structures/scim.ts @@ -13,7 +13,7 @@ interface CreateUserRequestFields { export function createUserRequest(userData?: Partial) { const defaultValues = { externalId: uuid(), - email: generator.email(), + email: `${uuid()}@example.com`, firstName: generator.first(), lastName: generator.last(), username: generator.name(), diff --git a/packages/builder/src/components/backend/Datasources/CreateEditRelationship.svelte b/packages/builder/src/components/backend/Datasources/CreateEditRelationship.svelte index 6b9524776c..b54ecbf9fd 100644 --- a/packages/builder/src/components/backend/Datasources/CreateEditRelationship.svelte +++ b/packages/builder/src/components/backend/Datasources/CreateEditRelationship.svelte @@ -40,8 +40,15 @@ part2: PrettyRelationshipDefinitions.MANY, }, } - let relationshipOpts1 = Object.values(PrettyRelationshipDefinitions) - let relationshipOpts2 = Object.values(PrettyRelationshipDefinitions) + $: relationshipOpts1 = + relationshipPart2 === PrettyRelationshipDefinitions.ONE + ? [PrettyRelationshipDefinitions.MANY] + : Object.values(PrettyRelationshipDefinitions) + + $: relationshipOpts2 = + relationshipPart1 === PrettyRelationshipDefinitions.ONE + ? [PrettyRelationshipDefinitions.MANY] + : Object.values(PrettyRelationshipDefinitions) let relationshipPart1 = PrettyRelationshipDefinitions.ONE let relationshipPart2 = PrettyRelationshipDefinitions.MANY diff --git a/packages/builder/src/components/design/settings/controls/ButtonActionEditor/actions/DeleteRow.svelte b/packages/builder/src/components/design/settings/controls/ButtonActionEditor/actions/DeleteRow.svelte index b8459ac0eb..431368d28f 100644 --- a/packages/builder/src/components/design/settings/controls/ButtonActionEditor/actions/DeleteRow.svelte +++ b/packages/builder/src/components/design/settings/controls/ButtonActionEditor/actions/DeleteRow.svelte @@ -45,7 +45,10 @@ {#if parameters.confirm} - + + + + {#if parameters.confirm} - + + + + + + diff --git a/packages/builder/src/components/design/settings/controls/ButtonActionEditor/actions/SaveRow.svelte b/packages/builder/src/components/design/settings/controls/ButtonActionEditor/actions/SaveRow.svelte index a1fe773455..d834e9aac9 100644 --- a/packages/builder/src/components/design/settings/controls/ButtonActionEditor/actions/SaveRow.svelte +++ b/packages/builder/src/components/design/settings/controls/ButtonActionEditor/actions/SaveRow.svelte @@ -72,7 +72,10 @@ {#if parameters.confirm} - + + + + { + const { host, pathname } = new URL(url) function json(body: any, status = 200) { return { status, @@ -34,7 +35,7 @@ module FetchMock { } } - if (url.includes("/api/global")) { + if (pathname.includes("/api/global")) { const user = { email: "test@example.com", _id: "us_test@example.com", @@ -47,31 +48,31 @@ module FetchMock { global: false, }, } - return url.endsWith("/users") && opts.method === "GET" + return pathname.endsWith("/users") && opts.method === "GET" ? json([user]) : json(user) } // mocked data based on url - else if (url.includes("api/apps")) { + else if (pathname.includes("api/apps")) { return json({ app1: { url: "/app1", }, }) - } else if (url.includes("example.com")) { + } else if (host.includes("example.com")) { return json({ body: opts.body, url, method: opts.method, }) - } else if (url.includes("invalid.com")) { + } else if (host.includes("invalid.com")) { return json( { invalid: true, }, 404 ) - } else if (mockSearch && url.includes("_search")) { + } else if (mockSearch && pathname.includes("_search")) { const body = opts.body const parts = body.split("tableId:") let tableId @@ -90,7 +91,7 @@ module FetchMock { ], bookmark: "test", }) - } else if (url.includes("google.com")) { + } else if (host.includes("google.com")) { return json({ url, opts, @@ -177,7 +178,7 @@ module FetchMock { } else if (url === "https://www.googleapis.com/oauth2/v4/token") { // any valid response return json({}) - } else if (url.includes("failonce.com")) { + } else if (host.includes("failonce.com")) { failCount++ if (failCount === 1) { return json({ message: "error" }, 500) diff --git a/packages/server/src/tests/jestEnv.ts b/packages/server/src/tests/jestEnv.ts index 4763208c54..7c58470e9b 100644 --- a/packages/server/src/tests/jestEnv.ts +++ b/packages/server/src/tests/jestEnv.ts @@ -10,3 +10,4 @@ process.env.MOCK_REDIS = "1" process.env.PLATFORM_URL = "http://localhost:10000" process.env.REDIS_PASSWORD = "budibase" process.env.BUDIBASE_VERSION = "0.0.0+jest" +process.env.WORKER_URL = "http://localhost:10000" diff --git a/packages/server/src/tests/utilities/TestConfiguration.ts b/packages/server/src/tests/utilities/TestConfiguration.ts index cfe1bf4066..316f266653 100644 --- a/packages/server/src/tests/utilities/TestConfiguration.ts +++ b/packages/server/src/tests/utilities/TestConfiguration.ts @@ -347,7 +347,7 @@ export default class TestConfiguration { lastName = generator.last(), builder = { global: true }, admin = { global: false }, - email = generator.email(), + email = generator.email({ domain: "example.com" }), tenantId = this.getTenantId(), roles = {}, } = config @@ -512,7 +512,7 @@ export default class TestConfiguration { async basicRoleHeaders() { return await this.roleHeaders({ - email: generator.email(), + email: generator.email({ domain: "example.com" }), builder: false, prodApp: true, roleId: roles.BUILTIN_ROLE_IDS.BASIC, @@ -520,7 +520,7 @@ export default class TestConfiguration { } async roleHeaders({ - email = generator.email(), + email = generator.email({ domain: "example.com" }), roleId = roles.BUILTIN_ROLE_IDS.ADMIN, builder = false, prodApp = true, diff --git a/packages/types/src/documents/global/userGroup.ts b/packages/types/src/documents/global/userGroup.ts index e6b70a160c..f914b2876d 100644 --- a/packages/types/src/documents/global/userGroup.ts +++ b/packages/types/src/documents/global/userGroup.ts @@ -24,7 +24,7 @@ export interface GroupUser { } export interface UserGroupRoles { - [key: string]: string + [key: string]: string | undefined } export interface SearchGroupRequest {} diff --git a/packages/types/src/sdk/db.ts b/packages/types/src/sdk/db.ts index 1b41cec254..7f821d1412 100644 --- a/packages/types/src/sdk/db.ts +++ b/packages/types/src/sdk/db.ts @@ -128,6 +128,7 @@ export interface Database { exists(): Promise get(id?: string): Promise + exists(docId: string): Promise getMultiple( ids: string[], opts?: { allowMissing?: boolean } diff --git a/packages/types/src/sdk/events/userGroup.ts b/packages/types/src/sdk/events/userGroup.ts index ea1f554cff..804db5196d 100644 --- a/packages/types/src/sdk/events/userGroup.ts +++ b/packages/types/src/sdk/events/userGroup.ts @@ -48,7 +48,7 @@ export interface GroupAddedOnboardingEvent extends BaseEvent { } export interface GroupPermissionsEditedEvent extends BaseEvent { - permissions: Record + permissions: Record groupId: string audited: { name: string diff --git a/packages/worker/src/api/routes/global/tests/groups.spec.ts b/packages/worker/src/api/routes/global/tests/groups.spec.ts index 8f0739a812..b69c4781c4 100644 --- a/packages/worker/src/api/routes/global/tests/groups.spec.ts +++ b/packages/worker/src/api/routes/global/tests/groups.spec.ts @@ -147,7 +147,7 @@ describe("/api/global/groups", () => { await Promise.all( Array.from({ length: 30 }).map(async (_, i) => { - const email = `user${i}@${generator.domain()}` + const email = `user${i}@example.com` const user = await config.api.users.saveUser({ ...structures.users.user(), email, diff --git a/packages/worker/src/initPro.ts b/packages/worker/src/initPro.ts index ddc8d2562a..b34d514992 100644 --- a/packages/worker/src/initPro.ts +++ b/packages/worker/src/initPro.ts @@ -1,5 +1,4 @@ import { sdk as proSdk } from "@budibase/pro" -import * as userSdk from "./sdk/users" export const initPro = async () => { await proSdk.init({}) diff --git a/qa-core/src/account-api/tests/accounts/accounts.cloud.spec.ts b/qa-core/src/account-api/tests/accounts/accounts.cloud.spec.ts index 0969b72cf9..01338b609c 100644 --- a/qa-core/src/account-api/tests/accounts/accounts.cloud.spec.ts +++ b/qa-core/src/account-api/tests/accounts/accounts.cloud.spec.ts @@ -84,7 +84,7 @@ describe("Accounts", () => { }) it("searches by email", async () => { - const email = generator.email() + const email = generator.email({ domain: "example.com" }) // Empty result const [_, emptyBody] = await config.api.accounts.search(email, "email") diff --git a/qa-core/src/public-api/fixtures/users.ts b/qa-core/src/public-api/fixtures/users.ts index e20c464b34..418b565d2a 100644 --- a/qa-core/src/public-api/fixtures/users.ts +++ b/qa-core/src/public-api/fixtures/users.ts @@ -4,7 +4,7 @@ import { generator } from "../../shared" export const generateUser = ( overrides: Partial = {} ): CreateUserParams => ({ - email: generator.email(), + email: generator.email({ domain: "example.com" }), roles: { [generator.string({ length: 32, alpha: true, numeric: true })]: generator.word(),