From 535c4ca5aaf1d28380679c6674e1011ab7e78f20 Mon Sep 17 00:00:00 2001 From: mike12345567 Date: Mon, 14 Sep 2020 10:30:35 +0100 Subject: [PATCH] Making worker thread decision based on environment variable (BUDIBASE_ENVIRONMENT) and some general tidy up, as well as fixing delete event emitter --- packages/server/Dockerfile | 1 + packages/server/src/api/controllers/record.js | 29 +++++++++---------- .../src/api/controllers/workflow/index.js | 5 ++++ packages/server/src/workflows/index.js | 3 +- packages/server/src/workflows/thread.js | 1 + packages/server/src/workflows/triggers.js | 3 ++ 6 files changed, 25 insertions(+), 17 deletions(-) diff --git a/packages/server/Dockerfile b/packages/server/Dockerfile index 8a42beba6d..96ae3b7a30 100644 --- a/packages/server/Dockerfile +++ b/packages/server/Dockerfile @@ -4,6 +4,7 @@ WORKDIR /app ENV CLOUD=1 ENV COUCH_DB_URL=https://couchdb.budi.live:5984 +env BUDIBASE_ENVIRONMENT=PRODUCTION # copy files and install dependencies COPY . ./ diff --git a/packages/server/src/api/controllers/record.js b/packages/server/src/api/controllers/record.js index df3c93687a..7137ef1ce7 100644 --- a/packages/server/src/api/controllers/record.js +++ b/packages/server/src/api/controllers/record.js @@ -2,6 +2,16 @@ const CouchDB = require("../../db") const validateJs = require("validate.js") const newid = require("../../db/newid") +function emitEvent(eventType, ctx, record) { + ctx.eventEmitter && + ctx.eventEmitter.emit(eventType, { + args: { + record, + }, + instanceId: ctx.user.instanceId, + }) +} + validateJs.extend(validateJs.validators.datetime, { parse: function(value) { return new Date(value).getTime() @@ -66,9 +76,7 @@ exports.save = async function(ctx) { const doc = row.doc return { ...doc, - [model.name]: doc[model.name] - ? [...doc[model.name], record._id] - : [record._id], + [model.name]: doc[model.name] ? [...doc[model.name], record._id] : [record._id], } }) @@ -76,13 +84,7 @@ exports.save = async function(ctx) { } } - ctx.eventEmitter && - ctx.eventEmitter.emit(`record:save`, { - args: { - record, - }, - instanceId: ctx.user.instanceId, - }) + emitEvent(`record:save`, ctx, record) ctx.body = record ctx.status = 200 ctx.message = `${model.name} created successfully` @@ -145,7 +147,7 @@ exports.destroy = async function(ctx) { return } ctx.body = await db.remove(ctx.params.recordId, ctx.params.revId) - ctx.eventEmitter && ctx.eventEmitter.emit(`record:delete`, record) + emitEvent(`record:delete`, ctx, record) } exports.validate = async function(ctx) { @@ -165,10 +167,7 @@ async function validate({ instanceId, modelId, record, model }) { } const errors = {} for (let fieldName in model.schema) { - const res = validateJs.single( - record[fieldName], - model.schema[fieldName].constraints - ) + const res = validateJs.single(record[fieldName], model.schema[fieldName].constraints) if (res) errors[fieldName] = res } return { valid: Object.keys(errors).length === 0, errors } diff --git a/packages/server/src/api/controllers/workflow/index.js b/packages/server/src/api/controllers/workflow/index.js index 6bae766b62..5a3020879a 100644 --- a/packages/server/src/api/controllers/workflow/index.js +++ b/packages/server/src/api/controllers/workflow/index.js @@ -91,4 +91,9 @@ exports.trigger = async function(ctx) { ...ctx.request.body, instanceId: ctx.user.instanceId, }) + ctx.status = 200 + ctx.body = { + message: `Workflow ${workflow._id} has been triggered.`, + workflow, + } } diff --git a/packages/server/src/workflows/index.js b/packages/server/src/workflows/index.js index 0ea885454f..545fc3382d 100644 --- a/packages/server/src/workflows/index.js +++ b/packages/server/src/workflows/index.js @@ -1,6 +1,5 @@ const triggers = require("./triggers") const workerFarm = require("worker-farm") -const CouchDB = require("../db/client") const singleThread = require("./thread") let workers = workerFarm(require.resolve("./thread")) @@ -22,7 +21,7 @@ function runWorker(job) { */ module.exports.init = function() { triggers.workflowQueue.process(async job => { - if (CouchDB.preferredAdapters != null && CouchDB.preferredAdapters[0] !== "leveldb") { + if (process.env.BUDIBASE_ENVIRONMENT === "PRODUCTION") { await runWorker(job) } else { await singleThread(job) diff --git a/packages/server/src/workflows/thread.js b/packages/server/src/workflows/thread.js index bf1fe41a3f..18eb6e2434 100644 --- a/packages/server/src/workflows/thread.js +++ b/packages/server/src/workflows/thread.js @@ -52,6 +52,7 @@ class Orchestrator { } } +// callback is required for worker-farm to state that the worker thread has completed module.exports = async (job, cb = null) => { try { const workflowOrchestrator = new Orchestrator(job.data.workflow) diff --git a/packages/server/src/workflows/triggers.js b/packages/server/src/workflows/triggers.js index 808093d75d..f8bbf5b5e5 100644 --- a/packages/server/src/workflows/triggers.js +++ b/packages/server/src/workflows/triggers.js @@ -5,6 +5,9 @@ const InMemoryQueue = require("./queue/inMemoryQueue") let workflowQueue = new InMemoryQueue() async function queueRelevantWorkflows(event, eventType) { + if (event.instanceId == null) { + throw `No instanceId specified for ${eventType} - check event emitters.` + } const db = new CouchDB(event.instanceId) const workflowsToTrigger = await db.query("database/by_workflow_trigger", { key: [eventType],