273 lines
5.7 KiB
JavaScript
273 lines
5.7 KiB
JavaScript
'use strict';
|
|
const pTimeout = require('p-timeout');
|
|
|
|
const symbolAsyncIterator = Symbol.asyncIterator || '@@asyncIterator';
|
|
|
|
const normalizeEmitter = emitter => {
|
|
const addListener = emitter.on || emitter.addListener || emitter.addEventListener;
|
|
const removeListener = emitter.off || emitter.removeListener || emitter.removeEventListener;
|
|
|
|
if (!addListener || !removeListener) {
|
|
throw new TypeError('Emitter is not compatible');
|
|
}
|
|
|
|
return {
|
|
addListener: addListener.bind(emitter),
|
|
removeListener: removeListener.bind(emitter)
|
|
};
|
|
};
|
|
|
|
const normalizeEvents = event => Array.isArray(event) ? event : [event];
|
|
|
|
const multiple = (emitter, event, options) => {
|
|
let cancel;
|
|
const ret = new Promise((resolve, reject) => {
|
|
options = Object.assign({
|
|
rejectionEvents: ['error'],
|
|
multiArgs: false,
|
|
resolveImmediately: false
|
|
}, options);
|
|
|
|
if (!(options.count >= 0 && (options.count === Infinity || Number.isInteger(options.count)))) {
|
|
throw new TypeError('The `count` option should be at least 0 or more');
|
|
}
|
|
|
|
// Allow multiple events
|
|
const events = normalizeEvents(event);
|
|
|
|
const items = [];
|
|
const {addListener, removeListener} = normalizeEmitter(emitter);
|
|
|
|
const onItem = (...args) => {
|
|
const value = options.multiArgs ? args : args[0];
|
|
|
|
if (options.filter && !options.filter(value)) {
|
|
return;
|
|
}
|
|
|
|
items.push(value);
|
|
|
|
if (options.count === items.length) {
|
|
cancel();
|
|
resolve(items);
|
|
}
|
|
};
|
|
|
|
const rejectHandler = error => {
|
|
cancel();
|
|
reject(error);
|
|
};
|
|
|
|
cancel = () => {
|
|
for (const event of events) {
|
|
removeListener(event, onItem);
|
|
}
|
|
|
|
for (const rejectionEvent of options.rejectionEvents) {
|
|
removeListener(rejectionEvent, rejectHandler);
|
|
}
|
|
};
|
|
|
|
for (const event of events) {
|
|
addListener(event, onItem);
|
|
}
|
|
|
|
for (const rejectionEvent of options.rejectionEvents) {
|
|
addListener(rejectionEvent, rejectHandler);
|
|
}
|
|
|
|
if (options.resolveImmediately) {
|
|
resolve(items);
|
|
}
|
|
});
|
|
|
|
ret.cancel = cancel;
|
|
|
|
if (typeof options.timeout === 'number') {
|
|
const timeout = pTimeout(ret, options.timeout);
|
|
timeout.cancel = cancel;
|
|
return timeout;
|
|
}
|
|
|
|
return ret;
|
|
};
|
|
|
|
module.exports = (emitter, event, options) => {
|
|
if (typeof options === 'function') {
|
|
options = {filter: options};
|
|
}
|
|
|
|
options = Object.assign({}, options, {
|
|
count: 1,
|
|
resolveImmediately: false
|
|
});
|
|
|
|
const arrayPromise = multiple(emitter, event, options);
|
|
|
|
const promise = arrayPromise.then(array => array[0]);
|
|
promise.cancel = arrayPromise.cancel;
|
|
|
|
return promise;
|
|
};
|
|
|
|
module.exports.multiple = multiple;
|
|
|
|
module.exports.iterator = (emitter, event, options) => {
|
|
if (typeof options === 'function') {
|
|
options = {filter: options};
|
|
}
|
|
|
|
// Allow multiple events
|
|
const events = normalizeEvents(event);
|
|
|
|
options = Object.assign({
|
|
rejectionEvents: ['error'],
|
|
resolutionEvents: [],
|
|
limit: Infinity,
|
|
multiArgs: false
|
|
}, options);
|
|
|
|
const {limit} = options;
|
|
const isValidLimit = limit >= 0 && (limit === Infinity || Number.isInteger(limit));
|
|
if (!isValidLimit) {
|
|
throw new TypeError('The `limit` option should be a non-negative integer or Infinity');
|
|
}
|
|
|
|
if (limit === 0) {
|
|
// Return an empty async iterator to avoid any further cost
|
|
return {
|
|
[Symbol.asyncIterator]() {
|
|
return this;
|
|
},
|
|
next() {
|
|
return Promise.resolve({done: true, value: undefined});
|
|
}
|
|
};
|
|
}
|
|
|
|
let isLimitReached = false;
|
|
|
|
const {addListener, removeListener} = normalizeEmitter(emitter);
|
|
|
|
let done = false;
|
|
let error;
|
|
let hasPendingError = false;
|
|
const nextQueue = [];
|
|
const valueQueue = [];
|
|
let eventCount = 0;
|
|
|
|
const valueHandler = (...args) => {
|
|
eventCount++;
|
|
isLimitReached = eventCount === limit;
|
|
|
|
const value = options.multiArgs ? args : args[0];
|
|
|
|
if (nextQueue.length > 0) {
|
|
const {resolve} = nextQueue.shift();
|
|
|
|
resolve({done: false, value});
|
|
|
|
if (isLimitReached) {
|
|
cancel();
|
|
}
|
|
|
|
return;
|
|
}
|
|
|
|
valueQueue.push(value);
|
|
|
|
if (isLimitReached) {
|
|
cancel();
|
|
}
|
|
};
|
|
|
|
const cancel = () => {
|
|
done = true;
|
|
for (const event of events) {
|
|
removeListener(event, valueHandler);
|
|
}
|
|
|
|
for (const rejectionEvent of options.rejectionEvents) {
|
|
removeListener(rejectionEvent, rejectHandler);
|
|
}
|
|
|
|
for (const resolutionEvent of options.resolutionEvents) {
|
|
removeListener(resolutionEvent, resolveHandler);
|
|
}
|
|
|
|
while (nextQueue.length > 0) {
|
|
const {resolve} = nextQueue.shift();
|
|
resolve({done: true, value: undefined});
|
|
}
|
|
};
|
|
|
|
const rejectHandler = (...args) => {
|
|
error = options.multiArgs ? args : args[0];
|
|
|
|
if (nextQueue.length > 0) {
|
|
const {reject} = nextQueue.shift();
|
|
reject(error);
|
|
} else {
|
|
hasPendingError = true;
|
|
}
|
|
|
|
cancel();
|
|
};
|
|
|
|
const resolveHandler = (...args) => {
|
|
const value = options.multiArgs ? args : args[0];
|
|
|
|
if (options.filter && !options.filter(value)) {
|
|
return;
|
|
}
|
|
|
|
if (nextQueue.length > 0) {
|
|
const {resolve} = nextQueue.shift();
|
|
resolve({done: true, value});
|
|
} else {
|
|
valueQueue.push(value);
|
|
}
|
|
|
|
cancel();
|
|
};
|
|
|
|
for (const event of events) {
|
|
addListener(event, valueHandler);
|
|
}
|
|
|
|
for (const rejectionEvent of options.rejectionEvents) {
|
|
addListener(rejectionEvent, rejectHandler);
|
|
}
|
|
|
|
for (const resolutionEvent of options.resolutionEvents) {
|
|
addListener(resolutionEvent, resolveHandler);
|
|
}
|
|
|
|
return {
|
|
[symbolAsyncIterator]() {
|
|
return this;
|
|
},
|
|
next() {
|
|
if (valueQueue.length > 0) {
|
|
const value = valueQueue.shift();
|
|
return Promise.resolve({done: done && valueQueue.length === 0 && !isLimitReached, value});
|
|
}
|
|
|
|
if (hasPendingError) {
|
|
hasPendingError = false;
|
|
return Promise.reject(error);
|
|
}
|
|
|
|
if (done) {
|
|
return Promise.resolve({done: true, value: undefined});
|
|
}
|
|
|
|
return new Promise((resolve, reject) => nextQueue.push({resolve, reject}));
|
|
},
|
|
return(value) {
|
|
cancel();
|
|
return Promise.resolve({done, value});
|
|
}
|
|
};
|
|
};
|