Deploying a model is not the end. Models drift, hardware fails, and new versions must reach thousands of devices. Today you'll build the operational layer.
Data drift: input distribution changes over time (seasonal variation, sensor wear, environment changes). A model trained on summer images degrades in winter. Detect with: statistical tests on input features (KS test, PSI — Population Stability Index), monitoring prediction confidence distribution, tracking accuracy on a labeled holdout set. Model drift (concept drift): the relationship between inputs and outputs changes — a fraud model degrades as attackers adapt. Retrain when monitoring metrics cross a threshold.
Edge devices need remote model updates. Architecture: model registry in S3/GCS, edge device polls for new version (or broker pushes notification), device downloads to staging area, verifies SHA-256 hash, verifies ECDSA signature, swaps active model, runs smoke test (inference on 10 known samples, compare output to expected). If smoke test fails, roll back to previous version. Implement with Python requests + hashlib + cryptography library. The model file is not code — but its security posture should be treated like code.
At scale, you need to know: which model version runs on each device, device health (uptime, inference latency, error rate), hardware status (temperature, memory, CPU). Use device twin / shadow pattern: each device maintains a shadow document in the cloud (desired state vs reported state). The cloud sets desired_model_version; the device reports current_model_version. When they differ, the device downloads the update. AWS IoT Core, Azure IoT Hub, and AWS Greengrass implement this pattern.
# Model versioning and OTA update system
import hashlib, json, os, time, requests
from pathlib import Path
# Simulated model registry (in production: S3 or similar)
MODEL_REGISTRY = {
"v1.0.0": {
"url": "https://example.com/models/detector_v1.tflite",
"sha256": "abc123...",
"size_kb": 1240,
"released": "2026-01-15",
"notes": "Initial deployment"
},
"v1.1.0": {
"url": "https://example.com/models/detector_v1_1.tflite",
"sha256": "def456...",
"size_kb": 1180,
"released": "2026-03-01",
"notes": "Improved night performance, 2% accuracy gain"
},
}
DEVICE_ID = "edge-pi-001"
CURRENT_VER = "v1.0.0"
MODEL_DIR = Path("/opt/models")
ACTIVE_MODEL = MODEL_DIR / "active.tflite"
def check_sha256(filepath, expected_hash):
sha256 = hashlib.sha256()
with open(filepath, 'rb') as f:
for chunk in iter(lambda: f.read(65536), b''):
sha256.update(chunk)
return sha256.hexdigest() == expected_hash
def smoke_test(model_path):
# Run inference on known test samples
# Compare output to expected (stored during training)
import numpy as np
test_inputs = [np.random.randn(1,224,224,3).astype(np.float32)] # your test data
test_outputs = [[0.9, 0.05, 0.05]] # expected top class probabilities
# ... run TFLite inference and compare
return True # return False if outputs deviate too much
def update_model(target_version):
if target_version == CURRENT_VER:
print(f"Already on {CURRENT_VER}")
return False
meta = MODEL_REGISTRY.get(target_version)
if not meta:
print(f"Unknown version: {target_version}")
return False
# Download to staging
staging = MODEL_DIR / f"staging_{target_version}.tflite"
print(f"Downloading {target_version} ({meta['size_kb']} KB)...")
# r = requests.get(meta['url'], stream=True)
# staging.write_bytes(r.content)
# Verify hash
# if not check_sha256(staging, meta['sha256']):
# staging.unlink(); return False
print(f"Hash verified.")
# Smoke test
if not smoke_test(staging):
print(f"Smoke test FAILED — rolling back")
staging.unlink()
return False
# Swap
backup = MODEL_DIR / "backup.tflite"
if ACTIVE_MODEL.exists(): ACTIVE_MODEL.rename(backup)
staging.rename(ACTIVE_MODEL)
print(f"Updated to {target_version}")
return True
# Report device state to cloud
def report_state():
state = {
"device_id": DEVICE_ID,
"model_version": CURRENT_VER,
"uptime_s": int(time.time()),
"free_disk_mb": 1234, # os.statvfs('/')
"temp_c": 45.2, # vcgencmd measure_temp
}
print(f"Reporting state: {json.dumps(state, indent=2)}")
# requests.post("https://fleet.api/devices/state", json=state)
report_state()
print("OTA system initialized.")
scipy.stats.ks_2samp) to detect input distribution shift on one feature (e.g., mean pixel value).Build a complete edge ML pipeline: train a simple binary classifier (defect vs good) on a small dataset. Deploy to Raspberry Pi with TFLite. Add monitoring: log inference latency and confidence to InfluxDB (from Day 4 of IoT course). Build a Grafana alert that fires when average confidence drops below 0.75 for 5 consecutive minutes — indicating potential drift. Implement automatic rollback if the alert fires.