ETL is the backbone of data engineering. You extract data from a source, transform it into the right shape, and load it into a destination. Today you will build a Python ETL pipeline that reads CSV, cleans data, and writes to a database.
# etl_pipeline.py
import csv
import sqlite3
from datetime import datetime
from pathlib import Path
# Extract
def extract(filepath: str) -> list[dict]:
with open(filepath, newline='', encoding='utf-8') as f:
return list(csv.DictReader(f))
# Transform
def transform(records: list[dict]) -> list[dict]:
clean = []
for r in records:
# Skip rows with missing required fields
if not r.get('email') or not r.get('name'):
continue
clean.append({
'name': r['name'].strip().title(),
'email': r['email'].strip().lower(),
'signup_date': datetime.strptime(
r.get('date', '2026-01-01'), '%Y-%m-%d').date(),
'active': r.get('status', 'active') == 'active',
})
return clean
# Load
def load(records: list[dict], db_path: str) -> int:
conn = sqlite3.connect(db_path)
cur = conn.cursor()
cur.execute('''CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT, email TEXT UNIQUE, signup_date TEXT, active INTEGER
)''')
cur.executemany('''INSERT OR IGNORE INTO users
(name, email, signup_date, active) VALUES
(:name, :email, :signup_date, :active)''', records)
conn.commit()
count = cur.rowcount
conn.close()
return count
if __name__ == '__main__':
rows = extract('users.csv')
clean = transform(rows)
n = load(clean, 'warehouse.db')
print(f'Loaded {n} new records')