145 lines
		
	
	
		
			3.2 KiB
		
	
	
	
		
			JavaScript
		
	
	
	
	
	
		
		
			
		
	
	
			145 lines
		
	
	
		
			3.2 KiB
		
	
	
	
		
			JavaScript
		
	
	
	
	
	
|  | 'use strict' | ||
|  | /* | ||
|  |  * merge2 | ||
|  |  * https://github.com/teambition/merge2
 | ||
|  |  * | ||
|  |  * Copyright (c) 2014-2020 Teambition | ||
|  |  * Licensed under the MIT license. | ||
|  |  */ | ||
|  | const Stream = require('stream') | ||
|  | const PassThrough = Stream.PassThrough | ||
|  | const slice = Array.prototype.slice | ||
|  | 
 | ||
|  | module.exports = merge2 | ||
|  | 
 | ||
|  | function merge2 () { | ||
|  |   const streamsQueue = [] | ||
|  |   const args = slice.call(arguments) | ||
|  |   let merging = false | ||
|  |   let options = args[args.length - 1] | ||
|  | 
 | ||
|  |   if (options && !Array.isArray(options) && options.pipe == null) { | ||
|  |     args.pop() | ||
|  |   } else { | ||
|  |     options = {} | ||
|  |   } | ||
|  | 
 | ||
|  |   const doEnd = options.end !== false | ||
|  |   const doPipeError = options.pipeError === true | ||
|  |   if (options.objectMode == null) { | ||
|  |     options.objectMode = true | ||
|  |   } | ||
|  |   if (options.highWaterMark == null) { | ||
|  |     options.highWaterMark = 64 * 1024 | ||
|  |   } | ||
|  |   const mergedStream = PassThrough(options) | ||
|  | 
 | ||
|  |   function addStream () { | ||
|  |     for (let i = 0, len = arguments.length; i < len; i++) { | ||
|  |       streamsQueue.push(pauseStreams(arguments[i], options)) | ||
|  |     } | ||
|  |     mergeStream() | ||
|  |     return this | ||
|  |   } | ||
|  | 
 | ||
|  |   function mergeStream () { | ||
|  |     if (merging) { | ||
|  |       return | ||
|  |     } | ||
|  |     merging = true | ||
|  | 
 | ||
|  |     let streams = streamsQueue.shift() | ||
|  |     if (!streams) { | ||
|  |       process.nextTick(endStream) | ||
|  |       return | ||
|  |     } | ||
|  |     if (!Array.isArray(streams)) { | ||
|  |       streams = [streams] | ||
|  |     } | ||
|  | 
 | ||
|  |     let pipesCount = streams.length + 1 | ||
|  | 
 | ||
|  |     function next () { | ||
|  |       if (--pipesCount > 0) { | ||
|  |         return | ||
|  |       } | ||
|  |       merging = false | ||
|  |       mergeStream() | ||
|  |     } | ||
|  | 
 | ||
|  |     function pipe (stream) { | ||
|  |       function onend () { | ||
|  |         stream.removeListener('merge2UnpipeEnd', onend) | ||
|  |         stream.removeListener('end', onend) | ||
|  |         if (doPipeError) { | ||
|  |           stream.removeListener('error', onerror) | ||
|  |         } | ||
|  |         next() | ||
|  |       } | ||
|  |       function onerror (err) { | ||
|  |         mergedStream.emit('error', err) | ||
|  |       } | ||
|  |       // skip ended stream
 | ||
|  |       if (stream._readableState.endEmitted) { | ||
|  |         return next() | ||
|  |       } | ||
|  | 
 | ||
|  |       stream.on('merge2UnpipeEnd', onend) | ||
|  |       stream.on('end', onend) | ||
|  | 
 | ||
|  |       if (doPipeError) { | ||
|  |         stream.on('error', onerror) | ||
|  |       } | ||
|  | 
 | ||
|  |       stream.pipe(mergedStream, { end: false }) | ||
|  |       // compatible for old stream
 | ||
|  |       stream.resume() | ||
|  |     } | ||
|  | 
 | ||
|  |     for (let i = 0; i < streams.length; i++) { | ||
|  |       pipe(streams[i]) | ||
|  |     } | ||
|  | 
 | ||
|  |     next() | ||
|  |   } | ||
|  | 
 | ||
|  |   function endStream () { | ||
|  |     merging = false | ||
|  |     // emit 'queueDrain' when all streams merged.
 | ||
|  |     mergedStream.emit('queueDrain') | ||
|  |     if (doEnd) { | ||
|  |       mergedStream.end() | ||
|  |     } | ||
|  |   } | ||
|  | 
 | ||
|  |   mergedStream.setMaxListeners(0) | ||
|  |   mergedStream.add = addStream | ||
|  |   mergedStream.on('unpipe', function (stream) { | ||
|  |     stream.emit('merge2UnpipeEnd') | ||
|  |   }) | ||
|  | 
 | ||
|  |   if (args.length) { | ||
|  |     addStream.apply(null, args) | ||
|  |   } | ||
|  |   return mergedStream | ||
|  | } | ||
|  | 
 | ||
|  | // check and pause streams for pipe.
 | ||
|  | function pauseStreams (streams, options) { | ||
|  |   if (!Array.isArray(streams)) { | ||
|  |     // Backwards-compat with old-style streams
 | ||
|  |     if (!streams._readableState && streams.pipe) { | ||
|  |       streams = streams.pipe(PassThrough(options)) | ||
|  |     } | ||
|  |     if (!streams._readableState || !streams.pause || !streams.pipe) { | ||
|  |       throw new Error('Only readable stream can be merged.') | ||
|  |     } | ||
|  |     streams.pause() | ||
|  |   } else { | ||
|  |     for (let i = 0, len = streams.length; i < len; i++) { | ||
|  |       streams[i] = pauseStreams(streams[i], options) | ||
|  |     } | ||
|  |   } | ||
|  |   return streams | ||
|  | } |