Anomaly Detection System
A Python script that ingests auth log data, baselines normal behavior per user, and flags statistically anomalous events — high failure rates, unusual hours, impossible travel, new geo locations. The same logic that powers commercial UEBA products.
Install Dependencies and Create Sample Data
You need pandas, numpy, and scipy. No ML training required — we're using statistical methods that work well with small datasets and are easy to explain to stakeholders.
pip install pandas numpy scipy
Create a sample auth log CSV. In production you'd pull this from your SIEM, Splunk, or CloudTrail.
import pandas as pd import numpy as np from datetime import datetime, timedelta import random random.seed(42) np.random.seed(42) users = ["alice", "bob", "carol", "dave", "eve"] locations = ["New York", "Chicago", "Denver", "Los Angeles"] base_time = datetime(2026, 4, 1, 0, 0, 0) rows = [] for day in range(30): for user in users: # Normal: 3-8 logins per day, business hours, home location n_logins = random.randint(3, 8) home_loc = locations[users.index(user) % len(locations)] for _ in range(n_logins): hour = random.randint(8, 18) rows.append({ "timestamp": base_time + timedelta(days=day, hours=hour, minutes=random.randint(0,59)), "user": user, "result": "success" if random.random() > 0.05 else "failure", "location": home_loc, "ip": f"10.0.{users.index(user)}.{random.randint(1,50)}" }) # Inject anomalies # Bob: brute force on day 25 for _ in range(45): rows.append({"timestamp": base_time + timedelta(days=25, hours=2, minutes=random.randint(0,59)), "user": "bob", "result": "failure", "location": "Chicago", "ip": "185.220.101.5"}) # Carol: impossible travel on day 28 rows.append({"timestamp": base_time + timedelta(days=28, hours=9), "user": "carol", "result": "success", "location": "Tokyo", "ip": "202.32.115.1"}) # Alice: after-hours logins on day 27 for _ in range(6): rows.append({"timestamp": base_time + timedelta(days=27, hours=random.randint(1,5)), "user": "alice", "result": "success", "location": "New York", "ip": "10.0.0.12"}) df = pd.DataFrame(rows) df.to_csv("auth_logs.csv", index=False) print(f"Generated {len(df)} log entries")
Build the Anomaly Detector
Three detection methods: Z-score for failure rate outliers, IQR for login volume spikes, and rule-based checks for impossible travel and after-hours access. Layer them — any hit generates an alert.
import pandas as pd import numpy as np from scipy import stats from datetime import timedelta # ── Load & prep ────────────────────────────────────────── df = pd.read_csv("auth_logs.csv", parse_dates=["timestamp"]) df["date"] = df["timestamp"].dt.date df["hour"] = df["timestamp"].dt.hour df["failed"] = (df["result"] == "failure").astype(int) alerts = [] def add_alert(user, date, alert_type, detail, severity): alerts.append({ "user": user, "date": date, "type": alert_type, "detail": detail, "severity": severity }) # ── Method 1: Failure rate Z-score per user ────────────── daily_failures = df.groupby(["user", "date"])["failed"].sum().reset_index() daily_failures.columns = ["user", "date", "failures"] for user, group in daily_failures.groupby("user"): if len(group) < 7: continue # Need baseline z_scores = np.abs(stats.zscore(group["failures"].values)) for idx, (_, row) in enumerate(group.iterrows()): if z_scores[idx] > 2.5 and row[1]["failures"] >= 5: add_alert(user, row[1]["date"], "BRUTE_FORCE", f"{row[1]['failures']} failures (z={z_scores[idx]:.1f})", "HIGH") # ── Method 2: Login volume spike (IQR) ─────────────────── daily_logins = df.groupby(["user", "date"]).size().reset_index(name="count") for user, group in daily_logins.groupby("user"): q1, q3 = group["count"].quantile([0.25, 0.75]) iqr = q3 - q1 upper_fence = q3 + 1.5 * iqr outliers = group[group["count"] > upper_fence] for _, row in outliers.iterrows(): add_alert(user, row["date"], "VOLUME_SPIKE", f"{row['count']} logins (fence={upper_fence:.0f})", "MEDIUM") # ── Method 3: Impossible travel ────────────────────────── for user, group in df.groupby("user"): known_locs = set(group[group["date"] < group["date"].max()]["location"].unique()) recent = group[group["date"] == group["date"].max()] for _, row in recent.iterrows(): if row["location"] not in known_locs: add_alert(user, row["date"], "NEW_LOCATION", f"First seen: {row['location']}", "HIGH") break # ── Method 4: After-hours access ───────────────────────── after_hours = df[(df["hour"] < 6) | (df["hour"] > 22)] ah_grouped = after_hours.groupby(["user", "date"]).size().reset_index(name="count") for _, row in ah_grouped[ah_grouped["count"] >= 3].iterrows(): add_alert(row["user"], row["date"], "AFTER_HOURS", f"{row['count']} logins between midnight and 6am", "MEDIUM") # ── Output ─────────────────────────────────────────────── result = pd.DataFrame(alerts).sort_values("severity") print(f"\n{'='*60}") print(f"ANOMALY DETECTION REPORT — {len(alerts)} alerts") print(f"{'='*60}\n") for _, row in result.iterrows(): print(f"[{row['severity']}] {row['type']} | {row['user']} | {row['date']}") print(f" {row['detail']}\n") result.to_csv("alerts.csv", index=False)
Run it: python generate_logs.py && python anomaly_detector.py — You should see 4+ alerts: Bob's brute force, Carol's new location (Tokyo), Alice's after-hours logins.
Adapting This for Real Environments
Connecting to real log sources
Replace the CSV load with a direct SIEM or CloudTrail query. The pandas DataFrame structure stays the same — you just change the input.
# AWS CloudTrail via boto3 import boto3 ct = boto3.client('cloudtrail', region_name='us-east-1') events = ct.lookup_events( LookupAttributes=[{'AttributeKey': 'EventName', 'AttributeValue': 'ConsoleLogin'}], MaxResults=500 ) # Parse events['Events'] into a DataFrame with same schema # Splunk via REST API import requests resp = requests.post("https://your-splunk:8089/services/search/jobs", auth=("admin", "password"), data={"search": "search index=auth sourcetype=linux_secure | head 10000", "output_mode": "json"}, verify=False) # Parse resp.json() into DataFrame
Tuning thresholds
The Z-score threshold of 2.5 and IQR fence of 1.5x are starting points. In a production environment, you'll tune these based on your false positive rate. Start more sensitive (lower thresholds), track false positives for two weeks, then tighten. Better to over-alert initially than miss real incidents while tuning.
What this doesn't cover (and what does)
Statistical anomaly detection catches volume-based attacks well (brute force, credential stuffing, account takeover). It won't catch living-off-the-land attacks where the attacker behaves like a normal user. For those, you need behavioral baselines over longer windows and graph-based analysis of lateral movement — that's what enterprise UEBA tools (Securonix, Exabeam) do.
Day 2 Complete
- Built a 4-method anomaly detector: Z-score (failure rate), IQR (volume spike), new location, after-hours
- Detected simulated brute force (Bob), impossible travel (Carol), and after-hours access (Alice)
- Statistical methods work well for volume-based attacks with minimal setup
- Real-world connectors: CloudTrail via boto3, Splunk via REST API
- Tomorrow: feed these alerts into Claude to generate plain-English investigation summaries