Processing Massive Amounts of On Demand Data Without Crashing NodeJS Main Thread

Written by wesleymr7 | Published 2023/03/15
Tech Story Tags: nodejs | javascript | nodejs-tutorial | web-performance | application-performance | programming | big-data | data-science

TLDRNode.js can have some problems processing big files because the main thread(responsible for the event loop) can stop. For this reason, I decided to write about how to use streams in node.js showing a funny example. Application downloads a huge CSV file from Google Cloud Storage, processes the data, transforms it into a JSON file, extracts some data insights, and finally compresses the file and uploads it again to the Cloud Storage.via the TL;DR App

Sometimes we encounter problems processing big files using Node.js because the main thread (responsible for the event loop) can stop. For this reason, I decided to write about how to use streams in Node.js showing a funny example. Let's go!

Application: Node.js application that downloads a huge CSV file from Google Cloud Storage, processes the data, transforms it into a JSON file, extracts some data insights, and finally compresses the file and uploads it again to the Cloud Storage. (We are going to create a terminal progress bar to visualize the process progress)

Main principles:

  • Streams: Interface to work with chunks of data.

  • Readable Streams: Interface to consume data streams.

  • Writable Streams: Interface to provide data and source to the data streams.

  • Duplex Streams: Implement both interfaces, Readable and Writable.

  • Transform Streams: It's a Duplex Stream used to modify the data during the process.

  • PassThrough: It's a Transform Stream useful when we need to check or test something during the stream pipeline process.

  • Pipeline: It's useful to handle all the stream process.

Steps to reproduce:

  • Download file stream

  • Create a progress bar

  • Transform each CSV file line into javascript objects

  • Get information from the file

  • Convert the data into JSON format

  • Compress the data

  • Upload the new file converted


Requirements

  • The CSV file you can find here

  • We are going to use Node 16.16 version

  • You need to have a Google account to access Google Cloud Services.


Cloud Storage Service

We need to install cloud storage library to deal with google cloud service.

npm install @google-cloud/storage

If you need help configuring your Cloud Storage service on your Google account, there are many tutorials that can help with that, it's not the purpose of this tutorial.

Let's create our first file cloudStorageFileService.js in src folder to work with our storage.

src/cloudStorageFileService.js

const { Storage } = require('@google-cloud/storage')
const path = require('path')
const serviceKey = path.join(__dirname, '../gkeys.json')


class CloudStorageFileService {

// (1)
    constructor() {
        this.storage = new Storage({
            projectId: 'my-project-id',
            keyFilename: serviceKey
        })
    }

// (2)
    async downloadFile(bucketName, fileName) {
        return await this.storage
            .bucket(bucketName)
            .file(fileName)
            .createReadStream()
    }

// (3)
    async uploadFile(bucketName, destFileName) {
        return await this.storage
            .bucket(bucketName)
            .file(destFileName)
            .createWriteStream()
    }

// (4)
    async getFileSize(bucketName, fileName) {
        const [metadata] = await this.storage
                        .bucket(bucketName)
                        .file(fileName)
                        .getMetadata();
        return metadata.size
    }
}

module.exports = CloudStorageFileService

From the code sections above:

  1. Basic configurations to use Cloud Storage service, as the project id and the path with your Google Cloud credentials.
  2. Google Cloud Storage provides us a Readable Stream for downloading files. We can use it to download our huge file as a stream and not have the main Node.js thread become stuck.
  3. Google Cloud Storage also provides a Writable Stream to upload files.
  4. The last function we are going to use to create our progress bar.

Progress Bar

Here we are extending PassThrough because we don't need to apply any modification to the stream, only to get some data to create our progress bar.

It is important to know that Streams implement Event Emitter interface, and we can work with emit to emit events and on to listen to them.

src/progressPass.js

const { PassThrough } = require('stream')

class ProgressPass extends PassThrough {

// (1)
    constructor(fileSize, options = {}) {
        super({ ...options })
        this.on('data', this.processData)
        this.on('progress', this.showResult)
        this.on('close', this.finishProgress)
        this.bytesRead = 0
        this.progress = 0
        this.fileSize = fileSize
        this.createProgressBar()
    }

// (2)
    processData(data) {
        this.bytesRead += data.length
        this.progress = (this.bytesRead / this.fileSize) * 100
        this.emit('progress', Math.floor(this.progress))
    }

// (3)
    createProgressBar() {
        process.stdout.write("\x1B[?25l")
        process.stdout.write('[')
        for(let i = 1; i <= 101; i++) {
            process.stdout.write('-')
        }
        process.stdout.write(']')
    }

// (4)
    showResult(progress) {
        process.stdout.cursorTo(progress+1)
        process.stdout.write('=')


        process.stdout.cursorTo(105)
        process.stdout.write(`${progress}%`)

    }

    finishProgress() {
        process.stdout.write("\x1B[?25h")
        process.stdout.write("\n")
    }
}


module.exports = ProgressPass

From the code sections above:

  1. Initializing our progress bar:

    • The file size parameter added to the constructor will be useful to create the progress percentage.
    • data event is emitted when a new chunk comes to the current stream.
    • progress is our custom event that we emit to update the progress bar.
    • close is the event emitted when there is no more data to pass through the stream.
  2. Every time new data comes we get the chunk and add to the sum the chunk's length and emitting progress event to update the progress bar.

  3. Creating a unfilled progress bar like:

    [------------------------------------------------]

4 When the progress event happens we update the progress bar with the current percentage:

[===========-------------------------------------]  25%


Transforming CSV Line Into a Javascript Object

Now it's time to get every line of our CSV file and convert it to a JS object. The intention of this is to manipulate the data easier and at the final step of the process, convert it to a JSON file.

The strategy is to convert the binary chunk into text, and to read every line of the it.

To get the values we need to split the lines using ,.

When we are reading using streams, by default each chunk has 16kb (we can modify it). Moments of separation could occur similar to the examples below:

chunk 1:

value1, value2, val

chunk 2:

ue3, value4, value5

Keep in mind we need to treat this kind of thing and temporarily save the previous chunk before splitting the text.

To treat the transformation we are going to use Transform Stream. As I mentioned it's useful when we want to transform our data chunks.

src/objectTransform.js

const { Transform } = require('stream')

class ObjectTranform extends Transform {

// (1)
    constructor(options = {}) {
        super({ ...options })
        this.headerLine = true
        this.keys = []
        this.tailChunk = ''
    }

// (2) 
    _transform(chunk, encoding, callback) {
        const stringChunks = chunk.toString("utf8")
        const lines = stringChunks.split('\n')

        for (const line of lines) {
            const lineString = (this.tailChunk + line)
            let values = lineString.split(',')

            if (this.headerLine) {
                this.keys = values
                this.headerLine = false
                continue
            }


            if (values.length !== this.keys.length || lineString[lineString.length - 1] === ',') {
                this.tailChunk = line
            } else {
                const chunkObject = {}

                this.keys.forEach((element, index) => {
                    chunkObject[element] = values[index]
                })

                this.tailChunk = ''
                this.push(`${JSON.stringify(chunkObject)}`)
            }
        }
        callback()
    }

// (3)
    _flush(callback) {
        callback()
    }

}

module.exports = ObjectTranform

From the code sections above:

  1. Let's save the keys to create our JS objects using a flag. The tailChunk is for saving the incomplete CSV lines.
  2. The magic happens here. We read every line, split the text whether the lines are completed, and convert it to JS object.
  3. flush is called when there is no more data to be processed.

Extracting Information

Now that we have JS objects we can extract some data from the chunks.

Here again, we are going to use PassThrough Stream because we only want to check some information.

src/monitorTransform.js

const { PassThrough } = require('stream')

class MonitorTransform extends PassThrough {
    constructor(options = {}) {
        super({ ...options })
        this.on('data', this.processData)
        this.on('close', this.showResult)
        this.totalCrimes = 0
        this.boroughTotal = new Map()
        this.monthTotal = new Map()
        this.yearTotal = new Map()
    }

// (1)
    processData(data) {
        const row = JSON.parse(data.toString())
        const rowCrimeQuantity = Number(row.value) || 0
        const currentBoroughTotal = Number(this.boroughTotal.get(row.borough)) || 0
        const currentMonthTotal = Number(this.monthTotal.get(row.month)) || 0
        const currentYearTotal = Number(this.yearTotal.get(row.year)) || 0

        this.totalCrimes += rowCrimeQuantity
        this.boroughTotal.set(row.borough, currentBoroughTotal + rowCrimeQuantity)
        this.monthTotal.set(row.month, currentMonthTotal + rowCrimeQuantity)
        this.yearTotal.set(row.year, currentYearTotal + rowCrimeQuantity)
    }

// (2)
    showResult() {
        console.log(this.totalCrimes)
        console.log(this.boroughTotal)
        console.log(this.monthTotal)
        console.log(this.yearTotal)
    }
}


module.exports = MonitorTransform

From the code sections above:

  1. When a data event happens we can process the chunk and get whatever we want from it.
  2. If there is no more data to process we show the extracted information.

Transforming The File to a JSON File

All the chunks are in a JS object format, but when we save the file, this won't work properly. We need to treat it by transforming it into an array of objects using Transform Stream.

src/jsonTransform.js

const { Transform } = require('stream')

class JsonTransform extends Transform {
    constructor (options = {}) {
        super({ ...options })
        this.once('data', this.startJson)
        this.firstLine = true
    }

// (1)
    startJson() {
        this.push('[')
    }

// (2)
    _transform (chunk, encoding, callback) {
        const row = JSON.parse(chunk.toString())
        const newChunk = this.firstLine ? `${JSON.stringify(row)}` : `,${JSON.stringify(row)}`
        this.push(newChunk)
        if(this.firstLine) {
            this.firstLine = false
        }
        callback()
    }

// (3)
    _flush(callback) {
        this.push(']')
        callback()
    }

}

module.exports = JsonTransform

From the code section above:

  1. Our array should start with [, it happens only in the first chunk. Here we are using once instead of on, to listen only one time the event data.
  2. Here we are putting , to separate our JS objects.
  3. At the end of the file we need to close the array with ]

Compressing The Chunks

We don't need to exert a lot of effort for this step because Node.js has a core library to help.

The zlib from Node.js is a Transform Stream used to compress chunks of data during the streaming process. We only need to add this to our stream pipeline.


Creating The Stream Pipeline Process

The pipeline process needs to have at least the Readable and the Writable Stream, and it can have how ever many Transform Streams and PassThrough Streams we want.

To handle the pipeline process we are going to use the Builder design pattern, to control the creation process of our pipeline and throw some errors.

src/fileProcessor.js

const { pipeline } = require('stream/promises')

class FileProcessor {
    constructor() {
        this.readableStream = null
        this.transforms = []
        this.writableStream = null
    }

// (1)
    setReadable(readableStream) {
        this.readableStream = readableStream
        return this
    }

// (2)
    addTransforms(transformsStream) {
        this.transforms = transformsStream
        return this
    }

// (3)
    setWritable(writableStream) {
        this.writableStream = writableStream
        return this
    }

// (4)
    async execute() {
        try {
            if(!this.readableStream) {
                throw Error('Readable stream not implemented')
            }
            if(!this.writableStream) {
                throw Error('Writable stream not implemented')
            }
            await pipeline(this.readableStream, ...this.transforms, this.writableStream)
        } catch (error) {
            console.log(error)
        }
    }
}

module.exports = FileProcessor

From the code section above:

  1. Setting our Readable Stream, in this case, should be the download function from Google Cloud Storage.
  2. Adding Transforms to our pipeline. All the Transform and PassThrough should be added as an array.
  3. The Writable Stream is the upload function from Google Cloud Storage.
  4. To execute the pipeline if there is the Writable and Readable Stream. The pipeline function must have at least these two functions.

Joining Everything

It's time to import everything we've created, define the constants, and execute the entire app.

src/index.js

// (1)
const FileProcessor = require('./fileProcessor')
const JsonTransform = require('./jsonTransform')
const MonitorTransform = require('./monitorTransform')
const ObjectTransform = require('./objectTransform')
const { createGzip } = require('node:zlib')
const CloudStorageFileService = require('./cloudStorageFileService')
const ProgressPass = require('./progressPass')
const fileProcessor = new FileProcessor()
const cloudFileService = new CloudStorageFileService()

// (2)
const gzip = createGzip()

// (3)
const bucketName = 'myfileuploads'
const fileName = 'london_crime_by_lsoa2.csv'
const destFileName = 'london_crime_by_lsoa2.tar.gz'

// (4)
    ;
(async () => {
    try {
        const fileSize = await cloudFileService.getFileSize(bucketName, fileName)
        await fileProcessor
            .setReadable(await cloudFileService.downloadFile(bucketName, fileName))
            .addTransforms([new ProgressPass(fileSize), new ObjectTransform(), new MonitorTransform(), new JsonTransform(), gzip])
            .setWritable(await cloudFileService.uploadFile(bucketName, destFileName))
            .execute()
    } catch (e) {
        console.log(e)
    }
})()

From the code sections above:

  1. Here are all the things we've created, services, Transform Streams and PassThrough Streams.
  2. As I mentioned, this is the Node.js library to compress data.
  3. We need to define our Cloud Storage bucket and file.
  4. This is our final function to execute everything together, defining the Readable, Transforms, and the Writable.

Takeaways

  • You can use this approach to import and export data between databases or generate reports.
  • Streams are useful to process and treat audio and video files and for file conversion.

You can take a look at the entire code here

Also published here


Written by wesleymr7 | Brazilian full-stack developer passionate about the Javascript environment
Published by HackerNoon on 2023/03/15