Apache Airflow schedules and monitors data pipelines. A DAG (Directed Acyclic Graph) defines tasks and their dependencies. Today you will write a DAG that chains ETL tasks and runs on a schedule.
pip install apache-airflow
airflow standalone # starts web server + scheduler at localhost:8080# dags/sales_pipeline.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
default_args = {
'owner': 'data-team',
'retries': 2,
'retry_delay': timedelta(minutes=5),
'email_on_failure': True,
'email': ['[email protected]'],
}
with DAG(
dag_id='sales_pipeline',
default_args=default_args,
description='Daily sales ETL',
schedule='0 6 * * *', # 6am UTC every day
start_date=datetime(2026, 1, 1),
catchup=False,
tags=['sales', 'etl'],
) as dag:
def extract():
import requests
data = requests.get('https://api.example.com/sales/yesterday').json()
return data # XCom: passed to next task
def transform(**context):
data = context['ti'].xcom_pull(task_ids='extract')
return [row for row in data if row['status'] == 'completed']
def load(**context):
rows = context['ti'].xcom_pull(task_ids='transform')
# insert to database...
print(f'Loaded {len(rows)} rows')
t1 = PythonOperator(task_id='extract', python_callable=extract)
t2 = PythonOperator(task_id='transform', python_callable=transform)
t3 = PythonOperator(task_id='load', python_callable=load)
t4 = BashOperator(task_id='dbt_run', bash_command='cd /opt/dbt && dbt run')
t1 >> t2 >> t3 >> t4 # chain: extract → transform → load → dbt