diff --git a/packages/backend-core/logging.js b/packages/backend-core/logging.js new file mode 100644 index 0000000000..da40fe3100 --- /dev/null +++ b/packages/backend-core/logging.js @@ -0,0 +1 @@ +module.exports = require("./src/logging") diff --git a/packages/backend-core/src/logging.js b/packages/backend-core/src/logging.js new file mode 100644 index 0000000000..425d7f8133 --- /dev/null +++ b/packages/backend-core/src/logging.js @@ -0,0 +1,16 @@ +const NonErrors = ["AccountError"] + +function isSuppressed(e) { + return e && e["suppressAlert"] +} + +module.exports.logAlert = (message, e = null) => { + if (e && NonErrors.includes(e.name) && isSuppressed(e)) { + return + } + let errorJson = "" + if (e) { + errorJson = ": " + JSON.stringify(e, Object.getOwnPropertyNames(e)) + } + console.error(`bb-alert: ${message} ${errorJson}`) +} diff --git a/packages/backend-core/src/redis/index.js b/packages/backend-core/src/redis/index.js index 0ee17265ce..158b5e3841 100644 --- a/packages/backend-core/src/redis/index.js +++ b/packages/backend-core/src/redis/index.js @@ -23,7 +23,7 @@ function connectionError(timeout, err) { if (CLOSED) { return } - CLIENT.end() + CLIENT.disconnect() CLOSED = true // always clear this on error clearTimeout(timeout) diff --git a/packages/server/src/api/index.js b/packages/server/src/api/index.js index 4bf9d9e14d..1aae78cb30 100644 --- a/packages/server/src/api/index.js +++ b/packages/server/src/api/index.js @@ -12,6 +12,7 @@ const { mainRoutes, staticRoutes, publicRoutes } = require("./routes") const pkg = require("../../package.json") const env = require("../environment") const { middleware: pro } = require("@budibase/pro") +const { shutdown } = require("./routes/public") const router = new Router() @@ -90,4 +91,5 @@ router.use(publicRoutes.allowedMethods()) router.use(staticRoutes.routes()) router.use(staticRoutes.allowedMethods()) -module.exports = router +module.exports.router = router +module.exports.shutdown = shutdown diff --git a/packages/server/src/api/routes/public/index.ts b/packages/server/src/api/routes/public/index.ts index 6f1c69560e..ca49a1a7d6 100644 --- a/packages/server/src/api/routes/public/index.ts +++ b/packages/server/src/api/routes/public/index.ts @@ -29,6 +29,7 @@ function getApiLimitPerSecond(): number { return parseInt(env.API_REQ_LIMIT_PER_SEC) } +let rateLimitStore: any = null if (!env.isTest()) { const REDIS_OPTS = getRedisOptions() let options @@ -47,8 +48,9 @@ if (!env.isTest()) { database: 1, } } + rateLimitStore = new Stores.Redis(options) RateLimit.defaultOptions({ - store: new Stores.Redis(options), + store: rateLimitStore, }) } // rate limiting, allows for 2 requests per second @@ -128,3 +130,10 @@ applyRoutes(queryEndpoints, PermissionTypes.QUERY, "queryId") applyRoutes(rowEndpoints, PermissionTypes.TABLE, "tableId", "rowId") export default publicRouter + +export const shutdown = () => { + if (rateLimitStore) { + rateLimitStore.client.disconnect() + rateLimitStore = null + } +} diff --git a/packages/server/src/app.ts b/packages/server/src/app.ts index 8efc383194..f73c90c895 100644 --- a/packages/server/src/app.ts +++ b/packages/server/src/app.ts @@ -14,6 +14,8 @@ const automations = require("./automations/index") const Sentry = require("@sentry/node") const fileSystem = require("./utilities/fileSystem") const bullboard = require("./automations/bullboard") +const { logAlert } = require("@budibase/backend-core/logging") +const { Thread } = require("./threads") import redis from "./utilities/redis" import * as migrations from "./migrations" @@ -49,7 +51,7 @@ app.context.eventEmitter = eventEmitter app.context.auth = {} // api routes -app.use(api.routes()) +app.use(api.router.routes()) if (env.isProd()) { env._set("NODE_ENV", "production") @@ -68,11 +70,24 @@ if (env.isProd()) { const server = http.createServer(app.callback()) destroyable(server) +let shuttingDown = false, + errCode = 0 server.on("close", async () => { - if (env.NODE_ENV !== "jest") { + // already in process + if (shuttingDown) { + return + } + shuttingDown = true + if (!env.isTest()) { console.log("Server Closed") } + await automations.shutdown() await redis.shutdown() + await Thread.shutdown() + api.shutdown() + if (!env.isTest()) { + process.exit(errCode) + } }) module.exports = server.listen(env.PORT || 0, async () => { @@ -90,7 +105,13 @@ const shutdown = () => { } process.on("uncaughtException", err => { - console.error(err) + // @ts-ignore + // don't worry about this error, comes from zlib isn't important + if (err && err["code"] === "ERR_INVALID_CHAR") { + return + } + errCode = -1 + logAlert("Uncaught exception.", err) shutdown() }) @@ -102,7 +123,7 @@ process.on("SIGTERM", () => { // not recommended in a clustered environment if (!env.HTTP_MIGRATIONS) { migrations.migrate().catch(err => { - console.error("Error performing migrations. Exiting.\n", err) + logAlert("Error performing migrations. Exiting.", err) shutdown() }) } diff --git a/packages/server/src/automations/bullboard.js b/packages/server/src/automations/bullboard.js index 32336c4714..cba6594ae7 100644 --- a/packages/server/src/automations/bullboard.js +++ b/packages/server/src/automations/bullboard.js @@ -45,4 +45,12 @@ exports.init = () => { return serverAdapter.registerPlugin() } +exports.shutdown = async () => { + if (automationQueue) { + clearInterval(cleanupInternal) + await automationQueue.close() + automationQueue = null + } +} + exports.queue = automationQueue diff --git a/packages/server/src/automations/index.js b/packages/server/src/automations/index.js index 87f35ce763..e543365183 100644 --- a/packages/server/src/automations/index.js +++ b/packages/server/src/automations/index.js @@ -1,5 +1,5 @@ const { processEvent } = require("./utils") -const { queue } = require("./bullboard") +const { queue, shutdown } = require("./bullboard") /** * This module is built purely to kick off the worker farm and manage the inputs/outputs @@ -14,4 +14,9 @@ exports.init = function () { exports.getQueues = () => { return [queue] } + +exports.shutdown = () => { + return shutdown() +} + exports.queue = queue diff --git a/packages/server/src/threads/index.ts b/packages/server/src/threads/index.ts index c19453cf44..8516b62596 100644 --- a/packages/server/src/threads/index.ts +++ b/packages/server/src/threads/index.ts @@ -28,6 +28,8 @@ export class Thread { workers: any timeoutMs: any + static workerRefs: any[] = [] + constructor(type: any, opts: any = { timeoutMs: null, count: 1 }) { this.type = type this.count = opts.count ? opts.count : 1 @@ -46,6 +48,7 @@ export class Thread { workerOpts.maxCallTime = opts.timeoutMs } this.workers = workerFarm(workerOpts, typeToFile(type)) + Thread.workerRefs.push(this.workers) } } @@ -73,4 +76,23 @@ export class Thread { }) }) } + + static shutdown() { + return new Promise(resolve => { + if (Thread.workerRefs.length === 0) { + resolve() + } + let count = 0 + function complete() { + count++ + if (count >= Thread.workerRefs.length) { + resolve() + } + } + for (let worker of Thread.workerRefs) { + workerFarm.end(worker, complete) + } + Thread.workerRefs = [] + }) + } } diff --git a/packages/server/src/utilities/queue/inMemoryQueue.js b/packages/server/src/utilities/queue/inMemoryQueue.js index aebc0ba919..375092609e 100644 --- a/packages/server/src/utilities/queue/inMemoryQueue.js +++ b/packages/server/src/utilities/queue/inMemoryQueue.js @@ -75,6 +75,13 @@ class InMemoryQueue { this._emitter.emit("message") } + /** + * replicating the close function from bull, which waits for jobs to finish. + */ + async close() { + return [] + } + /** * This removes a cron which has been implemented, this is part of Bull API. * @param {string} cronJobId The cron which is to be removed. diff --git a/packages/worker/src/index.ts b/packages/worker/src/index.ts index 1cec2868c6..fb395a7e6a 100644 --- a/packages/worker/src/index.ts +++ b/packages/worker/src/index.ts @@ -12,6 +12,7 @@ const destroyable = require("server-destroy") const koaBody = require("koa-body") const koaSession = require("koa-session") const { passport } = require("@budibase/backend-core/auth") +const { logAlert } = require("@budibase/backend-core/logging") const logger = require("koa-pino-logger") const http = require("http") const api = require("./api") @@ -28,7 +29,6 @@ app.keys = ["secret", "key"] // set up top level koa middleware app.use(koaBody({ multipart: true })) app.use(koaSession(app)) - app.use( logger({ prettyPrint: { @@ -62,25 +62,38 @@ if (env.isProd()) { const server = http.createServer(app.callback()) destroyable(server) +let shuttingDown = false, + errCode = 0 server.on("close", async () => { - if (env.isProd()) { + if (shuttingDown) { + return + } + shuttingDown = true + if (!env.isTest()) { console.log("Server Closed") } await redis.shutdown() + if (!env.isTest()) { + process.exit(errCode) + } }) +const shutdown = () => { + server.close() + server.destroy() +} + module.exports = server.listen(parseInt(env.PORT || 4002), async () => { console.log(`Worker running on ${JSON.stringify(server.address())}`) await redis.init() }) process.on("uncaughtException", err => { - console.error(err) - server.close() - server.destroy() + errCode = -1 + logAlert("Uncaught exception.", err) + shutdown() }) process.on("SIGTERM", () => { - server.close() - server.destroy() + shutdown() })