Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow stopping iteration via pMap.stop #36

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 81 additions & 6 deletions index.d.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
declare const stop: unique symbol;

export type StopSymbol = typeof stop;
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see StopSymbol used anywhere.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on your idea above of not exposing the existence of { [stop] } in the returned object, perhaps this is indeed not needed. The reason I first added this was to have a way to expose it to the user, since for example in find-up it is also exposed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should I undo this, then?

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should I undo this, then?

Yes

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think about passing options param for exposing stop in the returned object?


export interface Options {
/**
Number of concurrently pending promises returned by `mapper`.
Expand All @@ -16,6 +20,46 @@ export interface Options {
readonly stopOnError?: boolean;
}

export interface OngoingMappingsStopOptions {
/**
Whether or not to remove all holes from the result array (caused by pending mappings).

@default false
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any thoughts on making this true by default?

Copy link
Contributor Author

@papb papb Jun 7, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's fine by me. I don't have any strong opinion. Personally, I will probably set this option explicitly whenever I use .stop, regardless of what is the default. Do you want me to change it to true?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Richienb do you have any opinion on what would be the best default value here?

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets change it to true then.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets change it to true then.

Agree, to match its behaviour with pMapSkip which could have left holes but didn't.

*/
readonly collapse?: boolean;

/**
Value to use as immediate result for pending mappings, replacing holes from the result array.

This option is ignored if `collapse` is set to `true`.

@default undefined
*/
readonly fillWith?: unknown;
}

export interface StopOptions<NewElement> {
/**
Value to provide as result for this iteration.

@default undefined
*/
readonly value?: NewElement;

/**
Options to configure what `pMap` must do with any concurrent ongoing mappings at the moment `stop` is called.
*/
readonly ongoingMappings?: OngoingMappingsStopOptions;
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if there's any way we could flatten the options object:

  • value
  • collpasePending
  • fillPending

Needs better names.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it makes sense to group them. In the future, if support for canceling promises and AbortController is added, they might probably need options that could also be added to this group.

Do you also want better names for the way I've currently done it? Or you said "better names" only for your suggestion itself?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe something like this:

  • value Value to resolve with
  • defaultValue Value to set when the promise hasn't been run
    What if the consumer wants it to be an empty value? Maybe this could be in the form of passing a pMap.stop.empty symbol
  • stopRunning Whether currently running promises should be allowed to finish
  • removeIncomplete Whether empty items in the resulting array where there should have been values if all the promises had run should be removed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still think the non-flattened way is better, because value is about the current iteration, while everything else is about configuring what happens to the rest of the mapping itself. Maybe instead of ongoingMappings, one of:

  • rest
  • restConfig
  • restOptions
  • whatToDoWithOngoingMappings
  • mappingsStillRunning
  • inProgressBehavior
  • inProgressConfig

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if the consumer wants it to be an empty value? Maybe this could be in the form of passing a pMap.stop.empty symbol

I think this particular behavior could be left for another PR. I've never seen good uses for holes in arrays, they're an obscure feature of the language, if I'm not mistaken TypeScript for example pretends they don't exist. Sindre also said it here. Although if you found a good use case for them, I'd be interested in knowing more.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alright, we can stick with the nested option. I'm not really sold on most of these names though. The problem is that it applies to any mappings, not just running ones, but also pending ones. For example, if the concurrency option is used, not all remaining mappings are executing when it's stopped. I think the only one that is usable is rest, so I guess we can go with that.

}

type BaseStopValueWrapper<NewElement> = {
[stop]: Required<StopOptions<NewElement>>;
};

export type StopValueWrapper<NewElement> = NewElement extends any ? BaseStopValueWrapper<NewElement> : never;

type MaybeWrappedInStop<NewElement> = NewElement | StopValueWrapper<NewElement>;
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why these 3 types? Not immediately clear why this complication is needed. We don't need to expose anything about stop to the user in the return value. For the user, it can just be an opaque object.

Copy link
Contributor Author

@papb papb Jun 7, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why these 3 types? Not immediately clear why this complication is needed.

To be honest that was the simplest way I could make it work. All attempts to simplify this further caused a test in test-d.ts to fail. In particular, I observed that TypeScript does not realize automatically that { x: A } | { x: B } is the same as { x: A | B }, so I had to write this extends any ? hack.

For the user, it can just be an opaque object.

Nice idea. Do you have any favorite way of declaring an opaque object in TypeScript? Recenly I've seen Record<never, never>, does this sound good?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need to expose anything about stop to the user in the return value. For the user, it can just be an opaque object.

Actually, on second thought, I think we have to, because as soon as the user uses return pMap.stop() inside mapper, TypeScript will infer the return type of mapper differently, and we need TS to infer it to something that we can unwrap later in order to provide a correct return type for the pMap call itself. This was quite complicated to achieve to be honest. I don't see how it would be possible if we simplify the result of .stop to a constant type such as Record<never, never>.

Let's merge it this way, currently, and you can open an issue to improve typings? Then if someone finds a way to do it without breaking the current TS tests I added, it can be done later. What do you think?

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't you just do what was done here: c9c0882 (Exclude the symbol from the return type)

// @Richienb

Copy link
Contributor

@Richienb Richienb Jun 30, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When a stop value is recieved, it could set a variable which causes future resolutions of other items to avoid adding new values or creating new calls. Then, the array can be processed and resolved.


/**
Function which is called for every item in `input`. Expected to return a `Promise` or value.

Expand All @@ -25,7 +69,7 @@ Function which is called for every item in `input`. Expected to return a `Promis
export type Mapper<Element = any, NewElement = unknown> = (
element: Element,
index: number
) => NewElement | Promise<NewElement>;
) => MaybeWrappedInStop<NewElement> | Promise<MaybeWrappedInStop<NewElement>>;

/**
@param input - Iterated over concurrently in the `mapper` function.
Expand Down Expand Up @@ -54,8 +98,39 @@ console.log(result);
//=> ['https://sindresorhus.com/', 'https://avajs.dev/', 'https://github.com/']
```
*/
export default function pMap<Element, NewElement>(
input: Iterable<Element>,
mapper: Mapper<Element, NewElement>,
options?: Options
): Promise<NewElement[]>;
declare const pMap: {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of defining a constant do this:

export function stop(...);

export default function pMap(...);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hello, I tried to do it this way on 941ed8b but TypeScript tests failed, so I reverted it via 31be4be, what do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The tests failed because the way the function is imported changed:

- import pMap from 'p-map';
- console.log(pMap, pMap.stop);
+ import pMap, {stop} from 'p-map';
+ console.log(pMap, stop);

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Per c9c0882, the symbol should be a named export called pMapStop.

<Element, NewElement>(
input: Iterable<Element>,
mapper: Mapper<Element, NewElement>,
options?: Options
): Promise<NewElement[]>;

/**
Creates a special value that should be returned from `mapper` in order to immediately stop iteration.

@example
```
import pMap from 'p-map';
import got from 'got';

const numbers = Array.from({length: 2000}).map((_, index) => index + 1);
//=> [1, 2, ..., 1999, 2000]

const mapper = async number => {
if (number !== 404) {
const {transcript} = await got(`https://xkcd.com/${number}/info.0.json`).json();
if (/unicorn/.test(transcript)) {
console.log('Found a XKCD comic with an unicorn:', number);
return pMap.stop();
}
}
};

await pMap(numbers, mapper, {concurrency: 50});
//=> Found a XKCD comic with an unicorn: 948
```
*/
stop: <NewElement>(options?: StopOptions<NewElement>) => StopValueWrapper<NewElement>;
};

export default pMap;
43 changes: 34 additions & 9 deletions index.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import AggregateError from 'aggregate-error';

const stopSymbol = Symbol('pMap.stop');

export default async function pMap(
iterable,
mapper,
Expand All @@ -20,13 +22,13 @@ export default async function pMap(
const result = [];
const errors = [];
const iterator = iterable[Symbol.iterator]();
let isRejected = false;
const pendingIndices = new Set();
let isFinished = false;
let isIterableDone = false;
let resolvingCount = 0;
let currentIndex = 0;

const next = () => {
if (isRejected) {
if (isFinished) {
return;
}

Expand All @@ -37,7 +39,7 @@ export default async function pMap(
if (nextItem.done) {
isIterableDone = true;

if (resolvingCount === 0) {
if (pendingIndices.size === 0) {
if (!stopOnError && errors.length > 0) {
reject(new AggregateError(errors));
} else {
Expand All @@ -48,21 +50,42 @@ export default async function pMap(
return;
}

resolvingCount++;
pendingIndices.add(index);

(async () => {
try {
const element = await nextItem.value;
result[index] = await mapper(element, index);
resolvingCount--;
next();

if (isFinished) {
return;
}

pendingIndices.delete(index);

if (result[index] && result[index][stopSymbol]) {
isFinished = true;
const stopConfig = result[index][stopSymbol];
result[index] = stopConfig.value;
if (stopConfig.ongoingMappings.collapse) {
resolve(result.flat(0));
} else {
for (const pendingIndex of pendingIndices) {
result[pendingIndex] = stopConfig.ongoingMappings.fillWith;
}

resolve(result);
}
} else {
next();
}
} catch (error) {
if (stopOnError) {
isRejected = true;
isFinished = true;
reject(error);
} else {
errors.push(error);
resolvingCount--;
pendingIndices.delete(index);
next();
}
}
Expand All @@ -78,3 +101,5 @@ export default async function pMap(
}
});
}

pMap.stop = ({value, ongoingMappings = {}} = {}) => ({[stopSymbol]: {value, ongoingMappings}});
28 changes: 27 additions & 1 deletion index.test-d.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import {expectType, expectAssignable} from 'tsd';
import {expectType, expectAssignable, expectNotAssignable} from 'tsd';
import pMap, {Options, Mapper} from './index.js';

const sites = [
Expand All @@ -18,13 +18,26 @@ const asyncSyncMapper = async (site: string, index: number): Promise<string> =>
index > 1 ? site : Promise.resolve(site);
const multiResultTypeMapper = async (site: string, index: number): Promise<string | number> =>
index > 1 ? site.length : site;
const mapperStoppingEarly1 = async (site: string, index: number) =>
index > 1 ? pMap.stop({value: site.length}) : site;
const mapperStoppingEarly2 = async (site: string, index: number) =>
index > 1 ? pMap.stop({value: index > 2 ? site.length : site}) : site;
const mapperStoppingEarly3 = async (site: string, index: number) =>
index > 1 ? pMap.stop({value: index > 2 ? site.length : (index > 3 ? Date.now() : site)}) : true;

expectAssignable<Mapper>(asyncMapper);
expectAssignable<Mapper<string, string>>(asyncMapper);
expectAssignable<Mapper>(asyncSyncMapper);
expectAssignable<Mapper<string, string>>(asyncSyncMapper);
expectAssignable<Mapper<string, string | Promise<string>>>(asyncSyncMapper);
expectAssignable<Mapper>(multiResultTypeMapper);
expectAssignable<Mapper<string, string | number>>(multiResultTypeMapper);
expectAssignable<Mapper>(mapperStoppingEarly1);
expectAssignable<Mapper<string, string | number>>(mapperStoppingEarly1);
expectAssignable<Mapper>(mapperStoppingEarly2);
expectAssignable<Mapper<string, string | number>>(mapperStoppingEarly2);
expectAssignable<Mapper>(mapperStoppingEarly3);
expectAssignable<Mapper<string, string | number | boolean | Date>>(mapperStoppingEarly3);

expectAssignable<Options>({});
expectAssignable<Options>({concurrency: 0});
Expand All @@ -40,3 +53,16 @@ expectType<Promise<string[]>>(pMap(sites, (site: string) => site));
expectType<Promise<number[]>>(pMap(sites, (site: string) => site.length));

expectType<Promise<number[]>>(pMap(numbers, (number: number) => number * 2));

expectType<Promise<number[]>>(pMap(numbers, (number: number) => number * 2));

pMap.stop();
pMap.stop({});
pMap.stop({value: 123});
pMap.stop({ongoingMappings: {}});
pMap.stop({ongoingMappings: {collapse: true}});
pMap.stop({ongoingMappings: {fillWith: 'hello'}});
pMap.stop({value: Date.now(), ongoingMappings: {collapse: false, fillWith: 'hello'}});

const shouldBeUnusableDirectly = pMap.stop({value: 123});
expectNotAssignable<number>(shouldBeUnusableDirectly);
67 changes: 66 additions & 1 deletion readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@

Useful when you need to run promise-returning & async functions multiple times with different inputs concurrently.

This is different from `Promise.all()` in that you can control the concurrency and also decide whether or not to stop iterating when there's an error.
This is different from `Promise.all()` in that you can:

* Control the concurrency
* Decide whether or not to stop iterating when there's an error
* Stop iterating at any point (like `break` in `for` loops)

## Install

Expand Down Expand Up @@ -35,6 +39,29 @@ console.log(result);
//=> ['https://sindresorhus.com/', 'https://avajs.dev/', 'https://github.com/']
```

### Breaking from iteration

```js
import pMap from 'p-map';
import got from 'got';

const numbers = Array.from({length: 2000}).map((_, index) => index + 1);
//=> [1, 2, ..., 1999, 2000]

const mapper = async number => {
if (number !== 404) {
const {transcript} = await got(`https://xkcd.com/${number}/info.0.json`).json();
if (/unicorn/.test(transcript)) {
console.log('Found a XKCD comic with an unicorn:', number);
return pMap.stop();
}
}
};

await pMap(numbers, mapper, {concurrency: 50});
//=> Found a XKCD comic with an unicorn: 948
```

## API

### pMap(input, mapper, options?)
Expand Down Expand Up @@ -72,6 +99,44 @@ Default: `true`

When set to `false`, instead of stopping when a promise rejects, it will wait for all the promises to settle and then reject with an [aggregated error](https://github.com/sindresorhus/aggregate-error) containing all the errors from the rejected promises.

### pMap.stop(options?)

Creates a special value that should be returned from `mapper` in order to immediately stop iteration.

#### options

Type: `object`

##### value

Type: `any`\
Default: `undefined`

Value to provide as result for this iteration.

##### ongoingMappings

Type: `object`

Options to configure what `pMap` must do with any concurrent ongoing mappings at the moment `stop` is called.

###### collapse

Type: `boolean`\
Default: `false`

Whether or not to remove all holes from the result array (caused by pending mappings).

###### fillWith

Type: `any`\
Default: `undefined`

Value to use as immediate result for pending mappings, replacing holes from the result array.

This option is ignored if `collapse` is set to `true`.


## p-map for enterprise

Available as part of the Tidelift Subscription.
Expand Down
Loading