Thu, 27. June 2013
One of the biggest changes in Node.js 0.10 was surely the complete rewrite of the streams module. In this post, I'd like to show you how to use streams 2 and how to write your first own stream implementation.
First of all: The new streams implementation is downward compatible. If you
start using it the old way, it will behave the old way (minus some issues).
There is no need to migrate anything when switching to Node 0.10. The new
implementation has some advantages over the old implementation, like
"backpressure" handling, so we don't have to deal with pause
and resume
logic anymore. If you know your code is only running on Node 0.10 or newer,
using streams 2 will make streaming more robust and simpler.
Consider a simple http client that pulls down a website. With old streams the code would look like this:
var http = require('http');
http.get('http://maxantoni.de/blog/feed.rss', function (res) {
res.on('data', function (data) {
process.stdout.write(data);
});
});
The same thing using streams 2:
var http = require('http');
http.get('http://maxantoni.de/blog/feed.rss', function (res) {
res.on('readable', function () {
process.stdout.write(res.read());
});
});
The main difference here is that with old streams, you could miss 'data' events
if you registered the listener too late. With streams 2, you have to actively
consume the data using the read
function and therefore you're never loosing
anything.
Writeable streams did not change from a users perspective. Just call write
with a string or a buffer. Pipes also work the same way as they did before. The
above example could be shortened to this:
var http = require('http');
http.get('http://maxantoni.de/blog/feed.rss', function (res) {
res.pipe(process.stdout);
});
Here is a simple implementation of a readable stream that produces ISO timestamps on every read with a slight delay:
var stream = require('stream');
function TimestampStream(count, options) {
stream.Readable.call(this, options);
this.count = count;
}
TimestampStream.prototype = Object.create(stream.Readable.prototype, {
constructor : { value : TimestampStream }
});
TimestampStream.prototype._read = function () {
if (this.count-- === 0) {
this.push(null); // end
} else {
var s = this;
setTimeout(function () {
s.push(new Date().toISOString() + '\n');
}, 10);
}
};
Print 20 timestamps:
new TimestampStream(20).pipe(process.stdout);
2013-06-27T20:56:33.944Z
2013-06-27T20:56:33.959Z
2013-06-27T20:56:33.972Z
2013-06-27T20:56:33.983Z
...
Here is a writable stream implementation that only prints out a character every 25 milliseconds:
var stream = require('stream');
function writeForMovie(str, then) {
if (str.length === 0) {
return then();
}
setTimeout(function () {
process.stdout.write(str.charAt(0));
writeForMovie(str.substring(1), then);
}, 25);
}
function HollywoodOut(options) {
stream.Writable.call(this, options);
}
HollywoodOut.prototype = Object.create(stream.Writable.prototype, {
constructor : { value : HollywoodOut }
});
HollywoodOut.prototype._write = function (data, encoding, cb) {
var str = data.toString();
writeForMovie(str, cb);
};
Now let's combine the readable and the writable streams:
new TimestampStream(20).pipe(new HollywoodOut());
2013-06-27T21:02:21.243Z
2013-06-27T21:02:21.255Z
2013-06-27T21:02:21.267Z
2013-06-27T21:02:21.278Z
...
As you can see from the timestamps, all 20 lines get produced as before.
The output is buffered by the writable stream and then slowly written out.
To configure the buffer size, use the highWaterMark
option:
var out = new HollywoodOut({ highWaterMark : 30 });
new TimestampStream(20).pipe(out);
2013-06-27T21:04:21.147Z
2013-06-27T21:04:21.168Z
2013-06-27T21:04:21.182Z
2013-06-27T21:04:22.501Z
2013-06-27T21:04:22.513Z
2013-06-27T21:04:23.838Z
...
This setup leads to "backpressure" being produced by the writable stream.
pipe
handles backpressure automatically for us.
Note how the timestamps show when read
was called.
Happy streaming!