1
0
Fork 0
mirror of synced 2024-09-30 00:57:16 +13:00

Removing the concept of worker-farm and fixing issues raised in review.

This commit is contained in:
mike12345567 2021-09-14 16:43:06 +01:00
parent e94c629bab
commit 2f9f643592
5 changed files with 6 additions and 51 deletions

View file

@ -109,7 +109,6 @@
"to-json-schema": "0.2.5", "to-json-schema": "0.2.5",
"uuid": "3.3.2", "uuid": "3.3.2",
"validate.js": "0.13.1", "validate.js": "0.13.1",
"worker-farm": "1.7.0",
"yargs": "13.2.4", "yargs": "13.2.4",
"zlib": "1.0.5" "zlib": "1.0.5"
}, },

View file

@ -99,21 +99,10 @@ class Orchestrator {
} }
} }
// callback is required for worker-farm to state that the worker thread has completed module.exports = async job => {
module.exports = (job, cb) => {
if (!cb) {
throw "Callback must be defined."
}
const automationOrchestrator = new Orchestrator( const automationOrchestrator = new Orchestrator(
job.data.automation, job.data.automation,
job.data.event job.data.event
) )
automationOrchestrator return automationOrchestrator.execute()
.execute()
.then(output => {
cb(null, output)
})
.catch(err => {
cb(err)
})
} }

View file

@ -1,4 +1,4 @@
exports.defintion = { exports.definition = {
name: "Cron Trigger", name: "Cron Trigger",
event: "cron:trigger", event: "cron:trigger",
icon: "ri-timer-line", icon: "ri-timer-line",

View file

@ -11,5 +11,5 @@ exports.definitions = {
ROW_DELETED: rowDeleted.definition, ROW_DELETED: rowDeleted.definition,
WEBHOOK: webhook.definition, WEBHOOK: webhook.definition,
APP: app.definition, APP: app.definition,
CRON: cron.defintion, CRON: cron.definition,
} }

View file

@ -1,7 +1,6 @@
const env = require("../environment") const env = require("../environment")
const workerFarm = require("worker-farm")
const { getAPIKey, update, Properties } = require("../utilities/usageQuota") const { getAPIKey, update, Properties } = require("../utilities/usageQuota")
const singleThread = require("./thread") const runner = require("./thread")
const { definitions } = require("./triggerInfo") const { definitions } = require("./triggerInfo")
const webhooks = require("../api/controllers/webhook") const webhooks = require("../api/controllers/webhook")
const CouchDB = require("../db") const CouchDB = require("../db")
@ -13,32 +12,6 @@ const { MetadataTypes } = require("../constants")
const WH_STEP_ID = definitions.WEBHOOK.stepId const WH_STEP_ID = definitions.WEBHOOK.stepId
const CRON_STEP_ID = definitions.CRON.stepId const CRON_STEP_ID = definitions.CRON.stepId
let workers = workerFarm(require.resolve("./thread"))
function runWorker(job) {
return new Promise((resolve, reject) => {
workers(job, (err, output) => {
if (err) {
reject(err)
} else {
resolve(output)
}
})
})
}
function runSingleThread(job) {
return new Promise((resolve, reject) => {
singleThread(job, (err, output) => {
if (err) {
reject(err)
} else {
resolve(output)
}
})
})
}
async function updateQuota(automation) { async function updateQuota(automation) {
const appId = automation.appId const appId = automation.appId
const apiObj = await getAPIKey(appId) const apiObj = await getAPIKey(appId)
@ -53,13 +26,7 @@ exports.processEvent = async job => {
job.data.automation.apiKey = await updateQuota(job.data.automation) job.data.automation.apiKey = await updateQuota(job.data.automation)
} }
// need to actually await these so that an error can be captured properly // need to actually await these so that an error can be captured properly
let response return await runner(job)
if (!env.isProd()) {
response = await runSingleThread(job)
} else {
response = await runWorker(job)
}
return response
} catch (err) { } catch (err) {
console.error( console.error(
`${job.data.automation.appId} automation ${job.data.automation._id} was unable to run - ${err}` `${job.data.automation.appId} automation ${job.data.automation._id} was unable to run - ${err}`