From 7e3715d88aaf78f7df7c24f9983bb914242cf3ad Mon Sep 17 00:00:00 2001 From: mike12345567 Date: Fri, 25 Sep 2020 18:05:26 +0100 Subject: [PATCH] Some groundwork for the linked records, building up a much more in-depth emitter for models and records to drive the record cleanup. --- packages/server/src/api/controllers/model.js | 21 +++-- packages/server/src/api/controllers/record.js | 34 ++++---- .../src/automations/queue/inMemoryQueue.js | 44 ----------- packages/server/src/automations/triggers.js | 4 +- packages/server/src/db/linkedRecords.js | 73 +++++++++++++++++ packages/server/src/events/index.js | 41 +++++++++- .../src/utilities/queue/inMemoryQueue.js | 78 +++++++++++++++++++ 7 files changed, 221 insertions(+), 74 deletions(-) delete mode 100644 packages/server/src/automations/queue/inMemoryQueue.js create mode 100644 packages/server/src/db/linkedRecords.js create mode 100644 packages/server/src/utilities/queue/inMemoryQueue.js diff --git a/packages/server/src/api/controllers/model.js b/packages/server/src/api/controllers/model.js index 0f38056753..753be501a2 100644 --- a/packages/server/src/api/controllers/model.js +++ b/packages/server/src/api/controllers/model.js @@ -12,12 +12,12 @@ exports.fetch = async function(ctx) { exports.find = async function(ctx) { const db = new CouchDB(ctx.user.instanceId) - const model = await db.get(ctx.params.id) - ctx.body = model + ctx.body = await db.get(ctx.params.id) } exports.save = async function(ctx) { - const db = new CouchDB(ctx.user.instanceId) + const instanceId = ctx.user.instanceId + const db = new CouchDB(instanceId) const modelToSave = { type: "model", _id: newid(), @@ -54,7 +54,7 @@ exports.save = async function(ctx) { modelToSave._rev = result.rev const { schema } = ctx.request.body - for (let key in schema) { + for (let key of Object.keys(schema)) { // model has a linked record if (schema[key].type === "link") { // create the link field in the other model @@ -84,13 +84,18 @@ exports.save = async function(ctx) { } await db.put(designDoc) + // syntactic sugar for event emission + modelToSave.modelId = modelToSave._id + ctx.eventEmitter && + ctx.eventEmitter.emitModel(`model:save`, instanceId, modelToSave) ctx.status = 200 ctx.message = `Model ${ctx.request.body.name} saved successfully.` ctx.body = modelToSave } exports.destroy = async function(ctx) { - const db = new CouchDB(ctx.user.instanceId) + const instanceId = ctx.user.instanceId + const db = new CouchDB(instanceId) const modelToDelete = await db.get(ctx.params.modelId) @@ -105,7 +110,7 @@ exports.destroy = async function(ctx) { ) // Delete linked record fields in dependent models - for (let key in modelToDelete.schema) { + for (let key of Object.keys(modelToDelete.schema)) { const { type, modelId } = modelToDelete.schema[key] if (type === "link") { const linkedModel = await db.get(modelId) @@ -119,6 +124,10 @@ exports.destroy = async function(ctx) { delete designDoc.views[modelViewId] await db.put(designDoc) + // syntactic sugar for event emission + modelToDelete.modelId = modelToDelete._id + ctx.eventEmitter && + ctx.eventEmitter.emitModel(`model:delete`, instanceId, modelToDelete) ctx.status = 200 ctx.message = `Model ${ctx.params.modelId} deleted.` } diff --git a/packages/server/src/api/controllers/record.js b/packages/server/src/api/controllers/record.js index 2626667ef3..a2ac3ea2f5 100644 --- a/packages/server/src/api/controllers/record.js +++ b/packages/server/src/api/controllers/record.js @@ -2,21 +2,6 @@ const CouchDB = require("../../db") const validateJs = require("validate.js") const newid = require("../../db/newid") -function emitEvent(eventType, ctx, record) { - let event = { - record, - instanceId: ctx.user.instanceId, - } - // add syntactic sugar for mustache later - if (record._id) { - event.id = record._id - } - if (record._rev) { - event.revision = record._rev - } - ctx.eventEmitter && ctx.eventEmitter.emit(eventType, event) -} - validateJs.extend(validateJs.validators.datetime, { parse: function(value) { return new Date(value).getTime() @@ -28,7 +13,8 @@ validateJs.extend(validateJs.validators.datetime, { }) exports.patch = async function(ctx) { - const db = new CouchDB(ctx.user.instanceId) + const instanceId = ctx.user.instanceId + const db = new CouchDB(instanceId) const record = await db.get(ctx.params.id) const model = await db.get(record.modelId) const patchfields = ctx.request.body @@ -55,13 +41,16 @@ exports.patch = async function(ctx) { const response = await db.put(record) record._rev = response.rev record.type = "record" + ctx.eventEmitter && + ctx.eventEmitter.emitRecord(`record:update`, instanceId, record, model) ctx.body = record ctx.status = 200 ctx.message = `${model.name} updated successfully.` } exports.save = async function(ctx) { - const db = new CouchDB(ctx.user.instanceId) + const instanceId = ctx.user.instanceId + const db = new CouchDB(instanceId) const record = ctx.request.body record.modelId = ctx.params.modelId @@ -124,7 +113,8 @@ exports.save = async function(ctx) { } } - emitEvent(`record:save`, ctx, record) + ctx.eventEmitter && + ctx.eventEmitter.emitRecord(`record:save`, instanceId, record, model) ctx.body = record ctx.status = 200 ctx.message = `${model.name} created successfully` @@ -180,7 +170,8 @@ exports.find = async function(ctx) { } exports.destroy = async function(ctx) { - const db = new CouchDB(ctx.user.instanceId) + const instanceId = ctx.user.instanceId + const db = new CouchDB() const record = await db.get(ctx.params.recordId) if (record.modelId !== ctx.params.modelId) { ctx.throw(400, "Supplied modelId doesn't match the record's modelId") @@ -188,9 +179,10 @@ exports.destroy = async function(ctx) { } ctx.body = await db.remove(ctx.params.recordId, ctx.params.revId) ctx.status = 200 - // for automations + // for automations include the record that was deleted ctx.record = record - emitEvent(`record:delete`, ctx, record) + ctx.eventEmitter && + ctx.eventEmitter.emitRecord(`record:delete`, instanceId, record) } exports.validate = async function(ctx) { diff --git a/packages/server/src/automations/queue/inMemoryQueue.js b/packages/server/src/automations/queue/inMemoryQueue.js deleted file mode 100644 index 927eeb60b6..0000000000 --- a/packages/server/src/automations/queue/inMemoryQueue.js +++ /dev/null @@ -1,44 +0,0 @@ -let events = require("events") - -// Bull works with a Job wrapper around all messages that contains a lot more information about -// the state of the message, implement this for the sake of maintaining API consistency -function newJob(queue, message) { - return { - timestamp: Date.now(), - queue: queue, - data: message, - } -} - -// designed to replicate Bull (https://github.com/OptimalBits/bull) in memory as a sort of mock -class InMemoryQueue { - // opts is not used by this as there is no real use case when in memory, but is the same API as Bull - constructor(name, opts) { - this._name = name - this._opts = opts - this._messages = [] - this._emitter = new events.EventEmitter() - } - - // same API as bull, provide a callback and it will respond when messages are available - process(func) { - this._emitter.on("message", async () => { - if (this._messages.length <= 0) { - return - } - let msg = this._messages.shift() - let resp = func(msg) - if (resp.then != null) { - await resp - } - }) - } - - // simply puts a message to the queue and emits to the queue for processing - add(msg) { - this._messages.push(newJob(this._name, msg)) - this._emitter.emit("message") - } -} - -module.exports = InMemoryQueue diff --git a/packages/server/src/automations/triggers.js b/packages/server/src/automations/triggers.js index d1d393347c..fb6bcaf5d3 100644 --- a/packages/server/src/automations/triggers.js +++ b/packages/server/src/automations/triggers.js @@ -1,8 +1,8 @@ const CouchDB = require("../db") const emitter = require("../events/index") -const InMemoryQueue = require("./queue/inMemoryQueue") +const InMemoryQueue = require("../utilities/queue/inMemoryQueue") -let automationQueue = new InMemoryQueue() +let automationQueue = new InMemoryQueue("automationQueue") const FAKE_STRING = "TEST" const FAKE_BOOL = false diff --git a/packages/server/src/db/linkedRecords.js b/packages/server/src/db/linkedRecords.js new file mode 100644 index 0000000000..877e290cea --- /dev/null +++ b/packages/server/src/db/linkedRecords.js @@ -0,0 +1,73 @@ +const CouchDB = require("./index") +const emitter = require("../events/index") +const InMemoryQueue = require("../utilities/queue/inMemoryQueue") + +/** + * This functionality makes sure that when records with links are created, updated or deleted they are processed + * correctly - making sure that no stale links are left around and that all links have been made successfully. + */ + +const EventType = { + RECORD_SAVE: "record:save", + RECORD_UPDATE: "record:update", + RECORD_DELETE: "record:delete", + MODEL_SAVE: "model:save", + MODEL_DELETE: "model:delete", +} +const linkedRecordQueue = new InMemoryQueue("linkedRecordQueue") + +function createEmitterCallback(eventName) { + emitter.on(eventName, function(event) { + if (!event || !event.record || !event.record.modelId) { + return + } + linkedRecordQueue.add({ + type: eventName, + event, + }) + }) +} + +for (let typeKey of Object.keys(EventType)) { + createEmitterCallback(EventType[typeKey]) +} + +function doesModelHaveLinkedRecords(model) { + for (let key of Object.keys(model.schema)) { + const { type } = model.schema[key] + if (type === "link") { + return true + } + } + return false +} + +linkedRecordQueue.process(async job => { + let event = job.data + // can't operate without these properties + if (event.instanceId == null || event.modelId == null) { + return + } + const db = new CouchDB(event.instanceId) + let model = event.model == null ? await db.get(event.modelId) : event.model + // model doesn't have links, can stop here + if (!doesModelHaveLinkedRecords(model)) { + return + } + // no linked records to operate on + if (model == null) { + return + } + switch (event.type) { + case EventType.RECORD_SAVE: + break + case EventType.RECORD_UPDATE: + break + case EventType.RECORD_DELETE: + break + case EventType.MODEL_SAVE: + break + case EventType.MODEL_DELETE: + break + } +}) diff --git a/packages/server/src/events/index.js b/packages/server/src/events/index.js index 2d6d2fa585..13abacd761 100644 --- a/packages/server/src/events/index.js +++ b/packages/server/src/events/index.js @@ -6,6 +6,45 @@ const EventEmitter = require("events").EventEmitter * future. */ -const emitter = new EventEmitter() +/** + * Extending the standard emitter to some syntactic sugar and standardisation to the emitted event. + * This is specifically quite important for mustache used in automations. + */ +class BudibaseEmitter extends EventEmitter { + emitRecord(eventName, instanceId, record, model = null) { + let event = { + record, + instanceId, + modelId: record.modelId, + } + if (model) { + event.model = model + } + if (record._id) { + event.id = record._id + } + if (record._rev) { + event.revision = record._rev + } + this.emit(eventName, event) + } + + emitModel(eventName, instanceId, model = null) { + let event = { + model, + instanceId, + modelId: model._id, + } + if (model._id) { + event.id = model._id + } + if (model._rev) { + event.revision = model._rev + } + this.emit(eventName, event) + } +} + +const emitter = new BudibaseEmitter() module.exports = emitter diff --git a/packages/server/src/utilities/queue/inMemoryQueue.js b/packages/server/src/utilities/queue/inMemoryQueue.js new file mode 100644 index 0000000000..6c1d73801d --- /dev/null +++ b/packages/server/src/utilities/queue/inMemoryQueue.js @@ -0,0 +1,78 @@ +let events = require("events") + +/** + * Bull works with a Job wrapper around all messages that contains a lot more information about + * the state of the message, this object constructor implements the same schema of Bull jobs + * for the sake of maintaining API consistency. + * @param {string} queue The name of the queue which the message will be carried on. + * @param {object} message The JSON message which will be passed back to the consumer. + * @returns {Object} A new job which can now be put onto the queue, this is mostly an + * internal structure so that an in memory queue can be easily swapped for a Bull queue. + */ +function newJob(queue, message) { + return { + timestamp: Date.now(), + queue: queue, + data: message, + } +} + +/** + * This is designed to replicate Bull (https://github.com/OptimalBits/bull) in memory as a sort of mock. + * It is relatively simple, using an event emitter internally to register when messages are available + * to the consumers - in can support many inputs and many consumers. + */ +class InMemoryQueue { + /** + * The constructor the queue, exactly the same as that of Bulls. + * @param {string} name The name of the queue which is being configured. + * @param {object|null} opts This is not used by the in memory queue as there is no real use + * case when in memory, but is the same API as Bull + */ + constructor(name, opts = null) { + this._name = name + this._opts = opts + this._messages = [] + this._emitter = new events.EventEmitter() + } + + /** + * Same callback API as Bull, each callback passed to this will consume messages as they are + * available. Please note this is a queue service, not a notification service, so each + * consumer will receive different messages. + * @param {function} func The callback function which will return a "Job", the same + * as the Bull API, within this job the property "data" contains the JSON message. Please + * note this is incredibly limited compared to Bull as in reality the Job would contain + * a lot more information about the queue and current status of Bull cluster. + */ + process(func) { + this._emitter.on("message", async () => { + if (this._messages.length <= 0) { + return + } + let msg = this._messages.shift() + let resp = func(msg) + if (resp.then != null) { + await resp + } + }) + } + + // simply puts a message to the queue and emits to the queue for processing + /** + * Simple function to replicate the add message functionality of Bull, putting + * a new message on the queue. This then emits an event which will be used to + * return the message to a consumer (if one is attached). + * @param {object} msg A message to be transported over the queue, this should be + * a JSON message as this is required by Bull. + */ + add(msg) { + if (typeof msg !== "object") { + throw "Queue only supports carrying JSON." + } + this._messages.push(newJob(this._name, msg)) + this._emitter.emit("message") + } +} + +module.exports = InMemoryQueue