Implementing more manageable transform streams in Node.js

Published: Aug 3, 2025

Last updated: Aug 3, 2025

Over the past few months, I've been tinkering with my approach to streams at work when needing to work with requirements that can be efficiently processed with Node.js streams.

This has led to experimentation with both Node.js stream classes and generators.

Although each have their pros and cons, in retrospect I've decided that class streams are the preference for the work that I am doing.

Some of my personal reasons:

  1. They can be easier to reason about.
  2. They can easily have dependency injected with inversion-of-control libraries like Awilix and Inversify.
  3. They don't require currying or partial application for aforementioned dependency injection.

Streams that I work with

Normally you would expect this to be Strings or Buffers, but in scenarios where I am processing job records.

This has led me to make use of object mode and Transform streams more often than not in day-to-day work.

Because of this, I've also opted to build out an abstract class for me to implement in TypeScript. The purpose of it is to contain the objectMode requirement and require one method that can be used for processing each chunk of a stream.

An abstract class for object-mode transforms

The following is a general look at the abstracted stream.

abstract class ChunkStream< TInput = unknown, TResult = unknown, > extends Transform { constructor() { super({ objectMode: true }); } abstract processChunk(chunk: TInput): TResult; async _transform( chunk: TInput, _encoding: BufferEncoding, callback: (error?: Error | null, data?: TResult) => void ): Promise<void> { try { const result = await this.processChunk(chunk); if (result !== undefined && result !== null) { // Handle empty arrays - only push if there's actual data if (Array.isArray(result) && result.length === 0) { callback(); // Don't push empty arrays } else { callback(null, result); } } else { callback(); } } catch (error) { // Transform callbacks only expect error of type `Error | null`. // Handling of this is entirely up to you if you wish to alter // this approach. if (error instanceof Error) { callback(error); } else { console.warn("Unhandled error which is not an instanceof Error", error); throw error; } } } }

As mentioned earlier, the processChunk method is abstract and requires a concrete implementation of that behaviour.

The typing probably leaves more to be desired (it could do with some constraints), but my current implementation expects to take the incoming object chunk and then return a value that will be used as the callback for the transform stream.

Implementing a concrete transform stream

A contrived implementation may look like this:

import { err, ok } from "neverthrow"; type User = { name: string; age: number; color: string; }; class UserData { readonly _tag = "UserData"; name: string; age: number; constructor(name: string, age: number) { this.name = name; this.age = age; } } // Making use of the `ChunkStream` class SetUserDataStream extends ChunkStream { #logger: Logger; constructor(logger: Logger) { super(); this.#logger = logger; } // Generator method for processing chunks async processChunk(chunk: User) { const transformed = new UserData(chunk.name, chunk.age); return ok(transformed); } // OPTIONAL: Override a Transform method _flush(callback) { this.#logger.info("Stream completed"); callback(); } }

The above stream expects an object that structurally matches User as the incoming chunk input and returns that value serialized into a UserData instance and wrapped inside of a Ok result.

There is also an optional _flush method that I have overridden. You can override any Transform stream method. _flush has been useful for logging checkpoints at the end of a stream.

In use, it could look something like a long-running script where you fetch batches of data remotely, run a process and then finalize it somewhere:

import { pipeline } from "stream/promises"; const logger = new Logger(); // Assume this is a script that you're running to bulk run a long extract-transform-load job async function main() { // Assume all streams in the middle are duplex or transform streams. // They could even inherit from our `ChunkStream` class. await pipeline( // Fetch data from remote Readable.from(fetchDataFromRemote()), // Serialize data new SetUserDataStream(logger), // Split into batches of 20 records to process new BatchStream(20), // Run the upload process new UploadToNewDataSource(), // Final sink to log results new Writable({ write( chunk: User, _encoding: BufferEncoding, callback: (error?: Error | null) => void ) { // Log the output from `UploadToNewDataSource` logger.info("Output:", chunk); callback(); }, objectMode: true, }) ); } main();

Testing with this stream

For unit tests on a stream, I do the following:

  1. Create a Readable stream for test data used as input.
  2. Create an instance of my Transform stream to test.
  3. Create a Writable "sink" where I push chunk results into an array I can assert against.

An example of this:

it("should batch items into groups of 2", async () => { const results: number[][] = []; const inputStream = [1, 2, 3, 4]; const sink = new Writable({ write( chunk: User, _encoding: BufferEncoding, callback: (error?: Error | null) => void ) { results.push(chunk); callback(); }, objectMode: true, }); expect(results).toBe([ [1, 2], [3, 4], ]); });

Caveats to this approach

Node.js Streams at the time of writing have limited type support from what I could see.

Take my approach with a grain of salt and know that it is there to be improved upon.

Photo credit: pcbulai

Personal image

Dennis O'Keeffe

Byron Bay, Australia

Dennis O'Keeffe

2020-present Dennis O'Keeffe.

All Rights Reserved.