From 8853776c7982fa0ba15595aa0c64cb01a4f9ed75 Mon Sep 17 00:00:00 2001 From: mike12345567 Date: Fri, 24 Feb 2023 13:52:54 +0000 Subject: [PATCH] Moving around processors to separate audit logs out of central event handling. --- packages/backend-core/src/events/events.ts | 44 +----------- packages/backend-core/src/events/index.ts | 2 +- .../events/processors/AuditLogsProcessor.ts | 71 +++++++++++++++++++ .../src/events/processors/index.ts | 13 +++- packages/server/src/startup.ts | 2 +- packages/types/src/sdk/auditLogs.ts | 3 +- packages/worker/src/index.ts | 4 +- 7 files changed, 90 insertions(+), 49 deletions(-) create mode 100644 packages/backend-core/src/events/processors/AuditLogsProcessor.ts diff --git a/packages/backend-core/src/events/events.ts b/packages/backend-core/src/events/events.ts index d3d32384aa..dc8c5482fb 100644 --- a/packages/backend-core/src/events/events.ts +++ b/packages/backend-core/src/events/events.ts @@ -1,38 +1,12 @@ -import { - AuditLogFn, - Event, - IdentityType, - AuditedEventFriendlyName, - AuditLogQueueEvent, -} from "@budibase/types" +import { Event, AuditedEventFriendlyName } from "@budibase/types" import { processors } from "./processors" import identification from "./identification" -import { getAppId } from "../context" import * as backfill from "./backfill" -import { createQueue, JobQueue } from "../queue" -import BullQueue from "bull" export function isAudited(event: Event) { return !!AuditedEventFriendlyName[event] } -let auditLogsEnabled = false -let auditLogQueue: BullQueue.Queue - -export const configure = (fn: AuditLogFn) => { - auditLogsEnabled = true - const writeAuditLogs = fn - auditLogQueue = createQueue(JobQueue.AUDIT_LOG) - return auditLogQueue.process(async job => { - await writeAuditLogs(job.data.event, job.data.properties, { - userId: job.data.opts.userId, - timestamp: job.data.opts.timestamp, - appId: job.data.opts.appId, - hostInfo: job.data.opts.hostInfo, - }) - }) -} - export const publishEvent = async ( event: Event, properties: any, @@ -45,22 +19,6 @@ export const publishEvent = async ( // no backfill - send the event and exit if (!backfilling) { await processors.processEvent(event, identity, properties, timestamp) - if (auditLogsEnabled && isAudited(event)) { - // only audit log actual events, don't include backfills - const userId = - identity.type === IdentityType.USER ? identity.id : undefined - // add to the event queue, rather than just writing immediately - await auditLogQueue.add({ - event, - properties, - opts: { - userId, - timestamp, - appId: getAppId(), - hostInfo: identity.hostInfo, - }, - }) - } return } diff --git a/packages/backend-core/src/events/index.ts b/packages/backend-core/src/events/index.ts index 15f6dde835..dd687304d8 100644 --- a/packages/backend-core/src/events/index.ts +++ b/packages/backend-core/src/events/index.ts @@ -3,7 +3,7 @@ export * as processors from "./processors" export * as analytics from "./analytics" export { default as identification } from "./identification" export * as backfillCache from "./backfill" -export { configure, isAudited } from "./events" +export { isAudited } from "./events" import { processors } from "./processors" diff --git a/packages/backend-core/src/events/processors/AuditLogsProcessor.ts b/packages/backend-core/src/events/processors/AuditLogsProcessor.ts new file mode 100644 index 0000000000..2461539ff3 --- /dev/null +++ b/packages/backend-core/src/events/processors/AuditLogsProcessor.ts @@ -0,0 +1,71 @@ +import { + Event, + Identity, + Group, + IdentityType, + AuditLogQueueEvent, + AuditLogFn, +} from "@budibase/types" +import { EventProcessor } from "./types" +import { getAppId } from "../../context" +import { isAudited } from "../events" +import BullQueue from "bull" +import { createQueue, JobQueue } from "../../queue" + +export default class AuditLogsProcessor implements EventProcessor { + static auditLogsEnabled = false + static auditLogQueue: BullQueue.Queue + + // can't use constructor as need to return promise + static init(fn: AuditLogFn) { + AuditLogsProcessor.auditLogsEnabled = true + const writeAuditLogs = fn + AuditLogsProcessor.auditLogQueue = createQueue( + JobQueue.AUDIT_LOG + ) + return AuditLogsProcessor.auditLogQueue.process(async job => { + await writeAuditLogs(job.data.event, job.data.properties, { + userId: job.data.opts.userId, + timestamp: job.data.opts.timestamp, + appId: job.data.opts.appId, + hostInfo: job.data.opts.hostInfo, + }) + }) + } + + async processEvent( + event: Event, + identity: Identity, + properties: any, + timestamp?: string + ): Promise { + if (AuditLogsProcessor.auditLogsEnabled && isAudited(event)) { + // only audit log actual events, don't include backfills + const userId = + identity.type === IdentityType.USER ? identity.id : undefined + // add to the event queue, rather than just writing immediately + await AuditLogsProcessor.auditLogQueue.add({ + event, + properties, + opts: { + userId, + timestamp, + appId: getAppId(), + hostInfo: identity.hostInfo, + }, + }) + } + } + + async identify(identity: Identity, timestamp?: string | number) { + // no-op + } + + async identifyGroup(group: Group, timestamp?: string | number) { + // no-op + } + + shutdown(): void { + AuditLogsProcessor.auditLogQueue.close() + } +} diff --git a/packages/backend-core/src/events/processors/index.ts b/packages/backend-core/src/events/processors/index.ts index 0e75f050db..6646764e47 100644 --- a/packages/backend-core/src/events/processors/index.ts +++ b/packages/backend-core/src/events/processors/index.ts @@ -1,8 +1,19 @@ import AnalyticsProcessor from "./AnalyticsProcessor" import LoggingProcessor from "./LoggingProcessor" +import AuditLogsProcessor from "./AuditLogsProcessor" import Processors from "./Processors" +import { AuditLogFn } from "@budibase/types" export const analyticsProcessor = new AnalyticsProcessor() const loggingProcessor = new LoggingProcessor() +const auditLogsProcessor = new AuditLogsProcessor() -export const processors = new Processors([analyticsProcessor, loggingProcessor]) +export function init(auditingFn: AuditLogFn) { + return AuditLogsProcessor.init(auditingFn) +} + +export const processors = new Processors([ + analyticsProcessor, + loggingProcessor, + auditLogsProcessor, +]) diff --git a/packages/server/src/startup.ts b/packages/server/src/startup.ts index 1729c7ba20..e6784c5a82 100644 --- a/packages/server/src/startup.ts +++ b/packages/server/src/startup.ts @@ -127,7 +127,7 @@ export async function startup(app?: any, server?: any) { let queuePromises = [] // configure events to use the pro audit log write // can't integrate directly into backend-core due to cyclic issues - queuePromises.push(events.configure(proSdk.auditLogs.write)) + queuePromises.push(events.processors.init(proSdk.auditLogs.write)) queuePromises.push(automations.init()) queuePromises.push(initPro()) if (app) { diff --git a/packages/types/src/sdk/auditLogs.ts b/packages/types/src/sdk/auditLogs.ts index 9a5d6e7556..0322d2e862 100644 --- a/packages/types/src/sdk/auditLogs.ts +++ b/packages/types/src/sdk/auditLogs.ts @@ -1,4 +1,5 @@ import { Event, HostInfo } from "./events" +import { AuditLogDoc } from "../documents" export type AuditWriteOpts = { appId?: string @@ -11,7 +12,7 @@ export type AuditLogFn = ( event: Event, metadata: any, opts: AuditWriteOpts -) => Promise +) => Promise export type AuditLogQueueEvent = { event: Event diff --git a/packages/worker/src/index.ts b/packages/worker/src/index.ts index 750ae0055d..ac28ead30a 100644 --- a/packages/worker/src/index.ts +++ b/packages/worker/src/index.ts @@ -14,7 +14,7 @@ import Application from "koa" import { bootstrap } from "global-agent" import * as db from "./db" import { auth, logging, events, middleware } from "@budibase/backend-core" -import { sdk } from "@budibase/pro" +import { sdk as proSdk, sdk } from "@budibase/pro" db.init() import Koa from "koa" import koaBody from "koa-body" @@ -30,7 +30,7 @@ import destroyable from "server-destroy" // configure events to use the pro audit log write // can't integrate directly into backend-core due to cyclic issues -events.configure(sdk.auditLogs.write) +events.processors.init(proSdk.auditLogs.write) if (env.ENABLE_SSO_MAINTENANCE_MODE) { console.warn(