top of page

How can PySpark and Apache Kafka be used for real-time data processing?


Overview

In today's data-driven world, real-time data processing is critical for organizations seeking timely insights and decision-making. This blog explores how to build a robust real-time data pipeline using PySpark and Apache Kafka, two powerful tools that facilitate seamless data streaming and processing. Throughout this guide, you will learn how to set up a PySpark environment, read customer data from Kafka topics, transform data for consistency, and enrich streaming data by joining it with static datasets. Additionally, the blog covers saving processed data to Parquet files, enabling efficient storage for future analysis. By the end of this blog, you will have a comprehensive understanding of how to create, manage, and scale real-time data pipelines using PySpark and Kafka, making it easier to analyze and act on streaming data in real time.


Introduction

Real-time data processing has become an essential part of modern data-driven solutions, allowing organizations to analyze data as it streams, enabling timely insights and decisions. Apache Kafka is a popular distributed streaming platform that works seamlessly with PySpark, a powerful data processing framework built on top of Apache Spark. This blog will guide you through setting up a PySpark-based data pipeline to read, transform, and enrich data from Kafka topics, utilizing both real-time streams and static data sources.


Setting Up the Environment

if you don't know how to setup environment for kafka and pyspark project please read our previous blog. Here is the link https://www.codersarts.com/post/how-to-set-up-your-kafka-environment-for-effective-data-streaming


Streaming Customer Data from Kafka to PySpark DataFrame

One of the key tasks in data processing pipelines is ingesting real-time data. Below is a step-by-step approach to reading data from a Kafka topic, converting it into a structured PySpark DataFrame, and displaying the data.


PySpark Code Example: Reading Data from Kafka

from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col

from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType



# Create SparkSession with required configurations

spark = SparkSession.builder \

    .appName("KafkaReadWrite") \

    .master("local[4]") \

    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0") \

    .getOrCreate()



# Define schema for customer data

customer_schema = StructType([

    StructField("customerNumber", IntegerType(), True),

    StructField("customerName", StringType(), True),

    StructField("contactLastName", StringType(), True),

    StructField("contactFirstName", StringType(), True),

    StructField("phone", StringType(), True),

    StructField("addressLine1", StringType(), True),

    StructField("addressLine2", StringType(), True),

    StructField("city", StringType(), True),

    StructField("state", StringType(), True),

    StructField("postalCode", StringType(), True),

    StructField("country", StringType(), True),

    StructField("salesRepEmployeeNumber", StringType(), True),

    StructField("creditLimit", DoubleType(), True)

])



# Read the customer stream from Kafka topic

customer_df = spark.readStream \

    .format("kafka") \

    .option("kafka.bootstrap.servers", "localhost:9092") \

    .option("subscribe", "customers_topic") \

    .option("startingOffsets", "earliest") \

    .load()



# Convert the Kafka 'value' to String and parse the JSON with the schema

customer_df_ = customer_df.selectExpr("CAST(value AS STRING)") \

    .select(from_json(col("value"), customer_schema).alias("data")) \

    .select("data.*")



# Display the customer data on the console (for real-time monitoring)

query = customer_df_.writeStream \

    .outputMode("append") \

    .format("console") \

    .start()



query.awaitTermination(timeout=5)

Output :





Changing Data Types for Consistency in PySpark

Consistency in data types is crucial for smooth data processing, especially when merging datasets or transforming data. Use the .cast() method to ensure all columns are properly typed.


PySpark Code Example: Changing Data Types


# Apply column type conversions (casting)
customer_df_casted = customer_df_ \

    .withColumn("customerNumber", col("customerNumber").cast(IntegerType())) \

    .withColumn("phone", col("phone").cast(StringType())) \

    .withColumn("postalCode", col("postalCode").cast(IntegerType())) \

    .withColumn("salesRepEmployeeNumber", col("salesRepEmployeeNumber").cast(IntegerType())) \

    .withColumn("creditLimit", col("creditLimit").cast(DoubleType()))



# Display the schema and preview the stream data

customer_df_casted.printSchema()

customer_df_casted.writeStream.format("console").start().awaitTermination(5)

Output :


Loading Static Data from a CSV File Using PySpark

Static data can act as a reference or enrich real-time streaming data. For instance, you might have a dataset of orders stored in a CSV file that can be used for enriching customer stream data.


PySpark Code Example: Loading Static Data


# Load static data from CSV
orders_df = spark.read.csv("dataset/orders.csv", header=True, inferSchema=True)

orders_df.show(5)

orders_df.printSchema()

Output :



Joining Static Data with Real-Time Stream Data Using PySpark

Combining static data with real-time streams enriches the data, providing a more complete view. PySpark makes it easy to join these datasets.


PySpark Code Example: Merging Static and Streaming Data

# Join the customer stream with the static orders data
combine_df = customer_df_casted.join(orders_df, on="customerNumber", how="left")



# Output the combined data for monitoring or further processing

combine_df.writeStream.format("console").outputMode("append").start().awaitTermination()

Output :




Saving Live Stream Data to Parquet File Using PySpark

Storing live stream data in Parquet files allows for efficient long-term storage and analysis. PySpark can save streaming data directly to Parquet format, providing better performance for read-heavy operations.


PySpark Code Example: Writing Data to Parquet

# Writing data to Parquet files on local file system or HDFS
combine_df.writeStream \

    .outputMode("append") \

    .format("parquet") \

    .option("path", "directory") \

    .option("checkpointLocation", "/tmp/checkpoints/customers") \

    .start() \

    .awaitTermination(timeout=10)

Output :


This blog guided you through creating a real-time data pipeline using PySpark and Kafka, covering everything from reading data from Kafka, transforming and enriching it with static data, to saving processed data to Parquet files for future analysis.



Comments


bottom of page