Day 5 of 5

Airflow Orchestration

Apache Airflow schedules and monitors data pipelines. A DAG (Directed Acyclic Graph) defines tasks and their dependencies. Today you will write a DAG that chains ETL tasks and runs on a schedule.

bash
pip install apache-airflow
airflow standalone   # starts web server + scheduler at localhost:8080
python
# dags/sales_pipeline.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator

default_args = {
    'owner': 'data-team',
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
    'email_on_failure': True,
    'email': ['[email protected]'],
}

with DAG(
    dag_id='sales_pipeline',
    default_args=default_args,
    description='Daily sales ETL',
    schedule='0 6 * * *',  # 6am UTC every day
    start_date=datetime(2026, 1, 1),
    catchup=False,
    tags=['sales', 'etl'],
) as dag:

    def extract():
        import requests
        data = requests.get('https://api.example.com/sales/yesterday').json()
        return data  # XCom: passed to next task

    def transform(**context):
        data = context['ti'].xcom_pull(task_ids='extract')
        return [row for row in data if row['status'] == 'completed']

    def load(**context):
        rows = context['ti'].xcom_pull(task_ids='transform')
        # insert to database...
        print(f'Loaded {len(rows)} rows')

    t1 = PythonOperator(task_id='extract', python_callable=extract)
    t2 = PythonOperator(task_id='transform', python_callable=transform)
    t3 = PythonOperator(task_id='load', python_callable=load)
    t4 = BashOperator(task_id='dbt_run', bash_command='cd /opt/dbt && dbt run')

    t1 >> t2 >> t3 >> t4   # chain: extract → transform → load → dbt
Tip: Use the Airflow UI to trigger runs manually, inspect task logs, and re-run failed tasks without re-running the whole DAG.

Exercise: Build a Daily ETL DAG

  1. Install airflow standalone and open the UI
  2. Write a DAG with 3 PythonOperator tasks
  3. Chain them with >> syntax
  4. Add a schedule of @daily
  5. Trigger a manual run from the UI and inspect logs

Day 5 Summary

Finished this lesson?