Courses Blog Enroll Now
Day 2 of 5 50 min • includes coding

AI for Threat Detection

Build an anomaly detection system that flags suspicious login patterns in auth logs using statistical models and Python.

Threat Landscape
2Threat Detection
3LLM for SecOps
4Vuln Scanning
5AI Security Risks
What You're Building

Anomaly Detection System

A Python script that ingests auth log data, baselines normal behavior per user, and flags statistically anomalous events — high failure rates, unusual hours, impossible travel, new geo locations. The same logic that powers commercial UEBA products.

01
Setup

Install Dependencies and Create Sample Data

You need pandas, numpy, and scipy. No ML training required — we're using statistical methods that work well with small datasets and are easy to explain to stakeholders.

bash
pip install pandas numpy scipy

Create a sample auth log CSV. In production you'd pull this from your SIEM, Splunk, or CloudTrail.

python generate_logs.py
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import random

random.seed(42)
np.random.seed(42)

users = ["alice", "bob", "carol", "dave", "eve"]
locations = ["New York", "Chicago", "Denver", "Los Angeles"]
base_time = datetime(2026, 4, 1, 0, 0, 0)

rows = []
for day in range(30):
    for user in users:
        # Normal: 3-8 logins per day, business hours, home location
        n_logins = random.randint(3, 8)
        home_loc = locations[users.index(user) % len(locations)]
        for _ in range(n_logins):
            hour = random.randint(8, 18)
            rows.append({
                "timestamp": base_time + timedelta(days=day, hours=hour, minutes=random.randint(0,59)),
                "user": user,
                "result": "success" if random.random() > 0.05 else "failure",
                "location": home_loc,
                "ip": f"10.0.{users.index(user)}.{random.randint(1,50)}"
            })

# Inject anomalies
# Bob: brute force on day 25
for _ in range(45):
    rows.append({"timestamp": base_time + timedelta(days=25, hours=2, minutes=random.randint(0,59)),
                 "user": "bob", "result": "failure", "location": "Chicago", "ip": "185.220.101.5"})

# Carol: impossible travel on day 28
rows.append({"timestamp": base_time + timedelta(days=28, hours=9),
             "user": "carol", "result": "success", "location": "Tokyo", "ip": "202.32.115.1"})

# Alice: after-hours logins on day 27
for _ in range(6):
    rows.append({"timestamp": base_time + timedelta(days=27, hours=random.randint(1,5)),
                 "user": "alice", "result": "success", "location": "New York", "ip": "10.0.0.12"})

df = pd.DataFrame(rows)
df.to_csv("auth_logs.csv", index=False)
print(f"Generated {len(df)} log entries")
02
Core Logic

Build the Anomaly Detector

Three detection methods: Z-score for failure rate outliers, IQR for login volume spikes, and rule-based checks for impossible travel and after-hours access. Layer them — any hit generates an alert.

python anomaly_detector.py
import pandas as pd
import numpy as np
from scipy import stats
from datetime import timedelta

# ── Load & prep ──────────────────────────────────────────
df = pd.read_csv("auth_logs.csv", parse_dates=["timestamp"])
df["date"] = df["timestamp"].dt.date
df["hour"] = df["timestamp"].dt.hour
df["failed"] = (df["result"] == "failure").astype(int)

alerts = []

def add_alert(user, date, alert_type, detail, severity):
    alerts.append({
        "user": user, "date": date,
        "type": alert_type, "detail": detail, "severity": severity
    })

# ── Method 1: Failure rate Z-score per user ──────────────
daily_failures = df.groupby(["user", "date"])["failed"].sum().reset_index()
daily_failures.columns = ["user", "date", "failures"]

for user, group in daily_failures.groupby("user"):
    if len(group) < 7:
        continue  # Need baseline
    z_scores = np.abs(stats.zscore(group["failures"].values))
    for idx, (_, row) in enumerate(group.iterrows()):
        if z_scores[idx] > 2.5 and row[1]["failures"] >= 5:
            add_alert(user, row[1]["date"], "BRUTE_FORCE",
                      f"{row[1]['failures']} failures (z={z_scores[idx]:.1f})", "HIGH")

# ── Method 2: Login volume spike (IQR) ───────────────────
daily_logins = df.groupby(["user", "date"]).size().reset_index(name="count")

for user, group in daily_logins.groupby("user"):
    q1, q3 = group["count"].quantile([0.25, 0.75])
    iqr = q3 - q1
    upper_fence = q3 + 1.5 * iqr
    outliers = group[group["count"] > upper_fence]
    for _, row in outliers.iterrows():
        add_alert(user, row["date"], "VOLUME_SPIKE",
                  f"{row['count']} logins (fence={upper_fence:.0f})", "MEDIUM")

# ── Method 3: Impossible travel ──────────────────────────
for user, group in df.groupby("user"):
    known_locs = set(group[group["date"] < group["date"].max()]["location"].unique())
    recent = group[group["date"] == group["date"].max()]
    for _, row in recent.iterrows():
        if row["location"] not in known_locs:
            add_alert(user, row["date"], "NEW_LOCATION",
                      f"First seen: {row['location']}", "HIGH")
            break

# ── Method 4: After-hours access ─────────────────────────
after_hours = df[(df["hour"] < 6) | (df["hour"] > 22)]
ah_grouped = after_hours.groupby(["user", "date"]).size().reset_index(name="count")
for _, row in ah_grouped[ah_grouped["count"] >= 3].iterrows():
    add_alert(row["user"], row["date"], "AFTER_HOURS",
              f"{row['count']} logins between midnight and 6am", "MEDIUM")

# ── Output ───────────────────────────────────────────────
result = pd.DataFrame(alerts).sort_values("severity")
print(f"\n{'='*60}")
print(f"ANOMALY DETECTION REPORT — {len(alerts)} alerts")
print(f"{'='*60}\n")
for _, row in result.iterrows():
    print(f"[{row['severity']}] {row['type']} | {row['user']} | {row['date']}")
    print(f"       {row['detail']}\n")

result.to_csv("alerts.csv", index=False)

Run it: python generate_logs.py && python anomaly_detector.py — You should see 4+ alerts: Bob's brute force, Carol's new location (Tokyo), Alice's after-hours logins.

03
Production Notes

Adapting This for Real Environments

Connecting to real log sources

Replace the CSV load with a direct SIEM or CloudTrail query. The pandas DataFrame structure stays the same — you just change the input.

python real_data_connectors.py
# AWS CloudTrail via boto3
import boto3

ct = boto3.client('cloudtrail', region_name='us-east-1')
events = ct.lookup_events(
    LookupAttributes=[{'AttributeKey': 'EventName', 'AttributeValue': 'ConsoleLogin'}],
    MaxResults=500
)
# Parse events['Events'] into a DataFrame with same schema

# Splunk via REST API
import requests

resp = requests.post("https://your-splunk:8089/services/search/jobs",
    auth=("admin", "password"),
    data={"search": "search index=auth sourcetype=linux_secure | head 10000",
          "output_mode": "json"}, verify=False)
# Parse resp.json() into DataFrame

Tuning thresholds

The Z-score threshold of 2.5 and IQR fence of 1.5x are starting points. In a production environment, you'll tune these based on your false positive rate. Start more sensitive (lower thresholds), track false positives for two weeks, then tighten. Better to over-alert initially than miss real incidents while tuning.

What this doesn't cover (and what does)

Statistical anomaly detection catches volume-based attacks well (brute force, credential stuffing, account takeover). It won't catch living-off-the-land attacks where the attacker behaves like a normal user. For those, you need behavioral baselines over longer windows and graph-based analysis of lateral movement — that's what enterprise UEBA tools (Securonix, Exabeam) do.

Day 2 Complete

  • Built a 4-method anomaly detector: Z-score (failure rate), IQR (volume spike), new location, after-hours
  • Detected simulated brute force (Bob), impossible travel (Carol), and after-hours access (Alice)
  • Statistical methods work well for volume-based attacks with minimal setup
  • Real-world connectors: CloudTrail via boto3, Splunk via REST API
  • Tomorrow: feed these alerts into Claude to generate plain-English investigation summaries

Go deeper in 3 days.

The live bootcamp covers production-grade SIEM integration, red team exercises, and AI-powered incident response playbooks.

Reserve Your Seat →
Finished this lesson?