How to Use Node Streams to Transform the Largest POI Database

Written by kuwala | Published 2021/06/23
Tech Story Tags: openstreetmap | open-source | javascript | nodejs | opendata | big-data | geospatial | good-company

TLDR OpenStreetMap (OSM) is maybe the most extensive open data project for geo-data. It has rich information on points of interest (POIs), such as apartments, shops, or offices, globally. Kuwala created an ETL pipeline to directly download the raw OSM data in the.pbf file format and store the transformed objects in a database. Leveraging Uber’s H3 indexing, you can then easily query those objects with radius or polygon queries.via the TL;DR App

OpenStreetMap (OSM) is maybe the most extensive open data project for geo-data. It has rich information on points of interest (POIs), such as apartments, shops, or offices, globally. You can truly call it a data treasure. Also, the big players, like Apple, Amazon, and Facebook, realized that and built heavily upon this global community project (Visual by: Jennings Anderson State of the Map)
To also enable smaller data science teams to benefit from this resource, we created an ETL pipeline to directly download the raw OSM data in the .pbf file format and store the transformed objects in a database. Leveraging Uber’s H3 indexing, you can then easily query those objects with radius or polygon queries.
In the remaining parts of this article, I will show you how we approached the transformation of the raw data using the power of streams in Node.js. All of the code snippets are open-source, just as everything else at Kuwala. You can find it on our GitHub.
Streams in Node.js are an elegant way to process big files while not using a lot of memory. The reason is that you don’t need to load the entire file into the memory to perform transformations on or work with the data, but it packs them in batches. That way, even on files with millions of rows, the memory consumption stays consistently low. Of course, you need to write the transformed data directly to a database or file system or perform requests based on the data in the batch and not store everything in intermediate in-memory objects.
We can already use streams when downloading the OSM files since the file size may be several gigabytes big depending on the region you want to download. You first need to create a write stream that writes the data to a specified file path. You then pipe it to the data stream of the request-response object and continue when the stream is finished.
async function downloadFile() {
    const downloadUrl = await pickFile();

    if (!downloadUrl) {
        throw new Error('No file selected');
    }

    const filePath = `tmp/osmFiles${downloadUrl.split(baseUrl)[1]}`;

    await fse.ensureDir(filePath.split(filePath.split('/').pop())[0]);

    const writer = fs.createWriteStream(filePath);
    const response = await axios.get(downloadUrl, {
        method: 'GET',
        responseType: 'stream'
    });

    response.data.pipe(str).pipe(writer);

    return new Promise((resolve, reject) => {
        writer.on('finish', () => resolve(filePath));
        writer.on('error', reject);
    });
}
Now that we downloaded the file, let’s parse it into handleable objects and
write them to a database. First, we create a read stream that reads the
file. For parsing the OSM protocol buffer into JSON objects, we use the
osm-pbf-parser npm package. With the pipe command, you can connect a writable stream to a readable stream. There is a particular class, the Transform stream, which implements both the Readable and Writable interface. Using that, we can break the entire filtering and transformation down into semantic streams and pipe them together.
After parsing the pbf format to JSON, we want to create more concise objects based on the OSM tags, which is just a list of key-value pairs. The objects in the OSM file are ordered by type. First nodes, then ways, and lastly relations. Which makes a lot of sense since ways consist of nodes and relations consist of ways. So with our first transformer, we store all the nodes in a temporary database. We use LevelDB here, a fast key-value store developed by Google. Secondly, we filter all objects that do not contain relevant tags for classifying them as a POI, e.g., nodes that only exist to represent a point in a building footprint. Thirdly, we create building footprints for ways and relations, so the outline of a building, for example. In the OSM objects, only the references to the nodes that form the building
footprint are stored. We retrieve the relevant nodes from the LevelDB
and generate the geometries in GeoJSON format. Next, we get the centroid
based on these geometries, which we then transform to H3 indexes with
another Transformer.
async function parseFile(file) {
    return new Promise(async (resolve, reject) => {
        const levelDbPath = 'tmp/levelDb';

        try {
            const db = await level(levelDbPath);
            const pbfParserStream = parseOSM();
            const stream = fs.createReadStream(file);

            stream
                .pipe(pbfParserStream) // Parse pbf format to JSON objects
                .pipe(levelDbStream(db)) // Store every node in temporary database to generate building footprints
                .pipe(tagFilterStream) // Filter irrelevant items
                .pipe(buildingFootprintStream(db)) // Construct building footprints for ways and relations
                .pipe(locationStream) // Get lat lng pair for center based on point or building footprint
                .pipe(h3Stream) // Convert location to h3 index
                .pipe(tagStream) // Convert relevant tags into categories and properties
                .pipe(modelStream) // Create model objects
                .pipe(mongodbStream(bar)) // Save objects to mongodb
                .on('finish', async () => {
                    await Poi.insertMany(pois).catch(reject);

                    addedObjects += pois.length;

                    await db.close();
                    rimraf.sync(levelDbPath);
                    addUnmatchedTags(); // Add unmatched category tags to category list under "misc"
                    resolve();
                })
                .on('error', (error) => {
                    rimraf.sync(levelDbPath); // Delete temporary database
                    reject(error);
                });
        } catch (error) {
            rimraf.sync(levelDbPath); // Delete temporary database
            reject(error);
        }
    });
}
H3 is a spatial index developed by Uber, which is a great way to connect several geospatial data sources because you can quickly traverse from one resolution to another. We aggregate tags relevant to POI properties such as categories, addresses, or contact details in our next pipe. Lastly, the transformed objects are wrapped in Mongoose objects to validate the schema and so that we can write them to the database in the last pipe.
Having passed these transformations, we now have handy objects that you could either load into your data warehouse or query over an API with, for example, radius and geometry queries.
At Kuwala, we want to unify geospatial data and make them easily integratable and connectable. To have a significant impact, we are entirely open-source and believe in the power of the community. If you work with data, make sure to star us on GitHub and join our Slack community.

Written by kuwala | Kuwala is an Open Source No Code Data Platform that reduces the friction between BI Analysts and Engineers.
Published by HackerNoon on 2021/06/23