Readable Streams


1 | What is a Readable Stream?


A Readable Stream is an abstraction for a source of data from which data is consumed.


The purpose of a Readable Stream is to be consumed. When a Readable Stream is consumed, it is possible to process its data, and more specifically to read its data, chunk by chunk. However, the important concept to remember is that a Readable Stream will not generate data until a mechanism for consuming that data is provided. If the consuming mechanism is disabled or taken away, the Readable Stream will attempt to stop generating the data.


There is always a 2-step process to follow when working with Readable Streams:


  1. Implement the Readable Stream. There can be several ways to do it.
  2. Consume the Readable Stream. There are also several ways to do it.

2 | How to consume a Readable Stream?


The way a Readable Stream is consumed depends on its reading mode. Readable Streams have two reading modes:


  1. The paused mode.
  2. The flowing mode.

It is possible to know if a Stream is in paused mode or not by calling the stream.Readable.prototype.isPaused() method. This method returns true if the Readable Stream is in paused mode or false if it is not.


The paused mode


By default, all Readable Streams start in paused mode when they are implemented.


The way to consume a Readable stream in paused mode is by registering a 'readable' event handler and by calling the stream.Readable.prototype.read() method inside the event handler to get the data of the Readable stream when a 'readable' event is emitted.


Since Readable Streams, just like any Stream, are Emitters, they emit specific events we can handle. Let us have a look at some of the important events that a Readable Stream can emit in paused mode:


#Event name (string)Event description
1'readable'The 'readable' event is emitted when there is data available to be read from the Stream. The 'readable' event will also be emitted once the end of the Stream data has been reached but before the 'end' event is emitted. Effectively, the 'readable' event indicates that the Stream has new information: either new data is available or the end of the stream has been reached. In the former case, stream.Readable.prototype.read() will return the available data. In the latter case, stream.Readable.prototype.read() will return null.
2'error'The 'error' event may be emitted by a Readable Stream at any time. Typically, this may occur if the underlying Stream is unable to generate data due to an underlying internal failure, or when a Stream implementation attempts to push an invalid chunk of data. The listener callback will be passed a single Error object.
3'end'The 'end' event is emitted when there is no more data to be consumed from the Stream. The 'end' event will not be emitted unless the data is completely consumed. This can be accomplished in paused mode by calling stream.Readable.prototype.read() repeatedly until all data has been consumed.
4'close'The 'close' event is emitted when the Stream and any of its underlying resources (a file descriptor, for example) have been closed. The event indicates that no more events will be emitted, and no further computation will occur. A Readable Stream will always emit the 'close' event if it is created with the emitClose option.

The flowing mode


A readable Stream can be switched to the flowing mode in 3 different ways:


  1. By registering a 'data' event handler.
  2. By calling the stream.Readable.prototype.resume() method.
  3. By piping the Readable Stream to a Writable Stream a or a Transform Stream.

When a Readable Stream is in flowing mode, it is either:


  • automatically and implicitly consumed, when it is piped to another stream (case #3).
  • explicitly consumed by registering the 'data' event handler (case #1).

Regarding case #2, when we switch a Readable Stream into flowing mode by calling the stream.Readable.prototype.resume() method, we must also register a 'data' event handler if we want to be able to read the data. Otherwise, the data of the Readable Stream will be lost and will not be read. However, the stream.Readable.prototype.resume() is usually used to consume the data from a Readable Stream without actually processing it. This comes handy if we want to consume a Readable Stream in case an error is raised from the very beginning of the consumption of the Readable stream.


Now, let us have a look at the important events a Readable Stream can emit when it is in flowing mode:


#Event name (string)Event description
1'data'The 'data' event is emitted whenever the Readable Stream is releasing a chunk of data to a consumer. The chunk of data is by default a Buffer. It can be a String if a String format has been specified for the Readable Stream using the stream.Readable.prototype.setEncoding() method. It can also be any other object if the Readable Stream is operating in object mode (objectMode: true).
2'error'The 'error' event may be emitted by a Readable stream at any time. Typically, this may occur if the underlying stream is unable to generate data due to an underlying internal failure, or when a stream implementation attempts to push an invalid chunk of data. The listener callback will be passed a single Error object.
3'end'The 'end' event is emitted when there is no more data to be consumed from the stream. The 'end' event will not be emitted unless the data is completely consumed.
4'close'The 'close' event is emitted when the Stream and any of its underlying resources (a file descriptor, for example) have been closed. The event indicates that no more events will be emitted, and no further computation will occur. A Readable Stream will always emit the 'close' event if it is created with the emitClose option.

The flowing mode over the paused mode


It is possible to switch back and forth between the 2 modes:


  • by calling the method stream.Readable.prototype.resume() to switch from paused mode to flowing mode
  • by calling the method stream.Readable.prototype.pause() to switch from flowing mode to paused mode.

However, it is absolutely not recommended to do so.


Use the flowing mode whenever you can, because it is easier to work with. In flowing mode:


  • There is no need to call the stream.Readable.prototype.read() to get the data of the Stream.
  • There is no need to check if the data is equal to null.
  • The chunks of data of the Readable Stream are released the same way they are pushed by the implementation of the stream.Readable.prototype._read() method.
  • Each chunk of data is automatically displayed on a new line when they are passed to the console.log method.

All of this makes the flowing mode more simple and straightforward.


3 | Chunks


Chunks of Buffers by default


By default, chunks of data of Readable Streams are chunks of Buffers.


Chunks of Strings


It is possible to convert the chunks of Buffers of a Readable Stream to chunks of Strings when the Readable Stream is implemented (at the level of the Readable Stream) or when it is is consumed (at the level of each chunk).


Upon implementation of the Readable Stream, this can be done by setting the String format 'utf-8' at the level of the Readable Stream. There are 2 ways to do so:


  1. by calling the method stream.Readable.prototype.setEncoding() on the Readable Stream with the argument 'utf-8'.
  2. by setting the property encoding: 'utf-8' of the object options passed as an argument to the constructor function stream.Readable().

Upon consumption of the Readable Stream, this can be done by converting each chunk of Buffer to a chunk of String, by calling the method Buffer.toString() on each chunk.


Chunks of any values (except null)


Node.js also allows Readable Streams to use chunks of any JavaScript values (except the value null, which serves a special purpose within Streams). In order to do so, Readable Streams must be created with the option objectMode set to true. This is possible by using the method stream.Readable.from() or by using the constructor function stream.Readable().


Do not worry. We will see in details how to set the format of chunks of Readable Streams


4 | The stream.Readable.prototype._read() method


A Readable Stream uses an internal buffer to temporarily store the data that needs to be consumed. When a Readable Stream is consumed, Node.js fills an internal Buffer with chunks of data and also empties the same internal buffer when chunks of data are consumed.


Node.js handles this process by calling 2 methods:


  • stream.Readable.prototype._read() to fills the internal buffer chunk by chunk.
  • stream.Readable.prototype.read(), to empty the internal buffer chunk by chunk.

The names of theses methods are very similar: only an underscore (_) differentiates them.


The stream.Readable.prototype._read() (with the underscore) must not be called directly in our code. The method is prefixed with an underscore because it is internal to the class that defines it. It is supposed to be private (hence the underscore). It should be implemented by child classes of Stream.Readable or instances of Stream.Readable, and called by the internal stream.Readable class methods only.


All Readable Streams implementations must provide an implementation of the stream.Readable.prototype._read() method to fetch data from the underlying resource. In order to work properly, this method must internally call another method to push each chunk of data into the internal buffer: it is the method stream.Readable.prototype.push(). Once each piece of data has been pushed, the method stream.Readable.prototype.push() must be called one last time with the argument null to signal that this is the end of the Readable Stream and that there are no more data left to be pushed. It is important to understand this because we may have to define our own stream.Readable.prototype._read() to implement a Readable Stream.


On the other hand, the method stream.Readable.prototype.read() (without the underscore) is responsible for emptying the internal buffer of the Readable Stream. When a Readable Stream is in paused mode, we must call explicitly this method to consume the data, while when a Readable Stream is in flowing mode, there is no need to call this method explicitly to consume the data.


It is important to note that there are some methods in Node.js that allow us to implement Readable Streams without requiring the explicit implementation of the method stream.Readable.prototype._read(). In this case, Node.js handles this internally and implicitly. This is the case when we implement Readable Stream by using the method fs.createReadStream() or stream.Readable.from().


5 | How to implement a Readable Stream?


There are 5 different ways of implementing a Readable Stream:


  1. By using the method fs.createReadStream().
  2. By using the method stream.Readable.from().
  3. By using the constructor function stream.Readable(), the method stream.Readable.prototype._read() and the objetMode: true option.
  4. By using the constructor function stream.Readable() and the method stream.Readable.prototype._read().
  5. By using a subclass of stream.Readable and the method stream.Readable.prototype._read().

Way #1 and #2 do not require us to implement explicitly the method stream.Readable.prototype._read(), as Node.js handles this implicitly. We recommend using these 2 ways whenever you can.


Way #2 and way #3 are equivalent and interchangeable. This is also the case of way #4 and way #5.


Way #1 is to implement Readable Streams from the content of a file. Way #2 and #3 are to implement Readable Stream from iterable objects.


Implementation 1: fs.createReadStream()


Let us consider that we have the poem.txt file in our working directory (This is the poem The Road Not Taken by Robert Frost):


Two roads diverged in a yellow wood,
And sorry I could not travel both
And be one traveler, long I stood
And looked down one as far as I could
To where it bent in the undergrowth;

Then took the other, as just as fair,
And having perhaps the better claim,
Because it was grassy and wanted wear;
Though as for that the passing there
Had worn them really about the same,

And both that morning equally lay
In leaves no step had trodden black.
Oh, I kept the first for another day!
Yet knowing how way leads on to way,
I doubted if I should ever come back.

I shall be telling this with a sigh
Somewhere ages and ages hence:
Two roads diverged in a wood, and I—
I took the one less traveled by,
And that has made all the difference.

// Example 1a

// Implementation of the Readable Stream
const fs = require('fs');
const poem = fs.createReadStream('./poem.txt');
poem.setEncoding('utf-8');

// Consumption of the Readable Stream in flowing mode (Recommended)
poem.on('data', chunk => console.log(chunk));

We assign all of the properties (specially the methods) of the Node.js core module fs to the variable named fs.


We create a Readable Stream from the file named 'poem.txt' by calling the method fs.createReadStream(). The argument of the method is the path to the file (in our case, it is './poem.txt'). We assign the created Readable Stream to the variable poem. And this is it. The variable poem references a Readable Stream which data is the content of the file 'poem.txt'.


We set the String format of the chunks, so that they can be Strings, and not Buffers, at the level of the Readable Stream. We call the method stream.Readable.prototype.setEncoding() on the Readable Stream referenced by the variable poem and we pass the String 'utf-8' to the method.


We provide a mechanism to consume the data of the Readable Stream in flowing mode. We do this by registering an event handler for the 'data' event. The Readable Stream is an Emitter, and it emits the event 'data' each time a chunk of String is released from the Readable Stream.


By registering an event handler for the 'data' event, we kill 2 birds with 1 stone:


  1. We switch the Readable Stream from the paused mode to the flowing mode.
  2. We are able to read the data of the Readable Stream.

To register the event handler, we call the method EventEmitter.prototype.on() on the Readable Stream referenced by the variable poem. We pass 2 arguments to the method:


  1. The first argument is the name of the event we want to handle, the String 'data'.
  2. The second argument is the callback function (the listener function) to invoke each time the event 'data' is emitted.

Since each chunk of data is a String, we can display each chunk of String, as they are consumed throughout time (this goes very fast though).


// Example 1b

// Implementation of the Readable Stream
const fs = require('fs');
const poem = fs.createReadStream('./poem.txt');

// Consumption of the Readable Stream in flowing mode (Recommended)
poem.on('data', chunk => console.log(chunk.toString()));

This example is similar to example 1, except that we do not set the String format at the level of the Readable Stream. Therefore, in the event handler, we have to convert each chunk of Buffer to a chunk of String by calling the method Buffer.toString() on each chunk.


Implementation 2: stream.Readable.from()

// Example 2a

// Creation of an iterable object: array of integers
let array = [];

for (let i = 0; i <= 100; i += 1) {
  array.push(i);
}

// Implementation of the Readable Stream
const { Readable } = require('stream');
const numbers = Readable.from(array);

// Consumption of the Readable Stream in flowing mode (Recommended)
numbers.on('data', chunk => console.log(chunk));

We create an iterable object, an array of integers from 0 to 100, referenced by the variable array.


We assign The Readable property/class of the Node.js core module stream to the variable Readable, by using the destructuring assignment syntax.


We call the method stream.Readable.from() and pass the argument array to it. This method accepts any iterable object as its argument. Also, by default, this method returns a Readable Stream with the objectMode set to true. So, by default, the Readable Stream will be able to consume chunks of objects (hence the name of the option objectMode). In our case, it will be chunks of integers.


We consume the Readable Stream by registering an event handler handling the 'data' event. This switches the Readable Stream referenced by numbers into flowing mode and allows to read the data. Integers from 0 to 100 are displayed to the screen , accordingly to the event handler we registered.


// Example 2b

// Creation of an iterable object: array of arrays
let array = [];

for (let i = 0; i <= 100; i += 1) {
  array.push([i]);
}

// Implementation of the Readable Stream
const { Readable } = require('stream');
const arrayOfArrays = Readable.from(array);

// Consumption of the Readable Stream in flowing mode (Recommended)
arrayOfArrays.on('data', chunk => console.log(chunk));

This example is similar to example 2a, except that the iterable object is an array of arrays. When the Readable Stream is consumed, arrays are printed to the screen.


The constructor function stream.Readable()


Custom Readable Streams that call the constructor function stream.Readable() must implement the method stream.Readable.prototype._read().


stream.Readable() takes an object options as its argument. This object lists the possible options for the Readable Stream. Here are the most useful:


  • encoding (String) or objectMode (Boolean). If encoding is specified, then Buffers will be decoded to Strings using the specified encoding. By default, the encoding value is null. If objectMode is set to true, the Readable Stream behaves as a stream of objects, meaning that stream.Readable.prototype.read(n) returns a single value instead of a Buffer of size n. By default, objectMode is set to false.
  • read (Function). Implementation for the method stream.Readable.prototype._read(). This is quite subtle. The read property is without an underscore, and serves as a way to implement the method with an underscore. Also note that this has nothing to do with the method stream.Readable.prototype.read().

The other options (for information):


  • highWaterMark (Number). The maximum number of bytes to store in the internal buffer before ceasing to read from the underlying resource. Default: 16384 (16KB), or 16 for objectMode streams.
  • emitClose (Boolean). Whether or not the Readable Stream should emit 'close' after it has been destroyed. Default: true.
  • destroy (Function). Implementation for the stream.Readable.prorotype._destroy() method.
  • construct (Function). Implementation for the method stream.Readable.prorotype._construct().
  • autoDestroy (Boolean). Whether this Readable Stream should automatically call .destroy() on itself after ending. Default: true.
  • signal (AbortSignal). A signal representing possible cancellation.

Implementation 3: stream.Readable(), stream.Readable.prototype._read() and objetMode: true


// Example 3a

// Creation of an iterable object: array of integers
let array = [];

for (let i = 0; i <= 100; i += 1) {
  array.push(i);
}

// Implementation of the Readable Stream
const { Readable } = require('stream');

const numbers = new Readable({
  read() {
    for (let i = 0; i <= 100; i += 1) {
        this.push(i);
      }    
      this.push(null);
    },
    objectMode: true
});

// Consumption of the Readable Stream in flowing mode (Recommended)
numbers.on('data', chunk => console.log(chunk));

We create an iterable object, an array of integers from 0 to 100, referenced by the variable array.


We assign The Readable property/class of the Node.js core module stream to the variable Readable, by using the destructuring assignment syntax.


We create an instance of the stream.Readable class by calling the constructor function stream.Readable() with the new keyword. This instance is assigned to the variable numbers. We pass an object as an argument to the constructor function. In this object, we set 2 properties:


  • a read property which has a function as a value (ES6 shorthand syntax). Note that this property read implements in fact a custom method stream.Readable.prototype._read(). This read property has nothing to do with the method stream.Readable.prototype.read(). This is quite subtle. Just remember that the constructor function lets use define a custom method stream.Readable.prototype._read() by setting a property named read instead of _read. In this read method, we make sure to push the data we want by calling the method stream.Readable.prototype.push(). We call one last time the method stream.Readable.prototype.push() with the argument null to signal the end of the Stream, accordinly to a standard implementation of a stream.Readable.prototype._read() method.
  • an objectMode property with a value of true. This assures that the Readable Stream will consume automatically chunks of objects (in our case chunks of integers) instead of chunks of Buffers.

Then, we consume the Readable Stream by registering an event handler for the event 'data'. The event handler has a double purpose:


  • It automatically switches the Readable Stream from paused mode to flowing mode.
  • We are able to get the data of the Readable Stream, and our cse, we display as it is consumed. Integers from 0 to 100 are displayed to the screen.

// Example 3b

// Creation of an iterable object: array of arrays
let array = [];

for (let i = 0; i <= 100; i += 1) {
  array.push([i]);
}

// Implementation of the Readable Stream
const { Readable } = require('stream');

const numbers = new Readable({
  read() {
    for (let i = 0; i <= 100; i += 1) {
        this.push(i);
      }    
      this.push(null);
    },
    objectMode: true
});

// Consumption of the Readable Stream in flowing mode (Recommended)
numbers.on('data', chunk => console.log(chunk));

This example is similar to example 3a, except that the iterable object is an array of arrays of integers instead of an array of integers.


Implementation 4: stream.Readable() and stream.Readable.prototype._read()


// Example 4a

// Implementation of the Readable Stream
const { Readable } = require('stream');

const numbers = new Readable({
  read() {
    for (let i = 0; i <= 100; i += 1) {
        this.push(String(i));
      }    
      this.push(null);
    },
    encoding: 'utf-8'
});

// Consumption of the Readable Stream in flowing mode (Recommended)
numbers.on('data', chunk => console.log(chunk));

In this example, we use the constructor function stream.Readable() with the new keyword. This function takes an object options as its argument. We set 2 properties in this object:


  • a read property which has a function as its value (ES6 shorthand syntax). Note that this property read implements in fact a custom method stream.Readable.prototype._read(). This read property has nothing to do with the method stream.Readable.prototype.read(). This is quite subtle. Just remember that the constructor function lets use define a custom method stream.Readable.prototype._read() by setting a property named read instead of _read. In this read method, we make sure to push the data we want by calling the method stream.Readable.prototype.push(). We make to call one last time the method with the argument null to signal the end of the Stream.
  • an encoding property with a value of utf-8. This assures that the Readable Stream will consume automatically chunks of Strings instead of chunks of Buffers.

// Example 4b

// Implementation of the Readable Stream
const { Readable } = require('stream');

const numbers = new Readable({
  read() {
    for (let i = 0; i <= 100; i += 1) {
        this.push(String(i));
      }    
      this.push(null);
    },
});

// Consumption of the Readable Stream in flowing mode (Recommended)
numbers.on('data', chunk => console.log(chunk.toString()));

This example is similar to example 4a, except that we set the format of the chunks at the levels of the chunks and not at the level of the Readable Stream. Hence the invocation of the Buffer.prototype.toString() on each chunk in the event handler.


Implementation 5: subclass of stream.Readable and stream.Readable.prototype._read()


// Example 5a

// Implementation of the Readable Stream
const { Readable } = require('stream');

class NumbersReadable extends Readable {
  _read() {
    for (let i = 0; i <= 100; i += 1) {
      this.push(String(i));
    }    
    this.push(null);
  }
}

const numbers = new NumbersReadable();
numbers.setEncoding('utf-8');

// Consumption of the Readable Stream in flowing mode (Recommended)
numbers.on('data', chunk => console.log(chunk));

In this example, instead of creating directly an instance of the class stream.Readable, we define a subclass of stream.Readable that we call NumbersReadable. When we define this subclass, we make sure to implement a method _read() to push the data into the Readable Stream.


Then, we create an instance of the subclass NumbersReadable and assign it to the variable numbers. We make sure that the Readable Stream will consume chunks of Stings instead of chunks of Buffers by calling the method stream.Readable.prototype.setEncoding() with the argument 'utf-8'.


// Example 5b

// Implementation of the Readable Stream
const { Readable } = require('stream');

class NumbersReadable extends Readable {
  _read() {
    for (let i = 0; i <= 100; i += 1) {
      this.push(String(i));
    }    
    this.push(null);
  }
}

const numbers = new NumbersReadable();

// Consumption of the Readable Stream in flowing mode (Recommended)
numbers.on('data', chunk => console.log(chunk.toString()));

This example is similar to example 5a, except that we set the format of the chunks at the level of the chunks, and not at the level of the Readable Stream.


6 | Buffering and highWaterMark (optional bonus section)


This section explores how things work internally in Node.js for a Readable Stream in paused mode. Things work a bit differently for Readable Streams consumed in flowing mode. This section is optional and is here for information purposes only.


The amount of data potentially stored in the internal buffer depends on the highWaterMark option passed into the Readable Stream when it is created with a constructor function (more on this later in this chapter). For normal Streams, the highWaterMark option specifies a total number of bytes (by default, it is 16384 or 16KB). For Streams operating in object mode, the highWaterMark specifies a total number of objects (by default, it is 16).


Once the total size of the internal buffer reaches the threshold specified by highWaterMark, the Readable stream will temporarily stop reading data from the underlying resource until the data currently stored in the internal buffer can be consumed. In other words, the Readable Stream will stop calling the internal stream.Readable.prototype._read() method that is used to fill the internal buffer.


More specifically, here is at it works:


Internal buffering Source

When the total size of the internal buffer reaches the highWaterMark, then stream.Readable.prototype.push() returns false, and data stops being pushed to the internal buffer. The method stream.Readable.prototype.read() gets called repetitively until the internal buffer is emptied and stream.Readable.prototype.push() returns true again and starts pushing data again into the internal buffer.


Author:
Initial publication date:
Last updated: