'use strict'; Object.defineProperty(exports, "__esModule", { value: true }); exports.zip = zip; exports.zipArray = zipArray; var _Stream = require('../Stream'); var _Stream2 = _interopRequireDefault(_Stream); var _transform = require('./transform'); var transform = _interopRequireWildcard(_transform); var _core = require('../source/core'); var core = _interopRequireWildcard(_core); var _Pipe = require('../sink/Pipe'); var _Pipe2 = _interopRequireDefault(_Pipe); var _IndexSink = require('../sink/IndexSink'); var _IndexSink2 = _interopRequireDefault(_IndexSink); var _dispose = require('../disposable/dispose'); var dispose = _interopRequireWildcard(_dispose); var _prelude = require('@most/prelude'); var base = _interopRequireWildcard(_prelude); var _invoke = require('../invoke'); var _invoke2 = _interopRequireDefault(_invoke); var _Queue = require('../Queue'); var _Queue2 = _interopRequireDefault(_Queue); function _interopRequireWildcard(obj) { if (obj && obj.__esModule) { return obj; } else { var newObj = {}; if (obj != null) { for (var key in obj) { if (Object.prototype.hasOwnProperty.call(obj, key)) { newObj[key] = obj[key]; } } } newObj.default = obj; return newObj; } } function _interopRequireDefault(obj) { return obj && obj.__esModule ? obj : { default: obj }; } var map = base.map; /** @license MIT License (c) copyright 2010-2016 original author or authors */ /** @author Brian Cavalier */ /** @author John Hann */ var tail = base.tail; /** * Combine streams pairwise (or tuple-wise) by index by applying f to values * at corresponding indices. The returned stream ends when any of the input * streams ends. * @param {function} f function to combine values * @returns {Stream} new stream with items at corresponding indices combined * using f */ function zip(f /*, ...streams */) { return zipArray(f, tail(arguments)); } /** * Combine streams pairwise (or tuple-wise) by index by applying f to values * at corresponding indices. The returned stream ends when any of the input * streams ends. * @param {function} f function to combine values * @param {[Stream]} streams streams to zip using f * @returns {Stream} new stream with items at corresponding indices combined * using f */ function zipArray(f, streams) { return streams.length === 0 ? core.empty() : streams.length === 1 ? transform.map(f, streams[0]) : new _Stream2.default(new Zip(f, map(getSource, streams))); } function getSource(stream) { return stream.source; } function Zip(f, sources) { this.f = f; this.sources = sources; } Zip.prototype.run = function (sink, scheduler) { var this$1 = this; var l = this.sources.length; var disposables = new Array(l); var sinks = new Array(l); var buffers = new Array(l); var zipSink = new ZipSink(this.f, buffers, sinks, sink); for (var indexSink, i = 0; i < l; ++i) { buffers[i] = new _Queue2.default(); indexSink = sinks[i] = new _IndexSink2.default(i, zipSink); disposables[i] = this$1.sources[i].run(indexSink, scheduler); } return dispose.all(disposables); }; function ZipSink(f, buffers, sinks, sink) { this.f = f; this.sinks = sinks; this.sink = sink; this.buffers = buffers; } ZipSink.prototype.event = function (t, indexedValue) { // eslint-disable-line complexity var buffers = this.buffers; var buffer = buffers[indexedValue.index]; buffer.push(indexedValue.value); if (buffer.length() === 1) { if (!ready(this.buffers)) { return; } emitZipped(this.f, t, buffers, this.sink); if (ended(this.buffers, this.sinks)) { this.sink.end(t, void 0); } } }; ZipSink.prototype.end = function (t, indexedValue) { var buffer = this.buffers[indexedValue.index]; if (buffer.isEmpty()) { this.sink.end(t, indexedValue.value); } }; ZipSink.prototype.error = _Pipe2.default.prototype.error; function emitZipped(f, t, buffers, sink) { sink.event(t, (0, _invoke2.default)(f, map(head, buffers))); } function head(buffer) { return buffer.shift(); } function ended(buffers, sinks) { for (var i = 0, l = buffers.length; i < l; ++i) { if (buffers[i].isEmpty() && !sinks[i].active) { return true; } } return false; } function ready(buffers) { for (var i = 0, l = buffers.length; i < l; ++i) { if (buffers[i].isEmpty()) { return false; } } return true; }