How to Understand Your Data in Real-Time Using bytewax and ydata-profiling

Written by ydata | Published 2023/07/25
Tech Story Tags: data-science | data | datascience | datastreaming | machine-learning-tutorials | data-analysis | data-stream | good-company | hackernoon-es | hackernoon-hi | hackernoon-zh | hackernoon-vi | hackernoon-fr | hackernoon-pt | hackernoon-ja

TLDRJust an awesome step-by-step tutorial on how perform data profiling on data streams 🚀via the TL;DR App

In this blog post, we will be covering how you can combine and leverage the open-source streaming solution, bytewax, with ydata-profiling, to improve the quality of your streaming flows. Buckle up!

Stream processing enables real-time analysis of data in-flight and before storage and can be stateful or stateless.

Stateful stream processing is used for real-time recommendations, pattern detection, or complex event processing, where the history of what has happened is required for the processing (windows, joining by a key, etc.).

Stateless stream processing is used for inline transformation that doesn’t require knowledge of other data points in the stream like masking an email or converting a type.

Overall, data streams are widely used in the industry and can be found applied to use cases such as fraud detectionpatient monitoring, or event predictive maintenance.

One Crucial Aspect That All Data Streams Must Consider Is the Quality of the Data

Unlike traditional models where data quality is usually assessed during the creation of the data warehouse or dashboard solution, streaming data requires continuous monitoring.

It is essential to maintain data quality throughout the entire process, from collection to feeding downstream applications. After all, the cost of bad data quality can be high for organizations:

“The cost of bad data is an astonishing 15% to 25% of revenue for most companies. (…) Two-thirds of these costs can be eliminated by getting in front on data quality.”

— Thomas C. Redman, author of “Getting in Front on Data Quality”

Throughout this article, we will show you how you can combine bytewa with ydata-profiling to profile and improve the quality of your streaming flows!

Stream Processing for Data Professionals With Bytewax

Bytewax is an OSS stream processing framework designed specifically for Python developers.

It allows users to build streaming data pipelines and real-time applications with capabilities similar to Flink, Spark, and Kafka Streams while providing a friendly and familiar interface and 100% compatibility with the Python ecosystem.

Using built-in connectors or existing Python libraries, you can connect to real-time and streaming data sources (Kafka, RedPanda, WebSocket, etc.) and write transformed data out to various downstream systems (Kafka, parquet files, data lakes, etc.).

For the transformations, Bytewax facilitates stateful and stateless transformations with mapwindowing, and aggregation methods and comes with familiar features such as recovery and scalability.

Bytewax facilitates a Python-first and data-centric experience to data streams and is purposely built for data engineers and data scientists.

It allows users to build streaming data pipelines and real-time applications and create customizations necessary to meet their needs without having to learn and maintain JVM-based streaming platforms like Spark or Flink.

Bytewax is well suited for many use cases, namely, Embedding Pipelines For Generative AIHandling Missing Values in Data StreamsUsing Language Models in a Streaming Context to Understand Financial Markets, and more.

For use case inspiration and more information like documentation, tutorials, and guides, feel free to check the bytewax website.

Why Data Profiling for Data Streams?

Data Profiling is key to a successful start of any machine learning task and refers to the step of thoroughly understanding our data: its structure, behavior, and quality.

In a nutshell, data profiling involves analyzing aspects related to the data’s format and basic descriptors (e.g., number of samples, number/types of features, duplicate values), its intrinsic characteristics (such as the presence of missing data or imbalanced features), and other complicating factors that may arise during data collection or processing (e.g., erroneous values or inconsistent features).

Ensuring high data quality standards is crucial for all domains and organizations, but is especially relevant for domains operating with domains outputting continuous data, where circumstances might change fast and may require immediate action (e.g., healthcare monitoring, stock values, air quality policies).

For many domains, data profiling is used from an exploratory data analysis lens, considering historical data stored in databases. On the contrary, for data streams, data profiling becomes essential for validation and quality control continuously along the stream, where data needs to be checked at different time frames or stages of the process.

By embedding an automated profiling into our data flows, we can immediately get feedback on the current state of our data and be alerted for any potentially critical issues — whether they are related to data consistency and integrity (e.g., corrupted values or changing formats), or to events happening in short periods of time (e.g., data drifts, deviation from business rules and outcomes).

In real-world domains — where you just know Murphy’s law is bound to strike and “everything can definitely go wrong” — automated profiling might save us from multiple brain puzzles and systems needing to be taken out of production!

In what concerns data profiling, ydata-profiling has consistently been a crowd favorite, either for tabular or time-series data. And no wonder why — it’s one line of code for an extensive set of analysis and insights.

Complex and time-draining operations are done under the hood: ydata-profiling automatically detects the feature types comprised in the data, and depending on the feature types (either numeric or categorical), it adjusts the summary statistics and visualizations that are shown in the profiling report.

Fostering a data-centric analysis, the package also highlights the existing relationships between features, focusing on their pairwise interactions and correlations, and provides a thorough evaluation of data quality alerts, from duplicate or constant values to skewed and imbalanced features.

It’s really a 360º view of the quality of our data — with minimal effort.

Putting It All Together: bytewax and ydata-profiling

Before starting the project, we need to first set our Python dependencies and configure our data source.

First, let’s install the bytewax and ydata-profiling packages (You might want to use a virtual environment for this — check these instructions if you need some extra guidance!)

pip install bytewax==0.16.2 ydata-profiling==4.3.1

Then, we’ll upload the Environmental Sensor Telemetry Dataset (License — CC0: Public Domain), which contains several measurements of temperature, humidity, carbon monoxide liquid petroleum gas, smoke, light, and motion from different IoT devices:

wget https://raw.githubusercontent.com/bytewax/air-quality-sensor/main/data/iot_telemetry_data_1000

In a production environment, these measurements would be continuously generated by each device, and the input would look like what we expect in a streaming platform such as Kafka. In this article, to simulate the context we would find with streaming data, we will read the data from the CSV file one line at a time and create a dataflow using bytewax.

(As a quick side note, a dataflow is essentially a data pipeline that can be described as a directed acyclic graph — DAG)

First, let’s make some necessary imports:

from datetime import datetime, timedelta, timezone

from bytewax.dataflow import Dataflow
from bytewax.connectors.stdio import StdOutput
from bytewax.connectors.files import CSVInput
from bytewax.testing import run_main

Then, we define our dataflow object. Afterward, we will use a stateless map method where we pass in a function to convert the string to a DateTime object and restructure the data to the format (device_id, data).


The map method will make the change to each data point in a stateless way. The reason we have modified the shape of our data is so that we can easily group the data in the next steps to profile data for each device separately rather than for all of the devices simultaneously.

flow = Dataflow()
flow.input("simulated_stream", CSVInput("/content/iot_telemetry_data_1000"))

# parse timestamp
def parse_time(reading_data):
    reading_data["ts"] = datetime.fromtimestamp(float(reading_data["ts"]), timezone.utc)
    return reading_data

flow.map(parse_time)


# remap format to tuple (device_id, reading_data)
flow.map(lambda reading_data: (reading_data['device'], reading_data))

Now, we will take advantage of the stateful capabilities of bytewax to gather data for each device over the duration of time that we have defined. ydata-profiling expects a snapshot of the data over time, which makes the window operator the perfect method to use to do this.

In ydata-profiling, we are able to produce summarizing statistics for a dataframe that is specified for a particular context. For instance, in our example, we can produce snapshots of data referring to each IoT device or to particular time frames:

from bytewax.window import EventClockConfig, TumblingWindow

# This is the accumulator function, and outputs a list of readings
def acc_values(acc, reading):
    acc.append(reading)
    return acc


# This function instructs the event clock on how to retrieve the
# event's datetime from the input.
def get_time(reading):
    return reading["ts"]

  
# Configure the `fold_window` operator to use the event time.
cc = EventClockConfig(get_time, wait_for_system_duration=timedelta(seconds=30))

# And a tumbling window
align_to = datetime(2020, 1, 1, tzinfo=timezone.utc)
wc = TumblingWindow(align_to=align_to, length=timedelta(hours=1))

flow.fold_window("running_average", cc, wc, list, acc_values)

flow.inspect(print)

After the snapshots are defined, leveraging ydata-profiling is as simple as calling the ProfileReport for each of the dataframes we would like to analyze:

import pandas as pd
from ydata_profiling import ProfileReport

def profile(device_id__readings):
    print(device_id__readings)
    device_id, readings = device_id__readings
    start_time = readings[0]['ts'].replace(minute=0, second=0, microsecond=0).strftime('%Y-%m-%d %H:%M:%S')
    df = pd.DataFrame(readings)
    profile = ProfileReport(
        df,
        tsmode=True,
        sortby="ts",
        title=f"Sensor Readings - device: {device_id}"
    )

    profile.to_file(f"Ts_Profile_{device_id}-{start_time}.html")
    return f"device {device_id} profiled at hour {start_time}"


flow.map(profile)

In this example, we are writing the images out to local files as part of a function in a map method. These could be reported out via a messaging tool or we could save them to some remote storage in the future.

Once the profile is complete, the dataflow expects some output so we can use the built-in StdOutput to print the device that was profiled and the time it was profiled at that was passed out of the profile function in the map step:

flow.output("out", StdOutput())

There are multiple ways to execute Bytewax dataflows. In this example, we use the same local machine, but Bytewax can also run on multiple Python processes, across multiple hosts, in a Docker container, using a Kubernetes cluster, and more.

In this article, we’ll continue with a local setup, but we encourage you to check our helper tool waxctl which manages Kubernetes dataflow deployments once your pipeline is ready to transition to production.

Assuming we are in the same directory as the file with the dataflow definition, we can run it using:

python -m bytewax.run ydata-profiling-streaming:flow

We can then use the profiling reports to validate the data quality, check for changes in schemas or data formats, and compare the data characteristics between different devices or time windows.

In fact, we can leverage the comparison report functionality that highlights the differences between two data profiles in a straightforward manner, making it easier for us to detect important patterns that need to be investigated or issues that have to be addressed:

snapshot_a_report = ProfileReport(df_a, title="Snapshot A")
snapshot_b_report = ProfileReport(df_b, title="Snapshot B")

comparison_report =snapshot_a_report(snapshot_b_report)
comparison_report.to_file("comparison_report.html")

Ready to Explore Your Own Data Streams?

Validating data streams is crucial to identify issues in data quality in a continuous manner and compare the state of data across distinct periods of time.

For organizations in healthcareenergymanufacturing, and entertainment — all working with continuous streams of data — automated profiling is key to establishing data governance best practices, from quality assessment to data privacy.

This requires the analysis of snapshots of data which, as showcased in this article, can be achieved in a seamless way by combining bytewax and ydata-profiling.

Bytewax takes care of all the processes necessary to handle and structure data streams into snapshots, which can then be summarized and compared with ydata-profiling through a comprehensive report of data characteristics.

Being able to appropriately process and profile incoming data opens up a plethora of use cases across different domains, from the correction of errors in data schemas and formats to the highlighting and mitigation of additional issues that derive from real-world activities, such as anomaly detection (e.g., fraud or intrusion/threats detection), equipment malfunction, and other events that deviate from the expectations (e.g., data drifts or misalignment with business rules).

Now, you’re all set to start exploring your data streams! Let us know what other use cases you find, and as always, feel free to drop us a line in the comments, or find us at the Data-Centric AI Community for further questions and suggestions! See you there!

Acknowledgments

This article was written by Fabiana Clemente (Co-founder & CDO @ YData) and Miriam Santos (Developer Relations @ YData) -- developing ydata-profiling -- and Zander Matheson (CEO & Founder @ Bytewax) and Oli Makhasoeva (Developer Relations @ Byetwax) -- developing bytewax.

You may find additional information about the OSS packages in the respective documentations: ydata-profiling docs & bytewax docs.


Also published here


Written by ydata | YData offers a data development platform for Data Scientists focused on the quality of the data.
Published by HackerNoon on 2023/07/25