var util = require('util'); varĀ events = require('events').EventEmitter; var qjob = function(options) { if(false === (this instanceof qjob)) { return new qjob(options); } this.maxConcurrency = 10; this.jobsRunning = 0; this.jobsDone = 0; this.jobsTotal = 0; this.timeStart; this.jobId = 0; this.jobsList = []; this.paused = false; this.pausedId = null; this.lastPause = 0; this.interval = null; this.stopAdding = false; this.sleeping = false; this.aborting = false; if (options) { this.maxConcurrency = options.maxConcurrency || this.maxConcurrency; this.interval = options.interval || this.interval; } events.call(this); }; util.inherits(qjob, events); /* * helper to set max concurrency */ qjob.prototype.setConcurrency = function(max) { this.maxConcurrency = max; } /* * helper to set delay between rafales */ qjob.prototype.setInterval = function(delay) { this.interval = delay; } /* * add some jobs in the queue */ qjob.prototype.add = function(job,args) { var self = this; self.jobsList.push([job,args]); self.jobsTotal++; } /* * */ qjob.prototype.sleepDueToInterval = function() { var self = this; if (this.interval === null) { return; } if (this.sleeping) { return true; } if (this.stopAdding) { if (this.jobsRunning > 0) { //console.log('waiting for '+jobsRunning+' jobs to finish'); return true; } //console.log('waiting for '+rafaleDelay+' ms'); this.sleeping = true; self.emit('sleep'); setTimeout(function() { this.stopAdding = false; this.sleeping = false; self.emit('continu'); self.run(); }.bind(self),this.interval); return true; } if (this.jobsRunning + 1 == this.maxConcurrency) { //console.log('max concurrent jobs reached'); this.stopAdding = true; return true; } } /* * run the queue */ qjob.prototype.run = function() { var self = this; // first launch, let's emit start event if (this.jobsDone == 0) { self.emit('start'); this.timeStart = Date.now(); } if (self.sleepDueToInterval()) return; if (self.aborting) { this.jobsList = []; } // while queue is empty and number of job running // concurrently are less than max job running, // then launch the next job while (this.jobsList.length && this.jobsRunning < this.maxConcurrency) { // get the next job and // remove it from the queue var job = self.jobsList.shift(); // increment number of job running self.jobsRunning++; // fetch args for the job var args = job[1]; // add jobId in args args._jobId = this.jobId++; // emit jobStart event self.emit('jobStart',args); // run the job setTimeout(function() { this.j(this.args,self.next.bind(self,this.args)); }.bind({j:job[0],args:args}),1); } // all jobs done ? emit end event if (this.jobsList.length == 0 && this.jobsRunning == 0) { self.emit('end'); } } /* * a task has been terminated, * so 'next()' has been called */ qjob.prototype.next = function(args) { var self = this; // update counters this.jobsRunning--; this.jobsDone++; // emit 'jobEnd' event self.emit('jobEnd',args); // if queue has been set to pause // then do nothing if (this.paused) return; // else, execute run() function self.run(); } /* * You can 'pause' jobs. * it will not pause running jobs, but * it will stop launching pending jobs * until paused = false */ qjob.prototype.pause = function(status) { var self = this; this.paused = status; if (!this.paused && this.pausedId) { clearInterval(this.pausedId); self.emit('unpause'); this.run(); } if (this.paused && !this.pausedId) { self.lastPause = Date.now(); this.pausedId = setInterval(function() { var since = Date.now() - self.lastPause; self.emit('pause',since); },1000); return; } } qjob.prototype.stats = function() { var now = Date.now(); var o = {}; o._timeStart = this.timeStart || 'N/A'; o._timeElapsed = (now - this.timeStart) || 'N/A'; o._jobsTotal = this.jobsTotal; o._jobsRunning = this.jobsRunning; o._jobsDone = this.jobsDone; o._progress = Math.floor((this.jobsDone/this.jobsTotal)*100); o._concurrency = this.maxConcurrency; if (this.paused) { o._status = 'Paused'; return o; } if (o._timeElapsed == 'N/A') { o._status = 'Starting'; return o; } if (this.jobsTotal == this.jobsDone) { o._status = 'Finished'; return o; } o._status = 'Running'; return o; } qjob.prototype.abort = function() { this.aborting = true; } module.exports = qjob;