This repository has been archived by the owner on Dec 6, 2018. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 44
/
Copy pathread-stream.js
72 lines (62 loc) · 1.71 KB
/
read-stream.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
var IteratorStream = require('level-iterator-stream');
var inherits = require('util').inherits;
var EncodingError = require('level-errors').EncodingError;
function wrapIterator(it, options, makeData) {
return {
next: function (callback) {
it.next(function(err, key, value) {
if (err) {
return callback(err);
}
if (key === undefined && value === undefined) {
return callback(err, key, value);
}
var data;
try {
data = makeData(key, value);
} catch (err) {
return callback(new EncodingError(err));
}
if (options.keys !== false && options.values === false) {
return callback(err, data, value);
}
if (options.keys === false && options.values !== false) {
return callback(err, key, data);
}
return callback(err, data.key, data.value);
});
},
end: function end(callback) {
return it.end(callback);
}
}
}
function ReadStream (options, makeData) {
if (!(this instanceof ReadStream)) {
return new ReadStream(options, makeData);
}
IteratorStream.call(this, null, options);
this._waiting = false;
this._makeData = makeData;
}
inherits(ReadStream, IteratorStream)
ReadStream.prototype.setIterator = function (it) {
this._iterator = wrapIterator(it, this._options, this._makeData);
if (this.destroyed) {
this._iterator.end(function () {});
return;
}
if (this._waiting) {
this._waiting = false
this._read();
return;
}
};
ReadStream.prototype._read = function () {
if (!this._iterator) {
this._waiting = true;
return;
}
IteratorStream.prototype._read.call(this);
}
module.exports = ReadStream