1
0
Fork 0
mirror of synced 2024-06-17 18:04:42 +12:00

Add locking framework

This commit is contained in:
Rory Powell 2022-10-10 08:21:17 +01:00
parent ed948e6ae7
commit e92a31bd45
9 changed files with 149 additions and 80 deletions

View file

@ -37,6 +37,7 @@ const core = {
db,
...dbConstants,
redis,
locks: redis.redlock,
objectStore,
utils,
users,

View file

@ -3,9 +3,11 @@
import Client from "../redis"
import utils from "../redis/utils"
import clients from "../redis/init"
import * as redlock from "../redis/redlock"
export = {
Client,
utils,
clients,
redlock,
}

View file

@ -1,27 +1,23 @@
const Client = require("./index")
const utils = require("./utils")
const { getRedlock } = require("./redlock")
let userClient, sessionClient, appClient, cacheClient, writethroughClient
let migrationsRedlock
// turn retry off so that only one instance can ever hold the lock
const migrationsRedlockConfig = { retryCount: 0 }
let userClient,
sessionClient,
appClient,
cacheClient,
writethroughClient,
lockClient
async function init() {
userClient = await new Client(utils.Databases.USER_CACHE).init()
sessionClient = await new Client(utils.Databases.SESSIONS).init()
appClient = await new Client(utils.Databases.APP_METADATA).init()
cacheClient = await new Client(utils.Databases.GENERIC_CACHE).init()
lockClient = await new Client(utils.Databases.LOCKS).init()
writethroughClient = await new Client(
utils.Databases.WRITE_THROUGH,
utils.SelectableDatabases.WRITE_THROUGH
).init()
// pass the underlying ioredis client to redlock
migrationsRedlock = getRedlock(
cacheClient.getClient(),
migrationsRedlockConfig
)
}
process.on("exit", async () => {
@ -30,6 +26,7 @@ process.on("exit", async () => {
if (appClient) await appClient.finish()
if (cacheClient) await cacheClient.finish()
if (writethroughClient) await writethroughClient.finish()
if (lockClient) await lockClient.finish()
})
module.exports = {
@ -63,10 +60,10 @@ module.exports = {
}
return writethroughClient
},
getMigrationsRedlock: async () => {
if (!migrationsRedlock) {
getLockClient: async () => {
if (!lockClient) {
await init()
}
return migrationsRedlock
return lockClient
},
}

View file

@ -1,14 +1,37 @@
import Redlock from "redlock"
import Redlock, { Options } from "redlock"
import { getLockClient } from "./init"
import { LockOptions, LockType } from "@budibase/types"
import * as tenancy from "../tenancy"
export const getRedlock = (redisClient: any, opts = { retryCount: 10 }) => {
return new Redlock([redisClient], {
let noRetryRedlock: Redlock | undefined
const getClient = async (type: LockType): Promise<Redlock> => {
switch (type) {
case LockType.TRY_ONCE: {
if (!noRetryRedlock) {
noRetryRedlock = await newRedlock(OPTIONS.TRY_ONCE)
}
return noRetryRedlock
}
default: {
throw new Error(`Could not get redlock client: ${type}`)
}
}
}
export const OPTIONS = {
TRY_ONCE: {
// immediately throws an error if the lock is already held
retryCount: 0,
},
DEFAULT: {
// the expected clock drift; for more details
// see http://redis.io/topics/distlock
driftFactor: 0.01, // multiplied by lock ttl to determine drift time
// the max number of times Redlock will attempt
// to lock a resource before erroring
retryCount: opts.retryCount,
retryCount: 10,
// the time in ms between attempts
retryDelay: 200, // time in ms
@ -16,6 +39,45 @@ export const getRedlock = (redisClient: any, opts = { retryCount: 10 }) => {
// the max time in ms randomly added to retries
// to improve performance under high contention
// see https://www.awsarchitectureblog.com/2015/03/backoff.html
retryJitter: 200, // time in ms
})
retryJitter: 100, // time in ms
},
}
export const newRedlock = async (opts: Options = {}) => {
let options = { ...OPTIONS.DEFAULT, ...opts }
const redisWrapper = await getLockClient()
const client = redisWrapper.getClient()
return new Redlock([client], options)
}
export const doWithLock = async (opts: LockOptions, task: any) => {
const redlock = await getClient(opts.type)
let lock
try {
// aquire lock
let name: string = `${tenancy.getTenantId()}_${opts.name}`
if (opts.nameSuffix) {
name = name + `_${opts.nameSuffix}`
}
lock = await redlock.lock(name, opts.ttl)
// perform locked task
return task()
} catch (e: any) {
// lock limit exceeded
if (e.name === "LockError") {
if (opts.type === LockType.TRY_ONCE) {
// don't throw for try-once locks, they will always error
// due to retry count (0) exceeded
return
} else {
throw e
}
} else {
throw e
}
} finally {
if (lock) {
await lock.unlock()
}
}
}

View file

@ -28,6 +28,7 @@ exports.Databases = {
LICENSES: "license",
GENERIC_CACHE: "data_cache",
WRITE_THROUGH: "writeThrough",
LOCKS: "locks",
}
/**

View file

@ -1,5 +1,11 @@
import { migrations, redis } from "@budibase/backend-core"
import { Migration, MigrationOptions, MigrationName } from "@budibase/types"
import { locks, migrations } from "@budibase/backend-core"
import {
Migration,
MigrationOptions,
MigrationName,
LockType,
LockName,
} from "@budibase/types"
import env from "../environment"
// migration functions
@ -86,33 +92,14 @@ export const migrate = async (options?: MigrationOptions) => {
}
const migrateWithLock = async (options?: MigrationOptions) => {
// get a new lock client
const redlock = await redis.clients.getMigrationsRedlock()
// lock for 15 minutes
const ttl = 1000 * 60 * 15
let migrationLock
// acquire lock
try {
migrationLock = await redlock.lock("migrations", ttl)
} catch (e: any) {
if (e.name === "LockError") {
return
} else {
throw e
await locks.doWithLock(
{
type: LockType.TRY_ONCE,
name: LockName.MIGRATIONS,
ttl: 1000 * 60 * 15, // auto expire the migration lock after 15 minutes
},
async () => {
await migrations.runMigrations(MIGRATIONS, options)
}
}
// run migrations
try {
await migrations.runMigrations(MIGRATIONS, options)
} finally {
// release lock
try {
await migrationLock.unlock()
} catch (e) {
console.error("unable to release migration lock")
}
}
)
}

View file

@ -7,3 +7,4 @@ export * from "./datasources"
export * from "./search"
export * from "./koa"
export * from "./auth"
export * from "./locks"

View file

@ -0,0 +1,31 @@
export enum LockType {
/**
* If this lock is already held the attempted operation will not be performed.
* No retries will take place and no error will be thrown.
*/
TRY_ONCE = "try_once",
}
export enum LockName {
MIGRATIONS = "migrations",
TRIGGER_QUOTA = "trigger_quota",
}
export interface LockOptions {
/**
* The lock type determines which client to use
*/
type: LockType
/**
* The name for the lock
*/
name: LockName
/**
* The ttl to auto-expire the lock if not unlocked manually
*/
ttl: number
/**
* The suffix to add to the lock name for additional uniqueness
*/
nameSuffix?: string
}

View file

@ -1,5 +1,11 @@
import { migrations, redis } from "@budibase/backend-core"
import { Migration, MigrationOptions, MigrationName } from "@budibase/types"
import { migrations, locks } from "@budibase/backend-core"
import {
Migration,
MigrationOptions,
MigrationName,
LockType,
LockName,
} from "@budibase/types"
import env from "../environment"
// migration functions
@ -42,33 +48,14 @@ export const migrate = async (options?: MigrationOptions) => {
}
const migrateWithLock = async (options?: MigrationOptions) => {
// get a new lock client
const redlock = await redis.clients.getMigrationsRedlock()
// lock for 15 minutes
const ttl = 1000 * 60 * 15
let migrationLock
// acquire lock
try {
migrationLock = await redlock.lock("migrations", ttl)
} catch (e: any) {
if (e.name === "LockError") {
return
} else {
throw e
await locks.doWithLock(
{
type: LockType.TRY_ONCE,
name: LockName.MIGRATIONS,
ttl: 1000 * 60 * 15, // auto expire the migration lock after 15 minutes
},
async () => {
await migrations.runMigrations(MIGRATIONS, options)
}
}
// run migrations
try {
await migrations.runMigrations(MIGRATIONS, options)
} finally {
// release lock
try {
await migrationLock.unlock()
} catch (e) {
console.error("unable to release migration lock")
}
}
)
}