top of page

Integrating Kafka and Elasticsearch for Real-Time Data Streaming and Indexing

Updated: Dec 4, 2024

Introduction

In this project, I implemented a real-time data pipeline using Apache Kafka and Elasticsearch. The pipeline reads data from a CSV file, streams it to a Kafka topic, and then consumes it to index the data into Elasticsearch for querying and analysis. This blog documents the step-by-step implementation, from producing and consuming data to verifying it in Elasticsearch.


Workflow Overview

  1. Kafka Producer: Reads a CSV file and streams its rows to a Kafka topic.

  2. Kafka Consumer: Consumes the messages from the Kafka topic and sends them to Elasticsearch.

  3. Elasticsearch: Stores the streamed data and makes it available for querying.


1. Kafka Producer Code Explanation

The producer script reads rows from a data.csv file and sends each row as a message to a Kafka topic (csv_topic).

Code: kafka_producer.py


import csv
from kafka import KafkaProducer
import json

# Kafka producer
producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

# Read CSV and send each row to Kafka
with open('data.csv', 'r') as file:
    reader = csv.DictReader(file)
    for row in reader:
        producer.send('csv_topic', value=row)
        print(f"Sent: {row}")

producer.flush()

What Happens Here?

  1. A KafkaProducer instance is created to connect to the Kafka broker running on localhost:9092.

  2. The value_serializer ensures that messages are serialized into JSON format before being sent.

  3. Each row from the CSV file is read and sent to the Kafka topic csv_topic.


Output



2. Kafka Consumer and Elasticsearch Integration

The consumer script listens to the Kafka topic, processes each message, and indexes the data into an Elasticsearch index (csv_data).


Code: kafka_consumer_ElasticSearch.py


from kafka import KafkaConsumer
from elasticsearch import Elasticsearch
import json

# Kafka consumer
consumer = KafkaConsumer(
    'csv_topic',
    bootstrap_servers=['localhost:9092'],
    auto_offset_reset='earliest',
    value_deserializer=lambda v: json.loads(v.decode('utf-8'))
)

# Elasticsearch client
es = Elasticsearch([{'host': 'localhost', 'port': 9200, 'scheme': 'http'}])

# Consume messages and index into Elasticsearch
for message in consumer:
    row = message.value
    print(f"Consumed: {row}")
    
    # Index into Elasticsearch
    es.index(index='csv_data', document=row)
    print(f"Indexed to Elasticsearch: {row}")

What Happens Here?

  1. A KafkaConsumer connects to the same Kafka broker and listens to the csv_topic.

  2. Each message is deserialized from JSON format.

  3. The message is indexed into Elasticsearch under the index csv_data.



3. Verifying Data in Elasticsearch

Once data is indexed into Elasticsearch, you can query it to verify successful ingestion.


Command to Verify Data:

curl -X GET "http://localhost:9200/csv_data/_search?pretty"

Output :


Struggling with Data Searches? Let Elasticsearch Help!

At Codersarts, we specialize in Elasticsearch Development Services to help you harness the full potential of this powerful search and analytics engine. Whether you’re building scalable search solutions, optimizing query performance, or managing real-time data indexing, our expert developers are here to assist.


Contact us today to hire skilled Elasticsearch developers and transform how you search, analyze, and manage your data!

 

Keywords: Elasticsearch Development Services, Elasticsearch Development Services, Elasticsearch Workflow Automation, Elasticsearch Query Optimization Services, Elasticsearch Integration Services, Data Pipeline Development with Elasticsearch, Elasticsearch Index Customization, Building ETL Pipelines with Elasticsearch, Real-time Data Processing with Elasticsearch, Data Engineering with Elasticsearch, Hire Elasticsearch Developer, Elasticsearch Project Help, Elasticsearch Freelance Developer

Comments


bottom of page