From 4ceed6c340461a7565b1cf5f429fc135ef87a259 Mon Sep 17 00:00:00 2001 From: Michael Drury Date: Thu, 16 Dec 2021 22:43:14 +0000 Subject: [PATCH] Adding in dynamic variable processing - finding which variables need to be used and then calling them from within the query thread. --- .../server/src/api/controllers/query/index.js | 2 + packages/server/src/threads/query.js | 260 ++++++++---------- packages/server/src/threads/utils.js | 83 +++++- 3 files changed, 200 insertions(+), 145 deletions(-) diff --git a/packages/server/src/api/controllers/query/index.js b/packages/server/src/api/controllers/query/index.js index ddb49b7b3d..d0992d037a 100644 --- a/packages/server/src/api/controllers/query/index.js +++ b/packages/server/src/api/controllers/query/index.js @@ -109,6 +109,7 @@ exports.preview = async function (ctx) { try { const { rows, keys, info, extra } = await Runner.run({ + appId: ctx.appId, datasource, queryVerb, fields, @@ -136,6 +137,7 @@ async function execute(ctx, opts = { rowsOnly: false }) { // call the relevant CRUD method on the integration class try { const { rows, extra } = await Runner.run({ + appId: ctx.appId, datasource, queryVerb: query.queryVerb, fields: query.fields, diff --git a/packages/server/src/threads/query.js b/packages/server/src/threads/query.js index f0ca114e94..b86c1a49fd 100644 --- a/packages/server/src/threads/query.js +++ b/packages/server/src/threads/query.js @@ -1,152 +1,134 @@ -require("./utils").threadSetup() +const threadUtils = require("./utils") +threadUtils.threadSetup() const ScriptRunner = require("../utilities/scriptRunner") const { integrations } = require("../integrations") const { processStringSync } = require("@budibase/string-templates") +const CouchDB = require("../db") -async function getDynamicVariable() {} +class QueryRunner { + constructor(input, flags = { noRecursiveQuery: false }) { + this.appId = input.appId + this.datasource = input.datasource + this.queryVerb = input.queryVerb + this.fields = input.fields + this.parameters = input.parameters + this.transformer = input.transformer + this.noRecursiveQuery = flags.noRecursiveQuery + } -async function addDatasourceVariables(datasource, parameters) { - if (!datasource || !datasource.config) { + async execute() { + let { datasource, fields, queryVerb, transformer } = this + // pre-query, make sure datasource variables are added to parameters + const parameters = await this.addDatasourceVariables() + const query = threadUtils.enrichQueryFields(fields, parameters) + const Integration = integrations[datasource.source] + if (!Integration) { + throw "Integration type does not exist." + } + const integration = new Integration(datasource.config) + + let output = threadUtils.formatResponse(await integration[queryVerb](query)) + let rows = output, + info = undefined, + extra = undefined + if (threadUtils.hasExtraData(output)) { + rows = output.data + info = output.info + extra = output.extra + } + + // transform as required + if (transformer) { + const runner = new ScriptRunner(transformer, { data: rows }) + rows = runner.execute() + } + + // needs to an array for next step + if (!Array.isArray(rows)) { + rows = [rows] + } + + // map into JSON if just raw primitive here + if (rows.find(row => typeof row !== "object")) { + rows = rows.map(value => ({ value })) + } + + // get all the potential fields in the schema + let keys = rows.flatMap(Object.keys) + + if (integration.end) { + integration.end() + } + + return { rows, keys, info, extra } + } + + async runAnotherQuery(queryId, parameters) { + const db = new CouchDB(this.appId) + const query = await db.get(queryId) + const datasource = await db.get(query.datasourceId) + return new QueryRunner( + { + appId: this.appId, + datasource, + queryVerb: query.queryVerb, + fields: query.fields, + parameters, + transformer: query.transformer, + }, + { noRecursiveQuery: true } + ).execute() + } + + async getDynamicVariable(variable) { + let { parameters } = this + const queryId = variable.queryId, + name = variable.name + let value = await threadUtils.checkCacheForDynamicVariable(queryId, name) + if (!value) { + value = await this.runAnotherQuery(queryId, parameters) + await threadUtils.storeDynamicVariable(queryId, name, value) + } + return value + } + + async addDatasourceVariables() { + let { datasource, parameters, fields } = this + if (!datasource || !datasource.config) { + return parameters + } + const staticVars = datasource.config.staticVariables || {} + const dynamicVars = datasource.config.dynamicVariables || [] + for (let [key, value] of Object.entries(staticVars)) { + if (!parameters[key]) { + parameters[key] = value + } + } + if (!this.noRecursiveQuery) { + // need to see if this uses any variables + const stringFields = JSON.stringify(fields) + const foundVars = dynamicVars.filter(variable => { + // look for {{ variable }} but allow spaces between handlebars + const regex = new RegExp(`{{[ ]*${variable.name}[ ]*}}`) + return regex.test(stringFields) + }) + const dynamics = foundVars.map(dynVar => this.getDynamicVariable(dynVar)) + const responses = await Promise.all(dynamics) + for (let i = 0; i < foundVars.length; i++) { + const variable = foundVars[i] + parameters[variable.name] = processStringSync(variable.value, { + data: responses[i].rows, + info: responses[i].extra, + }) + } + } return parameters } - const staticVars = datasource.config.staticVariables || {} - const dynamicVars = datasource.config.dynamicVariables || [] - for (let [key, value] of Object.entries(staticVars)) { - if (!parameters[key]) { - parameters[key] = value - } - } - const dynamics = dynamicVars.map(dynVar => - getDynamicVariable(datasource, dynVar, parameters) - ) - for (let i = 0; i < dynamicVars.length; i++) { - const variable = dynamicVars[i] - parameters[variable.name] = dynamics[i] - } - return parameters -} - -function enrichQueryFields(fields, parameters = {}) { - const enrichedQuery = {} - - // enrich the fields with dynamic parameters - for (let key of Object.keys(fields)) { - if (fields[key] == null) { - continue - } - if (typeof fields[key] === "object") { - // enrich nested fields object - enrichedQuery[key] = enrichQueryFields(fields[key], parameters) - } else if (typeof fields[key] === "string") { - // enrich string value as normal - enrichedQuery[key] = processStringSync(fields[key], parameters, { - noHelpers: true, - }) - } else { - enrichedQuery[key] = fields[key] - } - } - - if ( - enrichedQuery.json || - enrichedQuery.customData || - enrichedQuery.requestBody - ) { - try { - enrichedQuery.json = JSON.parse( - enrichedQuery.json || - enrichedQuery.customData || - enrichedQuery.requestBody - ) - } catch (err) { - // no json found, ignore - } - delete enrichedQuery.customData - } - - return enrichedQuery -} - -function formatResponse(resp) { - if (typeof resp === "string") { - try { - resp = JSON.parse(resp) - } catch (err) { - resp = { response: resp } - } - } - return resp -} - -function hasExtraData(response) { - return ( - typeof response === "object" && - !Array.isArray(response) && - response.data != null && - response.info != null - ) -} - -async function runAndTransform( - datasource, - queryVerb, - fields, - parameters, - transformer -) { - // pre-query, make sure datasource variables are added to parameters - parameters = await addDatasourceVariables(datasource, parameters) - const query = enrichQueryFields(fields, parameters) - const Integration = integrations[datasource.source] - if (!Integration) { - throw "Integration type does not exist." - } - const integration = new Integration(datasource.config) - - let output = formatResponse(await integration[queryVerb](query)) - let rows = output, - info = undefined, - extra = undefined - if (hasExtraData(output)) { - rows = output.data - info = output.info - extra = output.extra - } - - // transform as required - if (transformer) { - const runner = new ScriptRunner(transformer, { data: rows }) - rows = runner.execute() - } - - // needs to an array for next step - if (!Array.isArray(rows)) { - rows = [rows] - } - - // map into JSON if just raw primitive here - if (rows.find(row => typeof row !== "object")) { - rows = rows.map(value => ({ value })) - } - - // get all the potential fields in the schema - let keys = rows.flatMap(Object.keys) - - if (integration.end) { - integration.end() - } - - return { rows, keys, info, extra } } module.exports = (input, callback) => { - runAndTransform( - input.datasource, - input.queryVerb, - input.fields, - input.parameters, - input.transformer - ) + const Runner = new QueryRunner(input) + Runner.execute() .then(response => { callback(null, response) }) diff --git a/packages/server/src/threads/utils.js b/packages/server/src/threads/utils.js index 824d7e3f16..34ae4f0477 100644 --- a/packages/server/src/threads/utils.js +++ b/packages/server/src/threads/utils.js @@ -3,20 +3,23 @@ const CouchDB = require("../db") const { init } = require("@budibase/auth") const redis = require("@budibase/auth/redis") const { SEPARATOR } = require("@budibase/auth/db") +const { processStringSync } = require("@budibase/string-templates") +const VARIABLE_TTL_SECONDS = 3600 let client -async function startup() { +async function getClient() { if (!client) { client = await new redis.Client(redis.utils.Databases.QUERY_VARS).init() } + return client } process.on("exit", async () => { if (client) await client.finish() }) -exports.threadSetup = async () => { +exports.threadSetup = () => { // don't run this if not threading if (env.isTest() || env.DISABLE_THREADING) { return @@ -24,9 +27,6 @@ exports.threadSetup = async () => { // when thread starts, make sure it is recorded env.setInThread() init(CouchDB) - startup().catch(err => { - console.error("Redis connection failed for thread - " + err) - }) } function makeVariableKey(queryId, variable) { @@ -34,5 +34,76 @@ function makeVariableKey(queryId, variable) { } exports.checkCacheForDynamicVariable = async (queryId, variable) => { - await client.get(makeVariableKey(queryId, variable)) + const cache = await getClient() + return cache.get(makeVariableKey(queryId, variable)) +} + +exports.storeDynamicVariable = async (queryId, variable, value) => { + const cache = await getClient() + await cache.store( + makeVariableKey(queryId, variable), + value, + VARIABLE_TTL_SECONDS + ) +} + +exports.formatResponse = resp => { + if (typeof resp === "string") { + try { + resp = JSON.parse(resp) + } catch (err) { + resp = { response: resp } + } + } + return resp +} + +exports.hasExtraData = response => { + return ( + typeof response === "object" && + !Array.isArray(response) && + response.data != null && + response.info != null + ) +} + +exports.enrichQueryFields = (fields, parameters = {}) => { + const enrichedQuery = {} + + // enrich the fields with dynamic parameters + for (let key of Object.keys(fields)) { + if (fields[key] == null) { + continue + } + if (typeof fields[key] === "object") { + // enrich nested fields object + enrichedQuery[key] = this.enrichQueryFields(fields[key], parameters) + } else if (typeof fields[key] === "string") { + // enrich string value as normal + enrichedQuery[key] = processStringSync(fields[key], parameters, { + noHelpers: true, + }) + } else { + enrichedQuery[key] = fields[key] + } + } + + if ( + enrichedQuery.json || + enrichedQuery.customData || + enrichedQuery.requestBody + ) { + try { + enrichedQuery.json = JSON.parse( + enrichedQuery.json || + enrichedQuery.customData || + enrichedQuery.requestBody + ) + } catch (err) { + // no json found, ignore + } + delete enrichedQuery.customData + } + + return enrichedQuery }