Writeable Streams, piping and backpressure


1 | What is a Writeable Stream?


A Writeable Stream is an abstraction for a destination of data to which data is written.


The purpose of a Writable Stream is to be written into.


There is always a 2-step process to follow when working with Writable Streams:


  1. Implement the Writable Stream. There can be several ways to do it.
  2. Write data to the Writable Stream. There are also several ways to do it.

Just like Readable Streams, Writable Streams are Emitters. They emit events, but events are rarely used when dealing with Writable Streams, unless we want to implement explicitly a mechanism to handle backpressure of the internal buffer of Writable Streams (see last section of this article).


2 | How to implement a Writable Stream?


There are 3 different ways of implementing a Writable Stream (in our order of preference and recommendation):


  1. By using the method fs.createWriteStream() to write the data into a file.
  2. By using the constructor function stream.Writable() and implementing a method _write().
  3. By using a subclass of stream.Writable and implementing a method _write().

In this book, we will be exploring the only useful way, which is the first one.


3 | How to write punctual data to a Writable Stream?


If you want to write punctual data to a Writable Stream, you can call the method stream.Writable.prototype.write() to write the data, and then call the method stream.Writable.prototype.end() to signal the end of the writing process.


Implementation: fs.createWriteStream()


// Example 1

// Implementation of the Writable Stream
const fs = require('fs');
const destination = fs.createWriteStream('mj.txt');

// Writing into the Writable Stream
destination.write('This is it!\n');
destination.write('This is the final curtain call!');
destination.end();

Note that calling the method stream.Writable.prototype.write() does not automatically write data on new lines.


4 | Piping Readable and Writable Streams


The main advantage of piping is to limit the buffering of data to acceptable levels such that sources and destinations of differing speeds will not overwhelm the available memory.


Whenever you can, you should pipe Readable Streams into Writable Streams. There are 2 methods to pipe in Node.js:


  1. The recommended method stream.pipeline().
  2. The method stream.Readable.prototype.pipe().

The method stream.pipeline() was introduced in Node.js to improve the existing stream.Reable.prototype.pipe(), by forwarding errors, by properly cleaning up and by providing a callback function to call when the pipeline is complete. In this book, we will only be using the method stream.pipeline().


The method stream.pipeline() can take several arguments:


  • The mandatory first argument is a Readable Stream.
  • The optional following arguments are Transform Streams.
  • The mandatory before last argument is a Writable Stream.
  • The mandatory last argument is a error-first callback function that gets called when the pipeline is fully done.

// Example 2a
  
// Implementation of the Readable Stream
const fs = require('fs');
const source = fs.createReadStream('./poem.txt');
source.setEncoding('utf-8');
  
// Implementation of the Writable Stream
const destination = fs.createWriteStream('poem-copy.txt');
  
// Piping
const stream = require('stream');
  
stream.pipeline(
  source,
  destination,
  (err) => {
    if (err) {
      console.error('failed', err);
    } else {
      console.log('Successful copy!')
    }
  }
);

In the previous example, we implement a Readable Stream, by using the method fs.createReadStream(). We also set the encoding of the Readable Stream at the level of the Readable Stream.


We implement a Writable Stream by using the method fs.createWriteStream(). This method will automatically create a file named poem-copy.txt, if it does not already exist. Otherwise, the file will be overwritten by a new poem-copy.txt file.


We require the Node.js core module 'stream', and we assign it to the variable stream. We can call the method stream.pipeline() with 3 arguments:


  1. The 1st argument is the Readable Stream, referenced by the variable source.
  2. The 2nd argument is the Writable Stream, referenced by the variable destination.
  3. The 3rd argument is a callback function that displays whether the pipeline operation was successful or not.

// Example 2b
  
// Creation of an iterable object: array of integers
let array = [];
  
for (let i = 0; i <= 100; i += 1) {
  array.push(String(i) + '\n');
}
  
// Implementation of the Readable Stream
const stream = require('stream');
const source = stream.Readable.from(array);
  
// Implementation of the Writable Stream
const fs = require('fs');
const destination = fs.createWriteStream('destination.txt');
  
// Piping
stream.pipeline(
  source,
  destination,
  (err) => {
    if (err) {
      console.error('failed', err);
    } else {
      console.log('Successful copy!')
    }
  }
);

This example is similar to example 2a, except that the Readable Stream is built from an array by calling the method stream.Readable.from.


5 Backpressure


What is backpressure?


Backpressure is a generic computer science concept, not specific to Node.js and its Streams.


Backpressure is the data handling issue caused by software not being able to deliver output data as fast as the input data is coming in.


In Node.js, incoming data that cannot be processed right away is temporarily accumulated in a buffer. If buffering is not handled properly (i.e. if backpressure is not handled properly), the process ends up using the system's memory, effectively slowing down other processes, and monopolizing a large part of your system until completion. This results in a few things:


  • Slowing down all other current processes.
  • A very overworked garbage collector.
  • Memory exhaustion.

As you may have guessed, this behavior is not optimized and should be systematically avoided. Node.js provides automatic mechanisms to handle effectively backpressure and avoid buffers ending up using all the memory of the system. Bu this is done when we pipe Streams together, by one of 2 appropriate functions: stream.pipeline() or stream.Reable.prototype.pipe().


If we do not pipe Streams together, we have to make sure that our code handles backpressure properly. Let us see how it can be done in the next section.


stream.Writable.prototype.write() and the 'drain' event


It is possible to implement "manually" a backpressure mechanism in Node.js thanks to the return value of the stream.Writable.write() method and thanks to the 'drain' event emitted by a Writable Stream.


When the method stream.Writable.write() is called to write data into a Writable Stream, it returns the value true as long as the internal Buffer of the Writable Stream can accept data coming in. However, if the internal Buffer is full, the method returns false. When this happens, we have to pause the processing (the reading) of the Reading Stream, so that the Writable Stream can empty its internal Buffer. When the Writable Stream is ready to accept data once again, it emits the event 'drain'. Therefore, data can start being written again in the Writable Stream.


Let us see the important events a Writable Stream can emit:


#Event name (string)Event description
1'error'The 'error' event is emitted if an error occurred while writing or piping data. The listener callback is passed a single Error argument when called. The Writable Stream is closed when the 'error' event is emitted unless the autoDestroy option was set to false when creating the Writable Stream. After 'error', no further events other than 'close' should be emitted (including 'error' events).
2'drain'If a call to stream.Writable.prototype.write() returns false, the 'drain' event will be emitted when it is appropriate to resume writing data to the Writable Stream.
3'finish'The 'finish' event is emitted after the stream.Writable.prototype.end() method has been called, and all data has been flushed to the underlying system.
4'close'The 'close' event is emitted when the Writable Stream and any of its underlying resources (a file descriptor, for example) have been closed. The event indicates that no more events will be emitted, and no further computation will occur. A Writable Stream will always emit the 'close' event if it is created with the emitClose option.

In the following code examples, we suppose that we have a big video file called video.mp4.


First, let us see the the unoptimized version of the code, which does not handle handle backpressure. This should not be reproduced.


// Example 3a - Unoptimized code

const fs = require ('fs');
const source = fs.createReadStream('./video.mp4');
const destination = fs.createWriteStream('./video-copy.mp4');

source.on('data', (chunk) => {
  destination.write(chunk);
});

source.on('error', (err) => {
  console.log("Error with Readable Stream: ", err);
});

destination.on('error', (err) => {
  console.log("Error with Writable Stream: ", err);
});

source.on('end', () => {
  destination.end();
});

destination.on('close', () => {
  process.stdout.write('Successful copy');
});

In the previous code, we use the standard events emitted by the Readable Stream and the Writable Stream. But we do not handle the backpressure, so if the file is a really huge file, this will cause memory issues.


Now, let us see the same code with the handling of backpressure.


// Example 3b - Optimized code

const fs = require ('fs');
const source = fs.createReadStream('./video.mp4');
const destination = fs.createWriteStream('./video-copy.mp4');

// We pause the Readable Stream if need be
source.on('data', (chunk) => {
  const isWritable = destination.write(chunk);

  if (!isWritable) {
    console.log('Backpressure, pause Readable Stream');
    source.pause();
  }
});

source.on('error', (err) => {
  console.log("Error with Readable Stream: ", err);
});

destination.on('error', (err) => {
  console.log("Error with Writable Stream: ", err);
});

source.on('end', () => {
  destination.end();
});

// We switch back the Readable Stream to flowing mode when need be
destination.on('drain', () => {
  console.log('Buffer drained, flow Readavble Stream');
  source.resume();
});

destination.on('close', () => {
  process.stdout.write('Successful copy');
});

Author:
Initial publication date:
Last updated: