Node.js Streams in Action

Written by danstenger | Published 2020/04/14
Tech Story Tags: nodejs | koajs | knex | postgres | stream-processing | streaming | programming | javascript

TLDR Node.js uses streams to reduce memory usage on the server when processing large files. Instead of reading whole file into memory we stream it in chunks to whoever requested it and apply transformations to that stream if needed. I'll build simple API with 2 endpoints. Both will be returning large amount of records from Postgres DB. One endpoint will stream data to the client and another will read whole data into memory and return in one chunk. The error handling middleware has to be first in Koa app in order to capture exceptions.via the TL;DR App

Ever wondered how streams are working in Node.js? I had. So far I know that using streams will most certainly reduce memory usage on the server when processing large files. Instead of reading whole file into memory we stream it in chunks to whoever requested it and apply transformations to that stream if needed. That's huge benefit as it allows to avoid vertical scaling. Processing files is common task but not as common as interacting with databases and that's going to be my focus now. I'll build simple API with 2 endpoints. Both will be returning large amount of records from Postgres DB. One endpoint will stream data to the client and another will read whole data into memory and return in one chunk.
If I work in Node.js environment, there's usually Express.js framework in front of it with Sequelize orm. For this experiment I decided to try something different and choose Koa.js framework and Knex orm. Let's start.
I'll setup the project first:
# let's name our project "streamer"
mkdir streamer && cd streamer

# I'm happy with npm defaults for this project so:
npm init -y
Now time to add app dependencies. You can see all packages here. To setup my development environment I'll use Docker and docker-compose. Let's see how Dockerfile.dev looks like:
FROM node:12.16.1

# create new working directory
WORKDIR /src

# expecting to receive API_PORT as argument
ARG API_PORT

# make sure to run latest version of npm
RUN npm i npm@latest -g

# fetch dependencies on a separate layer first as it's not changing that often,
# will be cached and speed up the image build process
COPY ./package.json ./package-lock.json ./
RUN npm i

# copy the rest of the source files into working directory
COPY . .

EXPOSE ${API_PORT}

CMD ["npm", "run", "start:dev"]
Pretty standard right? Now docker-compose.yml to add DB dependency and bootstrap whole thing:
version: "3.7"

volumes:
  streamervolume:
    name: streamer-volume

networks:
  streamernetwork:
    name: streamer-network

services:
  pg:
    image: postgres:12.0
    restart: on-failure
    env_file:
      - .env
    ports:
      - "${POSTGRES_PORT}:${POSTGRES_PORT}"
    volumes:
      - streamervolume:/var/lib/postgresql/data
    networks:
      - streamernetwork
  streamer_api:
    build:
      context: .
      dockerfile: Dockerfile.dev
      args:
        API_PORT: ${API_PORT}
    restart: on-failure
    depends_on:
      - pg
    volumes:
      - ./:/src
    ports:
      - "${API_PORT}:${API_PORT}"
    networks:
      - streamernetwork
    env_file:
      - .env
Now that environment is pretty much ready, I'll continue with application logic itself. Let's look at app.js:
// src/app.js

require('dotenv').config();

const Koa = require('koa');
const errorHandler = require('./middleware/errorHandler');
const router = require('./routes');

const app = new Koa();

app
  .use(errorHandler)
  .use(router.routes())
  .use(router.allowedMethods());

module.exports = app;
I first load .env vars, import Koa, my custom error handling middleware and router. The error handling middleware looks like this:
// src/middleware/errorHandler.js

module.exports = async (ctx, next) => {
  try {
    await next();
  } catch (err) {
    console.error(err);
    ctx.status = err.status || 500;
    ctx.body = {
      msg: err.message || 'Oops. Something went wrong. Please try again later',
    };
  }
}
According to documentation, error handling middleware has to be added first in Koa app in order to capture all exceptions. Next stop is router:
// src/routes/index.js

const Router = require('@koa/router');

const router = new Router();

router.get('/nostream', require('./users/nostream'));
router.get('/stream', require('./users/stream'));

module.exports = router;
Interesting thing to mention is that in app.js I use
router.allowedMethods()
middleware that handles unsupported methods with
405 Method Not Allowed
for me. Lets look at the handlers next. nostream:
// src/routes/users/nostream/index.js

const db = require('../../../services/db');

module.exports = async (ctx) => {
  try {
    const users = await db.select('*').from('users');
    ctx.status = 200;
    ctx.body = users;
  } catch (err) {
    ctx.throw(500, err);
  }
};
Pretty sure you're wondering how that db looks like:
// src/services/db.js

const config = require('./config');

const knex = require('knex')({
  client: 'pg',
  connection: {
    host : config.dbHost,
    user : config.dbUser,
    password : config.dbPwd,
    database : config.db,
  },
});

module.exports = knex;
config is just simple object literal that exposes some env vars. You can see it here. I require knex, establish db connection and export instance of it. Since all node modules are singletons, no matter how many times I'll require this package, initiation will be performed only once. Everything else in nostream handler is pretty much straightforward. Please refer to Koa or Knex documentation for more details. Next let's look at stream handler:
// src/routes/users/stream/index.js

const Stringify = require('streaming-json-stringify')
const db = require('../../../services/db');

module.exports = async (ctx) => {
  ctx.type = 'application/json; charset=utf-8';
  ctx.set('Connection', 'keep-alive');

  try {
    const stream = db
      .select('*')
      .from('users')
      .stream();

    ctx.status = 200;
    await pipe(stream, ctx.res, { end: false });
  } catch (err) {
    ctx.throw(500, err);
  }
};

function pipe(from, to, options) {
  return new Promise((resolve, reject) => {
    from
      .pipe(Stringify())
      .pipe(to, options);

    from.on('error', reject);
    from.on('end', resolve);
  })
}
Here I call
.stream()
on my query to get stream instance. I then pipe this query stream to client. In order for that to happen I have
pipe
function that returns a promise and we don't exit the handler instantly but rather wait until streaming is done or error occurs. I added stringify package here because response (writeable stream) expects an input of a type string or an instance of Buffer and DB stream operates with Object types.
By the way, all
ctx.throw
will be captured by
errorHandler
middleware we created previously. Now before I jump into testing my server I need some data in DB. Since I have Knex already installed locally, I need a config file for it to be able to run migrations, seeds etc:
./node_modules/.bin/knex init
This will generate knexfile.js. I have slightly changed it to use my env vars. You can see it here. Now I'll generate the migration:
./node_modules/.bin/knex migrate:make users
This will create a migration file in
migrations/{timestapm}_users.js
You can review it here. Before I run it, start the application:
docker-compose up --build
Now since app is up and running, time to run migration:
POSTGRES_HOST=localhost npm run migrate:up
I specify postgres host here since I run migration from host machine for DB that runs in docker environment under pg host name and if I don't specify it, pg name will be used from env vars which will end up with failed connection attempt.
Now what I need is some records in users table for my API to serve. Knex cli can ease this process. It has command to generate a seed file:
./node_modules/.bin/knex seed:make create_users
This will generate a seed file in seeds directory. You can see what I did with it here. If you'll run into issues running it, try reducing amount of records you try to create. Currently it'll try to create 60k records. Lets run it:
POSTGRES_HOST=localhost npm run db:seed
OK, with all that in place, I can now test both endpoints and see how well they perform. Let's start with nostream endpoint:
And now stream endpoint:
So, streaming might be less memory intensive operation but it takes a bit longer to retrieve all data (60k users in this case). That certainly works when sending/processing large amounts of data and could be perfect fit for file processing or BI where we have to work with large data sets. When it comes to common tasks though, say admin panel that retrieves 10-100 records per page, most likely streaming wouldn't be the best fit.
Not only because it takes a bit more time, but also due to code complexity as it requires more code to achieve same output. It all boils down to good old saying: "choose the right tool for the job". I hope you have learned something useful. You can find the whole project here.

Written by danstenger | GO and functional programming enthusiast
Published by HackerNoon on 2020/04/14