Follow me if you want to grow your full-stack JavaScript skills with my screencasts and courses.

node streams

Resources

Excellent article on node streams: https://github.com/substack/stream-handbook

A common use of streams

The following code has an issue, the file is too big to fit into a buffer and when we hit the url, it gives us an error.

app.js
  
  var http         = require('http');
  var fs           = require('fs');

  var server = http.createServer(function (req, res) {
      fs.readFile('./video.mov', function (err, data) {
          res.end(data);
      });
  });

  server.listen(8000);

  

Node streams to the rescue! If we refactor this code to open a read stream to the file, and pipe it to the response stream it starts buffering immediately and doesn't attempt to load the whole file into memory, it handles a chunk at a time:

app.js
  

 var http         = require('http');
 var fs           = require('fs');

 var server = http.createServer(function (req, res) {
    var stream = fs.createReadStream('./video.mov');
    stream.pipe(res);
 });

 server.listen(8000);

  

Creating our own read stream

Coding up your own stream is not only terribly useful, but also a lot of fun. We're gonna code up a read stream that reads a bunch of words from a file, and then return a random word on demand.

Usually we would like to work with streams when reading from a file, but in this scenario I wanted to be able to pluck a random word from the file, and so had to load the whole contents of the file into memory. It's not a problem in this scenario, because the file isn't too big, but usually we would want to rely on streams as much as possible.

app.js
  
  var fs           = require('fs');
  var stream = require('stream');


  function randomWordStream(maxWords){
      var rs = stream.Readable();
      var wordContents = fs.readFileSync('./words', 'utf8');
      var words = wordContents.split("\n");
      var currentWord = 0;

      rs._read = function () {
          if (currentWord >= maxWords) {
              return rs.push(null);
          }

          currentWord ++;
          setTimeout(function () {

              var randomIndex = Math.floor((Math.random() * words.length));
              rs.push(words[randomIndex] + '\n');
          }, 100);
      };

      return rs;
  }

  var rws = new randomWordStream();
  rws.pipe(process.stdout);
  process.on('exit', function () {
      console.log('\n');
  });

  process.stdout.on('error', process.exit);
  

Running this starts spitting out random words:

output
  
  oarsman
  esthesiography
  undecreased
  unopined
  Columbid
  aseismatic
  unbarbarous
  gyroscope
  lupulic
  bipod
  [Finished in 1.3s]

  

If we didn't specify a maxWords argument, this read stream would be happy to go on forever returning random words from our array.

A write stream

We also want to see what's involved in creating a write stream. Our example write stream doesn't do a lot more than process.stdout in the previous example, but it shows you what's involved

We called next in a timeout, to illustrate that it can do async stuff, for example write to a DB.

app.js
  

  ...

  function wordWriteStream(){
      var ws =  stream.Writable();
      ws._write = function (chunk, enc, next) {
          console.log(chunk.toString());
          setTimeout(function(){
              next();
          }, 1000);
      };

      return ws;
  }

  new randomWordStream().pipe(new wordWriteStream());

  

Running that yields the following:

output
  
geomantically

rufescent

procteurynter

sporangiole

[Cancelled]
  

A transform stream

Transform streams act as read streams (providing data), and write streams (doing something with the data). Here's an example of a transform stream that uppercase each word, notice how it is piped inbetween the read stream and the write stream. You can have a whole bunch of transform streams piped inbetween the read stream and write stream, doing all sorts of stuff.

app.js
  

  ...

  function upperCaseStream(){
      var ws = stream.Transform();

      ws._transform = function(chunk, enc, next){
          var upper = chunk.toString().toUpperCase();
          this.push(upper);
          setTimeout(function(){
              next();
          }, 100);
      }

      return ws;
  }

  var rws = new randomWordStream();
  var ucs = new upperCaseStream();
  var wws = new wordWriteStream();

  rws.pipe(ucs).pipe(wws);
  

Running that yields the following:

output
  
  COUPONLESS

  GOPURA

  KIYAS

  MONKISH

  DESTRUCTIONISM

  GIGANTOCYTE

  HYDROPHILISM

  FORAMINIFERA

  FRATICELLIAN

  OVERLOOK

  FLAGGERY

  CLOTWEED

  QUEET

  LETCHY

  ACTINOSTEREOSCOPY

  FRINGELET

  BILIATION

  [Cancelled]
  

Wrap up

Node streams are extremely elegant and fun to work with. Hopefully I've shown you how easy it is to code up your own and pipe them together.