Data sitting in a broker is useless. Today you'll build the full pipeline: InfluxDB for time-series storage, Grafana for dashboards, and alert rules.
InfluxDB stores time-stamped data efficiently. Key concepts: measurement (like a table: 'sensors'), tags (indexed metadata: device_id, location), fields (values: temp, humidity), timestamp (nanosecond precision). Query with InfluxQL or Flux. Install: docker run -p 8086:8086 influxdb:2.0. Use the Python influxdb-client to write from MQTT. Key advantage over SQL: automatic time-based rollups and downsampling.
Grafana is the visualization layer. Connect to InfluxDB as a data source. Build panels: time-series graphs, gauges, stat panels, tables. Variables let users select device or time range from a dropdown — one dashboard serves all devices. Alert rules send notifications (email, Slack, PagerDuty) when thresholds are breached. Install: docker run -p 3000:3000 grafana/grafana. Default login: admin/admin.
A bridge script subscribes to all sensor topics and writes to InfluxDB. Use Python with paho-mqtt and influxdb-client. Parse the JSON payload, extract tags and fields, write a data point. Run as a systemd service on the Pi/server. This is the glue that turns MQTT messages into queryable time-series data.
# MQTT to InfluxDB bridge
# pip install paho-mqtt influxdb-client
import json
import paho.mqtt.client as mqtt
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS
# InfluxDB config (local or InfluxDB Cloud)
INFLUX_URL = "http://localhost:8086"
INFLUX_TOKEN = "your-token-here"
INFLUX_ORG = "your-org"
INFLUX_BUCKET = "iot-sensors"
# MQTT config
MQTT_BROKER = "localhost"
MQTT_TOPIC = "home/#" # all home topics
influx = InfluxDBClient(url=INFLUX_URL, token=INFLUX_TOKEN, org=INFLUX_ORG)
write_api = influx.write_api(write_options=SYNCHRONOUS)
def write_to_influx(device_id, location, temp, humidity):
point = (
Point("sensors")
.tag("device_id", device_id)
.tag("location", location)
.field("temperature", float(temp))
.field("humidity", float(humidity))
)
write_api.write(bucket=INFLUX_BUCKET, record=point)
def on_message(client, userdata, msg):
try:
data = json.loads(msg.payload.decode())
device_id = data.get('device', msg.topic.split('/')[1])
location = data.get('location', 'unknown')
if 'temp' in data and 'hum' in data:
write_to_influx(device_id, location, data['temp'], data['hum'])
print(f"Wrote: {device_id} {data['temp']}°C {data['hum']}%")
except Exception as e:
print(f"Error: {e} | payload: {msg.payload}")
def on_connect(client, userdata, flags, rc):
print(f"Connected to MQTT: {rc}")
client.subscribe(MQTT_TOPIC, qos=1)
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
client.connect(MQTT_BROKER, 1883, keepalive=60)
print(f"Bridge running: MQTT → InfluxDB")
client.loop_forever()
systemd so it restarts automatically on failure. Create /etc/systemd/system/mqtt-bridge.service, set Restart=always and RestartSec=5. Then systemctl enable mqtt-bridge. This turns a Python script into a production service.Implement data retention policy: configure InfluxDB to automatically downsample data older than 7 days from 1-minute intervals to 1-hour averages. This is called a continuous query or task in InfluxDB 2.x. Research the Flux query language task syntax. Calculate storage savings: 1 reading/minute vs 1 reading/hour over 1 year for 10 sensors.