forked from caolan/async
-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathcargo.js
70 lines (62 loc) · 1.98 KB
/
cargo.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
'use strict';
var timing = require( './timing' );
var utils = require( './utils' );
module.exports = function cargo( worker, payload ) {
var working = false,
tasks = [];
var _cargo = {
tasks: tasks,
payload: payload,
saturated: null,
empty: null,
drain: null,
drained: true,
push: function( data, callback ) {
if ( !utils.isArray( data ) ) {
data = [ data ];
}
utils.each( data, function( task ) {
tasks.push( {
data: task,
callback: typeof callback === 'function' ? callback : null
} );
_cargo.drained = false;
if ( _cargo.saturated && tasks.length === payload ) {
_cargo.saturated();
}
} );
timing.setImmediate( _cargo.process );
},
process: function process() {
if ( working ) return;
if ( tasks.length === 0 ) {
if ( _cargo.drain && !_cargo.drained ) _cargo.drain();
_cargo.drained = true;
return;
}
var ts = typeof payload === 'number' ? tasks.splice( 0, payload ) : tasks.splice( 0, tasks.length );
var ds = utils.map( ts, function( task ) {
return task.data;
} );
if ( _cargo.empty ) _cargo.empty();
working = true;
worker( ds, function() {
working = false;
var args = arguments;
utils.each( ts, function( data ) {
if ( data.callback ) {
data.callback.apply( null, args );
}
} );
process();
} );
},
length: function() {
return tasks.length;
},
running: function() {
return working;
}
};
return _cargo;
};