1
0
Fork 0
mirror of synced 2024-06-14 08:24:48 +12:00

Adding worker-farm back to have a mechanism to run queries within which we can timeout.

This commit is contained in:
mike12345567 2021-11-10 19:35:09 +00:00
parent 51b6687262
commit 2e61209291
17 changed files with 159 additions and 99 deletions

View file

@ -120,6 +120,7 @@
"uuid": "3.3.2",
"validate.js": "0.13.1",
"vm2": "^3.9.3",
"worker-farm": "^1.7.0",
"yargs": "13.2.4",
"zlib": "1.0.5"
},

View file

@ -1,10 +1,9 @@
const { processString } = require("@budibase/string-templates")
const CouchDB = require("../../db")
const { generateQueryID, getQueryParams } = require("../../db/utils")
const { integrations } = require("../../integrations")
const { BaseQueryVerbs } = require("../../constants")
const env = require("../../environment")
const ScriptRunner = require("../../utilities/scriptRunner")
const queryRunner = require("../../utilities/queryRunner")
// simple function to append "readable" to all read queries
function enrichQueries(input) {
@ -18,47 +17,6 @@ function enrichQueries(input) {
return wasArray ? queries : queries[0]
}
function formatResponse(resp) {
if (typeof resp === "string") {
try {
resp = JSON.parse(resp)
} catch (err) {
resp = { response: resp }
}
}
return resp
}
async function runAndTransform(
integration,
queryVerb,
enrichedQuery,
transformer
) {
let rows = formatResponse(await integration[queryVerb](enrichedQuery))
// transform as required
if (transformer) {
const runner = new ScriptRunner(transformer, { data: rows })
rows = runner.execute()
}
// needs to an array for next step
if (!Array.isArray(rows)) {
rows = [rows]
}
// map into JSON if just raw primitive here
if (rows.find(row => typeof row !== "object")) {
rows = rows.map(value => ({ value }))
}
// get all the potential fields in the schema
let keys = rows.flatMap(Object.keys)
return { rows, keys }
}
exports.fetch = async function (ctx) {
const db = new CouchDB(ctx.appId)
@ -143,18 +101,11 @@ exports.preview = async function (ctx) {
const datasource = await db.get(ctx.request.body.datasourceId)
const Integration = integrations[datasource.source]
if (!Integration) {
ctx.throw(400, "Integration type does not exist.")
}
const { fields, parameters, queryVerb, transformer } = ctx.request.body
const enrichedQuery = await enrichQueryFields(fields, parameters)
const integration = new Integration(datasource.config)
const { rows, keys } = await runAndTransform(
integration,
const { rows, keys } = await queryRunner(
datasource,
queryVerb,
enrichedQuery,
transformer
@ -164,10 +115,6 @@ exports.preview = async function (ctx) {
rows,
schemaFields: [...new Set(keys)],
}
// cleanup
if (integration.end) {
integration.end()
}
}
exports.execute = async function (ctx) {
@ -176,30 +123,19 @@ exports.execute = async function (ctx) {
const query = await db.get(ctx.params.queryId)
const datasource = await db.get(query.datasourceId)
const Integration = integrations[datasource.source]
if (!Integration) {
ctx.throw(400, "Integration type does not exist.")
}
const enrichedQuery = await enrichQueryFields(
query.fields,
ctx.request.body.parameters
)
const integration = new Integration(datasource.config)
// call the relevant CRUD method on the integration class
const { rows } = await runAndTransform(
integration,
const { rows } = await queryRunner(
datasource,
query.queryVerb,
enrichedQuery,
query.transformer
)
ctx.body = rows
// cleanup
if (integration.end) {
integration.end()
}
}
exports.destroy = async function (ctx) {

View file

@ -3,6 +3,7 @@ import {
DatasourceFieldTypes,
QueryTypes,
} from "../definitions/datasource"
import { IntegrationBase } from "./base/IntegrationBase"
module AirtableModule {
const Airtable = require("airtable")
@ -73,7 +74,7 @@ module AirtableModule {
},
}
class AirtableIntegration {
class AirtableIntegration implements IntegrationBase {
private config: AirtableConfig
private client: any

View file

@ -3,6 +3,7 @@ import {
DatasourceFieldTypes,
QueryTypes,
} from "../definitions/datasource"
import { IntegrationBase } from "./base/IntegrationBase"
module ArangoModule {
const { Database, aql } = require("arangojs")
@ -55,7 +56,7 @@ module ArangoModule {
},
}
class ArangoDBIntegration {
class ArangoDBIntegration implements IntegrationBase {
private config: ArangodbConfig
private client: any

View file

@ -0,0 +1,6 @@
export interface IntegrationBase {
create?(query: any): Promise<[any]>
read?(query: any): Promise<[any]>
update?(query: any): Promise<[any]>
delete?(query: any): Promise<[any]>
}

View file

@ -1,6 +1,7 @@
import { Table } from "../../definitions/common"
import { IntegrationBase } from "./IntegrationBase"
export interface DatasourcePlus {
export interface DatasourcePlus extends IntegrationBase {
tables: Record<string, Table>
schemaErrors: Record<string, string>

View file

@ -3,6 +3,7 @@ import {
DatasourceFieldTypes,
QueryTypes,
} from "../definitions/datasource"
import { IntegrationBase } from "./base/IntegrationBase"
module CouchDBModule {
const PouchDB = require("pouchdb")
@ -50,7 +51,7 @@ module CouchDBModule {
},
}
class CouchDBIntegration {
class CouchDBIntegration implements IntegrationBase {
private config: CouchDBConfig
private client: any

View file

@ -3,6 +3,7 @@ import {
DatasourceFieldTypes,
QueryTypes,
} from "../definitions/datasource"
import { IntegrationBase } from "./base/IntegrationBase"
module DynamoModule {
const AWS = require("aws-sdk")
@ -113,7 +114,7 @@ module DynamoModule {
},
}
class DynamoDBIntegration {
class DynamoDBIntegration implements IntegrationBase {
private config: DynamoDBConfig
private client: any

View file

@ -3,6 +3,7 @@ import {
DatasourceFieldTypes,
QueryTypes,
} from "../definitions/datasource"
import { IntegrationBase } from "./base/IntegrationBase"
module ElasticsearchModule {
const { Client } = require("@elastic/elasticsearch")
@ -74,7 +75,7 @@ module ElasticsearchModule {
},
}
class ElasticSearchIntegration {
class ElasticSearchIntegration implements IntegrationBase {
private config: ElasticsearchConfig
private client: any

View file

@ -3,6 +3,7 @@ import {
DatasourceFieldTypes,
QueryTypes,
} from "../definitions/datasource"
import { IntegrationBase } from "./base/IntegrationBase"
module MongoDBModule {
const { MongoClient } = require("mongodb")
@ -62,7 +63,7 @@ module MongoDBModule {
},
}
class MongoIntegration {
class MongoIntegration implements IntegrationBase {
private config: MongoDBConfig
private client: any

View file

@ -184,7 +184,7 @@ module MySQLModule {
return results.length ? results : [{ created: true }]
}
read(query: SqlQuery | string) {
async read(query: SqlQuery | string) {
return internalQuery(this.client, getSqlQuery(query))
}

View file

@ -3,6 +3,7 @@ import {
DatasourceFieldTypes,
QueryTypes,
} from "../definitions/datasource"
import { IntegrationBase } from "./base/IntegrationBase"
module RestModule {
const fetch = require("node-fetch")
@ -131,7 +132,7 @@ module RestModule {
},
}
class RestIntegration {
class RestIntegration implements IntegrationBase {
private config: RestConfig
private headers: {
[key: string]: string

View file

@ -1,4 +1,5 @@
import { Integration, QueryTypes } from "../definitions/datasource"
import { IntegrationBase } from "./base/IntegrationBase"
module S3Module {
const AWS = require("aws-sdk")
@ -42,7 +43,7 @@ module S3Module {
},
}
class S3Integration {
class S3Integration implements IntegrationBase {
private readonly config: S3Config
private client: any
private connectionPromise: Promise<any>

View file

@ -0,0 +1,31 @@
const workerFarm = require("worker-farm")
const MAX_WORKER_TIME_MS = 10000
const workers = workerFarm(
{
autoStart: true,
maxConcurrentWorkers: 1,
maxCallTime: MAX_WORKER_TIME_MS,
},
require.resolve("./runner")
)
function runService(data) {
return new Promise((resolve, reject) => {
workers(data, (err, response) => {
if (err) {
reject(err)
} else {
resolve(response)
}
})
})
}
module.exports = async (datasource, queryVerb, query, transformer) => {
return runService({
datasource,
queryVerb,
query,
transformer,
})
}

View file

@ -0,0 +1,63 @@
const ScriptRunner = require("../scriptRunner")
const { integrations } = require("../../integrations")
function formatResponse(resp) {
if (typeof resp === "string") {
try {
resp = JSON.parse(resp)
} catch (err) {
resp = { response: resp }
}
}
return resp
}
async function runAndTransform(datasource, queryVerb, query, transformer) {
const Integration = integrations[datasource.source]
if (!Integration) {
throw "Integration type does not exist."
}
const integration = new Integration(datasource.config)
let rows = formatResponse(await integration[queryVerb](query))
// transform as required
if (transformer) {
const runner = new ScriptRunner(transformer, { data: rows })
rows = runner.execute()
}
// needs to an array for next step
if (!Array.isArray(rows)) {
rows = [rows]
}
// map into JSON if just raw primitive here
if (rows.find(row => typeof row !== "object")) {
rows = rows.map(value => ({ value }))
}
// get all the potential fields in the schema
let keys = rows.flatMap(Object.keys)
if (integration.end) {
integration.end()
}
return { rows, keys }
}
module.exports = (input, callback) => {
runAndTransform(
input.datasource,
input.queryVerb,
input.query,
input.transformer
)
.then(response => {
callback(null, response)
})
.catch(err => {
callback(err)
})
}

View file

@ -2,7 +2,7 @@
"compilerOptions": {
"target": "es6",
"module": "commonjs",
"lib": ["es6"],
"lib": ["es2019"],
"allowJs": true,
"outDir": "dist",
"strict": true,

View file

@ -943,10 +943,10 @@
resolved "https://registry.yarnpkg.com/@bcoe/v8-coverage/-/v8-coverage-0.2.3.tgz#75a2e8b51cb758a7553d6804a5932d7aace75c39"
integrity sha512-0hYQ8SB4Db5zvZB4axdMHGwEaQjkZzFjQiN9LVYvIFB2nSUHW9tYpxWriPrWDASIxiaXax83REcLxuSdnGPZtw==
"@budibase/auth@^0.9.169-alpha.1":
version "0.9.169"
resolved "https://registry.yarnpkg.com/@budibase/auth/-/auth-0.9.169.tgz#fd2a8fc271782ba857259ace15118a4d53b3d161"
integrity sha512-Q087k/54Nzx6Oeg5uL7YD/9BB+qkBWIv7h4ct+cNQJFNK/aKKN8JLQft+z3mBN5omHTkdJYFmbgXWFxtX+rR3Q==
"@budibase/auth@^0.9.180-alpha.1":
version "0.9.183"
resolved "https://registry.yarnpkg.com/@budibase/auth/-/auth-0.9.183.tgz#da5a7e8b8ba9909d33399bbcd1b7164690ada257"
integrity sha512-BNlD4f7YfQejaq1wgMiIPzkNB+fu0HFpg9lyPYaD/mDWpa0F3HdMK3LxYewda9uRy9LJf6LtR3NJxVFvo0zXHA==
dependencies:
"@techpass/passport-openidconnect" "^0.3.0"
aws-sdk "^2.901.0"
@ -956,6 +956,7 @@
jsonwebtoken "^8.5.1"
koa-passport "^4.1.4"
lodash "^4.17.21"
lodash.isarguments "^3.1.0"
node-fetch "^2.6.1"
passport-google-auth "^1.0.2"
passport-google-oauth "^2.0.0"
@ -1015,10 +1016,10 @@
svelte-flatpickr "^3.1.0"
svelte-portal "^1.0.0"
"@budibase/bbui@^0.9.169":
version "0.9.169"
resolved "https://registry.yarnpkg.com/@budibase/bbui/-/bbui-0.9.169.tgz#e8dac59b9792a7edf03c4301a9069760e2ebd2f4"
integrity sha512-2hks6GEjcXbDUzC37WgJvgloiqTP5ZS7IuRjlHU9kStDr6dAnXuy8pO6JNJmKrTXt+rgtwhHHrVWzzcmNLIYxA==
"@budibase/bbui@^0.9.183":
version "0.9.183"
resolved "https://registry.yarnpkg.com/@budibase/bbui/-/bbui-0.9.183.tgz#7e2ad9a34ec5ae9f32bc9d263199217b324f1b8c"
integrity sha512-SFTb5rxfUB1rVYMASvtwVYb5XDhSdsQ1Fkr85Mn+ME284WQqBeJKRSz87jLVXJFQAnSpPEDUShOUTTFVByqpig==
dependencies:
"@adobe/spectrum-css-workflow-icons" "^1.2.1"
"@spectrum-css/actionbutton" "^1.0.1"
@ -1064,14 +1065,14 @@
svelte-flatpickr "^3.2.3"
svelte-portal "^1.0.0"
"@budibase/client@^0.9.169-alpha.1":
version "0.9.169"
resolved "https://registry.yarnpkg.com/@budibase/client/-/client-0.9.169.tgz#bec370b8f069b42f62483b281d6b9e2c7c8625f3"
integrity sha512-/bDnwv2iRysZrcrBQJEKzuxdwkwoJ2FalmQFhsfj+V/MWBN/wpQSDbJZQwf/YcI5bQk8f7xIn95O+DMH/m5izg==
"@budibase/client@^0.9.180-alpha.1":
version "0.9.183"
resolved "https://registry.yarnpkg.com/@budibase/client/-/client-0.9.183.tgz#cf86a2e0382d7e4a0898630f10f17d7640ce256d"
integrity sha512-1gw8EVIwouNJtYPgByX97EyeegAm35+jSd6irjU0PQEKldtvw2vLI9hmatvUdkUqLFUCT5PeXq37xfkp2JCYLQ==
dependencies:
"@budibase/bbui" "^0.9.169"
"@budibase/bbui" "^0.9.183"
"@budibase/standard-components" "^0.9.139"
"@budibase/string-templates" "^0.9.169"
"@budibase/string-templates" "^0.9.183"
regexparam "^1.3.0"
shortid "^2.2.15"
svelte-spa-router "^3.0.5"
@ -1121,16 +1122,17 @@
svelte-apexcharts "^1.0.2"
svelte-flatpickr "^3.1.0"
"@budibase/string-templates@^0.9.169", "@budibase/string-templates@^0.9.169-alpha.1":
version "0.9.169"
resolved "https://registry.yarnpkg.com/@budibase/string-templates/-/string-templates-0.9.169.tgz#3c0be97718f39a92ff6b2dbb8b470aaa7851005e"
integrity sha512-JUyg6XuUgFqnfdDSCAplo4cTtrqdSZ9NPrU3iGudZEQjO/Wk5sezWPznl3Yw/kFHKmPLjFHIveEa2+lODEAxIA==
"@budibase/string-templates@^0.9.180-alpha.1", "@budibase/string-templates@^0.9.183":
version "0.9.183"
resolved "https://registry.yarnpkg.com/@budibase/string-templates/-/string-templates-0.9.183.tgz#c75dc298d8ec69e1717721b46c3c99448b5ee0a1"
integrity sha512-S3Z81c2YGtG0hUXvOrDKn8Gj4iu1adxIDeNgHJAsesID3/SrI9KBhExx1HzIP14SLZlFEao5A12cVtpFBHC7LQ==
dependencies:
"@budibase/handlebars-helpers" "^0.11.7"
dayjs "^1.10.4"
handlebars "^4.7.6"
handlebars-utils "^1.0.6"
lodash "^4.17.20"
vm2 "^3.9.4"
"@cnakazawa/watch@^1.0.3":
version "1.0.4"
@ -4476,7 +4478,7 @@ ent@^2.2.0:
resolved "https://registry.yarnpkg.com/ent/-/ent-2.2.0.tgz#e964219325a21d05f44466a2f686ed6ce5f5dd1d"
integrity sha1-6WQhkyWiHQX0RGai9obtbOX13R0=
errno@~0.1.1:
errno@~0.1.1, errno@~0.1.7:
version "0.1.8"
resolved "https://registry.yarnpkg.com/errno/-/errno-0.1.8.tgz#8bb3e9c7d463be4976ff888f76b4809ebc2e811f"
integrity sha512-dJ6oBr5SQ1VSd9qkk7ByRgb/1SH4JZjCHSW/mr63/QcXO9zLVxvJ6Oy13nio03rxpSnVDDjFor75SjVeZWPW/A==
@ -11785,6 +11787,11 @@ vm2@^3.9.3:
resolved "https://registry.yarnpkg.com/vm2/-/vm2-3.9.4.tgz#2e118290fefe7bd8ea09ebe2f5faf53730dbddaa"
integrity sha512-sOdharrJ7KEePIpHekiWaY1DwgueuiBeX/ZBJUPgETsVlJsXuEx0K0/naATq2haFvJrvZnRiORQRubR0b7Ye6g==
vm2@^3.9.4:
version "3.9.5"
resolved "https://registry.yarnpkg.com/vm2/-/vm2-3.9.5.tgz#5288044860b4bbace443101fcd3bddb2a0aa2496"
integrity sha512-LuCAHZN75H9tdrAiLFf030oW7nJV5xwNMuk1ymOZwopmuK3d2H4L1Kv4+GFHgarKiLfXXLFU+7LDABHnwOkWng==
vuvuzela@1.0.3:
version "1.0.3"
resolved "https://registry.yarnpkg.com/vuvuzela/-/vuvuzela-1.0.3.tgz#3be145e58271c73ca55279dd851f12a682114b0b"
@ -11920,6 +11927,13 @@ wordwrap@^1.0.0:
resolved "https://registry.yarnpkg.com/wordwrap/-/wordwrap-1.0.0.tgz#27584810891456a4171c8d0226441ade90cbcaeb"
integrity sha1-J1hIEIkUVqQXHI0CJkQa3pDLyus=
worker-farm@^1.7.0:
version "1.7.0"
resolved "https://registry.yarnpkg.com/worker-farm/-/worker-farm-1.7.0.tgz#26a94c5391bbca926152002f69b84a4bf772e5a8"
integrity sha512-rvw3QTZc8lAxyVrqcSGVm5yP/IJ2UcB3U0graE3LCFoZ0Yn2x4EoVSqJKdB/T5M+FLcRPjz4TDacRf3OCfNUzw==
dependencies:
errno "~0.1.7"
wrap-ansi@^5.1.0:
version "5.1.0"
resolved "https://registry.yarnpkg.com/wrap-ansi/-/wrap-ansi-5.1.0.tgz#1fd1f67235d5b6d0fee781056001bfb694c03b09"