Introduction
In this blog, we’ll take a big step forward by creating your very first DAG in Apache Airflow. A Directed Acyclic Graph (DAG) is the backbone of Airflow, where workflows are defined as tasks and their dependencies. Our goal is to make this a complete guide for beginners so that you can confidently create, schedule, and run workflows in Airflow.
We’ll use a simple example of a “Hello World” DAG to introduce you to the core concepts, walk through the code step-by-step, and ensure you have a thorough understanding of how DAGs work in Airflow.
What is a DAG?
A DAG, or Directed Acyclic Graph, represents a workflow of tasks with defined dependencies. In Airflow:
Each DAG file is a Python script that defines the structure of the workflow.
Tasks represent the individual units of work.
Dependencies control the order in which tasks execute.
In essence, a DAG is a blueprint for workflows that run repeatedly at scheduled intervals.
Setting Up the Environment
Before creating your first DAG, ensure Apache Airflow is installed and running. Follow these steps:
First set load_examples = False in the airflow.cfg file, it prevents Airflow from loading the default example DAGs when the Airflow web server starts.
Start the Web Server and Scheduler
Open your terminal and run:
airflow webserver
airflow scheduler
Airflow scheduler
Airflow Webserver
Access the Airflow UI at http://localhost:8080.
Create the DAGs Folder
Locate your Airflow home directory (default is ~/airflow) and ensure the dags folder exists. This is where all your DAG files will reside.
Creating Your First DAG
Step 1: The Basics of a DAG File
DAGs in Airflow are Python scripts. Let’s start with a simple example:
from datetime import datetime, timedelta
from airflow.utils.dates import days_ago
from airflow import DAG
from airflow.operators.bash import BashOperator
default_args = {
'owner' : 'admin1',
}
dag = DAG(
dag_id = 'hello_world',
description = 'Our first "Hello World" DAG!',
default_args = default_args,
start_date = days_ago(1),
schedule_interval = '@daily',
tags = ['beginner', 'bash', 'hello world']
)
task = BashOperator(
task_id = 'hello_world_task',
bash_command = 'echo Hello world once again!',
dag = dag
)
task
Breaking Down the Code
1. Importing Required Libraries
At the top of the script, we import essential modules:
datetime and timedelta: For working with dates and times.
days_ago: A utility function to define relative start dates.
DAG: The core class to define your DAG structure.
BashOperator: Executes shell commands as a task.
2. Setting Default Arguments
The default_args dictionary contains parameters that apply to all tasks in the DAG unless explicitly overridden:
default_args = {
'owner': 'admin1',
}
owner: Identifies the creator or owner of the DAG.
Additional arguments like retries, retry_delay, and email_on_failure can be added for task-level configurations.
3. Defining the DAG
The DAG object defines the overall workflow.
dag = DAG(
dag_id='hello_world',
description='Our first "Hello World" DAG!',
default_args=default_args,
start_date=days_ago(1),
schedule_interval='@daily',
tags=['beginner', 'bash', 'hello world']
)
dag_id: A unique identifier for the DAG.
description: A brief summary of what the DAG does.
default_args: Applies the predefined arguments to the DAG.
start_date: Specifies the first execution date. The days_ago(1) function sets it to one day before the current date.
schedule_interval: Defines how often the DAG runs. Here, @daily means the DAG will run once every day.
tags: Labels for easy filtering in the Airflow UI.
4. Adding a Task
Tasks are the building blocks of a DAG. In this example, we use the BashOperator to execute a shell command:
task = BashOperator(
task_id='hello_world_task',
bash_command='echo Hello world once again!',
dag=dag
)
task_id: A unique identifier for the task.
bash_command: The shell command to be executed.
dag: Links the task to the DAG.
Saving and Viewing Your DAG
Save the script as hello_world.py in your Airflow dags folder (default: ~/airflow/dags).
Open the Airflow UI and locate the hello_world DAG in the list.
Understanding the Airflow UI
DAGs Page: View all available DAGs and their status.
Graph View: Visualize the workflow and dependencies.
Task Logs: Access detailed logs to debug or monitor tasks.
Enhancements to Your DAG
Add Parameters to the Bash Command
You can make the command dynamic by adding parameters:
task = BashOperator(
task_id='dynamic_hello',
bash_command='echo Hello world, today is {{ ds }}!',
dag=dag
{{ ds }} is a Jinja template that represents the DAG’s execution date.
Add Multiple Tasks
Create additional tasks and define their dependencies:
task1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag
)
task2 = BashOperator(
task_id='print_hello',
bash_command='echo Hello Airflow!',
dag=dag
)
task1 >> task2 # task1 runs before task2
Change the Schedule Interval
Modify schedule_interval for different frequencies:
schedule_interval='@hourly' # Runs every hour
Common Errors and Fixes
DAG Not Found in UI
Ensure the file is in the dags folder.
Check for syntax errors in the script.
Task Fails to Execute
Review task logs for detailed error messages.
Ensure the bash_command is valid.
Conclusion
Creating a simple "Hello World" DAG is your first step toward mastering Airflow. By understanding the structure of a DAG and its components, you can begin building more complex workflows tailored to your needs.
Comments