1
0
Fork 0
mirror of synced 2024-08-05 13:21:26 +12:00

bulkImport upsert working everywhere excpet mssql

This commit is contained in:
Sam Rose 2024-06-18 17:43:25 +01:00
parent 335c174643
commit 5ac8a7d514
No known key found for this signature in database
4 changed files with 51 additions and 16 deletions

View file

@ -571,6 +571,27 @@ class InternalBuilder {
return query.insert(parsedBody) return query.insert(parsedBody)
} }
bulkUpsert(knex: Knex, json: QueryJson): Knex.QueryBuilder {
const { endpoint, body } = json
let query = this.knexWithAlias(knex, endpoint)
if (!Array.isArray(body)) {
return query
}
const parsedBody = body.map(row => parseBody(row))
if (
this.client === SqlClient.POSTGRES ||
this.client === SqlClient.SQL_LITE ||
this.client === SqlClient.MY_SQL
) {
const primary = json.meta.table.primary
if (!primary) {
throw new Error("Primary key is required for upsert")
}
return query.insert(parsedBody).onConflict(primary).merge()
}
return query.upsert(parsedBody)
}
read(knex: Knex, json: QueryJson, limit: number): Knex.QueryBuilder { read(knex: Knex, json: QueryJson, limit: number): Knex.QueryBuilder {
let { endpoint, resource, filters, paginate, relationships, tableAliases } = let { endpoint, resource, filters, paginate, relationships, tableAliases } =
json json
@ -708,6 +729,9 @@ class SqlQueryBuilder extends SqlTableQueryBuilder {
case Operation.BULK_CREATE: case Operation.BULK_CREATE:
query = builder.bulkCreate(client, json) query = builder.bulkCreate(client, json)
break break
case Operation.BULK_UPSERT:
query = builder.bulkUpsert(client, json)
break
case Operation.CREATE_TABLE: case Operation.CREATE_TABLE:
case Operation.UPDATE_TABLE: case Operation.UPDATE_TABLE:
case Operation.DELETE_TABLE: case Operation.DELETE_TABLE:

View file

@ -98,7 +98,7 @@ export async function bulkImport(
table = processed.table table = processed.table
} }
await handleRequest(Operation.BULK_CREATE, table._id!, { await handleRequest(Operation.BULK_UPSERT, table._id!, {
rows: parsedRows, rows: parsedRows,
}) })
await events.rows.imported(table, parsedRows.length) await events.rows.imported(table, parsedRows.length)

View file

@ -26,6 +26,7 @@ import {
Table, Table,
TableSourceType, TableSourceType,
UpdatedRowEventEmitter, UpdatedRowEventEmitter,
TableSchema,
} from "@budibase/types" } from "@budibase/types"
import { generator, mocks } from "@budibase/backend-core/tests" import { generator, mocks } from "@budibase/backend-core/tests"
import _, { merge } from "lodash" import _, { merge } from "lodash"
@ -59,9 +60,9 @@ async function waitForEvent(
describe.each([ describe.each([
["internal", undefined], ["internal", undefined],
[DatabaseName.POSTGRES, getDatasource(DatabaseName.POSTGRES)], [DatabaseName.POSTGRES, getDatasource(DatabaseName.POSTGRES)],
// [DatabaseName.MYSQL, getDatasource(DatabaseName.MYSQL)], [DatabaseName.MYSQL, getDatasource(DatabaseName.MYSQL)],
// [DatabaseName.SQL_SERVER, getDatasource(DatabaseName.SQL_SERVER)], [DatabaseName.SQL_SERVER, getDatasource(DatabaseName.SQL_SERVER)],
// [DatabaseName.MARIADB, getDatasource(DatabaseName.MARIADB)], [DatabaseName.MARIADB, getDatasource(DatabaseName.MARIADB)],
])("/rows (%s)", (providerType, dsProvider) => { ])("/rows (%s)", (providerType, dsProvider) => {
const isInternal = dsProvider === undefined const isInternal = dsProvider === undefined
const config = setup.getConfig() const config = setup.getConfig()
@ -88,6 +89,23 @@ describe.each([
// the table name they're writing to. // the table name they're writing to.
...overrides: Partial<Omit<SaveTableRequest, "name">>[] ...overrides: Partial<Omit<SaveTableRequest, "name">>[]
): SaveTableRequest { ): SaveTableRequest {
const defaultSchema: TableSchema = {
id: {
type: FieldType.AUTO,
name: "id",
autocolumn: true,
constraints: {
presence: true,
},
},
}
for (const override of overrides) {
if (override.primary) {
delete defaultSchema.id
}
}
const req: SaveTableRequest = { const req: SaveTableRequest = {
name: uuid.v4().substring(0, 10), name: uuid.v4().substring(0, 10),
type: "table", type: "table",
@ -96,16 +114,7 @@ describe.each([
: TableSourceType.INTERNAL, : TableSourceType.INTERNAL,
sourceId: datasource ? datasource._id! : INTERNAL_TABLE_SOURCE_ID, sourceId: datasource ? datasource._id! : INTERNAL_TABLE_SOURCE_ID,
primary: ["id"], primary: ["id"],
schema: { schema: defaultSchema,
id: {
type: FieldType.AUTO,
name: "id",
autocolumn: true,
constraints: {
presence: true,
},
},
},
} }
return merge(req, ...overrides) return merge(req, ...overrides)
} }
@ -929,7 +938,7 @@ describe.each([
}) })
}) })
describe.only("bulkImport", () => { describe("bulkImport", () => {
isInternal && isInternal &&
it("should update Auto ID field after bulk import", async () => { it("should update Auto ID field after bulk import", async () => {
const table = await config.api.table.save( const table = await config.api.table.save(
@ -1004,9 +1013,10 @@ describe.each([
await assertRowUsage(isInternal ? rowUsage + 2 : rowUsage) await assertRowUsage(isInternal ? rowUsage + 2 : rowUsage)
}) })
it.only("should be able to update existing rows with bulkImport", async () => { it("should be able to update existing rows with bulkImport", async () => {
const table = await config.api.table.save( const table = await config.api.table.save(
saveTableRequest({ saveTableRequest({
primary: ["userId"],
schema: { schema: {
userId: { userId: {
type: FieldType.NUMBER, type: FieldType.NUMBER,

View file

@ -9,6 +9,7 @@ export enum Operation {
UPDATE = "UPDATE", UPDATE = "UPDATE",
DELETE = "DELETE", DELETE = "DELETE",
BULK_CREATE = "BULK_CREATE", BULK_CREATE = "BULK_CREATE",
BULK_UPSERT = "BULK_UPSERT",
CREATE_TABLE = "CREATE_TABLE", CREATE_TABLE = "CREATE_TABLE",
UPDATE_TABLE = "UPDATE_TABLE", UPDATE_TABLE = "UPDATE_TABLE",
DELETE_TABLE = "DELETE_TABLE", DELETE_TABLE = "DELETE_TABLE",