How To Create a Python Data Engineering Project with a Pipeline Pattern

Written by sureshdsk | Published 2021/02/15
Tech Story Tags: python | nlp | data-engineering | datapipeline | data-science | python-tips | clean-code | coding-skills

TLDR In this article, we cover how to use pipeline patterns in python data engineering projects. The functional pipeline is a design pattern mostly used in the functional programming paradigm. Each step can be thought of as a filter operation that transforms the data in some way. This pattern is most suitable for map, filter and reduces operations. It also provides a clean, readable and more sustainable code in data engineering. For example, let's take an input text which has to go through a series of transformations, remove_spaces. Remove white spaces Remove special characters Lowercase all letters and finally produces output.via the TL;DR App

In this article, we cover how to use pipeline patterns in python data engineering projects. Here are the steps:
  1. Functional pipeline
  2. fastcore
  3. Install fastcore
  4. Creating pipeline using fastcore
  5. Dynamic pipeline using fastcore
Let's get into it!

Functional pipeline

The functional pipeline is a design pattern mostly used in the functional programming paradigm, where data flows through a sequence of stages and the output of the previous stage is the input of the next. Each step can be thought of as a filter operation that transforms the data in some way.
This pattern is most suitable for map, filter and reduces operations. It also provides a clean, readable and more sustainable code in data engineering projects.
For example, let's take an input text which has to go through a series of transformations,
  1. Remove white spaces
  2. Remove special characters
  3. Lowercase all letters
  4. and finally produces output.
These pipeline functions are simplified to demonstrate the use case, In a real-life scenario, it would be a lot more complex.
Let's create the simple transformation functions.
#pipeline_functions.py

import re


def remove_spaces(string):
    output = string.replace(' ', '')
    print(f"""{remove_spaces.__name__}() ==> {output}""")
    return output


def remove_special_chars(string):
    output = re.sub("[^A-Za-z0-9]", "", string)
    print(f"""{remove_special_chars.__name__}() ==> {output}""")
    return output


def lowercase(string):
    output = string.lower()
    print(f"""{lowercase.__name__}() ==> {output}""")
    return output

fastcore

fastcore is a utility that has a lot of python goodies to make coding faster, easier, and more maintainable. It borrows some ideas from other languages like Julia, Ruby, and Haskell. It also adds functional programming patterns, simplified parallel processing, and a lot more.
For our pipeline implementation, we will be using fastcore transform module.
Do checkout -> https://fastcore.fast.ai/

Install fastcore

$ pip install fastcore

Creating pipeline using fastcore

#main.py

from fastcore.transform import Pipeline
from pipeline_functions import remove_spaces, remove_special_chars, lowercase


def main(input_string):
    # Creates a pipeline with a list of functions
    pipe = Pipeline([remove_spaces, remove_special_chars, lowercase])
    
    # Invokes pipeline
    output = pipe(input_string)

    print(f"""output ==> {output}""")


if __name__ == '__main__':
    text = input("Enter input string: ")
    main(text)

Run the program

$ python main.py
Enter input string: Hello World!
remove_spaces() ==> HelloWorld!
remove_special_chars() ==> HelloWorld
lowercase() ==> helloworld
output ==> helloworld
As you can see that entered input text gets passed through the pipeline from left to right order and manipulates input text in each step and returns the final output. We can even go one step further to have more dynamic pipeline functions by getting pipeline functions at runtime. So this list of functions can also be serialized and persisted for later use.

Dynamic pipeline

#main_dynamic.py

import sys
from fastcore.transform import Pipeline
from pipeline_functions import remove_spaces, remove_special_chars, lowercase


def main(input_string, pipe_funcs):
    # Creates a pipeline with a list of functions using using globals()
    pipe = Pipeline([globals()[func] for func in pipe_funcs])

    # Invokes pipeline
    output = pipe(input_string)

    print(f"""output ==> {output}""")


if __name__ == '__main__':
    text = input("Enter input string: ")
    funcs = sys.argv[1:]
    main(text, funcs)

Run the program

$ python main_dynamic.py remove_spaces lowercase 
Enter input string: Hello World 123$    
remove_spaces() ==> HelloWorld123$
lowercase() ==> helloworld123$
output ==> helloworld123$

Conclusion

So the pipeline pattern implementation in data engineering components makes it easier to write complex data processing operations. And fastcore utility makes it even better.
If you have found this tutorial helpful, or have any suggestions do let me know!

Written by sureshdsk | Software Architect | Pythonista 🔥
Published by HackerNoon on 2021/02/15