1
0
Fork 0
mirror of synced 2024-10-05 12:34:50 +13:00

Logging updates for automations

This commit is contained in:
Rory Powell 2023-05-17 13:54:20 +01:00
parent 42f472b038
commit ad37186665
5 changed files with 160 additions and 66 deletions

View file

@ -104,6 +104,18 @@ async function newContext(updates: ContextMap, task: any) {
return Context.run(context, task)
}
export async function doInAutomationContext(appId: string, automationId: string, task: any): Promise<any> {
const tenantId = getTenantIDFromAppID(appId)
return newContext(
{
tenantId,
appId,
automationId
},
task
)
}
export async function doInContext(appId: string, task: any): Promise<any> {
const tenantId = getTenantIDFromAppID(appId)
return newContext(
@ -187,6 +199,11 @@ export function getTenantId(): string {
return tenantId
}
export function getAutomationId(): string | undefined {
const context = Context.get()
return context?.automationId
}
export function getAppId(): string | undefined {
const context = Context.get()
const foundId = context?.appId

View file

@ -7,4 +7,5 @@ export type ContextMap = {
identity?: IdentityContext
environmentVariables?: Record<string, string>
isScim?: boolean
automationId?: string
}

View file

@ -39,6 +39,7 @@ if (!env.DISABLE_PINO_LOGGER) {
objects?: any[]
tenantId?: string
appId?: string
automationId?: string
identityId?: string
identityType?: IdentityType
correlationId?: string
@ -86,6 +87,7 @@ if (!env.DISABLE_PINO_LOGGER) {
contextObject = {
tenantId: getTenantId(),
appId: getAppId(),
automationId: getAutomationId(),
identityId: identity?._id,
identityType: identity?.type,
correlationId: correlation.getId(),
@ -159,6 +161,16 @@ if (!env.DISABLE_PINO_LOGGER) {
return appId
}
const getAutomationId = () => {
let appId
try {
appId = context.getAutomationId()
} catch (e) {
// do nothing
}
return appId
}
const getIdentity = () => {
let identity
try {

View file

@ -1,5 +1,6 @@
import { Job, JobId, Queue } from "bull"
import { JobQueue } from "./constants"
import * as context from "../context"
export type StalledFn = (job: Job) => Promise<void>
@ -31,77 +32,135 @@ function handleStalled(queue: Queue, removeStalledCb?: StalledFn) {
})
}
function logging(queue: Queue, jobQueue: JobQueue) {
let eventType: string
switch (jobQueue) {
case JobQueue.AUTOMATION:
eventType = "automation-event"
break
case JobQueue.APP_BACKUP:
eventType = "app-backup-event"
break
case JobQueue.AUDIT_LOG:
eventType = "audit-log-event"
break
case JobQueue.SYSTEM_EVENT_QUEUE:
eventType = "system-event"
break
function getLogParams(
eventType: QueueEventType,
event: BullEvent,
opts: {
job?: Job,
jobId?: JobId,
error?: Error
} = {},
extra: any = {}
) {
const message = `[BULL] ${eventType}=${event}`
const err = opts.error
const data = {
eventType,
event,
job: opts.job,
jobId: opts.jobId || opts.job?.id,
...extra
}
return [message, err, data]
}
enum BullEvent {
ERROR="error",
WAITING="waiting",
ACTIVE="active",
STALLED="stalled",
PROGRESS="progress",
COMPLETED="completed",
FAILED="failed",
PAUSED="paused",
RESUMED="resumed",
CLEANED="cleaned",
DRAINED="drained",
REMOVED="removed",
}
enum QueueEventType {
AUTOMATION_EVENT="automation-event",
APP_BACKUP_EVENT="app-backup-event",
AUDIT_LOG_EVENT="audit-log-event",
SYSTEM_EVENT="system-event"
}
const EventTypeMap: { [key in JobQueue]: QueueEventType } = {
[JobQueue.AUTOMATION]: QueueEventType.AUTOMATION_EVENT,
[JobQueue.APP_BACKUP]: QueueEventType.APP_BACKUP_EVENT,
[JobQueue.AUDIT_LOG]: QueueEventType.AUDIT_LOG_EVENT,
[JobQueue.SYSTEM_EVENT_QUEUE]: QueueEventType.SYSTEM_EVENT,
}
function logging(queue: Queue, jobQueue: JobQueue) {
const eventType = EventTypeMap[jobQueue]
function doInJobContext(job: Job, task: any) {
// if this is an automation job try to get the app id
const appId = job.data.event?.appId
if (appId) {
return context.doInContext(appId, task)
} else {
task()
}
}
queue
.on(BullEvent.STALLED, async (job: Job) => {
// A job has been marked as stalled. This is useful for debugging job
// workers that crash or pause the event loop.
await doInJobContext(job, () => {
console.error(...getLogParams(eventType, BullEvent.STALLED, { job }))
})
})
.on(BullEvent.ERROR, (error: any) => {
// An error occurred.
console.error(...getLogParams(eventType, BullEvent.ERROR, { error }))
})
if (process.env.NODE_DEBUG?.includes("bull")) {
queue
.on("error", (error: any) => {
// An error occurred.
console.error(`${eventType}=error error=${JSON.stringify(error)}`)
})
.on("waiting", (jobId: JobId) => {
.on(BullEvent.WAITING, (jobId: JobId) => {
// A Job is waiting to be processed as soon as a worker is idling.
console.log(`${eventType}=waiting jobId=${jobId}`)
console.info(...getLogParams(eventType, BullEvent.WAITING, { jobId }))
})
.on("active", (job: Job, jobPromise: any) => {
.on(BullEvent.ACTIVE, async (job: Job, jobPromise: any) => {
// A job has started. You can use `jobPromise.cancel()`` to abort it.
console.log(`${eventType}=active jobId=${job.id}`)
await doInJobContext(job, () => {
console.info(...getLogParams(eventType, BullEvent.ACTIVE, { job }))
})
})
.on("stalled", (job: Job) => {
// A job has been marked as stalled. This is useful for debugging job
// workers that crash or pause the event loop.
console.error(
`${eventType}=stalled jobId=${job.id} job=${JSON.stringify(job)}`
)
.on(BullEvent.PROGRESS, async (job: Job, progress: any) => {
// A job's progress was updated
await doInJobContext(job, () => {
console.info(...getLogParams(eventType, BullEvent.PROGRESS, { job }, { progress }))
})
})
.on("progress", (job: Job, progress: any) => {
// A job's progress was updated!
console.log(
`${eventType}=progress jobId=${job.id} progress=${progress}`
)
})
.on("completed", (job: Job, result) => {
.on(BullEvent.COMPLETED, async (job: Job, result) => {
// A job successfully completed with a `result`.
console.log(`${eventType}=completed jobId=${job.id} result=${result}`)
await doInJobContext(job, () => {
console.info(...getLogParams(eventType, BullEvent.COMPLETED, { job }, { result }))
})
})
.on("failed", (job, err: any) => {
.on(BullEvent.FAILED, async (job: Job, error: any) => {
// A job failed with reason `err`!
console.log(`${eventType}=failed jobId=${job.id} error=${err}`)
await doInJobContext(job, () => {
console.error(...getLogParams(eventType, BullEvent.FAILED, { job, error }))
})
})
.on("paused", () => {
.on(BullEvent.PAUSED, () => {
// The queue has been paused.
console.log(`${eventType}=paused`)
console.info(...getLogParams(eventType, BullEvent.PAUSED))
})
.on("resumed", (job: Job) => {
.on(BullEvent.RESUMED, () => {
// The queue has been resumed.
console.log(`${eventType}=paused jobId=${job.id}`)
console.info(...getLogParams(eventType, BullEvent.RESUMED))
})
.on("cleaned", (jobs: Job[], type: string) => {
.on(BullEvent.CLEANED, (jobs: Job[], type: string) => {
// Old jobs have been cleaned from the queue. `jobs` is an array of cleaned
// jobs, and `type` is the type of jobs cleaned.
console.log(`${eventType}=cleaned length=${jobs.length} type=${type}`)
console.info(...getLogParams(eventType, BullEvent.CLEANED, {}, { length: jobs.length, type } ))
})
.on("drained", () => {
.on(BullEvent.DRAINED, () => {
// Emitted every time the queue has processed all the waiting jobs (even if there can be some delayed jobs not yet processed)
console.log(`${eventType}=drained`)
console.info(...getLogParams(eventType, BullEvent.DRAINED ))
})
.on("removed", (job: Job) => {
.on(BullEvent.REMOVED, (job: Job) => {
// A job successfully removed.
console.log(`${eventType}=removed jobId=${job.id}`)
console.info(...getLogParams(eventType, BullEvent.REMOVED, { job } ))
})
}
}

View file

@ -8,7 +8,7 @@ import { db as dbCore, context } from "@budibase/backend-core"
import { getAutomationMetadataParams } from "../db/utils"
import { cloneDeep } from "lodash/fp"
import { quotas } from "@budibase/pro"
import { Automation, WebhookActionType } from "@budibase/types"
import { Automation, AutomationJob, WebhookActionType } from "@budibase/types"
import sdk from "../sdk"
const REBOOT_CRON = "@reboot"
@ -16,27 +16,32 @@ const WH_STEP_ID = definitions.WEBHOOK.stepId
const CRON_STEP_ID = definitions.CRON.stepId
const Runner = new Thread(ThreadType.AUTOMATION)
const jobMessage = (job: any, message: string) => {
return `app=${job.data.event.appId} automation=${job.data.automation._id} jobId=${job.id} trigger=${job.data.automation.definition.trigger.event} : ${message}`
function loggingArgs(job: AutomationJob) {
return {
jobId: job.id,
trigger: job.data.automation.definition.trigger.event
}
}
export async function processEvent(job: any) {
try {
const automationId = job.data.automation._id
console.log(jobMessage(job, "running"))
// need to actually await these so that an error can be captured properly
return await context.doInContext(job.data.event.appId, async () => {
export async function processEvent(job: AutomationJob) {
const appId = job.data.event.appId!
const automationId = job.data.automation._id!
return await context.doInAutomationContext(appId, automationId, async () => {
try {
// need to actually await these so that an error can be captured properly
console.log("automation running", loggingArgs(job))
const runFn = () => Runner.run(job)
return quotas.addAutomation(runFn, {
const result = await quotas.addAutomation(runFn, {
automationId,
})
})
} catch (err) {
const errJson = JSON.stringify(err)
console.error(jobMessage(job, `was unable to run - ${errJson}`))
console.trace(err)
return { err }
}
console.log("automation completed", loggingArgs(job))
return result
} catch (err) {
console.error(`automation was unable to run`, err, loggingArgs(job))
return { err }
}
})
}
export async function updateTestHistory(