-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
fix(sdk): Disable concurrency on rewrap
- Adds new `concurrencyLimit` decrypt param, which sets a thread pool (kinda) - Defaults value to 1
- Loading branch information
1 parent
c6cdbef
commit ac611ce
Showing
5 changed files
with
206 additions
and
61 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,96 @@ | ||
type LabelledSuccess<T> = { lid: string; value: Promise<T> }; | ||
type LabelledFailure = { lid: string; e: any }; | ||
|
||
async function labelPromise<T>(label: string, promise: Promise<T>): Promise<LabelledSuccess<T>> { | ||
try { | ||
const value = await promise; | ||
return { lid: label, value: Promise.resolve(value) }; | ||
} catch (e) { | ||
throw { lid: label, e }; | ||
} | ||
} | ||
|
||
// Pooled variant of Promise.all; implements most of the logic of the real all, | ||
// but with a pool size of n. Rejects on first reject, or returns a list | ||
// of all successful responses. Operates with at most n 'active' promises at a time. | ||
// For tracking purposes, all promises must have a unique identifier. | ||
export async function allPool<T>(n: number, p: Record<string, Promise<T>>): Promise<Awaited<T>[]> { | ||
const pool: Record<string, Promise<LabelledSuccess<T>>> = {}; | ||
const resolved: Awaited<T>[] = []; | ||
for (const [id, job] of Object.entries(p)) { | ||
// while the size of jobs to do is greater than n, | ||
// let n jobs run and take the first one to finish out of the pool | ||
pool[id] = labelPromise(id, job); | ||
if (Object.keys(pool).length > n - 1) { | ||
const promises = Object.values(pool); | ||
try { | ||
const { lid, value } = await Promise.race(promises); | ||
resolved.push(await value); | ||
console.log(`succeeded on promise ${lid}`, value); | ||
delete pool[lid]; | ||
} catch (err) { | ||
const { lid, e } = err as LabelledFailure; | ||
console.warn(`failed on promise ${lid}`, err); | ||
throw e; | ||
} | ||
} | ||
} | ||
try { | ||
for (const labelled of await Promise.all(Object.values(pool))) { | ||
console.log(`real.all succeeded on promise ${labelled.lid}`, labelled); | ||
resolved.push(await labelled.value); | ||
} | ||
} catch (err) { | ||
if ('lid' in err && 'e' in err) { | ||
throw err.e; | ||
} else { | ||
throw err; | ||
} | ||
} | ||
return resolved; | ||
} | ||
|
||
// Pooled variant of promise.any; implements most of the logic of the real any, | ||
// but with a pool size of n, and returns the first successful promise, | ||
// operating with at most n 'active' promises at a time. | ||
export async function anyPool<T>(n: number, p: Record<string, Promise<T>>): Promise<Awaited<T>> { | ||
const pool: Record<string, Promise<LabelledSuccess<T>>> = {}; | ||
const rejections = []; | ||
for (const [id, job] of Object.entries(p)) { | ||
// while the size of jobs to do is greater than n, | ||
// let n jobs run and take the first one to finish out of the pool | ||
pool[id] = labelPromise(id, job); | ||
if (Object.keys(pool).length > n - 1) { | ||
const promises = Object.values(pool); | ||
try { | ||
const { lid, value } = await Promise.race(promises); | ||
console.log(`any succeeded on promise ${lid}`, value); | ||
return await value; | ||
} catch (error) { | ||
const { lid, e } = error; | ||
rejections.push(e); | ||
delete pool[lid]; | ||
console.log(`any failed on promise ${lid}`, e); | ||
} | ||
} | ||
} | ||
try { | ||
const { lid, value } = await Promise.any(Object.values(pool)); | ||
console.log(`real.any succeeded on promise ${lid}`); | ||
return await value; | ||
} catch (errors) { | ||
console.log(`real.any failed`, errors); | ||
if (errors instanceof AggregateError) { | ||
for (const error of errors.errors) { | ||
if ('lid' in error && 'e' in error) { | ||
rejections.push(error.e); | ||
} else { | ||
rejections.push(error); | ||
} | ||
} | ||
} else { | ||
rejections.push(errors); | ||
} | ||
} | ||
throw new AggregateError(rejections); | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,65 @@ | ||
import { allPool, anyPool } from '../../../src/concurrency.js'; | ||
import { expect } from 'chai'; | ||
|
||
describe('concurrency', () => { | ||
for (const n of [1, 2, 3, 4]) { | ||
describe(`allPool(${n})`, () => { | ||
it(`should resolve all promises with a pool size of ${n}`, async () => { | ||
const promises = { | ||
a: Promise.resolve(1), | ||
b: Promise.resolve(2), | ||
c: Promise.resolve(3), | ||
}; | ||
const result = await allPool(n, promises); | ||
expect(result).to.have.members([1, 2, 3]); | ||
}); | ||
it(`should reject if any promise rejects, n=${n}`, async () => { | ||
const promises = { | ||
a: Promise.resolve(1), | ||
b: Promise.reject(new Error('failure')), | ||
c: Promise.resolve(3), | ||
}; | ||
try { | ||
await allPool(n, promises); | ||
} catch (e) { | ||
expect(e).to.contain({ message: 'failure' }); | ||
} | ||
}); | ||
}); | ||
describe(`anyPool(${n})`, () => { | ||
it('should resolve with the first resolved promise', async () => { | ||
const startTime = Date.now(); | ||
const promises = { | ||
a: new Promise((resolve) => setTimeout(() => resolve(1), 500)), | ||
b: new Promise((resolve) => setTimeout(() => resolve(2), 50)), | ||
c: new Promise((resolve) => setTimeout(() => resolve(3), 1500)), | ||
}; | ||
const result = await anyPool(n, promises); | ||
const endTime = Date.now(); | ||
const elapsed = endTime - startTime; | ||
if (n > 1) { | ||
expect(elapsed).to.be.lessThan(500); | ||
expect(result).to.equal(2); | ||
} else { | ||
expect(elapsed).to.be.greaterThan(50); | ||
expect(elapsed).to.be.lessThan(1000); | ||
expect(result).to.equal(1); | ||
} | ||
}); | ||
|
||
it('should reject if all promises reject', async () => { | ||
const promises = { | ||
a: Promise.reject(new Error('failure1')), | ||
b: Promise.reject(new Error('failure2')), | ||
c: Promise.reject(new Error('failure3')), | ||
}; | ||
try { | ||
await anyPool(n, promises); | ||
} catch (e) { | ||
expect(e).to.be.instanceOf(AggregateError); | ||
expect(e.errors).to.have.lengthOf(3); | ||
} | ||
}); | ||
}); | ||
} | ||
}); |