143 lines
3.8 KiB
JavaScript
143 lines
3.8 KiB
JavaScript
|
var Stream = require('stream');
|
||
|
var Promise = require('bluebird');
|
||
|
var util = require('util');
|
||
|
var Buffer = require('./Buffer');
|
||
|
var strFunction = 'function';
|
||
|
|
||
|
// Backwards compatibility for node versions < 8
|
||
|
if (!Stream.Writable || !Stream.Writable.prototype.destroy)
|
||
|
Stream = require('readable-stream');
|
||
|
|
||
|
function PullStream() {
|
||
|
if (!(this instanceof PullStream))
|
||
|
return new PullStream();
|
||
|
|
||
|
Stream.Duplex.call(this,{decodeStrings:false, objectMode:true});
|
||
|
this.buffer = Buffer.from('');
|
||
|
var self = this;
|
||
|
self.on('finish',function() {
|
||
|
self.finished = true;
|
||
|
self.emit('chunk',false);
|
||
|
});
|
||
|
}
|
||
|
|
||
|
util.inherits(PullStream,Stream.Duplex);
|
||
|
|
||
|
PullStream.prototype._write = function(chunk,e,cb) {
|
||
|
this.buffer = Buffer.concat([this.buffer,chunk]);
|
||
|
this.cb = cb;
|
||
|
this.emit('chunk');
|
||
|
};
|
||
|
|
||
|
|
||
|
// The `eof` parameter is interpreted as `file_length` if the type is number
|
||
|
// otherwise (i.e. buffer) it is interpreted as a pattern signaling end of stream
|
||
|
PullStream.prototype.stream = function(eof,includeEof) {
|
||
|
var p = Stream.PassThrough();
|
||
|
var done,self= this;
|
||
|
|
||
|
function cb() {
|
||
|
if (typeof self.cb === strFunction) {
|
||
|
var callback = self.cb;
|
||
|
self.cb = undefined;
|
||
|
return callback();
|
||
|
}
|
||
|
}
|
||
|
|
||
|
function pull() {
|
||
|
var packet;
|
||
|
if (self.buffer && self.buffer.length) {
|
||
|
if (typeof eof === 'number') {
|
||
|
packet = self.buffer.slice(0,eof);
|
||
|
self.buffer = self.buffer.slice(eof);
|
||
|
eof -= packet.length;
|
||
|
done = !eof;
|
||
|
} else {
|
||
|
var match = self.buffer.indexOf(eof);
|
||
|
if (match !== -1) {
|
||
|
if (includeEof) match = match + eof.length;
|
||
|
packet = self.buffer.slice(0,match);
|
||
|
self.buffer = self.buffer.slice(match);
|
||
|
done = true;
|
||
|
} else {
|
||
|
var len = self.buffer.length - eof.length;
|
||
|
if (len <= 0) {
|
||
|
cb();
|
||
|
} else {
|
||
|
packet = self.buffer.slice(0,len);
|
||
|
self.buffer = self.buffer.slice(len);
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
if (packet) p.write(packet,function() {
|
||
|
if (self.buffer.length === 0 || (eof.length && self.buffer.length <= eof.length)) cb();
|
||
|
});
|
||
|
}
|
||
|
|
||
|
if (!done) {
|
||
|
if (self.finished && !this.__ended) {
|
||
|
self.removeListener('chunk',pull);
|
||
|
self.emit('error', new Error('FILE_ENDED'));
|
||
|
this.__ended = true;
|
||
|
return;
|
||
|
}
|
||
|
|
||
|
} else {
|
||
|
self.removeListener('chunk',pull);
|
||
|
p.end();
|
||
|
}
|
||
|
}
|
||
|
|
||
|
self.on('chunk',pull);
|
||
|
pull();
|
||
|
return p;
|
||
|
};
|
||
|
|
||
|
PullStream.prototype.pull = function(eof,includeEof) {
|
||
|
if (eof === 0) return Promise.resolve('');
|
||
|
|
||
|
// If we already have the required data in buffer
|
||
|
// we can resolve the request immediately
|
||
|
if (!isNaN(eof) && this.buffer.length > eof) {
|
||
|
var data = this.buffer.slice(0,eof);
|
||
|
this.buffer = this.buffer.slice(eof);
|
||
|
return Promise.resolve(data);
|
||
|
}
|
||
|
|
||
|
// Otherwise we stream until we have it
|
||
|
var buffer = Buffer.from(''),
|
||
|
self = this;
|
||
|
|
||
|
var concatStream = Stream.Transform();
|
||
|
concatStream._transform = function(d,e,cb) {
|
||
|
buffer = Buffer.concat([buffer,d]);
|
||
|
cb();
|
||
|
};
|
||
|
|
||
|
var rejectHandler;
|
||
|
var pullStreamRejectHandler;
|
||
|
return new Promise(function(resolve,reject) {
|
||
|
rejectHandler = reject;
|
||
|
pullStreamRejectHandler = function(e) {
|
||
|
self.__emittedError = e;
|
||
|
reject(e);
|
||
|
}
|
||
|
if (self.finished)
|
||
|
return reject(new Error('FILE_ENDED'));
|
||
|
self.once('error',pullStreamRejectHandler); // reject any errors from pullstream itself
|
||
|
self.stream(eof,includeEof)
|
||
|
.on('error',reject)
|
||
|
.pipe(concatStream)
|
||
|
.on('finish',function() {resolve(buffer);})
|
||
|
.on('error',reject);
|
||
|
})
|
||
|
.finally(function() {
|
||
|
self.removeListener('error',rejectHandler);
|
||
|
self.removeListener('error',pullStreamRejectHandler);
|
||
|
});
|
||
|
};
|
||
|
|
||
|
PullStream.prototype._read = function(){};
|
||
|
|
||
|
module.exports = PullStream;
|