147 lines
3.7 KiB
JavaScript
147 lines
3.7 KiB
JavaScript
|
import ruit from 'ruit'
|
||
|
|
||
|
// Store the erre the API methods to handle the plugins installation
|
||
|
const API_METHODS = new Set()
|
||
|
const UNSUBSCRIBE_SYMBOL = Symbol()
|
||
|
const UNSUBSCRIBE_METHOD = 'off'
|
||
|
const CANCEL_METHOD = 'cancel'
|
||
|
|
||
|
/**
|
||
|
* Factory function to create the stream generator
|
||
|
* @private
|
||
|
* @param {Set} modifiers - stream input modifiers
|
||
|
* @returns {Generator} the stream generator
|
||
|
*/
|
||
|
function createStream(modifiers) {
|
||
|
const stream = (function *stream() {
|
||
|
while (true) {
|
||
|
// get the initial stream value
|
||
|
const input = yield
|
||
|
|
||
|
// run the input sequence
|
||
|
yield ruit(input, ...modifiers)
|
||
|
}
|
||
|
})()
|
||
|
|
||
|
// start the stream
|
||
|
stream.next()
|
||
|
|
||
|
return stream
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* Dispatch a value to several listeners
|
||
|
* @private
|
||
|
* @param {Set} callbacks - callbacks collection
|
||
|
* @param {*} value - anything
|
||
|
* @returns {Set} the callbacks received
|
||
|
*/
|
||
|
function dispatch(callbacks, value) {
|
||
|
callbacks.forEach(f => {
|
||
|
// unsubscribe the callback if erre.unsubscribe() will be returned
|
||
|
if (f(value) === UNSUBSCRIBE_SYMBOL) callbacks.delete(f)
|
||
|
})
|
||
|
|
||
|
return callbacks
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* Throw a panic error
|
||
|
* @param {string} message - error message
|
||
|
* @returns {Error} an error object
|
||
|
*/
|
||
|
function panic(message) {
|
||
|
throw new Error(message)
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* Install an erre plugin adding it to the API
|
||
|
* @param {string} name - plugin name
|
||
|
* @param {Function} fn - new erre API method
|
||
|
* @returns {Function} return the erre function
|
||
|
*/
|
||
|
erre.install = function(name, fn) {
|
||
|
if (!name || typeof name !== 'string')
|
||
|
panic('Please provide a name (as string) for your erre plugin')
|
||
|
if (!fn || typeof fn !== 'function')
|
||
|
panic('Please provide a function for your erre plugin')
|
||
|
|
||
|
if (API_METHODS.has(name)) {
|
||
|
panic(`The ${name} is already part of the erre API, please provide a different name`)
|
||
|
} else {
|
||
|
erre[name] = fn
|
||
|
API_METHODS.add(name)
|
||
|
}
|
||
|
|
||
|
return erre
|
||
|
}
|
||
|
|
||
|
// alias for ruit canel to stop a stream chain
|
||
|
erre.install(CANCEL_METHOD, ruit.cancel)
|
||
|
|
||
|
// unsubscribe helper
|
||
|
erre.install(UNSUBSCRIBE_METHOD, () => UNSUBSCRIBE_SYMBOL)
|
||
|
|
||
|
/**
|
||
|
* Stream constuction function
|
||
|
* @param {...Function} fns - stream modifiers
|
||
|
* @returns {Object} erre instance
|
||
|
*/
|
||
|
export default function erre(...fns) {
|
||
|
const
|
||
|
[success, error, end, modifiers] = [new Set(), new Set(), new Set(), new Set(fns)],
|
||
|
generator = createStream(modifiers),
|
||
|
stream = Object.create(generator),
|
||
|
addToCollection = (collection) => (fn) => collection.add(fn) && stream,
|
||
|
deleteFromCollection = (collection) => (fn) => collection.delete(fn) ? stream
|
||
|
: panic('Couldn\'t remove handler passed by reference')
|
||
|
|
||
|
return Object.assign(stream, {
|
||
|
on: Object.freeze({
|
||
|
value: addToCollection(success),
|
||
|
error: addToCollection(error),
|
||
|
end: addToCollection(end)
|
||
|
}),
|
||
|
off: Object.freeze({
|
||
|
value: deleteFromCollection(success),
|
||
|
error: deleteFromCollection(error),
|
||
|
end: deleteFromCollection(end)
|
||
|
}),
|
||
|
connect: addToCollection(modifiers),
|
||
|
push(input) {
|
||
|
const { value, done } = stream.next(input)
|
||
|
|
||
|
// dispatch the stream events
|
||
|
if (!done) {
|
||
|
value.then(
|
||
|
res => dispatch(success, res),
|
||
|
err => dispatch(error, err)
|
||
|
)
|
||
|
}
|
||
|
|
||
|
return stream
|
||
|
},
|
||
|
end() {
|
||
|
// kill the stream
|
||
|
generator.return()
|
||
|
// dispatch the end event
|
||
|
dispatch(end)
|
||
|
// clean up all the collections
|
||
|
;[success, error, end, modifiers].forEach(el => el.clear())
|
||
|
|
||
|
return stream
|
||
|
},
|
||
|
fork() {
|
||
|
return erre(...modifiers)
|
||
|
},
|
||
|
next(input) {
|
||
|
// get the input and run eventually the promise
|
||
|
const result = generator.next(input)
|
||
|
|
||
|
// pause to the next iteration
|
||
|
generator.next()
|
||
|
|
||
|
return result
|
||
|
}
|
||
|
})
|
||
|
}
|