diff --git a/packages/backend-core/src/context/mainContext.ts b/packages/backend-core/src/context/mainContext.ts index 861777b679..41e6579384 100644 --- a/packages/backend-core/src/context/mainContext.ts +++ b/packages/backend-core/src/context/mainContext.ts @@ -104,6 +104,22 @@ async function newContext(updates: ContextMap, task: any) { return Context.run(context, task) } +export async function doInAutomationContext(params: { + appId: string, + automationId: string, + task: any +}): Promise { + const tenantId = getTenantIDFromAppID(params.appId) + return newContext( + { + tenantId, + appId: params.appId, + automationId: params.automationId, + }, + params.task + ) +} + export async function doInContext(appId: string, task: any): Promise { const tenantId = getTenantIDFromAppID(appId) return newContext( @@ -187,6 +203,11 @@ export function getTenantId(): string { return tenantId } +export function getAutomationId(): string | undefined { + const context = Context.get() + return context?.automationId +} + export function getAppId(): string | undefined { const context = Context.get() const foundId = context?.appId diff --git a/packages/backend-core/src/context/types.ts b/packages/backend-core/src/context/types.ts index 727dad80bc..d687a93594 100644 --- a/packages/backend-core/src/context/types.ts +++ b/packages/backend-core/src/context/types.ts @@ -7,4 +7,5 @@ export type ContextMap = { identity?: IdentityContext environmentVariables?: Record isScim?: boolean + automationId?: string } diff --git a/packages/backend-core/src/logging/pino/logger.ts b/packages/backend-core/src/logging/pino/logger.ts index 276377eb00..14c8e7e5a9 100644 --- a/packages/backend-core/src/logging/pino/logger.ts +++ b/packages/backend-core/src/logging/pino/logger.ts @@ -39,6 +39,7 @@ if (!env.DISABLE_PINO_LOGGER) { objects?: any[] tenantId?: string appId?: string + automationId?: string identityId?: string identityType?: IdentityType correlationId?: string @@ -86,6 +87,7 @@ if (!env.DISABLE_PINO_LOGGER) { contextObject = { tenantId: getTenantId(), appId: getAppId(), + automationId: getAutomationId(), identityId: identity?._id, identityType: identity?.type, correlationId: correlation.getId(), @@ -159,6 +161,16 @@ if (!env.DISABLE_PINO_LOGGER) { return appId } + const getAutomationId = () => { + let appId + try { + appId = context.getAutomationId() + } catch (e) { + // do nothing + } + return appId + } + const getIdentity = () => { let identity try { diff --git a/packages/backend-core/src/queue/inMemoryQueue.ts b/packages/backend-core/src/queue/inMemoryQueue.ts index b80aece418..ec1d9d4a90 100644 --- a/packages/backend-core/src/queue/inMemoryQueue.ts +++ b/packages/backend-core/src/queue/inMemoryQueue.ts @@ -128,6 +128,7 @@ class InMemoryQueue { on() { // do nothing + return this } async waitForCompletion() { diff --git a/packages/backend-core/src/queue/listeners.ts b/packages/backend-core/src/queue/listeners.ts index 331b690fe9..6732bfaaa9 100644 --- a/packages/backend-core/src/queue/listeners.ts +++ b/packages/backend-core/src/queue/listeners.ts @@ -1,5 +1,6 @@ import { Job, JobId, Queue } from "bull" import { JobQueue } from "./constants" +import * as context from "../context" export type StalledFn = (job: Job) => Promise @@ -31,77 +32,153 @@ function handleStalled(queue: Queue, removeStalledCb?: StalledFn) { }) } -function logging(queue: Queue, jobQueue: JobQueue) { - let eventType: string - switch (jobQueue) { - case JobQueue.AUTOMATION: - eventType = "automation-event" - break - case JobQueue.APP_BACKUP: - eventType = "app-backup-event" - break - case JobQueue.AUDIT_LOG: - eventType = "audit-log-event" - break - case JobQueue.SYSTEM_EVENT_QUEUE: - eventType = "system-event" - break +function getLogParams( + eventType: QueueEventType, + event: BullEvent, + opts: { + job?: Job + jobId?: JobId + error?: Error + } = {}, + extra: any = {} +) { + const message = `[BULL] ${eventType}=${event}` + const err = opts.error + + const data = { + eventType, + event, + job: opts.job, + jobId: opts.jobId || opts.job?.id, + ...extra, } + + return [message, err, data] +} + +enum BullEvent { + ERROR = "error", + WAITING = "waiting", + ACTIVE = "active", + STALLED = "stalled", + PROGRESS = "progress", + COMPLETED = "completed", + FAILED = "failed", + PAUSED = "paused", + RESUMED = "resumed", + CLEANED = "cleaned", + DRAINED = "drained", + REMOVED = "removed", +} + +enum QueueEventType { + AUTOMATION_EVENT = "automation-event", + APP_BACKUP_EVENT = "app-backup-event", + AUDIT_LOG_EVENT = "audit-log-event", + SYSTEM_EVENT = "system-event", +} + +const EventTypeMap: { [key in JobQueue]: QueueEventType } = { + [JobQueue.AUTOMATION]: QueueEventType.AUTOMATION_EVENT, + [JobQueue.APP_BACKUP]: QueueEventType.APP_BACKUP_EVENT, + [JobQueue.AUDIT_LOG]: QueueEventType.AUDIT_LOG_EVENT, + [JobQueue.SYSTEM_EVENT_QUEUE]: QueueEventType.SYSTEM_EVENT, +} + +function logging(queue: Queue, jobQueue: JobQueue) { + const eventType = EventTypeMap[jobQueue] + + function doInJobContext(job: Job, task: any) { + // if this is an automation job try to get the app id + const appId = job.data.event?.appId + if (appId) { + return context.doInContext(appId, task) + } else { + task() + } + } + + queue + .on(BullEvent.STALLED, async (job: Job) => { + // A job has been marked as stalled. This is useful for debugging job + // workers that crash or pause the event loop. + await doInJobContext(job, () => { + console.error(...getLogParams(eventType, BullEvent.STALLED, { job })) + }) + }) + .on(BullEvent.ERROR, (error: any) => { + // An error occurred. + console.error(...getLogParams(eventType, BullEvent.ERROR, { error })) + }) + if (process.env.NODE_DEBUG?.includes("bull")) { queue - .on("error", (error: any) => { - // An error occurred. - console.error(`${eventType}=error error=${JSON.stringify(error)}`) - }) - .on("waiting", (jobId: JobId) => { + .on(BullEvent.WAITING, (jobId: JobId) => { // A Job is waiting to be processed as soon as a worker is idling. - console.log(`${eventType}=waiting jobId=${jobId}`) + console.info(...getLogParams(eventType, BullEvent.WAITING, { jobId })) }) - .on("active", (job: Job, jobPromise: any) => { + .on(BullEvent.ACTIVE, async (job: Job, jobPromise: any) => { // A job has started. You can use `jobPromise.cancel()`` to abort it. - console.log(`${eventType}=active jobId=${job.id}`) + await doInJobContext(job, () => { + console.info(...getLogParams(eventType, BullEvent.ACTIVE, { job })) + }) }) - .on("stalled", (job: Job) => { - // A job has been marked as stalled. This is useful for debugging job - // workers that crash or pause the event loop. - console.error( - `${eventType}=stalled jobId=${job.id} job=${JSON.stringify(job)}` - ) + .on(BullEvent.PROGRESS, async (job: Job, progress: any) => { + // A job's progress was updated + await doInJobContext(job, () => { + console.info( + ...getLogParams( + eventType, + BullEvent.PROGRESS, + { job }, + { progress } + ) + ) + }) }) - .on("progress", (job: Job, progress: any) => { - // A job's progress was updated! - console.log( - `${eventType}=progress jobId=${job.id} progress=${progress}` - ) - }) - .on("completed", (job: Job, result) => { + .on(BullEvent.COMPLETED, async (job: Job, result) => { // A job successfully completed with a `result`. - console.log(`${eventType}=completed jobId=${job.id} result=${result}`) + await doInJobContext(job, () => { + console.info( + ...getLogParams(eventType, BullEvent.COMPLETED, { job }, { result }) + ) + }) }) - .on("failed", (job, err: any) => { + .on(BullEvent.FAILED, async (job: Job, error: any) => { // A job failed with reason `err`! - console.log(`${eventType}=failed jobId=${job.id} error=${err}`) + await doInJobContext(job, () => { + console.error( + ...getLogParams(eventType, BullEvent.FAILED, { job, error }) + ) + }) }) - .on("paused", () => { + .on(BullEvent.PAUSED, () => { // The queue has been paused. - console.log(`${eventType}=paused`) + console.info(...getLogParams(eventType, BullEvent.PAUSED)) }) - .on("resumed", (job: Job) => { + .on(BullEvent.RESUMED, () => { // The queue has been resumed. - console.log(`${eventType}=paused jobId=${job.id}`) + console.info(...getLogParams(eventType, BullEvent.RESUMED)) }) - .on("cleaned", (jobs: Job[], type: string) => { + .on(BullEvent.CLEANED, (jobs: Job[], type: string) => { // Old jobs have been cleaned from the queue. `jobs` is an array of cleaned // jobs, and `type` is the type of jobs cleaned. - console.log(`${eventType}=cleaned length=${jobs.length} type=${type}`) + console.info( + ...getLogParams( + eventType, + BullEvent.CLEANED, + {}, + { length: jobs.length, type } + ) + ) }) - .on("drained", () => { + .on(BullEvent.DRAINED, () => { // Emitted every time the queue has processed all the waiting jobs (even if there can be some delayed jobs not yet processed) - console.log(`${eventType}=drained`) + console.info(...getLogParams(eventType, BullEvent.DRAINED)) }) - .on("removed", (job: Job) => { + .on(BullEvent.REMOVED, (job: Job) => { // A job successfully removed. - console.log(`${eventType}=removed jobId=${job.id}`) + console.info(...getLogParams(eventType, BullEvent.REMOVED, { job })) }) } } diff --git a/packages/pro/node_modules/console-control-strings/README.md~ b/packages/pro/node_modules/console-control-strings/README.md~ new file mode 100644 index 0000000000..6eb34e89d1 --- /dev/null +++ b/packages/pro/node_modules/console-control-strings/README.md~ @@ -0,0 +1,140 @@ +# Console Control Strings + +A library of cross-platform tested terminal/console command strings for +doing things like color and cursor positioning. This is a subset of both +ansi and vt100. All control codes included work on both Windows & Unix-like +OSes, except where noted. + +## Usage + +```js +var consoleControl = require('console-control-strings') + +console.log(consoleControl.color('blue','bgRed', 'bold') + 'hi there' + consoleControl.color('reset')) +process.stdout.write(consoleControl.goto(75, 10)) +``` + +## Why Another? + +There are tons of libraries similar to this one. I wanted one that was: + +1. Very clear about compatibility goals. +2. Could emit, for instance, a start color code without an end one. +3. Returned strings w/o writing to streams. +4. Was not weighed down with other unrelated baggage. + +## Functions + +### var code = consoleControl.up(_num = 1_) + +Returns the escape sequence to move _num_ lines up. + +### var code = consoleControl.down(_num = 1_) + +Returns the escape sequence to move _num_ lines down. + +### var code = consoleControl.forward(_num = 1_) + +Returns the escape sequence to move _num_ lines righ. + +### var code = consoleControl.back(_num = 1_) + +Returns the escape sequence to move _num_ lines left. + +### var code = consoleControl.nextLine(_num = 1_) + +Returns the escape sequence to move _num_ lines down and to the beginning of +the line. + +### var code = consoleControl.previousLine(_num = 1_) + +Returns the escape sequence to move _num_ lines up and to the beginning of +the line. + +### var code = consoleControl.eraseData() + +Returns the escape sequence to erase everything from the current cursor +position to the bottom right of the screen. This is line based, so it +erases the remainder of the current line and all following lines. + +### var code = consoleControl.eraseLine() + +Returns the escape sequence to erase to the end of the current line. + +### var code = consoleControl.goto(_x_, _y_) + +Returns the escape sequence to move the cursor to the designated position. +Note that the origin is _1, 1_ not _0, 0_. + +### var code = consoleControl.gotoSOL() + +Returns the escape sequence to move the cursor to the beginning of the +current line. (That is, it returns a carriage return, `\r`.) + +### var code = consoleControl.hideCursor() + +Returns the escape sequence to hide the cursor. + +### var code = consoleControl.showCursor() + +Returns the escape sequence to show the cursor. + +### var code = consoleControl.color(_colors = []_) + +### var code = consoleControl.color(_color1_, _color2_, _…_, _colorn_) + +Returns the escape sequence to set the current terminal display attributes +(mostly colors). Arguments can either be a list of attributes or an array +of attributes. The difference between passing in an array or list of colors +and calling `.color` separately for each one, is that in the former case a +single escape sequence will be produced where as in the latter each change +will have its own distinct escape sequence. Each attribute can be one of: + +* Reset: + * **reset** – Reset all attributes to the terminal default. +* Styles: + * **bold** – Display text as bold. In some terminals this means using a + bold font, in others this means changing the color. In some it means + both. + * **italic** – Display text as italic. This is not available in most Windows terminals. + * **underline** – Underline text. This is not available in most Windows Terminals. + * **inverse** – Invert the foreground and background colors. + * **stopBold** – Do not display text as bold. + * **stopItalic** – Do not display text as italic. + * **stopUnderline** – Do not underline text. + * **stopInverse** – Do not invert foreground and background. +* Colors: + * **white** + * **black** + * **blue** + * **cyan** + * **green** + * **magenta** + * **red** + * **yellow** + * **grey** / **brightBlack** + * **brightRed** + * **brightGreen** + * **brightYellow** + * **brightBlue** + * **brightMagenta** + * **brightCyan** + * **brightWhite** +* Background Colors: + * **bgWhite** + * **bgBlack** + * **bgBlue** + * **bgCyan** + * **bgGreen** + * **bgMagenta** + * **bgRed** + * **bgYellow** + * **bgGrey** / **bgBrightBlack** + * **bgBrightRed** + * **bgBrightGreen** + * **bgBrightYellow** + * **bgBrightBlue** + * **bgBrightMagenta** + * **bgBrightCyan** + * **bgBrightWhite** + diff --git a/packages/pro/node_modules/copy-concurrently/README.md~ b/packages/pro/node_modules/copy-concurrently/README.md~ new file mode 100644 index 0000000000..7f93b2ad26 --- /dev/null +++ b/packages/pro/node_modules/copy-concurrently/README.md~ @@ -0,0 +1,127 @@ +# copy-concurrently + +Copy files, directories and symlinks + +``` +const copy = require('copy-concurrently') +copy('/path/to/thing', '/new/path/thing').then(() => { + // this is now copied +}).catch(err => { + // oh noooo +}) +``` + +Copies files, directories and symlinks. Ownership is maintained when +running as root, permissions are always maintained. On Windows, if symlinks +are unavailable then junctions will be used. + +## PUBLIC INTERFACE + +### copy(from, to, [options]) → Promise + +Recursively copies `from` to `to` and resolves its promise when finished. +If `to` already exists then the promise will be rejected with an `EEXIST` +error. + +Options are: + +* maxConcurrency – (Default: `1`) The maximum number of concurrent copies to do at once. +* recurseWith - (Default: `copy.item`) The function to call on each file after recursing into a directory. +* isWindows - (Default: `process.platform === 'win32'`) If true enables Windows symlink semantics. This requires + an extra `stat` to determine if the destination of a symlink is a file or directory. If symlinking a directory + fails then we'll try making a junction instead. + +Options can also include dependency injection: + +* Promise - (Default: `global.Promise`) The promise implementation to use, defaults to Node's. +* fs - (Default: `require('fs')`) The filesystem module to use. Can be used + to use `graceful-fs` or to inject a mock. +* writeStreamAtomic - (Default: `require('fs-write-stream-atomic')`) The + implementation of `writeStreamAtomic` to use. Used to inject a mock. +* getuid - (Default: `process.getuid`) A function that returns the current UID. Used to inject a mock. + +## EXTENSION INTERFACE + +Ordinarily you'd only call `copy` above. But it's possible to use it's +component functions directly. This is useful if, say, you're writing +[move-concurently](https://npmjs.com/package/move-concurrently). + +### copy.file(from, to, options) → Promise + +Copies a ordinary file `from` to destination `to`. Uses +`fs-write-stream-atomic` to ensure that the file is entirely copied or not +at all. + +Options are: + +* uid, gid - (Optional) If `getuid()` is `0` then this and gid will be used to + set the user and group of `to`. If uid is present then gid must be too. +* mode - (Optional) If set then `to` will have its perms set to `mode`. +* fs - (Default: `require('fs')`) The filesystem module to use. Can be used + to use `graceful-fs` or to inject a mock. +* Promise - (Default: `global.Promise`) The promise implementation to use, defaults to Node's. +* writeStreamAtomic - (Default `require('fs-write-stream-atomic')`) The + implementation of `writeStreamAtomic` to use. Used to inject a mock. + +### copy.symlink(from, to, options) → Promise + +Copies a symlink `from` to destination `to`. If on Windows then if +symlinking fails, a junction will be used instead. + +Options are: + +* top - The top level the copy is being run from. This is used to determine + if the symlink destination is within the set of files we're copying or + outside it. +* fs - (Default: `require('fs')`) The filesystem module to use. Can be used + to use `graceful-fs` or to inject a mock. +* Promise - (Default: `global.Promise`) The promise implementation to use, defaults to Node's. +* isWindows - (Default: `process.platform === 'win32'`) If true enables Windows symlink semantics. This requires + an extra `stat` to determine if the destination of a symlink is a file or directory. If symlinking a directory + fails then we'll try making a junction instead. + +### copy.recurse(from, to, options) → Promise + +Reads all of the files in directory `from` and adds them to the `queue` +using `recurseWith` (by default `copy.item`). + +Options are: + +* queue - A [`run-queue`](https://npmjs.com/package/run-queue) object to add files found inside `from` to. +* recurseWith - (Default: `copy.item`) The function to call on each file after recursing into a directory. +* uid, gid - (Optional) If `getuid()` is `0` then this and gid will be used to + set the user and group of `to`. If uid is present then gid must be too. +* mode - (Optional) If set then `to` will have its perms set to `mode`. +* fs - (Default: `require('fs')`) The filesystem module to use. Can be used + to use `graceful-fs` or to inject a mock. +* getuid - (Default: `process.getuid`) A function that returns the current UID. Used to inject a mock. + +### copy.item(from, to, options) → Promise + +Copies some kind of `from` to destination `to`. This looks at the filetype +and calls `copy.file`, `copy.symlink` or `copy.recurse` as appropriate. + +Symlink copies are queued with a priority such that they happen after all +file and directory copies as you can't create a junction on windows to a +file that doesn't exist yet. + +Options are: + +* top - The top level the copy is being run from. This is used to determine + if the symlink destination is within the set of files we're copying or + outside it. +* queue - The [`run-queue`](https://npmjs.com/package/run-queue) object to + pass to `copy.recurse` if `from` is a directory. +* recurseWith - (Default: `copy.item`) The function to call on each file after recursing into a directory. +* uid, gid - (Optional) If `getuid()` is `0` then this and gid will be used to + set the user and group of `to`. If uid is present then gid must be too. +* mode - (Optional) If set then `to` will have its perms set to `mode`. +* fs - (Default: `require('fs')`) The filesystem module to use. Can be used + to use `graceful-fs` or to inject a mock. +* getuid - (Default: `process.getuid`) A function that returns the current UID. Used to inject a mock. +* isWindows - (Default: `process.platform === 'win32'`) If true enables Windows symlink semantics. This requires + an extra `stat` to determine if the destination of a symlink is a file or directory. If symlinking a directory + fails then we'll try making a junction instead. +* Promise - (Default: `global.Promise`) The promise implementation to use, defaults to Node's. +* writeStreamAtomic - (Default `require('fs-write-stream-atomic')`) The + implementation of `writeStreamAtomic` to use. Used to inject a mock. diff --git a/packages/pro/node_modules/move-concurrently/README.md~ b/packages/pro/node_modules/move-concurrently/README.md~ new file mode 100644 index 0000000000..7d04d45955 --- /dev/null +++ b/packages/pro/node_modules/move-concurrently/README.md~ @@ -0,0 +1,52 @@ +# move-concurrently + +Move files and directories. + +``` +const move = require('move-concurrently') +move('/path/to/thing', '/new/path/thing'), err => { + if (err) throw err + // thing is now moved! +}) +``` + +Uses `rename` to move things as fast as possible. + +If you `move` across devices or on filesystems that don't support renaming +large directories. That is, situations that result in `rename` returning +the `EXDEV` error, then `move` will fallback to copy + delete. + +When recursively copying directories it will first try to rename the +contents before falling back to copying. While this will be slightly slower +in true cross-device scenarios, it is MUCH faster in cases where the +filesystem can't handle directory renames. + +When copying ownership is maintained when running as root. Permissions are +always maintained. On Windows, if symlinks are unavailable then junctions +will be used. + +## INTERFACE + +### move(from, to, options) → Promise + +Recursively moves `from` to `to` and resolves its promise when finished. +If `to` already exists then the promise will be rejected with an `EEXIST` +error. + +Starts by trying to rename `from` to `to`. + +Options are: + +* maxConcurrency – (Default: `1`) The maximum number of concurrent copies to do at once. +* isWindows - (Default: `process.platform === 'win32'`) If true enables Windows symlink semantics. This requires + an extra `stat` to determine if the destination of a symlink is a file or directory. If symlinking a directory + fails then we'll try making a junction instead. + +Options can also include dependency injection: + +* Promise - (Default: `global.Promise`) The promise implementation to use, defaults to Node's. +* fs - (Default: `require('fs')`) The filesystem module to use. Can be used + to use `graceful-fs` or to inject a mock. +* writeStreamAtomic - (Default: `require('fs-write-stream-atomic')`) The + implementation of `writeStreamAtomic` to use. Used to inject a mock. +* getuid - (Default: `process.getuid`) A function that returns the current UID. Used to inject a mock. diff --git a/packages/pro/node_modules/umask/index.js~ b/packages/pro/node_modules/umask/index.js~ new file mode 100644 index 0000000000..87738e4a10 --- /dev/null +++ b/packages/pro/node_modules/umask/index.js~ @@ -0,0 +1,68 @@ +// commands for handling umask + +var util = require("util") +var log = require("npmlog") + +var defaultUmask = 0022 +var defaultUmaskString = toString(defaultUmask) + +exports.toString = toString +exports.fromString = fromString +exports.validate = validate + +function toString(val) { + val = val.toString(8) + while (val.length < 4) val = "0" + val + return val +} + +function validate (data, k, val) { + // must be either an integer or an octal string. + if (typeof val === "number") { + data[k] = val + return true + } + + if (typeof val === "string") { + if (val.charAt(0) !== "0" || isNaN(val)) return false + data[k] = parseInt(val, 8) + return true + } + + return false +} + +function fromString(val, cb) { + + // synchronous callback, no zalgo + _fromString(val, cb || function (err, result) { + + if (err) log.warn("invalid umask", err.message) + val = result + }) + + return val +} + +function _fromString(val, cb) { + if(typeof val === "string") { + if (!/^[0-7]+$/.test(val)) { + return cb(new Error(util.format("Expected octal string, got %j, defaulting to %j", + val, defaultUmaskString)), + defaultUmask) + } + + val = parseInt(val, 8) + } else if(typeof val !== "number") { + return cb(new Error(util.format("Expected number or octal string, got %j, defaulting to %j", + val, defaultUmaskString)), + defaultUmask) + } + + if ((val < 0) || (val > 511)) { + return cb(new Error(util.format("Must be in range 0..511 (0000..0777), got %j", val)), + defaultUmask) + } + + cb(null, val) +} \ No newline at end of file diff --git a/packages/pro/node_modules/umask/test/ChangeLog~ b/packages/pro/node_modules/umask/test/ChangeLog~ new file mode 100644 index 0000000000..72ade111ec --- /dev/null +++ b/packages/pro/node_modules/umask/test/ChangeLog~ @@ -0,0 +1,5 @@ +2015-01-15 Sam Mikes + + * index.js: (convert_fromString) accept decimal strings provided they + don't begin with '0' + diff --git a/packages/pro/node_modules/umask/test/simple.js~ b/packages/pro/node_modules/umask/test/simple.js~ new file mode 100644 index 0000000000..a2184c3cd4 --- /dev/null +++ b/packages/pro/node_modules/umask/test/simple.js~ @@ -0,0 +1,14 @@ +'use strict'; + +var Purefts = require('..'); + +var Code = require('code'); +var Lab = require('lab'); +var lab = Lab.script(); +exports.lab = lab; + +var describe = lab.describe; +var it = lab.it; +var expect = Code.expect; + +describe('create objects', function () { diff --git a/packages/server/src/automations/triggers.ts b/packages/server/src/automations/triggers.ts index 78f8a87b0c..3436c1be7a 100644 --- a/packages/server/src/automations/triggers.ts +++ b/packages/server/src/automations/triggers.ts @@ -9,7 +9,7 @@ import { checkTestFlag } from "../utilities/redis" import * as utils from "./utils" import env from "../environment" import { context, db as dbCore } from "@budibase/backend-core" -import { Automation, Row } from "@budibase/types" +import { Automation, Row, AutomationData, AutomationJob } from "@budibase/types" export const TRIGGER_DEFINITIONS = definitions const JOB_OPTS = { @@ -109,14 +109,16 @@ export async function externalTrigger( } params.fields = coercedFields } - const data: Record = { automation, event: params } + + const data: AutomationData = { automation, event: params as any } if (getResponses) { data.event = { ...data.event, appId: context.getAppId(), automation, } - return utils.processEvent({ data }) + const job = { data } as AutomationJob + return utils.processEvent(job) } else { return automationQueue.add(data, JOB_OPTS) } diff --git a/packages/server/src/automations/utils.ts b/packages/server/src/automations/utils.ts index 5296a0fa50..d9f19013ee 100644 --- a/packages/server/src/automations/utils.ts +++ b/packages/server/src/automations/utils.ts @@ -8,7 +8,7 @@ import { db as dbCore, context } from "@budibase/backend-core" import { getAutomationMetadataParams } from "../db/utils" import { cloneDeep } from "lodash/fp" import { quotas } from "@budibase/pro" -import { Automation, WebhookActionType } from "@budibase/types" +import { Automation, AutomationJob, WebhookActionType } from "@budibase/types" import sdk from "../sdk" const REBOOT_CRON = "@reboot" @@ -16,27 +16,34 @@ const WH_STEP_ID = definitions.WEBHOOK.stepId const CRON_STEP_ID = definitions.CRON.stepId const Runner = new Thread(ThreadType.AUTOMATION) -const jobMessage = (job: any, message: string) => { - return `app=${job.data.event.appId} automation=${job.data.automation._id} jobId=${job.id} trigger=${job.data.automation.definition.trigger.event} : ${message}` +function loggingArgs(job: AutomationJob) { + return { + jobId: job.id, + trigger: job.data.automation.definition.trigger.event, + } } -export async function processEvent(job: any) { - try { - const automationId = job.data.automation._id - console.log(jobMessage(job, "running")) - // need to actually await these so that an error can be captured properly - return await context.doInContext(job.data.event.appId, async () => { +export async function processEvent(job: AutomationJob) { + const appId = job.data.event.appId! + const automationId = job.data.automation._id! + const task = async () => { + try { + // need to actually await these so that an error can be captured properly + console.log("automation running", loggingArgs(job)) + const runFn = () => Runner.run(job) - return quotas.addAutomation(runFn, { + const result = await quotas.addAutomation(runFn, { automationId, }) - }) - } catch (err) { - const errJson = JSON.stringify(err) - console.error(jobMessage(job, `was unable to run - ${errJson}`)) - console.trace(err) - return { err } + console.log("automation completed", loggingArgs(job)) + return result + } catch (err) { + console.error(`automation was unable to run`, err, loggingArgs(job)) + return { err } + } } + + return await context.doInAutomationContext({ appId, automationId, task }) } export async function updateTestHistory( diff --git a/packages/server/src/definitions/automations.ts b/packages/server/src/definitions/automations.ts index 79cbe6e4ac..7e86608bf3 100644 --- a/packages/server/src/definitions/automations.ts +++ b/packages/server/src/definitions/automations.ts @@ -1,4 +1,4 @@ -import { AutomationResults, AutomationStep, Document } from "@budibase/types" +import { AutomationResults, AutomationStep } from "@budibase/types" export enum LoopStepType { ARRAY = "Array", @@ -27,7 +27,3 @@ export interface AutomationContext extends AutomationResults { env?: Record trigger: any } - -export interface AutomationMetadata extends Document { - errorCount?: number -} diff --git a/packages/server/src/threads/automation.ts b/packages/server/src/threads/automation.ts index adc1a83af0..e271ebba0b 100644 --- a/packages/server/src/threads/automation.ts +++ b/packages/server/src/threads/automation.ts @@ -13,13 +13,18 @@ import { generateAutomationMetadataID, isProdAppID } from "../db/utils" import { definitions as triggerDefs } from "../automations/triggerInfo" import { AutomationErrors, MAX_AUTOMATION_RECURRING_ERRORS } from "../constants" import { storeLog } from "../automations/logging" -import { Automation, AutomationStep, AutomationStatus } from "@budibase/types" +import { + Automation, + AutomationStep, + AutomationStatus, + AutomationMetadata, + AutomationJob, +} from "@budibase/types" import { LoopStep, LoopInput, TriggerOutput, AutomationContext, - AutomationMetadata, } from "../definitions/automations" import { WorkerCallback } from "./definitions" import { context, logging } from "@budibase/backend-core" @@ -60,11 +65,11 @@ class Orchestrator { _job: Job executionOutput: AutomationContext - constructor(job: Job) { - let automation = job.data.automation, - triggerOutput = job.data.event + constructor(job: AutomationJob) { + let automation = job.data.automation + let triggerOutput = job.data.event const metadata = triggerOutput.metadata - this._chainCount = metadata ? metadata.automationChainCount : 0 + this._chainCount = metadata ? metadata.automationChainCount! : 0 this._appId = triggerOutput.appId as string this._job = job const triggerStepId = automation.definition.trigger.stepId diff --git a/packages/server/src/threads/definitions.ts b/packages/server/src/threads/definitions.ts index 2cf5d8066c..21e7ce0b69 100644 --- a/packages/server/src/threads/definitions.ts +++ b/packages/server/src/threads/definitions.ts @@ -1,5 +1,3 @@ -import { EnvironmentVariablesDecrypted } from "@budibase/types" - export type WorkerCallback = (error: any, response?: any) => void export interface QueryEvent { diff --git a/packages/server/src/threads/index.ts b/packages/server/src/threads/index.ts index 6876f1a07c..9b6bffa867 100644 --- a/packages/server/src/threads/index.ts +++ b/packages/server/src/threads/index.ts @@ -1,5 +1,7 @@ import workerFarm from "worker-farm" import env from "../environment" +import { AutomationJob } from "@budibase/types" +import { QueryEvent } from "./definitions" export const ThreadType = { QUERY: "query", @@ -64,11 +66,11 @@ export class Thread { ) } - run(data: any) { + run(job: AutomationJob | QueryEvent) { const timeout = this.timeoutMs return new Promise((resolve, reject) => { function fire(worker: any) { - worker.execute(data, (err: any, response: any) => { + worker.execute(job, (err: any, response: any) => { if (err && err.type === "TimeoutError") { reject( new Error(`Query response time exceeded ${timeout}ms timeout.`) diff --git a/packages/types/src/documents/app/automation.ts b/packages/types/src/documents/app/automation.ts index 110a2c0642..07d4654433 100644 --- a/packages/types/src/documents/app/automation.ts +++ b/packages/types/src/documents/app/automation.ts @@ -177,3 +177,8 @@ export type AutomationStepInput = { appId: string apiKey?: string } + +export interface AutomationMetadata extends Document { + errorCount?: number + automationChainCount?: number +} diff --git a/packages/types/src/sdk/automations/index.ts b/packages/types/src/sdk/automations/index.ts new file mode 100644 index 0000000000..f1e3d38101 --- /dev/null +++ b/packages/types/src/sdk/automations/index.ts @@ -0,0 +1,15 @@ +import { Automation, AutomationMetadata } from "../../documents" +import { Job } from "bull" + +export interface AutomationDataEvent { + appId?: string + metadata?: AutomationMetadata + automation?: Automation +} + +export interface AutomationData { + event: AutomationDataEvent + automation: Automation +} + +export type AutomationJob = Job diff --git a/packages/types/src/sdk/index.ts b/packages/types/src/sdk/index.ts index 8fc5fb287e..ed44c13667 100644 --- a/packages/types/src/sdk/index.ts +++ b/packages/types/src/sdk/index.ts @@ -1,3 +1,4 @@ +export * from "./automations" export * from "./hosting" export * from "./context" export * from "./events"