1
0
Fork 0
mirror of synced 2024-06-26 18:10:51 +12:00
budibase/packages/backend-core/src/redis/index.ts

279 lines
7 KiB
TypeScript

import RedisWrapper from "../redis"
const env = require("../environment")
// ioredis mock is all in memory
const Redis = env.isTest() ? require("ioredis-mock") : require("ioredis")
const {
addDbPrefix,
removeDbPrefix,
getRedisOptions,
SEPARATOR,
SelectableDatabases,
} = require("./utils")
const RETRY_PERIOD_MS = 2000
const STARTUP_TIMEOUT_MS = 5000
const CLUSTERED = false
const DEFAULT_SELECT_DB = SelectableDatabases.DEFAULT
// for testing just generate the client once
let CLOSED = false
let CLIENTS: { [key: number]: any } = {}
// if in test always connected
let CONNECTED = env.isTest()
function pickClient(selectDb: number): any {
return CLIENTS[selectDb]
}
function connectionError(
selectDb: number,
timeout: NodeJS.Timeout,
err: Error | string
) {
// manually shut down, ignore errors
if (CLOSED) {
return
}
pickClient(selectDb).disconnect()
CLOSED = true
// always clear this on error
clearTimeout(timeout)
CONNECTED = false
console.error("Redis connection failed - " + err)
setTimeout(() => {
init()
}, RETRY_PERIOD_MS)
}
/**
* Inits the system, will error if unable to connect to redis cluster (may take up to 10 seconds) otherwise
* will return the ioredis client which will be ready to use.
*/
function init(selectDb = DEFAULT_SELECT_DB) {
let timeout: NodeJS.Timeout
CLOSED = false
let client = pickClient(selectDb)
// already connected, ignore
if (client && CONNECTED) {
return
}
// testing uses a single in memory client
if (env.isTest()) {
CLIENTS[selectDb] = new Redis(getRedisOptions())
}
// start the timer - only allowed 5 seconds to connect
timeout = setTimeout(() => {
if (!CONNECTED) {
connectionError(
selectDb,
timeout,
"Did not successfully connect in timeout"
)
}
}, STARTUP_TIMEOUT_MS)
// disconnect any lingering client
if (client) {
client.disconnect()
}
const { redisProtocolUrl, opts, host, port } = getRedisOptions(CLUSTERED)
if (CLUSTERED) {
client = new Redis.Cluster([{ host, port }], opts)
} else if (redisProtocolUrl) {
client = new Redis(redisProtocolUrl)
} else {
client = new Redis(opts)
}
// attach handlers
client.on("end", (err: Error) => {
connectionError(selectDb, timeout, err)
})
client.on("error", (err: Error) => {
connectionError(selectDb, timeout, err)
})
client.on("connect", () => {
clearTimeout(timeout)
CONNECTED = true
})
CLIENTS[selectDb] = client
}
function waitForConnection(selectDb: number = DEFAULT_SELECT_DB) {
return new Promise(resolve => {
if (pickClient(selectDb) == null) {
init()
} else if (CONNECTED) {
resolve("")
return
}
// check if the connection is ready
const interval = setInterval(() => {
if (CONNECTED) {
clearInterval(interval)
resolve("")
}
}, 500)
})
}
/**
* Utility function, takes a redis stream and converts it to a promisified response -
* this can only be done with redis streams because they will have an end.
* @param stream A redis stream, specifically as this type of stream will have an end.
* @param client The client to use for further lookups.
* @return {Promise<object>} The final output of the stream
*/
function promisifyStream(stream: any, client: RedisWrapper) {
return new Promise((resolve, reject) => {
const outputKeys = new Set()
stream.on("data", (keys: string[]) => {
keys.forEach(key => {
outputKeys.add(key)
})
})
stream.on("error", (err: Error) => {
reject(err)
})
stream.on("end", async () => {
const keysArray: string[] = Array.from(outputKeys) as string[]
try {
let getPromises = []
for (let key of keysArray) {
getPromises.push(client.get(key))
}
const jsonArray = await Promise.all(getPromises)
resolve(
keysArray.map(key => ({
key: removeDbPrefix(key),
value: JSON.parse(jsonArray.shift()),
}))
)
} catch (err) {
reject(err)
}
})
})
}
export = class RedisWrapper {
_db: string
_select: number
constructor(db: string, selectDb: number | null = null) {
this._db = db
this._select = selectDb || DEFAULT_SELECT_DB
}
getClient() {
return pickClient(this._select)
}
async init() {
CLOSED = false
init(this._select)
await waitForConnection(this._select)
return this
}
async finish() {
CLOSED = true
this.getClient().disconnect()
}
async scan(key = ""): Promise<any> {
const db = this._db
key = `${db}${SEPARATOR}${key}`
let stream
if (CLUSTERED) {
let node = this.getClient().nodes("master")
stream = node[0].scanStream({ match: key + "*", count: 100 })
} else {
stream = this.getClient().scanStream({ match: key + "*", count: 100 })
}
return promisifyStream(stream, this.getClient())
}
async keys(pattern: string) {
const db = this._db
return this.getClient().keys(addDbPrefix(db, pattern))
}
async get(key: string) {
const db = this._db
let response = await this.getClient().get(addDbPrefix(db, key))
// overwrite the prefixed key
if (response != null && response.key) {
response.key = key
}
// if its not an object just return the response
try {
return JSON.parse(response)
} catch (err) {
return response
}
}
async bulkGet(keys: string[]) {
const db = this._db
if (keys.length === 0) {
return {}
}
const prefixedKeys = keys.map(key => addDbPrefix(db, key))
let response = await this.getClient().mget(prefixedKeys)
if (Array.isArray(response)) {
let final: any = {}
let count = 0
for (let result of response) {
if (result) {
let parsed
try {
parsed = JSON.parse(result)
} catch (err) {
parsed = result
}
final[keys[count]] = parsed
}
count++
}
return final
} else {
throw new Error(`Invalid response: ${response}`)
}
}
async store(key: string, value: any, expirySeconds: number | null = null) {
const db = this._db
if (typeof value === "object") {
value = JSON.stringify(value)
}
const prefixedKey = addDbPrefix(db, key)
await this.getClient().set(prefixedKey, value)
if (expirySeconds) {
await this.getClient().expire(prefixedKey, expirySeconds)
}
}
async getTTL(key: string) {
const db = this._db
const prefixedKey = addDbPrefix(db, key)
return this.getClient().ttl(prefixedKey)
}
async setExpiry(key: string, expirySeconds: number | null) {
const db = this._db
const prefixedKey = addDbPrefix(db, key)
await this.getClient().expire(prefixedKey, expirySeconds)
}
async delete(key: string) {
const db = this._db
await this.getClient().del(addDbPrefix(db, key))
}
async clear() {
let items = await this.scan()
await Promise.all(items.map((obj: any) => this.delete(obj.key)))
}
}