'use strict'; Object.defineProperty(exports, "__esModule", { value: true }); exports.combine = combine; exports.combineArray = combineArray; var _Stream = require('../Stream'); var _Stream2 = _interopRequireDefault(_Stream); var _transform = require('./transform'); var transform = _interopRequireWildcard(_transform); var _core = require('../source/core'); var core = _interopRequireWildcard(_core); var _Pipe = require('../sink/Pipe'); var _Pipe2 = _interopRequireDefault(_Pipe); var _IndexSink = require('../sink/IndexSink'); var _IndexSink2 = _interopRequireDefault(_IndexSink); var _dispose = require('../disposable/dispose'); var dispose = _interopRequireWildcard(_dispose); var _prelude = require('@most/prelude'); var base = _interopRequireWildcard(_prelude); var _invoke = require('../invoke'); var _invoke2 = _interopRequireDefault(_invoke); function _interopRequireWildcard(obj) { if (obj && obj.__esModule) { return obj; } else { var newObj = {}; if (obj != null) { for (var key in obj) { if (Object.prototype.hasOwnProperty.call(obj, key)) { newObj[key] = obj[key]; } } } newObj.default = obj; return newObj; } } function _interopRequireDefault(obj) { return obj && obj.__esModule ? obj : { default: obj }; } /** @license MIT License (c) copyright 2010-2016 original author or authors */ /** @author Brian Cavalier */ /** @author John Hann */ var map = base.map; var tail = base.tail; /** * Combine latest events from all input streams * @param {function(...events):*} f function to combine most recent events * @returns {Stream} stream containing the result of applying f to the most recent * event of each input stream, whenever a new event arrives on any stream. */ function combine(f /*, ...streams */) { return combineArray(f, tail(arguments)); } /** * Combine latest events from all input streams * @param {function(...events):*} f function to combine most recent events * @param {[Stream]} streams most recent events * @returns {Stream} stream containing the result of applying f to the most recent * event of each input stream, whenever a new event arrives on any stream. */ function combineArray(f, streams) { var l = streams.length; return l === 0 ? core.empty() : l === 1 ? transform.map(f, streams[0]) : new _Stream2.default(combineSources(f, streams)); } function combineSources(f, streams) { return new Combine(f, map(getSource, streams)); } function getSource(stream) { return stream.source; } function Combine(f, sources) { this.f = f; this.sources = sources; } Combine.prototype.run = function (sink, scheduler) { var this$1 = this; var l = this.sources.length; var disposables = new Array(l); var sinks = new Array(l); var mergeSink = new CombineSink(disposables, sinks, sink, this.f); for (var indexSink, i = 0; i < l; ++i) { indexSink = sinks[i] = new _IndexSink2.default(i, mergeSink); disposables[i] = this$1.sources[i].run(indexSink, scheduler); } return dispose.all(disposables); }; function CombineSink(disposables, sinks, sink, f) { var this$1 = this; this.sink = sink; this.disposables = disposables; this.sinks = sinks; this.f = f; var l = sinks.length; this.awaiting = l; this.values = new Array(l); this.hasValue = new Array(l); for (var i = 0; i < l; ++i) { this$1.hasValue[i] = false; } this.activeCount = sinks.length; } CombineSink.prototype.error = _Pipe2.default.prototype.error; CombineSink.prototype.event = function (t, indexedValue) { var i = indexedValue.index; var awaiting = this._updateReady(i); this.values[i] = indexedValue.value; if (awaiting === 0) { this.sink.event(t, (0, _invoke2.default)(this.f, this.values)); } }; CombineSink.prototype._updateReady = function (index) { if (this.awaiting > 0) { if (!this.hasValue[index]) { this.hasValue[index] = true; this.awaiting -= 1; } } return this.awaiting; }; CombineSink.prototype.end = function (t, indexedValue) { dispose.tryDispose(t, this.disposables[indexedValue.index], this.sink); if (--this.activeCount === 0) { this.sink.end(t, indexedValue.value); } };