'use strict'; var STREAM = require('stream'), UTIL = require('util'), StringDecoder = require('string_decoder').StringDecoder; function MemoryReadableStream(data, options) { if (!(this instanceof MemoryReadableStream)) return new MemoryReadableStream(data, options); MemoryReadableStream.super_.call(this, options); this.init(data, options); } UTIL.inherits(MemoryReadableStream, STREAM.Readable); function MemoryWritableStream(data, options) { if (!(this instanceof MemoryWritableStream)) return new MemoryWritableStream(data, options); MemoryWritableStream.super_.call(this, options); this.init(data, options); } UTIL.inherits(MemoryWritableStream, STREAM.Writable); function MemoryDuplexStream(data, options) { if (!(this instanceof MemoryDuplexStream)) return new MemoryDuplexStream(data, options); MemoryDuplexStream.super_.call(this, options); this.init(data, options); } UTIL.inherits(MemoryDuplexStream, STREAM.Duplex); MemoryReadableStream.prototype.init = MemoryWritableStream.prototype.init = MemoryDuplexStream.prototype.init = function init (data, options) { var self = this; this.queue = []; if (data) { if (!Array.isArray(data)) { data = [ data ]; } data.forEach(function (chunk) { if (!(chunk instanceof Buffer)) { chunk = new Buffer(chunk); } self.queue.push(chunk); }); } options = options || {}; this.maxbufsize = options.hasOwnProperty('maxbufsize') ? options.maxbufsize : null; this.bufoverflow = options.hasOwnProperty('bufoverflow') ? options.bufoverflow : null; this.frequence = options.hasOwnProperty('frequence') ? options.frequence : null; }; function MemoryStream (data, options) { if (!(this instanceof MemoryStream)) return new MemoryStream(data, options); options = options || {}; var readable = options.hasOwnProperty('readable') ? options.readable : true, writable = options.hasOwnProperty('writable') ? options.writable : true; if (readable && writable) { return new MemoryDuplexStream(data, options); } else if (readable) { return new MemoryReadableStream(data, options); } else if (writable) { return new MemoryWritableStream(data, options); } else { throw new Error("Unknown stream type Readable, Writable or Duplex "); } } MemoryStream.createReadStream = function (data, options) { options = options || {}; options.readable = true; options.writable = false; return new MemoryStream(data, options); }; MemoryStream.createWriteStream = function (data, options) { options = options || {}; options.readable = false; options.writable = true; return new MemoryStream(data, options); }; MemoryReadableStream.prototype._read = MemoryDuplexStream.prototype._read = function _read (n) { var self = this, frequence = self.frequence || 0, wait_data = this instanceof STREAM.Duplex && ! this._writableState.finished ? true : false; if ( ! this.queue.length && ! wait_data) { this.push(null);// finish stream } else if (this.queue.length) { setTimeout(function () { if (self.queue.length) { var chunk = self.queue.shift(); if (chunk && ! self._readableState.ended) { if ( ! self.push(chunk) ) { self.queue.unshift(chunk); } } } }, frequence); } }; MemoryWritableStream.prototype._write = MemoryDuplexStream.prototype._write = function _write (chunk, encoding, cb) { var decoder = null; try { decoder = this.decodeStrings && encoding ? new StringDecoder(encoding) : null; } catch (err){ return cb(err); } var decoded_chunk = decoder ? decoder.write(chunk) : chunk, queue_size = this._getQueueSize(), chunk_size = decoded_chunk.length; if (this.maxbufsize && (queue_size + chunk_size) > this.maxbufsize ) { if (this.bufoverflow) { return cb("Buffer overflowed (" + this.bufoverflow + "/" + queue_size + ")"); } else { return cb(); } } if (this instanceof STREAM.Duplex) { while (this.queue.length) { this.push(this.queue.shift()); } this.push(decoded_chunk); } else { this.queue.push(decoded_chunk); } cb(); }; MemoryDuplexStream.prototype.end = function (chunk, encoding, cb) { var self = this; return MemoryDuplexStream.super_.prototype.end.call(this, chunk, encoding, function () { self.push(null);//finish readble stream too if (cb) cb(); }); }; MemoryReadableStream.prototype._getQueueSize = MemoryWritableStream.prototype._getQueueSize = MemoryDuplexStream.prototype._getQueueSize = function () { var queuesize = 0, i; for (i = 0; i < this.queue.length; i++) { queuesize += Array.isArray(this.queue[i]) ? this.queue[i][0].length : this.queue[i].length; } return queuesize; }; MemoryWritableStream.prototype.toString = MemoryDuplexStream.prototype.toString = MemoryReadableStream.prototype.toString = MemoryWritableStream.prototype.getAll = MemoryDuplexStream.prototype.getAll = MemoryReadableStream.prototype.getAll = function () { var self = this, ret = ''; this.queue.forEach(function (data) { ret += data; }); return ret; }; MemoryWritableStream.prototype.toBuffer = MemoryDuplexStream.prototype.toBuffer = MemoryReadableStream.prototype.toBuffer = function () { var buffer = new Buffer(this._getQueueSize()), currentOffset = 0; this.queue.forEach(function (data) { var data_buffer = data instanceof Buffer ? data : new Buffer(data); data_buffer.copy(buffer, currentOffset); currentOffset += data.length; }); return buffer; }; module.exports = MemoryStream;