Overview
In today's data-driven world, real-time data processing is critical for organizations seeking timely insights and decision-making. This blog explores how to build a robust real-time data pipeline using PySpark and Apache Kafka, two powerful tools that facilitate seamless data streaming and processing. Throughout this guide, you will learn how to set up a PySpark environment, read customer data from Kafka topics, transform data for consistency, and enrich streaming data by joining it with static datasets. Additionally, the blog covers saving processed data to Parquet files, enabling efficient storage for future analysis. By the end of this blog, you will have a comprehensive understanding of how to create, manage, and scale real-time data pipelines using PySpark and Kafka, making it easier to analyze and act on streaming data in real time.
Introduction
Real-time data processing has become an essential part of modern data-driven solutions, allowing organizations to analyze data as it streams, enabling timely insights and decisions. Apache Kafka is a popular distributed streaming platform that works seamlessly with PySpark, a powerful data processing framework built on top of Apache Spark. This blog will guide you through setting up a PySpark-based data pipeline to read, transform, and enrich data from Kafka topics, utilizing both real-time streams and static data sources.
Setting Up the Environment
if you don't know how to setup environment for kafka and pyspark project please read our previous blog. Here is the link https://www.codersarts.com/post/how-to-set-up-your-kafka-environment-for-effective-data-streaming
Streaming Customer Data from Kafka to PySpark DataFrame
One of the key tasks in data processing pipelines is ingesting real-time data. Below is a step-by-step approach to reading data from a Kafka topic, converting it into a structured PySpark DataFrame, and displaying the data.
PySpark Code Example: Reading Data from Kafka
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType
# Create SparkSession with required configurations
spark = SparkSession.builder \
.appName("KafkaReadWrite") \
.master("local[4]") \
.config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0") \
.getOrCreate()
# Define schema for customer data
customer_schema = StructType([
StructField("customerNumber", IntegerType(), True),
StructField("customerName", StringType(), True),
StructField("contactLastName", StringType(), True),
StructField("contactFirstName", StringType(), True),
StructField("phone", StringType(), True),
StructField("addressLine1", StringType(), True),
StructField("addressLine2", StringType(), True),
StructField("city", StringType(), True),
StructField("state", StringType(), True),
StructField("postalCode", StringType(), True),
StructField("country", StringType(), True),
StructField("salesRepEmployeeNumber", StringType(), True),
StructField("creditLimit", DoubleType(), True)
])
# Read the customer stream from Kafka topic
customer_df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "customers_topic") \
.option("startingOffsets", "earliest") \
.load()
# Convert the Kafka 'value' to String and parse the JSON with the schema
customer_df_ = customer_df.selectExpr("CAST(value AS STRING)") \
.select(from_json(col("value"), customer_schema).alias("data")) \
.select("data.*")
# Display the customer data on the console (for real-time monitoring)
query = customer_df_.writeStream \
.outputMode("append") \
.format("console") \
.start()
query.awaitTermination(timeout=5)
Output :
Changing Data Types for Consistency in PySpark
Consistency in data types is crucial for smooth data processing, especially when merging datasets or transforming data. Use the .cast() method to ensure all columns are properly typed.
PySpark Code Example: Changing Data Types
# Apply column type conversions (casting)
customer_df_casted = customer_df_ \
.withColumn("customerNumber", col("customerNumber").cast(IntegerType())) \
.withColumn("phone", col("phone").cast(StringType())) \
.withColumn("postalCode", col("postalCode").cast(IntegerType())) \
.withColumn("salesRepEmployeeNumber", col("salesRepEmployeeNumber").cast(IntegerType())) \
.withColumn("creditLimit", col("creditLimit").cast(DoubleType()))
# Display the schema and preview the stream data
customer_df_casted.printSchema()
customer_df_casted.writeStream.format("console").start().awaitTermination(5)
Output :
Loading Static Data from a CSV File Using PySpark
Static data can act as a reference or enrich real-time streaming data. For instance, you might have a dataset of orders stored in a CSV file that can be used for enriching customer stream data.
PySpark Code Example: Loading Static Data
# Load static data from CSV
orders_df = spark.read.csv("dataset/orders.csv", header=True, inferSchema=True)
orders_df.show(5)
orders_df.printSchema()
Output :
Joining Static Data with Real-Time Stream Data Using PySpark
Combining static data with real-time streams enriches the data, providing a more complete view. PySpark makes it easy to join these datasets.
PySpark Code Example: Merging Static and Streaming Data
# Join the customer stream with the static orders data
combine_df = customer_df_casted.join(orders_df, on="customerNumber", how="left")
# Output the combined data for monitoring or further processing
combine_df.writeStream.format("console").outputMode("append").start().awaitTermination()
Output :
Saving Live Stream Data to Parquet File Using PySpark
Storing live stream data in Parquet files allows for efficient long-term storage and analysis. PySpark can save streaming data directly to Parquet format, providing better performance for read-heavy operations.
PySpark Code Example: Writing Data to Parquet
# Writing data to Parquet files on local file system or HDFS
combine_df.writeStream \
.outputMode("append") \
.format("parquet") \
.option("path", "directory") \
.option("checkpointLocation", "/tmp/checkpoints/customers") \
.start() \
.awaitTermination(timeout=10)
Output :
This blog guided you through creating a real-time data pipeline using PySpark and Kafka, covering everything from reading data from Kafka, transforming and enriching it with static data, to saving processed data to Parquet files for future analysis.
Comments