This package brings goodies of functional programming (map, filter, reduce) to node streams.
npm install --save object-stream-tools
Converts existing array to stream of objects. Useful if you want to inject/merge those object to the existing stream.
const ost = require('object-stream-tools')
ost.arrayToStream([{foo: 'bar'}, {web: 'scale'}])
.on('data', data => {
console.log(data)
})
.pipe(somewhereWritable)
Prints
[{foo: 'bar'}, {web: 'scale'}]
Its very useful if you want to get unique elements / set of values
const jsonStream = require('JSONStream')
fs.createReadStream('../test/data.json')
.pipe(jsonStream.parse('*'))
.pipe(ost.map(obj => obj.requiredProperty))
.pipe(ost.streamToSet())
.on('data', uniqueSet => {
// here one get array of unique elements
const uniqueArray = Array.from(uniqueSet.values()).sort()
})
If you just want to remove some objects from stream, you probably want to use filter function.
fs.createReadStream('../test/data.json')
.pipe(jsonStream.parse('*'))
.pipe(ost.filter(e => e.value > 6))
// here you will get filtered objects
.pipe(jsonStream.stringify())
.pipe(process.stdout)
Map is useful when you want to modify existing objects in the stream.
Reduce is useful if you want to get single object/value based on whole stream, but you dont want to load whole stream to memory.
Example: sum / average value of huge stream
const jsonStream = require('JSONStream')
fs.createReadStream('./test/data.json')
.pipe(jsonStream.parse('*'))
// pick required property you want to reduce over
.pipe(ost.map(obj => obj.requiredProperty))
.pipe(ost.reduce((acc, curr, i) => {
return acc + curr + i
}, 0))
.on('data', reducedValue => {
// here you will get reduced value
})
Here is example with buffered/string input output:
const jsonStream = require('JSONStream')
fs.createReadStream('./test/data.json')
.pipe(jsonStream.parse('*'))
.pipe(ost.map(obj => obj.requiredProperty))
.pipe(ost.reduce((acc, curr, i) => {
return acc + curr + i
}, 0))
.on('data', reducedValue =>
// here you will get reduced value
})
.pipe(jsonStream.stringify())
.pipe(process.stdout)
Please note that if you do not pass initial value reduce function will start in (prev, curr, i) mode. Objects/Array/Reduce
Very handy when you want to aggregate streams using reduce or derrivated calls. Keep in mind .promise() will not work if you use only ost.map or ost.reduce features - as they do not aggregate.
fs.createReadStream('../test/data.json')
.pipe(jsonStream.parse('*'))
.pipe(ost.streamToArray())
.promise()
.then(data => {
// here you will get your aggregated data - array of values.
})
Find is super handy if we want to quickly check if vale/objects exists in the stream. Think about it as a grep on the steroids.
fs.createReadStream('../test/data.json')
.pipe(jsonStream.parse('*'))
.pipe(ost.find(e => e.value > 6))
.then(foundObj => {
// here you will get found first object that matches condition
// or undefined when there were none that matches
})