From ce105d8f4e590d68008427ce3702e10ae3398a62 Mon Sep 17 00:00:00 2001 From: Sam Rose Date: Thu, 19 Sep 2024 16:51:00 +0100 Subject: [PATCH] Revert unnecessary change. --- .../server/src/integrations/googlesheets.ts | 4 +- packages/shared-core/src/utils.ts | 53 +++++++++++-------- 2 files changed, 33 insertions(+), 24 deletions(-) diff --git a/packages/server/src/integrations/googlesheets.ts b/packages/server/src/integrations/googlesheets.ts index ea06ca8d69..d5667c276b 100644 --- a/packages/server/src/integrations/googlesheets.ts +++ b/packages/server/src/integrations/googlesheets.ts @@ -339,7 +339,7 @@ export class GoogleSheetsIntegration implements DatasourcePlus { const tables: Record = {} let errors: Record = {} - await utils.parallelForEach( + await utils.parallelForeach( sheets, async sheet => { try { @@ -367,7 +367,7 @@ export class GoogleSheetsIntegration implements DatasourcePlus { throw err } }, - { maxConcurrency: 2 } + 10 ) for (const sheet of sheets) { diff --git a/packages/shared-core/src/utils.ts b/packages/shared-core/src/utils.ts index 9ead522192..b69a059745 100644 --- a/packages/shared-core/src/utils.ts +++ b/packages/shared-core/src/utils.ts @@ -7,34 +7,43 @@ export function unreachable( throw new Error(message) } -interface PromiseWithId { - promise: Promise - id: number -} - -export async function parallelForEach( +export async function parallelForeach( items: T[], task: (item: T) => Promise, - opts?: { maxConcurrency?: number } + maxConcurrency: number ): Promise { - const { maxConcurrency = 10 } = opts || {} - let next = 0 - let inProgress: PromiseWithId[] = [] - while (next < items.length) { - if (inProgress.length === maxConcurrency) { - const finished = await Promise.race(inProgress.map(t => t.promise)) - inProgress = inProgress.filter(task => task.id !== finished) - } + const promises: Promise[] = [] + let index = 0 - const promise = async (next: number) => { - await task(items[next]) - return next + const processItem = async (item: T) => { + try { + await task(item) + } finally { + processNext() } - - inProgress.push({ promise: promise(next), id: next }) - next++ } - await Promise.all(inProgress.map(t => t.promise)) + + const processNext = () => { + if (index >= items.length) { + // No more items to process + return + } + + const item = items[index] + index++ + + const promise = processItem(item) + promises.push(promise) + + if (promises.length >= maxConcurrency) { + Promise.race(promises).then(processNext) + } else { + processNext() + } + } + processNext() + + await Promise.all(promises) } export function filterValueToLabel() {