diff --git a/packages/backend-core/src/sql/sql.ts b/packages/backend-core/src/sql/sql.ts index 9bc2092b83..c4f0d7015f 100644 --- a/packages/backend-core/src/sql/sql.ts +++ b/packages/backend-core/src/sql/sql.ts @@ -571,6 +571,27 @@ class InternalBuilder { return query.insert(parsedBody) } + bulkUpsert(knex: Knex, json: QueryJson): Knex.QueryBuilder { + const { endpoint, body } = json + let query = this.knexWithAlias(knex, endpoint) + if (!Array.isArray(body)) { + return query + } + const parsedBody = body.map(row => parseBody(row)) + if ( + this.client === SqlClient.POSTGRES || + this.client === SqlClient.SQL_LITE || + this.client === SqlClient.MY_SQL + ) { + const primary = json.meta.table.primary + if (!primary) { + throw new Error("Primary key is required for upsert") + } + return query.insert(parsedBody).onConflict(primary).merge() + } + return query.upsert(parsedBody) + } + read(knex: Knex, json: QueryJson, limit: number): Knex.QueryBuilder { let { endpoint, resource, filters, paginate, relationships, tableAliases } = json @@ -708,6 +729,9 @@ class SqlQueryBuilder extends SqlTableQueryBuilder { case Operation.BULK_CREATE: query = builder.bulkCreate(client, json) break + case Operation.BULK_UPSERT: + query = builder.bulkUpsert(client, json) + break case Operation.CREATE_TABLE: case Operation.UPDATE_TABLE: case Operation.DELETE_TABLE: diff --git a/packages/server/src/api/controllers/table/external.ts b/packages/server/src/api/controllers/table/external.ts index bd674d7d38..f1b186c233 100644 --- a/packages/server/src/api/controllers/table/external.ts +++ b/packages/server/src/api/controllers/table/external.ts @@ -98,7 +98,7 @@ export async function bulkImport( table = processed.table } - await handleRequest(Operation.BULK_CREATE, table._id!, { + await handleRequest(Operation.BULK_UPSERT, table._id!, { rows: parsedRows, }) await events.rows.imported(table, parsedRows.length) diff --git a/packages/server/src/api/routes/tests/row.spec.ts b/packages/server/src/api/routes/tests/row.spec.ts index ba935fc6ae..09b1f0fe01 100644 --- a/packages/server/src/api/routes/tests/row.spec.ts +++ b/packages/server/src/api/routes/tests/row.spec.ts @@ -26,6 +26,7 @@ import { Table, TableSourceType, UpdatedRowEventEmitter, + TableSchema, } from "@budibase/types" import { generator, mocks } from "@budibase/backend-core/tests" import _, { merge } from "lodash" @@ -59,9 +60,9 @@ async function waitForEvent( describe.each([ ["internal", undefined], [DatabaseName.POSTGRES, getDatasource(DatabaseName.POSTGRES)], - // [DatabaseName.MYSQL, getDatasource(DatabaseName.MYSQL)], - // [DatabaseName.SQL_SERVER, getDatasource(DatabaseName.SQL_SERVER)], - // [DatabaseName.MARIADB, getDatasource(DatabaseName.MARIADB)], + [DatabaseName.MYSQL, getDatasource(DatabaseName.MYSQL)], + [DatabaseName.SQL_SERVER, getDatasource(DatabaseName.SQL_SERVER)], + [DatabaseName.MARIADB, getDatasource(DatabaseName.MARIADB)], ])("/rows (%s)", (providerType, dsProvider) => { const isInternal = dsProvider === undefined const config = setup.getConfig() @@ -88,6 +89,23 @@ describe.each([ // the table name they're writing to. ...overrides: Partial>[] ): SaveTableRequest { + const defaultSchema: TableSchema = { + id: { + type: FieldType.AUTO, + name: "id", + autocolumn: true, + constraints: { + presence: true, + }, + }, + } + + for (const override of overrides) { + if (override.primary) { + delete defaultSchema.id + } + } + const req: SaveTableRequest = { name: uuid.v4().substring(0, 10), type: "table", @@ -96,16 +114,7 @@ describe.each([ : TableSourceType.INTERNAL, sourceId: datasource ? datasource._id! : INTERNAL_TABLE_SOURCE_ID, primary: ["id"], - schema: { - id: { - type: FieldType.AUTO, - name: "id", - autocolumn: true, - constraints: { - presence: true, - }, - }, - }, + schema: defaultSchema, } return merge(req, ...overrides) } @@ -929,7 +938,7 @@ describe.each([ }) }) - describe.only("bulkImport", () => { + describe("bulkImport", () => { isInternal && it("should update Auto ID field after bulk import", async () => { const table = await config.api.table.save( @@ -1004,9 +1013,10 @@ describe.each([ await assertRowUsage(isInternal ? rowUsage + 2 : rowUsage) }) - it.only("should be able to update existing rows with bulkImport", async () => { + it("should be able to update existing rows with bulkImport", async () => { const table = await config.api.table.save( saveTableRequest({ + primary: ["userId"], schema: { userId: { type: FieldType.NUMBER, diff --git a/packages/types/src/sdk/datasources.ts b/packages/types/src/sdk/datasources.ts index 3a788cbac6..da10bf26c6 100644 --- a/packages/types/src/sdk/datasources.ts +++ b/packages/types/src/sdk/datasources.ts @@ -9,6 +9,7 @@ export enum Operation { UPDATE = "UPDATE", DELETE = "DELETE", BULK_CREATE = "BULK_CREATE", + BULK_UPSERT = "BULK_UPSERT", CREATE_TABLE = "CREATE_TABLE", UPDATE_TABLE = "UPDATE_TABLE", DELETE_TABLE = "DELETE_TABLE",