From 3c0901f53022e1d117fc50d57b6e10909ec0ef03 Mon Sep 17 00:00:00 2001 From: Sam Rose Date: Thu, 21 Dec 2023 11:06:05 +0000 Subject: [PATCH 1/2] Remove all custom tracing to see if it's the cause of the memory leak. --- .../backend-core/src/db/couch/DatabaseImpl.ts | 4 +- packages/backend-core/src/db/db.ts | 3 +- .../backend-core/src/db/instrumentation.ts | 156 ----- .../backend-core/src/logging/pino/logger.ts | 6 - packages/server/src/automations/utils.ts | 70 +-- packages/server/src/jsRunner.ts | 54 +- packages/server/src/middleware/currentapp.ts | 14 - packages/server/src/threads/automation.ts | 563 ++++++++---------- .../src/utilities/rowProcessor/utils.ts | 56 +- 9 files changed, 313 insertions(+), 613 deletions(-) delete mode 100644 packages/backend-core/src/db/instrumentation.ts diff --git a/packages/backend-core/src/db/couch/DatabaseImpl.ts b/packages/backend-core/src/db/couch/DatabaseImpl.ts index 3fec573bb9..c2c0b6b21d 100644 --- a/packages/backend-core/src/db/couch/DatabaseImpl.ts +++ b/packages/backend-core/src/db/couch/DatabaseImpl.ts @@ -17,7 +17,6 @@ import { directCouchUrlCall } from "./utils" import { getPouchDB } from "./pouchDB" import { WriteStream, ReadStream } from "fs" import { newid } from "../../docIds/newid" -import { DDInstrumentedDatabase } from "../instrumentation" function buildNano(couchInfo: { url: string; cookie: string }) { return Nano({ @@ -36,8 +35,7 @@ export function DatabaseWithConnection( connection: string, opts?: DatabaseOpts ) { - const db = new DatabaseImpl(dbName, opts, connection) - return new DDInstrumentedDatabase(db) + return new DatabaseImpl(dbName, opts, connection) } export class DatabaseImpl implements Database { diff --git a/packages/backend-core/src/db/db.ts b/packages/backend-core/src/db/db.ts index 197770298e..3e69d49f0e 100644 --- a/packages/backend-core/src/db/db.ts +++ b/packages/backend-core/src/db/db.ts @@ -1,9 +1,8 @@ import { directCouchQuery, DatabaseImpl } from "./couch" import { CouchFindOptions, Database, DatabaseOpts } from "@budibase/types" -import { DDInstrumentedDatabase } from "./instrumentation" export function getDB(dbName: string, opts?: DatabaseOpts): Database { - return new DDInstrumentedDatabase(new DatabaseImpl(dbName, opts)) + return new DatabaseImpl(dbName, opts) } // we have to use a callback for this so that we can close diff --git a/packages/backend-core/src/db/instrumentation.ts b/packages/backend-core/src/db/instrumentation.ts deleted file mode 100644 index ba5febcba6..0000000000 --- a/packages/backend-core/src/db/instrumentation.ts +++ /dev/null @@ -1,156 +0,0 @@ -import { - DocumentScope, - DocumentDestroyResponse, - DocumentInsertResponse, - DocumentBulkResponse, - OkResponse, -} from "@budibase/nano" -import { - AllDocsResponse, - AnyDocument, - Database, - DatabaseDumpOpts, - DatabasePutOpts, - DatabaseQueryOpts, - Document, -} from "@budibase/types" -import tracer from "dd-trace" -import { Writable } from "stream" - -export class DDInstrumentedDatabase implements Database { - constructor(private readonly db: Database) {} - - get name(): string { - return this.db.name - } - - exists(): Promise { - return tracer.trace("db.exists", span => { - span?.addTags({ db_name: this.name }) - return this.db.exists() - }) - } - - checkSetup(): Promise> { - return tracer.trace("db.checkSetup", span => { - span?.addTags({ db_name: this.name }) - return this.db.checkSetup() - }) - } - - get(id?: string | undefined): Promise { - return tracer.trace("db.get", span => { - span?.addTags({ db_name: this.name, doc_id: id }) - return this.db.get(id) - }) - } - - getMultiple( - ids: string[], - opts?: { allowMissing?: boolean | undefined } | undefined - ): Promise { - return tracer.trace("db.getMultiple", span => { - span?.addTags({ - db_name: this.name, - num_docs: ids.length, - allow_missing: opts?.allowMissing, - }) - return this.db.getMultiple(ids, opts) - }) - } - - remove( - id: string | Document, - rev?: string | undefined - ): Promise { - return tracer.trace("db.remove", span => { - span?.addTags({ db_name: this.name, doc_id: id }) - return this.db.remove(id, rev) - }) - } - - put( - document: AnyDocument, - opts?: DatabasePutOpts | undefined - ): Promise { - return tracer.trace("db.put", span => { - span?.addTags({ db_name: this.name, doc_id: document._id }) - return this.db.put(document, opts) - }) - } - - bulkDocs(documents: AnyDocument[]): Promise { - return tracer.trace("db.bulkDocs", span => { - span?.addTags({ db_name: this.name, num_docs: documents.length }) - return this.db.bulkDocs(documents) - }) - } - - allDocs( - params: DatabaseQueryOpts - ): Promise> { - return tracer.trace("db.allDocs", span => { - span?.addTags({ db_name: this.name }) - return this.db.allDocs(params) - }) - } - - query( - viewName: string, - params: DatabaseQueryOpts - ): Promise> { - return tracer.trace("db.query", span => { - span?.addTags({ db_name: this.name, view_name: viewName }) - return this.db.query(viewName, params) - }) - } - - destroy(): Promise { - return tracer.trace("db.destroy", span => { - span?.addTags({ db_name: this.name }) - return this.db.destroy() - }) - } - - compact(): Promise { - return tracer.trace("db.compact", span => { - span?.addTags({ db_name: this.name }) - return this.db.compact() - }) - } - - dump(stream: Writable, opts?: DatabaseDumpOpts | undefined): Promise { - return tracer.trace("db.dump", span => { - span?.addTags({ db_name: this.name }) - return this.db.dump(stream, opts) - }) - } - - load(...args: any[]): Promise { - return tracer.trace("db.load", span => { - span?.addTags({ db_name: this.name }) - return this.db.load(...args) - }) - } - - createIndex(...args: any[]): Promise { - return tracer.trace("db.createIndex", span => { - span?.addTags({ db_name: this.name }) - return this.db.createIndex(...args) - }) - } - - deleteIndex(...args: any[]): Promise { - return tracer.trace("db.deleteIndex", span => { - span?.addTags({ db_name: this.name }) - return this.db.deleteIndex(...args) - }) - } - - getIndexes(...args: any[]): Promise { - return tracer.trace("db.getIndexes", span => { - span?.addTags({ db_name: this.name }) - return this.db.getIndexes(...args) - }) - } -} diff --git a/packages/backend-core/src/logging/pino/logger.ts b/packages/backend-core/src/logging/pino/logger.ts index 7a051e7f12..ad68bd300d 100644 --- a/packages/backend-core/src/logging/pino/logger.ts +++ b/packages/backend-core/src/logging/pino/logger.ts @@ -5,7 +5,6 @@ import { IdentityType } from "@budibase/types" import env from "../../environment" import * as context from "../../context" import * as correlation from "../correlation" -import tracer from "dd-trace" import { formats } from "dd-trace/ext" import { localFileDestination } from "../system" @@ -117,11 +116,6 @@ if (!env.DISABLE_PINO_LOGGER) { correlationId: correlation.getId(), } - const span = tracer.scope().active() - if (span) { - tracer.inject(span.context(), formats.LOG, contextObject) - } - const mergingObject: any = { err: error, pid: process.pid, diff --git a/packages/server/src/automations/utils.ts b/packages/server/src/automations/utils.ts index 0c28787f67..04fd36e3d1 100644 --- a/packages/server/src/automations/utils.ts +++ b/packages/server/src/automations/utils.ts @@ -16,7 +16,6 @@ import { } from "@budibase/types" import sdk from "../sdk" import { automationsEnabled } from "../features" -import tracer from "dd-trace" const REBOOT_CRON = "@reboot" const WH_STEP_ID = definitions.WEBHOOK.stepId @@ -40,62 +39,27 @@ function loggingArgs(job: AutomationJob) { } export async function processEvent(job: AutomationJob) { - return tracer.trace( - "processEvent", - { resource: "automation" }, - async span => { - const appId = job.data.event.appId! - const automationId = job.data.automation._id! + const appId = job.data.event.appId! + const automationId = job.data.automation._id! - span?.addTags({ - appId, + const task = async () => { + try { + // need to actually await these so that an error can be captured properly + console.log("automation running", ...loggingArgs(job)) + + const runFn = () => Runner.run(job) + const result = await quotas.addAutomation(runFn, { automationId, - job: { - id: job.id, - name: job.name, - attemptsMade: job.attemptsMade, - opts: { - attempts: job.opts.attempts, - priority: job.opts.priority, - delay: job.opts.delay, - repeat: job.opts.repeat, - backoff: job.opts.backoff, - lifo: job.opts.lifo, - timeout: job.opts.timeout, - jobId: job.opts.jobId, - removeOnComplete: job.opts.removeOnComplete, - removeOnFail: job.opts.removeOnFail, - stackTraceLimit: job.opts.stackTraceLimit, - preventParsingData: job.opts.preventParsingData, - }, - }, }) - - const task = async () => { - try { - // need to actually await these so that an error can be captured properly - console.log("automation running", ...loggingArgs(job)) - - const runFn = () => Runner.run(job) - const result = await quotas.addAutomation(runFn, { - automationId, - }) - console.log("automation completed", ...loggingArgs(job)) - return result - } catch (err) { - span?.addTags({ error: true }) - console.error( - `automation was unable to run`, - err, - ...loggingArgs(job) - ) - return { err } - } - } - - return await context.doInAutomationContext({ appId, automationId, task }) + console.log("automation completed", ...loggingArgs(job)) + return result + } catch (err) { + console.error(`automation was unable to run`, err, ...loggingArgs(job)) + return { err } } - ) + } + + return await context.doInAutomationContext({ appId, automationId, task }) } export async function updateTestHistory( diff --git a/packages/server/src/jsRunner.ts b/packages/server/src/jsRunner.ts index ab0381a399..d6472c0e36 100644 --- a/packages/server/src/jsRunner.ts +++ b/packages/server/src/jsRunner.ts @@ -2,44 +2,34 @@ import vm from "vm" import env from "./environment" import { setJSRunner } from "@budibase/string-templates" import { context, timers } from "@budibase/backend-core" -import tracer from "dd-trace" - type TrackerFn = (f: () => T) => T export function init() { setJSRunner((js: string, ctx: vm.Context) => { - return tracer.trace("runJS", {}, span => { - const perRequestLimit = env.JS_PER_REQUEST_TIME_LIMIT_MS - let track: TrackerFn = f => f() - if (perRequestLimit) { - const bbCtx = context.getCurrentContext() - if (bbCtx) { - if (!bbCtx.jsExecutionTracker) { - bbCtx.jsExecutionTracker = - timers.ExecutionTimeTracker.withLimit(perRequestLimit) - } - track = bbCtx.jsExecutionTracker.track.bind(bbCtx.jsExecutionTracker) - span?.addTags({ - js: { - limitMS: bbCtx.jsExecutionTracker.limitMs, - elapsedMS: bbCtx.jsExecutionTracker.elapsedMS, - }, - }) + const perRequestLimit = env.JS_PER_REQUEST_TIME_LIMIT_MS + let track: TrackerFn = f => f() + if (perRequestLimit) { + const bbCtx = context.getCurrentContext() + if (bbCtx) { + if (!bbCtx.jsExecutionTracker) { + bbCtx.jsExecutionTracker = + timers.ExecutionTimeTracker.withLimit(perRequestLimit) } + track = bbCtx.jsExecutionTracker.track.bind(bbCtx.jsExecutionTracker) } + } - ctx = { - ...ctx, - alert: undefined, - setInterval: undefined, - setTimeout: undefined, - } - vm.createContext(ctx) - return track(() => - vm.runInNewContext(js, ctx, { - timeout: env.JS_PER_EXECUTION_TIME_LIMIT_MS, - }) - ) - }) + ctx = { + ...ctx, + alert: undefined, + setInterval: undefined, + setTimeout: undefined, + } + vm.createContext(ctx) + return track(() => + vm.runInNewContext(js, ctx, { + timeout: env.JS_PER_EXECUTION_TIME_LIMIT_MS, + }) + ) }) } diff --git a/packages/server/src/middleware/currentapp.ts b/packages/server/src/middleware/currentapp.ts index ad6f2afa18..984dd8e5e9 100644 --- a/packages/server/src/middleware/currentapp.ts +++ b/packages/server/src/middleware/currentapp.ts @@ -12,7 +12,6 @@ import { getCachedSelf } from "../utilities/global" import env from "../environment" import { isWebhookEndpoint } from "./utils" import { UserCtx, ContextUser } from "@budibase/types" -import tracer from "dd-trace" export default async (ctx: UserCtx, next: any) => { // try to get the appID from the request @@ -21,11 +20,6 @@ export default async (ctx: UserCtx, next: any) => { return next() } - if (requestAppId) { - const span = tracer.scope().active() - span?.setTag("appId", requestAppId) - } - // deny access to application preview if (!env.isTest()) { if ( @@ -76,14 +70,6 @@ export default async (ctx: UserCtx, next: any) => { return next() } - if (ctx.user) { - const span = tracer.scope().active() - if (ctx.user._id) { - span?.setTag("userId", ctx.user._id) - } - span?.setTag("tenantId", ctx.user.tenantId) - } - const userId = ctx.user ? generateUserMetadataID(ctx.user._id!) : undefined // if the user is not in the right tenant then make sure to wipe their cookie diff --git a/packages/server/src/threads/automation.ts b/packages/server/src/threads/automation.ts index 4447899f96..9bb1717f3c 100644 --- a/packages/server/src/threads/automation.ts +++ b/packages/server/src/threads/automation.ts @@ -34,7 +34,6 @@ import { cloneDeep } from "lodash/fp" import { performance } from "perf_hooks" import * as sdkUtils from "../sdk/utils" import env from "../environment" -import tracer from "dd-trace" threadUtils.threadSetup() const FILTER_STEP_ID = actions.BUILTIN_ACTION_DEFINITIONS.FILTER.stepId @@ -243,347 +242,281 @@ class Orchestrator { } async execute(): Promise { - return tracer.trace( - "Orchestrator.execute", - { resource: "automation" }, - async span => { - span?.addTags({ - appId: this._appId, - automationId: this._automation._id, - }) + // this will retrieve from context created at start of thread + this._context.env = await sdkUtils.getEnvironmentVariables() + let automation = this._automation + let stopped = false + let loopStep: AutomationStep | undefined = undefined - // this will retrieve from context created at start of thread - this._context.env = await sdkUtils.getEnvironmentVariables() - let automation = this._automation - let stopped = false - let loopStep: AutomationStep | undefined = undefined + let stepCount = 0 + let loopStepNumber: any = undefined + let loopSteps: LoopStep[] | undefined = [] + let metadata + let timeoutFlag = false + let wasLoopStep = false + let timeout = this._job.data.event.timeout + // check if this is a recurring automation, + if (isProdAppID(this._appId) && isRecurring(automation)) { + metadata = await this.getMetadata() + const shouldStop = await this.checkIfShouldStop(metadata) + if (shouldStop) { + return + } + } + const start = performance.now() + for (let step of automation.definition.steps) { + let input: any, + iterations = 1, + iterationCount = 0 - let stepCount = 0 - let loopStepNumber: any = undefined - let loopSteps: LoopStep[] | undefined = [] - let metadata - let timeoutFlag = false - let wasLoopStep = false - let timeout = this._job.data.event.timeout - // check if this is a recurring automation, - if (isProdAppID(this._appId) && isRecurring(automation)) { - span?.addTags({ recurring: true }) - metadata = await this.getMetadata() - const shouldStop = await this.checkIfShouldStop(metadata) - if (shouldStop) { - span?.addTags({ shouldStop: true }) - return + if (timeoutFlag) { + break + } + + if (timeout) { + setTimeout(() => { + timeoutFlag = true + }, timeout || 12000) + } + + stepCount++ + if (step.stepId === LOOP_STEP_ID) { + loopStep = step + loopStepNumber = stepCount + continue + } + + if (loopStep) { + input = await processObject(loopStep.inputs, this._context) + iterations = getLoopIterations(loopStep as LoopStep) + } + for (let index = 0; index < iterations; index++) { + let originalStepInput = cloneDeep(step.inputs) + // Handle if the user has set a max iteration count or if it reaches the max limit set by us + if (loopStep && input.binding) { + let tempOutput = { + items: loopSteps, + iterations: iterationCount, } - } - const start = performance.now() - for (let step of automation.definition.steps) { - const stepSpan = tracer.startSpan("Orchestrator.execute.step", { - childOf: span, - }) - stepSpan.addTags({ - resource: "automation", - step: { - stepId: step.stepId, - id: step.id, - name: step.name, - type: step.type, - title: step.stepTitle, - internal: step.internal, - deprecated: step.deprecated, - }, - }) - - let input: any, - iterations = 1, - iterationCount = 0 - try { - if (timeoutFlag) { - span?.addTags({ timedOut: true }) - break - } + loopStep.inputs.binding = automationUtils.typecastForLooping( + loopStep as LoopStep, + loopStep.inputs as LoopInput + ) + } catch (err) { + this.updateContextAndOutput(loopStepNumber, step, tempOutput, { + status: AutomationErrors.INCORRECT_TYPE, + success: false, + }) + loopSteps = undefined + loopStep = undefined + break + } + let item = [] + if ( + typeof loopStep.inputs.binding === "string" && + loopStep.inputs.option === "String" + ) { + item = automationUtils.stringSplit(loopStep.inputs.binding) + } else if (Array.isArray(loopStep.inputs.binding)) { + item = loopStep.inputs.binding + } + this._context.steps[loopStepNumber] = { + currentItem: item[index], + } - if (timeout) { - setTimeout(() => { - timeoutFlag = true - }, timeout || 12000) - } - - stepCount++ - if (step.stepId === LOOP_STEP_ID) { - loopStep = step - loopStepNumber = stepCount - continue - } - - if (loopStep) { - input = await processObject(loopStep.inputs, this._context) - iterations = getLoopIterations(loopStep as LoopStep) - stepSpan?.addTags({ step: { iterations } }) - } - for (let index = 0; index < iterations; index++) { - let originalStepInput = cloneDeep(step.inputs) - // Handle if the user has set a max iteration count or if it reaches the max limit set by us - if (loopStep && input.binding) { - let tempOutput = { - items: loopSteps, - iterations: iterationCount, - } - try { - loopStep.inputs.binding = automationUtils.typecastForLooping( - loopStep as LoopStep, - loopStep.inputs as LoopInput - ) - } catch (err) { - this.updateContextAndOutput( - loopStepNumber, - step, - tempOutput, - { - status: AutomationErrors.INCORRECT_TYPE, - success: false, - } - ) - loopSteps = undefined - loopStep = undefined - break - } - let item = [] - if ( - typeof loopStep.inputs.binding === "string" && - loopStep.inputs.option === "String" - ) { - item = automationUtils.stringSplit(loopStep.inputs.binding) - } else if (Array.isArray(loopStep.inputs.binding)) { - item = loopStep.inputs.binding - } - this._context.steps[loopStepNumber] = { - currentItem: item[index], - } - - // The "Loop" binding in the front end is "fake", so replace it here so the context can understand it - // Pretty hacky because we need to account for the row object - for (let [key, value] of Object.entries(originalStepInput)) { - if (typeof value === "object") { - for (let [innerKey, innerValue] of Object.entries( - originalStepInput[key] - )) { - if (typeof innerValue === "string") { - originalStepInput[key][innerKey] = - automationUtils.substituteLoopStep( - innerValue, - `steps.${loopStepNumber}` - ) - } else if (typeof value === "object") { - for (let [innerObject, innerValue] of Object.entries( - originalStepInput[key][innerKey] - )) { - originalStepInput[key][innerKey][innerObject] = - automationUtils.substituteLoopStep( - innerValue as string, - `steps.${loopStepNumber}` - ) - } - } - } - } else { - if (typeof value === "string") { - originalStepInput[key] = - automationUtils.substituteLoopStep( - value, - `steps.${loopStepNumber}` - ) - } + // The "Loop" binding in the front end is "fake", so replace it here so the context can understand it + // Pretty hacky because we need to account for the row object + for (let [key, value] of Object.entries(originalStepInput)) { + if (typeof value === "object") { + for (let [innerKey, innerValue] of Object.entries( + originalStepInput[key] + )) { + if (typeof innerValue === "string") { + originalStepInput[key][innerKey] = + automationUtils.substituteLoopStep( + innerValue, + `steps.${loopStepNumber}` + ) + } else if (typeof value === "object") { + for (let [innerObject, innerValue] of Object.entries( + originalStepInput[key][innerKey] + )) { + originalStepInput[key][innerKey][innerObject] = + automationUtils.substituteLoopStep( + innerValue as string, + `steps.${loopStepNumber}` + ) } } - - if ( - index === env.AUTOMATION_MAX_ITERATIONS || - index === parseInt(loopStep.inputs.iterations) - ) { - this.updateContextAndOutput( - loopStepNumber, - step, - tempOutput, - { - status: AutomationErrors.MAX_ITERATIONS, - success: true, - } - ) - loopSteps = undefined - loopStep = undefined - break - } - - let isFailure = false - const currentItem = - this._context.steps[loopStepNumber]?.currentItem - if (currentItem && typeof currentItem === "object") { - isFailure = Object.keys(currentItem).some(value => { - return currentItem[value] === loopStep?.inputs.failure - }) - } else { - isFailure = - currentItem && currentItem === loopStep.inputs.failure - } - - if (isFailure) { - this.updateContextAndOutput( - loopStepNumber, - step, - tempOutput, - { - status: AutomationErrors.FAILURE_CONDITION, - success: false, - } - ) - loopSteps = undefined - loopStep = undefined - break - } } - - // execution stopped, record state for that - if (stopped) { - this.updateExecutionOutput( - step.id, - step.stepId, - {}, - STOPPED_STATUS + } else { + if (typeof value === "string") { + originalStepInput[key] = automationUtils.substituteLoopStep( + value, + `steps.${loopStepNumber}` ) - continue - } - - // If it's a loop step, we need to manually add the bindings to the context - let stepFn = await this.getStepFunctionality(step.stepId) - let inputs = await processObject(originalStepInput, this._context) - inputs = automationUtils.cleanInputValues( - inputs, - step.schema.inputs - ) - - try { - // appId is always passed - const outputs = await stepFn({ - inputs: inputs, - appId: this._appId, - emitter: this._emitter, - context: this._context, - }) - - this._context.steps[stepCount] = outputs - // if filter causes us to stop execution don't break the loop, set a var - // so that we can finish iterating through the steps and record that it stopped - if (step.stepId === FILTER_STEP_ID && !outputs.result) { - stopped = true - this.updateExecutionOutput( - step.id, - step.stepId, - step.inputs, - { - ...outputs, - ...STOPPED_STATUS, - } - ) - continue - } - if (loopStep && loopSteps) { - loopSteps.push(outputs) - } else { - this.updateExecutionOutput( - step.id, - step.stepId, - step.inputs, - outputs - ) - } - } catch (err) { - console.error(`Automation error - ${step.stepId} - ${err}`) - return err - } - - if (loopStep) { - iterationCount++ - if (index === iterations - 1) { - loopStep = undefined - this._context.steps.splice(loopStepNumber, 1) - break - } } } - } finally { - stepSpan?.finish() } - if (loopStep && iterations === 0) { - loopStep = undefined - this.executionOutput.steps.splice(loopStepNumber + 1, 0, { - id: step.id, - stepId: step.stepId, - outputs: { - status: AutomationStepStatus.NO_ITERATIONS, - success: true, - }, - inputs: {}, - }) - - this._context.steps.splice(loopStepNumber, 1) - iterations = 1 - } - - // Delete the step after the loop step as it's irrelevant, since information is included - // in the loop step - if (wasLoopStep && !loopStep) { - this._context.steps.splice(loopStepNumber + 1, 1) - wasLoopStep = false - } - if (loopSteps && loopSteps.length) { - let tempOutput = { + if ( + index === env.AUTOMATION_MAX_ITERATIONS || + index === parseInt(loopStep.inputs.iterations) + ) { + this.updateContextAndOutput(loopStepNumber, step, tempOutput, { + status: AutomationErrors.MAX_ITERATIONS, success: true, - items: loopSteps, - iterations: iterationCount, - } - this.executionOutput.steps.splice(loopStepNumber + 1, 0, { - id: step.id, - stepId: step.stepId, - outputs: tempOutput, - inputs: step.inputs, }) - this._context.steps[loopStepNumber] = tempOutput + loopSteps = undefined + loopStep = undefined + break + } - wasLoopStep = true - loopSteps = [] + let isFailure = false + const currentItem = this._context.steps[loopStepNumber]?.currentItem + if (currentItem && typeof currentItem === "object") { + isFailure = Object.keys(currentItem).some(value => { + return currentItem[value] === loopStep?.inputs.failure + }) + } else { + isFailure = currentItem && currentItem === loopStep.inputs.failure + } + + if (isFailure) { + this.updateContextAndOutput(loopStepNumber, step, tempOutput, { + status: AutomationErrors.FAILURE_CONDITION, + success: false, + }) + loopSteps = undefined + loopStep = undefined + break } } - const end = performance.now() - const executionTime = end - start + // execution stopped, record state for that + if (stopped) { + this.updateExecutionOutput(step.id, step.stepId, {}, STOPPED_STATUS) + continue + } - console.info( - `Automation ID: ${automation._id} Execution time: ${executionTime} milliseconds`, - { - _logKey: "automation", - executionTime, - } - ) + // If it's a loop step, we need to manually add the bindings to the context + let stepFn = await this.getStepFunctionality(step.stepId) + let inputs = await processObject(originalStepInput, this._context) + inputs = automationUtils.cleanInputValues(inputs, step.schema.inputs) - // store the logs for the automation run try { - await storeLog(this._automation, this.executionOutput) - } catch (e: any) { - if (e.status === 413 && e.request?.data) { - // if content is too large we shouldn't log it - delete e.request.data - e.request.data = { message: "removed due to large size" } + // appId is always passed + const outputs = await stepFn({ + inputs: inputs, + appId: this._appId, + emitter: this._emitter, + context: this._context, + }) + + this._context.steps[stepCount] = outputs + // if filter causes us to stop execution don't break the loop, set a var + // so that we can finish iterating through the steps and record that it stopped + if (step.stepId === FILTER_STEP_ID && !outputs.result) { + stopped = true + this.updateExecutionOutput(step.id, step.stepId, step.inputs, { + ...outputs, + ...STOPPED_STATUS, + }) + continue } - logging.logAlert("Error writing automation log", e) + if (loopStep && loopSteps) { + loopSteps.push(outputs) + } else { + this.updateExecutionOutput( + step.id, + step.stepId, + step.inputs, + outputs + ) + } + } catch (err) { + console.error(`Automation error - ${step.stepId} - ${err}`) + return err } - if (isProdAppID(this._appId) && isRecurring(automation) && metadata) { - await this.updateMetadata(metadata) + + if (loopStep) { + iterationCount++ + if (index === iterations - 1) { + loopStep = undefined + this._context.steps.splice(loopStepNumber, 1) + break + } } - return this.executionOutput + } + + if (loopStep && iterations === 0) { + loopStep = undefined + this.executionOutput.steps.splice(loopStepNumber + 1, 0, { + id: step.id, + stepId: step.stepId, + outputs: { + status: AutomationStepStatus.NO_ITERATIONS, + success: true, + }, + inputs: {}, + }) + + this._context.steps.splice(loopStepNumber, 1) + iterations = 1 + } + + // Delete the step after the loop step as it's irrelevant, since information is included + // in the loop step + if (wasLoopStep && !loopStep) { + this._context.steps.splice(loopStepNumber + 1, 1) + wasLoopStep = false + } + if (loopSteps && loopSteps.length) { + let tempOutput = { + success: true, + items: loopSteps, + iterations: iterationCount, + } + this.executionOutput.steps.splice(loopStepNumber + 1, 0, { + id: step.id, + stepId: step.stepId, + outputs: tempOutput, + inputs: step.inputs, + }) + this._context.steps[loopStepNumber] = tempOutput + + wasLoopStep = true + loopSteps = [] + } + } + + const end = performance.now() + const executionTime = end - start + + console.info( + `Automation ID: ${automation._id} Execution time: ${executionTime} milliseconds`, + { + _logKey: "automation", + executionTime, } ) + + // store the logs for the automation run + try { + await storeLog(this._automation, this.executionOutput) + } catch (e: any) { + if (e.status === 413 && e.request?.data) { + // if content is too large we shouldn't log it + delete e.request.data + e.request.data = { message: "removed due to large size" } + } + logging.logAlert("Error writing automation log", e) + } + if (isProdAppID(this._appId) && isRecurring(automation) && metadata) { + await this.updateMetadata(metadata) + } + return this.executionOutput } } diff --git a/packages/server/src/utilities/rowProcessor/utils.ts b/packages/server/src/utilities/rowProcessor/utils.ts index cafd366cae..22c099c58c 100644 --- a/packages/server/src/utilities/rowProcessor/utils.ts +++ b/packages/server/src/utilities/rowProcessor/utils.ts @@ -11,7 +11,6 @@ import { Row, Table, } from "@budibase/types" -import tracer from "dd-trace" interface FormulaOpts { dynamic?: boolean @@ -51,42 +50,35 @@ export function processFormulas( inputRows: T, { dynamic, contextRows }: FormulaOpts = { dynamic: true } ): T { - return tracer.trace("processFormulas", {}, span => { - const numRows = Array.isArray(inputRows) ? inputRows.length : 1 - span?.addTags({ table_id: table._id, dynamic, numRows }) - const rows = Array.isArray(inputRows) ? inputRows : [inputRows] - if (rows) { - for (let [column, schema] of Object.entries(table.schema)) { - if (schema.type !== FieldTypes.FORMULA) { - continue - } + const rows = Array.isArray(inputRows) ? inputRows : [inputRows] + if (rows) { + for (let [column, schema] of Object.entries(table.schema)) { + if (schema.type !== FieldTypes.FORMULA) { + continue + } - const isStatic = schema.formulaType === FormulaTypes.STATIC + const isStatic = schema.formulaType === FormulaTypes.STATIC - if ( - schema.formula == null || - (dynamic && isStatic) || - (!dynamic && !isStatic) - ) { - continue - } - // iterate through rows and process formula - for (let i = 0; i < rows.length; i++) { - let row = rows[i] - let context = contextRows ? contextRows[i] : row - let formula = schema.formula - rows[i] = { - ...row, - [column]: tracer.trace("processStringSync", {}, span => { - span?.addTags({ table_id: table._id, column, static: isStatic }) - return processStringSync(formula, context) - }), - } + if ( + schema.formula == null || + (dynamic && isStatic) || + (!dynamic && !isStatic) + ) { + continue + } + // iterate through rows and process formula + for (let i = 0; i < rows.length; i++) { + let row = rows[i] + let context = contextRows ? contextRows[i] : row + let formula = schema.formula + rows[i] = { + ...row, + [column]: processStringSync(formula, context), } } } - return Array.isArray(inputRows) ? rows : rows[0] - }) + } + return Array.isArray(inputRows) ? rows : rows[0] } /** From df73cbd0018cd643cb8fb46a6e0ad8e56e766371 Mon Sep 17 00:00:00 2001 From: Sam Rose Date: Thu, 21 Dec 2023 11:13:39 +0000 Subject: [PATCH 2/2] Fix lint warning. --- packages/server/src/jsRunner.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/packages/server/src/jsRunner.ts b/packages/server/src/jsRunner.ts index d6472c0e36..a9301feb60 100644 --- a/packages/server/src/jsRunner.ts +++ b/packages/server/src/jsRunner.ts @@ -2,6 +2,7 @@ import vm from "vm" import env from "./environment" import { setJSRunner } from "@budibase/string-templates" import { context, timers } from "@budibase/backend-core" + type TrackerFn = (f: () => T) => T export function init() {