Unlocking Real-Time Insights: Building a Stream Processing Solution with Apache Beam, Google Cloud Dataflow, and Terraform

Aekanun Thongtae
8 min readSep 12, 2023

--

1. Introduction

In an era where immediate insights are pivotal to a business’s success, leveraging the strengths of Apache Beam, Google Cloud Dataflow, and Terraform could be a game-changer. This piece delineates the path to constructing a reliable and scalable data pipeline using this powerful trio.

1.1 Apache Beam: The Cornerstone of Flexible and Portable Solutions

  • Unified Model: A common ground facilitating both batch and stream processing, thereby catering to a broader spectrum of data processing needs.
  • SDKs: Amplifying functionalities, it offers seamless integration through various supported SDKs, including Python.

1.2 Google Cloud Dataflow: The Epitome of Efficient Data Processing

  • Serverless: Doing away with the grueling task of manual server management, it stands as a fully managed service, redirecting your focus on developmental tasks.
  • Auto-scaling: A self-regulatory system adapting resources dynamically according to the workload, it promises enhanced efficiency.

1.3 Terraform: Revolutionizing Infrastructure Management

  • Infrastructure as Code (IaC): A tool ensuring simplified and uniform infrastructure setup, thereby minimizing potential errors that manual configurations often entail.
  • Multi-cloud Strategy: Supporting multiple service providers, it fosters a versatile and strategic cloud deployment landscape, encouraging unified cloud management.

2. Setting Up the Environment

We will work through the setup process step by step, focusing on building a real-time data processing pipeline. The essential elements are:

  • Python Scripts: These would include main_pipeline.py, cleanse_data_module.py, among others, helping to define and cleanse the data as it flows through the pipeline.
  • Terraform Configuration: Through main.tf, we define and manage the Google Cloud resources, orchestrating the entire dataflow seamlessly.
  • Docker Setup: Necessary to create a controlled and replicable environment for our deployment with all the requisite tools installed.
In this reference architecture, data moves seamlessly from the source through Pub/Sub and Dataflow, before arriving at BigQuery for super-fast SQL analytics.

In order to set up our environment, we have several components to consider. Let’s look at them one by one:

2.1 Python Scripts

We have two main Python scripts: main_pipeline.py, which contains the main data pipeline, and cleanse_data_module.py, a module that holds a function to clean our data. The scripts utilize Apache Beam to read data from a PubSub topic, clean it, and then write it to a BigQuery table.

main_pipeline.py

import apache_beam as beam
import logging
from your_project_name.cleanse_data_module import cleanse_data # Change to your project name and module

logging.basicConfig(level=logging.INFO)

# Specify your BigQuery project ID and dataset.table name
table_spec = (
'YOUR_PROJECT_ID:'
'YOUR_DATASET_ID.xxx_dfsqltable_sales'
)

# Define the schema for your BigQuery table
schema = (
'tr_time_str:DATETIME, first_name:STRING, last_name:STRING, '
'city:STRING, state:STRING, product:STRING, amount:FLOAT, '
'dayofweek:INTEGER'
)

# List of pipeline arguments; Adjust with your Google Cloud settings
pipeline_args = [
'--project=YOUR_PROJECT_ID', # Change to your GCP project ID
'--runner=DataflowRunner',
'--region=us-central1', # Adjust as per your GCP region
'--staging_location=gs://YOUR_BUCKET_NAME/temp/staging/', # Change to your bucket path
'--temp_location=gs://YOUR_BUCKET_NAME/temp', # Change to your bucket path
'--streaming',
'--setup_file=./setup.py', # Point to your setup file
]

pipeline_options = beam.options.pipeline_options.PipelineOptions(pipeline_args)
p = beam.Pipeline(options=pipeline_options)

(p
| 'Read from PubSub' >> beam.io.ReadFromPubSub(
topic="projects/YOUR_PROJECT_ID/topics/xxx-transactions" # Change to your PubSub topic
)
| 'Cleanse Data' >> beam.Map(cleanse_data) # Referencing the cleansing function
| 'Write to BigQuery' >> beam.io.WriteToBigQuery(
table=table_spec,
schema=schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED
)
)

result = p.run()
result.wait_until_finish()

# Grabbing job ID and region info to save in a file
job_id = result._job.id
region = 'us-central1' # Adjust as per your GCP region

with open("job_info.txt", "w") as file:
file.write(f"{job_id}\n{region}")

cleanse_data_module.py

import json
import logging
from datetime import datetime

def cleanse_data(element):
"""Function to cleanse and transform input data."""

# Convert byte format to string and load as JSON
element = element.decode('utf-8')
element = json.loads(element)

result = {}

# Cleanse string fields
for field in ['tr_time_str', 'first_name', 'last_name', 'city', 'state', 'product']:
if field in element:
result[field] = element[field].strip()

# Handle 'amount' field
if 'amount' in element:
try:
result['amount'] = float(element['amount'])
except ValueError:
result['amount'] = None
logging.error(f"Failed to parse 'amount': {element['amount']}")

# Handle 'tr_time_str' field to get datetime object and 'dayofweek'
if 'tr_time_str' in element:
try:
date_time_obj = datetime.strptime(element['tr_time_str'], "%Y-%m-%d %H:%M:%S")
result['dayofweek'] = date_time_obj.weekday()
result['tr_time_str'] = date_time_obj.strftime("%Y-%m-%d %H:%M:%S")
except ValueError:
result['dayofweek'] = None
logging.error(f"Failed to parse 'tr_time_str': {element['tr_time_str']}")

logging.info(f"Processed element: {result}")
return result

2.2 Terraform Configuration

Our Terraform configuration consists of a main.tf file, which sets up Google Cloud resources such as a storage bucket and a BigQuery dataset, and orchestrates the execution of our main pipeline script.

main.tf

provider "google" {
credentials = file("YOUR_CREDENTIAL_FILE.json") # Replace with your credential file
project = "YOUR_PROJECT_ID" # Replace with your project ID
}

resource "google_storage_bucket" "bucket" {
name = "YOUR_BUCKET_NAME" # Replace with your bucket name
location = "US-CENTRAL1" # Adjust the bucket location as needed
force_destroy = true # Allows the bucket to be destroyed
}

resource "google_bigquery_dataset" "dataset" {
dataset_id = "YOUR_DATASET_ID" # Replace with your dataset ID
location = "US" # Adjust the dataset location as needed
delete_contents_on_destroy = true # Allows the dataset contents to be deleted when the resource is destroyed
# Add other variables as needed for this resource
}

resource "null_resource" "main_pipeline" {
provisioner "local-exec" {
command = "python3 main_pipeline.py" # Executes the main pipeline script
}
depends_on = [google_bigquery_dataset.dataset] # Ensures the dataset is created before the pipeline is run
}

resource "null_resource" "stop_dataflow_job_on_destroy" {
provisioner "local-exec" {
when = destroy
command = "gcloud dataflow jobs cancel $(head -n 1 job_info.txt) --region=$(tail -n 1 job_info.txt)"
# Cancels the Dataflow job when the resource is destroyed, utilizing the job info saved in the `job_info.txt` file
}
depends_on = [null_resource.main_pipeline] # Ensures the main pipeline runs before this resource
}

2.3 Containerization with Docker

In this project, we use Docker to ensure our application runs consistently across all environments. The Dockerfile defines a Python 3.8 environment, installs necessary tools like the Google Cloud SDK and Terraform, and sets up the entry point for our pipeline.

Dockerfile

# Use Python 3.8 as the base image
FROM python:3.8-slim

# Set the working directory
WORKDIR /workspace

# Install Terraform, Google Cloud SDK, and Nano editor
RUN apt-get update -y && \
apt-get install -y wget unzip gcc python3-dev nano && \
wget https://releases.hashicorp.com/terraform/YOUR_TERRAFORM_VERSION/terraform_YOUR_TERRAFORM_VERSION_linux_amd64.zip && \
unzip terraform_YOUR_TERRAFORM_VERSION_linux_amd64.zip -d /usr/local/bin/ && \
rm terraform_YOUR_TERRAFORM_VERSION_linux_amd64.zip && \
wget https://dl.google.com/dl/cloudsdk/release/google-cloud-sdk.tar.gz && \
tar zxvf google-cloud-sdk.tar.gz && \
./google-cloud-sdk/install.sh -q && \
rm google-cloud-sdk.tar.gz && \
apt-get autoremove -y && \
rm -rf /var/lib/apt/lists/*

# Add Google Cloud SDK to the PATH
ENV PATH $PATH:/workspace/google-cloud-sdk/bin

# Copy the entire project to /workspace
COPY . /workspace/

# Install dependencies
RUN pip install --no-cache-dir -e .
RUN pip install --no-cache-dir 'apache-beam[gcp]'==YOUR_APACHE_BEAM_VERSION protobuf==YOUR_PROTOBUF_VERSION

# Set the command to be executed when the container is run
CMD ["python", "main_pipeline.py"]

2.4 Setting Up Python Environment

Before we dive deeper, let’s ensure our Python environment is correctly set up, facilitating a smooth deployment process later on.

2.4.1 Requirements.txt

This file contains all the Python packages required to run your project successfully. In our case, it has the following essential dependencies:

apache-beam==2.50.0
protobuf==4.23.4

Ensure to install them using the following command before proceeding:

pip install -r requirements.txt

2.4.2 Setup.py

The setup.py file is fundamental to package distribution, helping others to easily install and use your package. It contains metadata and other details like name, version, etc. Here is a brief snippet from our setup.py file:

from setuptools import setup, find_packages

setup(
name='your-package-name',
version='0.1',
packages=find_packages(),
install_requires=[
'apache-beam==2.50.0',
'protobuf==4.23.4',
# additional packages
],
)

3. Deployment

Now that we have our environment set up, it’s time to deploy our real-time stream processing solution. The deployment process involves utilizing Docker and Terraform in tandem to orchestrate and manage the different components seamlessly.

3.1 Building the Docker Image

Before we proceed to deployment, it is essential to build the Docker image that will encapsulate our Terraform scripts and Python application. Navigate to the directory containing your Dockerfile and execute the following command to build your Docker image:

docker build -t stream-processing-solution .

This will create a Docker image named “stream-processing-solution” containing all the necessary environments and files to run our application.

3.2 Initiating Terraform

With the Docker image ready, we can now initialize Terraform to manage and provision our resources on Google Cloud. First, run your Docker container with the following command:

docker run -it -v $(pwd):/workspace stream-processing-solution /bin/bash

This command mounts your current working directory to the /workspace directory in the container. Now, navigate to /workspace and initialize Terraform using:

terraform init

3.3 Deploying the Pipeline with Terraform

Now that Terraform has been initialized, we can proceed to deploy our data pipeline. Run the following command to apply the Terraform configurations and deploy the pipeline:

terraform apply

At this stage, Terraform will provision the necessary resources on Google Cloud and kickstart the data pipeline, allowing real-time data ingestion and processing through Apache Beam and Google Cloud Dataflow.

The data pipeline orchestrated by Terraform effectively operates within the Google Dataflow environment.

3.4 Testing the Pipeline

With the data pipeline now deployed, the next critical step is to test its functionality to ensure it operates as intended. To simulate real-time data injection, we will use a Python script named transactions_injector.py.

In the Docker container’s /workspace directory where all your project files are mounted, run the transactions injector script as follows:

python transactions_injector.py

This script will inject synthetic transaction data into the pipeline, mimicking real-time data flows. You should monitor the data as it moves through the pipeline to ensure everything functions correctly.

Launch the transaction_injector.py script to inject transaction data into the Pub/Sub stream.
Tracking real-time data fluctuations with BigQuery SQL queries.

4. Case Studies or Use Cases

When it comes to understanding the practical applications of utilizing tools like Apache Beam, Google Cloud Dataflow, and Terraform, real-world examples can be incredibly insightful. Here are a couple of use cases that showcase the benefits of these setups:

  1. E-commerce Giant: A renowned e-commerce company successfully implemented Apache Beam and Google Cloud Dataflow for real-time data analytics. By processing large streams of data efficiently, they managed to enhance user experiences through personalized recommendations.
  2. Healthcare Sector: In the healthcare sector, a network of hospitals utilized Terraform for orchestrating and automating their cloud infrastructure. This ensured a uniform environment, facilitating seamless data exchanges and ensuring high security and compliance standards.

Keep in mind that these cases are hypothetical, but grounded in the common applications of these technologies. For an even deeper understanding, you might want to look into real case studies documented in industry reports or tech blogs.

4. Conclusion

In this tutorial, we have walked you through setting up a robust real-time data stream processing solution using Apache Beam, Google Cloud Dataflow, and Terraform. With the successful deployment and testing of this pipeline, businesses are now equipped to glean real-time insights from their data streams, offering a significant edge in today’s dynamic market landscape.

Feel free to experiment further with the setup, tweaking the Python scripts, or the Terraform configurations to better suit your needs. And remember, the community is here to help should you encounter any issues or have further questions. Happy coding!

Resources for Further Learning

To help you venture further and build a solid foundation in working with Apache Beam, Google Cloud Dataflow, and Terraform, we recommend the following resources for deeper exploration:

  1. Apache Beam

2. Google Cloud Dataflow

3. Terraform

--

--

Aekanun Thongtae

Experienced Senior Big Data & Data Science Consultant with a history of working in many enterprises and various domains . Skilled in Apache Spark, and Hadoop.