mapAsyncWithRateLimit
/**
* In mapAsync, we map every item with an async function and return a Promise
* that resolves to all mapped results.
*
* In real systems, mapping a large array directly can trigger too many parallel
* API calls and cause rate limiting. This helper keeps the execution concurrent
* but bounded by a maximum number of ongoing tasks.
*
* - iterable: input items to process.
* - callbackFn: async mapper called for each item.
* - size: optional concurrency limit (maximum ongoing tasks).
* If omitted, concurrency is unlimited.
*
* The function returns a Promise that resolves when all tasks are complete.
*
* @param {Array<unknown>} iterable
* @param {(value: unknown) => Promise<unknown>} callbackFn
* @param {number} [size=Infinity]
*
* @return {Promise<Array<unknown>>}
*/
export default function mapAsyncLimit(iterable, callbackFn, size) {
return new Promise((resolve, reject) => {
const results = new Array(iterable.length);
let nextIndex = 0;
let completed = 0;
let settled = false;
// If size is omitted, run with max parallelism.
const limit = size == null ? iterable.length : Math.max(1, Math.floor(size));
// Fast path: nothing to process.
if (iterable.length === 0) {
resolve(results);
return;
}
const processNext = () => {
if (settled) {
return;
}
// Pick one item for this worker.
if (nextIndex >= iterable.length) {
return;
}
const currentIndex = nextIndex;
const value = iterable[currentIndex];
nextIndex++;
callbackFn(value)
.then((result) => {
// Keep output aligned with input order.
results[currentIndex] = result;
completed++;
if (completed === iterable.length) {
settled = true;
resolve(results);
return;
}
// Continue this worker with the next queued item.
processNext();
})
.catch((error) => {
// Fail-fast on the first rejected task.
if (!settled) {
settled = true;
reject(error);
}
});
};
// Start up to size concurrent workers.
for (let i = 0; i < Math.min(limit, iterable.length); i++) {
processNext();
}
});
}
// Test
// import mapAsyncLimit from './map-async-limit';
// const asyncIdentity = (x: number) => Promise.resolve(x);
// describe('mapAsyncLimit', () => {
// test('returns promise', () => {
// const p = mapAsyncLimit([], asyncIdentity);
// expect(p).toBeInstanceOf(Promise);
// });
// test('empty input array', async () => {
// expect.assertions(1);
// const res = await mapAsyncLimit([], asyncIdentity);
// expect(res).toEqual([]);
// });
// test('resolved', async () => {
// expect.assertions(1);
// let ongoing = 0;
// const limit = 2;
// const res = await mapAsyncLimit(
// [1, 2, 3, 4, 5],
// (x: number) => {
// ongoing++;
// return new Promise((resolve, reject) => {
// setTimeout(() => {
// if (ongoing > limit) {
// reject('Concurrency limit exceeded');
// }
// resolve(x * 2);
// ongoing--;
// }, 10);
// });
// },
// limit,
// );
// expect(res).toEqual([2, 4, 6, 8, 10]);
// });
// });