Declarative Engineering: Using Terraform to Code Your Data Pipelines

Written by andreitserakhau | Published 2024/01/18
Tech Story Tags: clickhouse | terraform | postgresql | opensource | datapipeline | modern-data-stack | data-engineering | big-data

TLDRA small modern data stack that ETLs data from a PostgreSQL database into a ClickHouse database.via the TL;DR App

Data is widely considered the lifeblood of an organization; however, it is worthless—and useless—in its raw form. Help is needed to turn this data into life-giving information. Part of this help is to move this data from its source to destination via data pipelines. But managing the end-to-end data pipeline lifecycle is not easy: ones are challenging to scale on demand, others can result in delays in detecting and addressing issues or be difficult to control. These and the other problems may be solved in the declarative programming paradigm, and in this article we will see it on Terraform.

Where Does This Data Come From?

The data in these extensive datasets come from multiple disparate sources and are typically loaded into datasets in a data storage like ClickHouse via data pipelines. These pipelines play a critical role in this process by efficiently moving, transforming, and processing the data.

For instance, imagine you own a data analytics startup that provides organizations with real-time data analytics. Your client owns an eBike hire startup that wants to integrate weather data into its existing data to inform its marketing messaging strategies. Undoubtedly, your data engineers will set up many different data pipelines, pulling data from many sources. The two we are concerned with are designed to extract data from your client’s CRM and the local weather station.

The Layers of the Modern Data Stack

A data stack is a collection of different technologies, logically stacked on top of each other, that provides end-to-end data processing capabilities.

As described here, a data stack can have many different layers. The modern data stack consists of ingestion (data pipelines), storage (OLAP database), and business intelligence (data analytics) layers. Lastly, there is an orchestration layer—for example, Kubernetes or Docker Compose—that sits on top of these layers, orchestrating everything together so that the stack does what it is supposed to do; ergo, ingest raw data and egest advanced predictive analytics.

Data Pipelines are typically structured either as Extract, Transform, Load (ETL) or Extract, Load Transform (ELT) pipelines. However, developing, monitoring, and managing these data pipelines, especially in international enterprises with many branches worldwide, can be highly challenging, especially if the end-to-end data pipeline monitoring process is manual.

Developing Data Pipelines, the Traditional Way

Data pipelines are broadly categorized into two groups: code-based and deployed in a Direct Acyclic Graph or DAG-like tools such as Airflow and Apache Fink or non-code-based, which are  often developed in a SaaS-based application via a drag-and-drop UI. Both have their own problems.

Code-based data pipelines are challenging to scale on demand. The development/test/release cycle is typically too long, especially in critical environments with thousands of pipelines, loading petabytes of data into data stores daily. Additionally, they are typically manually implemented, deployed, and monitored by human data engineers, allowing the risk of errors creeping in and significant downtime.

With non-code-based data pipelines engineers don’t have access to the backend code, there can be limited visibility and monitoring, resulting in delays in detecting and addressing issues and failures. Moreover, it is challenging to reproduce errors, as making an exact copy of the pipeline is impossible. Lastly, version control using a tool like Git is complex. It is harder to control the evolution of a pipeline as the code is not stored in a repository such as a GitHub repo.

The Solution: Improving Your Data Pipelines with Declarative, Reproducible, Modular Engineering

The good news is that there is a sustainable solution to these challenges.

It is important to remind ourselves of the difference between imperative and declarative programming. In imperative programming, you control how things happen. In declarative programming, you express the logic without specifying the control flow.

The answer to the end-to-end data pipeline development, deployment, monitoring, and maintenance challenges is to utilize the declarative programming paradigm, that is, abstracting the computation logic. In practice, this is best achieved by using Terraform by HashiCorp — an open-source project that is an infrastructure-as-code development tool.

Because modern world data-intensive application architecture is very similar to the data stack the applications run on, Terraform provides the framework that will make your data stack declarative, reproducible, and modular.

Let’s return to our data analytics startup example described above to elucidate these concepts further. Imagine your data analytics app is a web application that executes inside Terraform. As expected, the most significant part of the application is the ClickHouse database.

We must build two data pipelines to ETL data from the CRM system—stored in a PostgreSQL database—into the ClickHouse database, transforming and enriching the data before loading it into ClickHouse. The second pipeline extracts real-time weather data from the local weather service—through an API and transforms it—ensuring there are no errors and enriching it—before loading it into ClickHouse.

Using Terraform to Code the Data Pipelines

Classic infrastructure for modern cloud-based applications looks like:

This is an oversimplification, but a more or less solid way of building applications.

You have a public subnet with a user facing API / UI, which is connected to a private subnet where data is stored.

To simplify things, let’s look at how to use Terraform to build the first data pipeline—to ETL data from your RDBMS (in this case PostgreSQL) system into the ClickHouse database.

This diagram visualizes the data pipeline that ELTs the data from the RDBMS database into the ClickHouse database, and then exposed as a connection into Visualization service.

This workflow must be reproducible as your company’s information technology architecture has at least three environments: dev, QA, and prod.

Enter Terraform.

Let’s look at how to implement this data pipeline in Terraform:

It is best practice to develop modular code, distilled down to the smallest unit, which can be reused many times.

The first step is to start with a main.tf file, containing nothing more than the provider’s definition.

provider "doublecloud" {
  endpoint       = "api.double.cloud:443"
  authorized_key = file(var.dc-token)
}
provider "aws" {
  profile = var.profile
}


The second step is to create a BYOA netwok:

data "aws_caller_identity" "self" {}

data "aws_region" "self" {}

# Prepare BYOC VPC and IAM Role
module "doublecloud_byoc" {
  source  = "doublecloud/doublecloud-byoc/aws"
  version = "1.0.2"
  providers = {
    aws = aws
  }
  ipv4_cidr = var.vpc_cidr_block
}

# Create VPC to peer with
resource "aws_vpc" "peered" {
  cidr_block                       = var.dwh_ipv4_cidr
  provider                         = aws
}

# Get account ID to peer with
data "aws_caller_identity" "peered" {
  provider = aws
}

# Create DoubleCloud BYOC Network
resource "doublecloud_network" "aws" {
  project_id = var.dc_project_id
  name       = "alpha-network"
  region_id  = module.doublecloud_byoc.region_id
  cloud_type = "aws"
  aws = {
    vpc_id          = module.doublecloud_byoc.vpc_id
    account_id      = module.doublecloud_byoc.account_id
    iam_role_arn    = module.doublecloud_byoc.iam_role_arn
    private_subnets = true
  }
}

# Create VPC Peering from DoubleCloud Network to AWS VPC
resource "doublecloud_network_connection" "example" {
  network_id = doublecloud_network.aws.id
  aws = {
    peering = {
      vpc_id          = aws_vpc.peered.id
      account_id      = data.aws_caller_identity.peered.account_id
      region_id       = var.aws_region
      ipv4_cidr_block = aws_vpc.peered.cidr_block
      ipv6_cidr_block = aws_vpc.peered.ipv6_cidr_block
    }
  }
}

# Accept Peering Request on AWS side
resource "aws_vpc_peering_connection_accepter" "own" {
  provider                  = aws
  vpc_peering_connection_id = time_sleep.avoid_aws_race.triggers["peering_connection_id"]
  auto_accept               = true
}

# Confirm Peering creation
resource "doublecloud_network_connection_accepter" "accept" {
  id = doublecloud_network_connection.example.id

  depends_on = [
    aws_vpc_peering_connection_accepter.own,
  ]
}

# Create ipv4 routes to DoubleCloud Network
resource "aws_route" "ipv4" {
  provider                  = aws
  route_table_id            = aws_vpc.peered.main_route_table_id
  destination_cidr_block    = doublecloud_network_connection.example.aws.peering.managed_ipv4_cidr_block
  vpc_peering_connection_id = time_sleep.avoid_aws_race.triggers["peering_connection_id"]
}

# Sleep to avoid AWS InvalidVpcPeeringConnectionID.NotFound error
resource "time_sleep" "avoid_aws_race" {
  create_duration = "30s"

  triggers = {
    peering_connection_id = doublecloud_network_connection.example.aws.peering.peering_connection_id
  }
}


This is preparing our Stage for later clusters. Architecture after terraform apply looks follows:

Once we prepare a stage, we can create a cluster in this private-subnet:

resource "doublecloud_clickhouse_cluster" "alpha-clickhouse" {
  project_id = var.dc_project_id
  name       = "alpha-clickhouse"
  region_id  = var.aws_region
  cloud_type = "aws"
  network_id = doublecloud_network.aws.id

  resources {
    clickhouse {
      resource_preset_id = "s1-c2-m4"
      disk_size          = 34359738368
      replica_count      = 1
    }
  }

  config {
    log_level       = "LOG_LEVEL_TRACE"
    max_connections = 120
  }

  access {
    ipv4_cidr_blocks = [
      {
        value       = doublecloud_network.aws.ipv4_cidr_block
        description = "DC Network interconnection"
      },
      {
        value       = aws_vpc.tutorial_vpc.cidr_block
        description = "Peered VPC"
      },
      {
        value       = "${var.my_ip}/32"
        description = "My IP"
      }
    ]
    ipv6_cidr_blocks = [
      {
        value       = "${var.my_ipv6}/128"
        description = "My IPv6"
      }
    ]
  }
}


This is added to our Stage for simple Clickhouse Cluster.

But this cluster still empty, so we must enable transfer between PostgreSQL and ClickHouse:

resource "doublecloud_transfer_endpoint" "pg-source" {
  name       = "chinook-pg-source"
  project_id = var.dc_project_id
  settings {
    postgres_source {
      connection {
        on_premise {
          tls_mode {
            ca_certificate = file("global-bundle.pem")
          }
          hosts = [
            aws_db_instance.tutorial_database.address
          ]
          port = 5432
        }
      }
      database = aws_db_instance.tutorial_database.db_name
      user     = aws_db_instance.tutorial_database.username
      password = var.db_password
    }
  }
}

data "doublecloud_clickhouse" "dwh" {
  name       = doublecloud_clickhouse_cluster.alpha-clickhouse.name
  project_id = var.dc_project_id
}

resource "doublecloud_transfer_endpoint" "dwh-target" {
  name       = "alpha-clickhouse-target"
  project_id = var.dc_project_id
  settings {
    clickhouse_target {
      connection {
        address {
          cluster_id = doublecloud_clickhouse_cluster.alpha-clickhouse.id
        }
        database = "default"
        user     = data.doublecloud_clickhouse.dwh.connection_info.user
        password = data.doublecloud_clickhouse.dwh.connection_info.password
      }
    }
  }
}

resource "doublecloud_transfer" "pg2ch" {
  name       = "postgres-to-clickhouse-snapshot"
  project_id = var.dc_project_id
  source     = doublecloud_transfer_endpoint.pg-source.id
  target     = doublecloud_transfer_endpoint.dwh-target.id
  type       = "SNAPSHOT_ONLY"
  activated  = false
}

This is adding a ELT snapshot process between PostgreSQL and ClickHouse.

As a last piece we can add a connection to this newly created cluster to visualization service:

resource "doublecloud_workbook" "dwh-viewer" {
  project_id = var.dc_project_id
  title      = "DWH Viewer"

  config = jsonencode({
    "datasets" : [],
    "charts" : [],
    "dashboards" : []
  })

  connect {
    name = "main"
    config = jsonencode({
      kind          = "clickhouse"
      cache_ttl_sec = 600
      host          = data.doublecloud_clickhouse.dwh.connection_info.host
      port          = 8443
      username      = data.doublecloud_clickhouse.dwh.connection_info.user
      secure        = true
      raw_sql_level = "off"
    })
    secret = data.doublecloud_clickhouse.dwh.connection_info.password
  }
}

As you can see, most of the code consists of variables. Therefore, executing this code to set up the different environments is easy by adding a stage_name.tfvars file and running terraform apply with it:

See the Terraform documentation for more information on .tvars files.

// This variable is to set the
// AWS region that everything will be
// created in
variable "aws_region" {
  default = "eu-west-2" // london
}

// This variable is to set the
// CIDR block for the VPC
variable "vpc_cidr_block" {
  description = "CIDR block for VPC"
  type        = string
  default     = "10.0.0.0/16"
}

// This variable holds the
// number of public and private subnets
variable "subnet_count" {
  description = "Number of subnets"
  type        = map(number)
  default = {
    public  = 1,
    private = 2
  }
}

// This variable contains the configuration
// settings for the EC2 and RDS instances
variable "settings" {
  description = "Configuration settings"
  type        = map(any)
  default = {
    "database" = {
      allocated_storage   = 10            // storage in gigabytes
      engine              = "postgres"    // engine type
      engine_version      = "15.4"        // engine version
      instance_class      = "db.t3.micro" // rds instance type
      db_name             = "chinook"     // database name
      identifier           = "chinook"     // database identifier
      skip_final_snapshot = true
    },
    "web_app" = {
      count         = 1          // the number of EC2 instances
      instance_type = "t3.micro" // the EC2 instance
    }
  }
}

// This variable contains the CIDR blocks for
// the public subnet. I have only included 4
// for this tutorial, but if you need more you
// would add them here
variable "public_subnet_cidr_blocks" {
  description = "Available CIDR blocks for public subnets"
  type        = list(string)
  default = [
    "10.0.1.0/24",
    "10.0.2.0/24",
    "10.0.3.0/24",
    "10.0.4.0/24"
  ]
}

// This variable contains the CIDR blocks for
// the public subnet. I have only included 4
// for this tutorial, but if you need more you
// would add them here
variable "private_subnet_cidr_blocks" {
  description = "Available CIDR blocks for private subnets"
  type        = list(string)
  default = [
    "10.0.101.0/24",
    "10.0.102.0/24",
    "10.0.103.0/24",
    "10.0.104.0/24",
  ]
}

// This variable contains your IP address. This
// is used when setting up the SSH rule on the
// web security group
variable "my_ip" {
  description = "Your IP address"
  type        = string
  sensitive   = true
}
// This variable contains your IP address. This
// is used when setting up the SSH rule on the
// web security group
variable "my_ipv6" {
  description = "Your IPv6 address"
  type        = string
  sensitive   = true
}

// This variable contains the database master user
// We will be storing this in a secrets file
variable "db_username" {
  description = "Database master user"
  type        = string
  sensitive   = true
}

// This variable contains the database master password
// We will be storing this in a secrets file
variable "db_password" {
  description = "Database master user password"
  type        = string
  sensitive   = true
}

// Stage 2 Variables

variable "dwh_ipv4_cidr" {
  type        = string
  description = "CIDR of a used vpc"
  default     = "172.16.0.0/16"
}
variable "dc_project_id" {
  type        = string
  description = "ID of the DoubleCloud project in which to create resources"
}

In Conclusion…

There you have it. A small modern data stack that ETLs data from a PostgreSQL database into a ClickHouse database.

The best news is that you can set up multiple data pipelines—as many as you need—using the code in this example. There is also a complete example that you can find here, with additional features like adding a peer connection to your existing VPC and creating a sample ClickHouse database with a replication transfer between the two databases.

Have fun playing with your small modern data stack!


This article was also published on DZone.


Written by andreitserakhau | Engineer with over 10 years of experience in IT. Tech Lead @DoubleCloud.
Published by HackerNoon on 2024/01/18