How to Sync MQTT Sensor Data to PostGIS with Python
To sync MQTT sensor data to PostGIS with Python, establish a persistent subscriber using paho-mqtt v2, parse incoming JSON payloads that carry coordinates and environmental metrics, transform those coordinates into PostGIS geometries with ST_MakePoint and ST_SetSRID, then write records through psycopg2 using parameterized INSERT β¦ ON CONFLICT statements. The pipeline requires strict payload validation, explicit coordinate ordering (longitude before latitude), idempotent write logic, and a spatial GiST index to tolerate broker reconnects and duplicate telemetry without corrupting historical data.
Why the pipeline is harder than it looks
A raw MQTT stream offers no schema guarantees. Sensors from different manufacturers publish different field names, unit conventions, and timestamp formats. Coordinate order alone causes silent data corruption: ST_MakePoint(lat, lon) compiles without error but places every reading on a mirrored globe. Duplicate delivery (QoS 1 retransmission after a broker restart) overwrites valid readings unless the write path is designed to be idempotent. And a new database connection per message collapses throughput above a few messages per second.
Getting this right is the foundation for every downstream spatial query β bounding-box lookups, ST_DWithin radius searches, and the spatial CRS mapping on ingest steps that reproject data from local survey grids into a common coordinate system.
Prerequisites & version compatibility
| Component | Minimum version | Notes |
|---|---|---|
| Python | 3.9+ | Required for datetime.fromisoformat() timezone handling and modern asyncio patterns |
paho-mqtt |
2.0.0 | v2 enforces CallbackAPIVersion.VERSION2; legacy v1 callbacks raise TypeError at runtime |
psycopg2-binary |
2.9.0 | PostgreSQL 12+ recommended for GENERATED ALWAYS AS IDENTITY and JSONB performance |
asyncpg |
0.28.0 | Optional; use instead of psycopg2 for async pipelines above 50 msgs/sec |
| PostGIS | 3.3+ | Stable ST_MakePoint behaviour, native type casting, improved GiST index vacuuming |
| CRS | EPSG:4326 (WGS 84) | PostGIS default; transform client-side with pyproj if sensors report UTM or local projections |
The timestamp alignment and timezone normalization step must be complete before this pipeline runs β naive timestamps written to a TIMESTAMPTZ column are stored as UTC without adjustment, silently breaking time-range queries across sensor deployments in different time zones.
Step 1 β Define the PostGIS schema
Create a table that enforces spatial integrity and supports idempotent writes. The UNIQUE constraint on (sensor_id, recorded_at) prevents duplicate telemetry from overwriting valid historical records during broker reconnects or QoS 1 retransmissions.
CREATE EXTENSION IF NOT EXISTS postgis;
CREATE TABLE sensor_readings (
id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
sensor_id VARCHAR(50) NOT NULL,
recorded_at TIMESTAMPTZ NOT NULL,
temperature NUMERIC(5,2),
humidity NUMERIC(5,2),
pm25 NUMERIC(7,3),
location GEOMETRY(Point, 4326) NOT NULL
);
-- Enforce idempotency: (sensor, timestamp) pairs must be unique
ALTER TABLE sensor_readings
ADD CONSTRAINT uq_sensor_reading UNIQUE (sensor_id, recorded_at);
-- Spatial index for ST_DWithin and bounding-box queries
CREATE INDEX idx_sensor_readings_location
ON sensor_readings USING GIST (location);
-- Temporal index for time-range slices
CREATE INDEX idx_sensor_readings_time
ON sensor_readings (sensor_id, recorded_at DESC);
The separate temporal index matters for production: spatial queries typically filter by both geography and time window, and the planner cannot use a GiST index to satisfy a WHERE recorded_at > now() - interval '1 hour' clause.
Step 2 β Implement the MQTT subscriber (v2 API)
paho-mqtt 2.0 made CallbackAPIVersion a required argument. Omitting it raises a TypeError at construction time, not at connect time, which means the daemon fails to start silently if you suppress startup logs.
Subscribe to a hierarchical wildcard topic such as sensors/+/environmental to capture telemetry across all node IDs without managing per-sensor subscriptions. This matches the MQTT broker integration patterns used across environmental monitoring networks where sensors are deployed dynamically.
import paho.mqtt.client as mqtt
client = mqtt.Client(
callback_api_version=mqtt.CallbackAPIVersion.VERSION2,
client_id="postgis_sync_daemon",
)
# Reconnect with exponential backoff: 1 s min, 60 s max
client.reconnect_delay_set(min_delay=1, max_delay=60)
client.on_message = on_message # defined in Step 3
client.connect("mqtt.example.com", 1883, keepalive=60)
client.subscribe("sensors/+/environmental", qos=1)
Set keepalive=60 for cellular links where TCP idle connections are dropped after 90 seconds by carrier NAT tables. Lower values increase broker load; 60 seconds is the field-tested default for LTE-M deployments.
Step 3 β Production-ready sync function
The complete, copy-pasteable implementation below handles payload validation, coordinate bounds checking, timezone normalisation, and idempotent writes in a single self-contained module. It maintains a persistent database connection and reconnects on OperationalError rather than opening a new connection per message.
"""
mqtt_to_postgis.py
Sync MQTT environmental telemetry to PostGIS.
Requirements:
paho-mqtt>=2.0.0
psycopg2-binary>=2.9.0
Usage:
python mqtt_to_postgis.py
"""
from __future__ import annotations
import json
import logging
import sys
from datetime import datetime, timezone
from typing import Any
import psycopg2
import psycopg2.extras
import paho.mqtt.client as mqtt
# ββ Configuration ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
MQTT_BROKER: str = "mqtt.example.com"
MQTT_PORT: int = 1883
MQTT_TOPIC: str = "sensors/+/environmental"
DB_DSN: str = (
"dbname=env_data user=iot_user password=secure_pass "
"host=postgis.example.com port=5432"
)
TABLE_NAME: str = "sensor_readings"
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s β %(message)s",
)
logger = logging.getLogger("mqtt_to_postgis")
# ββ Database ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
_conn: psycopg2.extensions.connection | None = None
UPSERT_SQL = f"""
INSERT INTO {TABLE_NAME}
(sensor_id, recorded_at, temperature, humidity, pm25, location)
VALUES
(%s, %s, %s, %s, %s, ST_SetSRID(ST_MakePoint(%s, %s), 4326))
ON CONFLICT (sensor_id, recorded_at) DO UPDATE SET
temperature = EXCLUDED.temperature,
humidity = EXCLUDED.humidity,
pm25 = EXCLUDED.pm25,
location = EXCLUDED.location;
"""
def get_connection() -> psycopg2.extensions.connection:
"""Return the shared connection, reconnecting if necessary."""
global _conn
if _conn is None or _conn.closed:
_conn = psycopg2.connect(DB_DSN)
return _conn
def write_reading(record: dict[str, Any]) -> None:
"""
Execute an idempotent spatial UPSERT for a single telemetry record.
Args:
record: Validated dict with keys sensor_id, recorded_at (timezone-aware
datetime), temperature, humidity, pm25, lon, lat.
Raises:
psycopg2.Error: Propagated to the caller after logging; the daemon
continues running for subsequent messages.
"""
conn = get_connection()
try:
with conn.cursor() as cur:
cur.execute(
UPSERT_SQL,
(
record["sensor_id"],
record["recorded_at"],
record.get("temperature"), # nullable β not all sensors report temperature
record.get("humidity"),
record.get("pm25"),
record["lon"], # ST_MakePoint(x, y) β (longitude, latitude)
record["lat"],
),
)
conn.commit()
except psycopg2.OperationalError:
logger.warning("Lost DB connection β will reconnect on next message")
_conn = None
raise
# ββ Payload validation ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
REQUIRED_KEYS: frozenset[str] = frozenset(
{"sensor_id", "timestamp", "lon", "lat"}
)
def validate_payload(raw: dict[str, Any]) -> dict[str, Any]:
"""
Validate and normalise a raw telemetry dict.
Enforces: required keys present, WGS 84 coordinate bounds,
ISO 8601 timestamp parseable, timezone made explicit (UTC if naive).
Returns:
Cleaned dict with ``recorded_at`` as a timezone-aware datetime
and numeric fields coerced to float or None.
Raises:
ValueError: On any schema or bounds violation.
"""
missing = REQUIRED_KEYS - raw.keys()
if missing:
raise ValueError(f"Missing required keys: {missing}")
lon = float(raw["lon"])
lat = float(raw["lat"])
if not (-180.0 <= lon <= 180.0):
raise ValueError(f"Longitude {lon} out of WGS 84 range [-180, 180]")
if not (-90.0 <= lat <= 90.0):
raise ValueError(f"Latitude {lat} out of WGS 84 range [-90, 90]")
ts = datetime.fromisoformat(str(raw["timestamp"]))
if ts.tzinfo is None:
# Treat naive timestamps as UTC; log a warning so field deployments
# can be corrected β silent UTC assumption hides firmware bugs.
logger.warning(
"Naive timestamp from sensor %s β assuming UTC", raw["sensor_id"]
)
ts = ts.replace(tzinfo=timezone.utc)
return {
"sensor_id": str(raw["sensor_id"]),
"recorded_at": ts,
"lon": lon,
"lat": lat,
"temperature": float(raw["temperature"]) if "temperature" in raw else None,
"humidity": float(raw["humidity"]) if "humidity" in raw else None,
"pm25": float(raw["pm25"]) if "pm25" in raw else None,
}
# ββ MQTT callback βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def on_message(
client: mqtt.Client,
userdata: Any,
msg: mqtt.MQTTMessage,
) -> None:
"""Process a single MQTT message: decode β validate β write."""
try:
raw = json.loads(msg.payload.decode("utf-8"))
except (json.JSONDecodeError, UnicodeDecodeError) as exc:
logger.warning("Malformed payload on %s: %s", msg.topic, exc)
return
try:
record = validate_payload(raw)
except ValueError as exc:
logger.warning("Invalid telemetry from %s: %s", msg.topic, exc)
return
try:
write_reading(record)
logger.info("Synced %s @ %s", record["sensor_id"], record["recorded_at"])
except psycopg2.Error as exc:
logger.error("DB write failed for %s: %s", record["sensor_id"], exc)
# ββ Entry point βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def main() -> None:
client = mqtt.Client(
callback_api_version=mqtt.CallbackAPIVersion.VERSION2,
client_id="postgis_sync_daemon",
)
client.reconnect_delay_set(min_delay=1, max_delay=60)
client.on_message = on_message
try:
client.connect(MQTT_BROKER, MQTT_PORT, keepalive=60)
except OSError as exc:
logger.critical("Cannot connect to broker %s:%d β %s", MQTT_BROKER, MQTT_PORT, exc)
sys.exit(1)
client.subscribe(MQTT_TOPIC, qos=1)
logger.info("Subscribed to %s β starting sync daemon", MQTT_TOPIC)
client.loop_forever()
if __name__ == "__main__":
main()
Parameter tuning by sensor type
Different environmental metrics arrive at different rates and carry different null-rate profiles. Tune connection pooling and batch flushing based on aggregate message frequency, not individual sensor rate.
| Sensor / metric | Typical publish rate | Nullable fields | Tuning notes |
|---|---|---|---|
| Temperature / humidity (DHT22, SHT31) | 0.1β1 Hz | None | Low rate β synchronous psycopg2 sufficient |
| PM2.5 (SDS011, Plantower PMS5003) | 1 Hz | pm25 during warm-up |
Validate pm25 >= 0; negative values are sensor boot artefacts |
| Dissolved oxygen (Atlas Scientific) | 0.03β0.1 Hz | do_mgl if probe disconnected |
Expect 5β10 % null rate on field deployments; do not impute on ingest |
| Conductivity / TDS | 0.1 Hz | conductivity |
Reject values > 100,000 Β΅S/cm as sensor saturation artefacts |
| Multi-parameter sonde (YSI, In-Situ) | 1β5 Hz | Multiple | Use execute_values batch inserts at 5 Hz; single-row UPSERT adds latency |
| GPS-tracked mobile sensor | 1 Hz | None | lon/lat change each message β GiST index write amplification is high; batch in 10-second windows |
For multi-parameter sondes publishing at 5 Hz across 20 sensors (100 rows/second), replace the synchronous write path with psycopg2.extras.execute_values on a 1-second buffer, or switch to asyncpg with connection.executemany. Both approaches reduce round-trip count by 100Γ at that throughput.
Verification & testing
Confirm the pipeline writes correct geometries before deploying to production. The following unit test patches the MQTT callback and the database write, verifying the coordinate order that causes the most field incidents:
"""tests/test_mqtt_to_postgis.py"""
import pytest
from datetime import datetime, timezone
from unittest.mock import MagicMock, patch
from mqtt_to_postgis import validate_payload, on_message
class TestValidatePayload:
BASE = {
"sensor_id": "node-01",
"timestamp": "2024-11-15T12:00:00+00:00",
"lon": -0.1276,
"lat": 51.5074,
"temperature": 18.3,
"humidity": 72.1,
}
def test_valid_payload_returns_cleaned_dict(self):
result = validate_payload(self.BASE)
assert result["lon"] == -0.1276
assert result["lat"] == 51.5074
assert isinstance(result["recorded_at"], datetime)
assert result["recorded_at"].tzinfo is not None
def test_swapped_coordinates_still_pass_bounds(self):
# lat 51.5074 is valid lon; lon -0.1276 is valid lat β bounds check alone
# cannot catch swapped coordinates. Verify your sensor firmware, not the code.
bad = {**self.BASE, "lon": 51.5074, "lat": -0.1276}
result = validate_payload(bad)
assert result["lon"] == 51.5074 # passes bounds, but semantically wrong
def test_longitude_out_of_range_raises(self):
bad = {**self.BASE, "lon": 200.0}
with pytest.raises(ValueError, match="Longitude"):
validate_payload(bad)
def test_missing_required_key_raises(self):
bad = {k: v for k, v in self.BASE.items() if k != "sensor_id"}
with pytest.raises(ValueError, match="Missing required keys"):
validate_payload(bad)
def test_naive_timestamp_gets_utc(self):
naive = {**self.BASE, "timestamp": "2024-11-15T12:00:00"}
result = validate_payload(naive)
assert result["recorded_at"].tzinfo == timezone.utc
class TestOnMessage:
def _make_msg(self, payload: bytes, topic: str = "sensors/node-01/environmental"):
msg = MagicMock()
msg.payload = payload
msg.topic = topic
return msg
def test_valid_message_calls_write(self):
payload = b'{"sensor_id":"n1","timestamp":"2024-11-15T12:00:00Z","lon":-0.1,"lat":51.5,"temperature":18.0}'
with patch("mqtt_to_postgis.write_reading") as mock_write:
on_message(MagicMock(), None, self._make_msg(payload))
mock_write.assert_called_once()
call_record = mock_write.call_args[0][0]
assert call_record["lon"] == -0.1 # longitude written as x
assert call_record["lat"] == 51.5 # latitude written as y
def test_malformed_json_does_not_raise(self):
on_message(MagicMock(), None, self._make_msg(b"not json")) # must not propagate
Verify geometries are spatially correct in the database after the first real run:
-- Confirm the point lands in London, not the Indian Ocean
SELECT sensor_id, ST_AsText(location) AS wkt, ST_SRID(location) AS srid
FROM sensor_readings
WHERE sensor_id = 'node-01'
LIMIT 5;
-- Expected: POINT(-0.1276 51.5074), SRID 4326
Gotchas
Longitude and latitude are silently reversed. ST_MakePoint(lat, lon) writes valid geometry β PostGIS cannot tell that you meant the other order. Every spatial query returns wrong results or empty result sets. Always label your variables lon and lat explicitly and write a coordinate-bounds test that checks UK/EU/US sensors actually land in the right hemisphere.
Naive timestamps written to TIMESTAMPTZ become UTC without warning. If a sensorβs firmware writes local time without a timezone offset and you skip the tzinfo is None check, all timestamps shift by the serverβs UTC offset β queries against a specific date range silently miss records.
Opening a new psycopg2.connect() per message exhausts the PostgreSQL connection limit. The default max_connections is 100. A network of 20 sensors publishing at 1 Hz opens and closes 20 connections per second, rapidly hitting the limit and generating FATAL: too many connections errors that surface as message loss, not an application exception.
The ON CONFLICT clause requires an exact match on the UNIQUE constraint columns. If the constraint is on (sensor_id, recorded_at) but you write ON CONFLICT (sensor_id), PostgreSQL raises a ProgrammingError at runtime because the conflict target does not match any constraint. Check the constraint name with \d sensor_readings in psql before deploying.
Related
- Parent: MQTT Broker Integration for Environmental Sensors
- Handling Timezone Drift in High-Frequency IoT Streams β normalize timestamps before they reach PostGIS
- Python Scripts for On-the-Fly CRS Transformation During Ingest β reproject UTM or local-grid coordinates client-side before
ST_MakePoint - Building a Local SQLite Fallback Buffer for Remote Sensors β queue readings locally when PostGIS is unreachable and replay on reconnect
- IoT Sensor Data Ingestion & Spatial Synchronization β overview of the full ingestion architecture
Why does ST_MakePoint take longitude before latitude?
ST_MakePoint follows the mathematical (x, y) convention where x is the east-west axis (longitude) and y is the north-south axis (latitude). Most human-readable formats and many APIs list latitude first, so reversing the arguments silently places every point on a mirrored coordinate grid β queries return empty results and bounding-box checks never intersect real geometries. Always annotate your variables as lon and lat, not x and y or bare index positions.
What QoS level should I use for environmental telemetry?
Use QoS 1 (at-least-once delivery). QoS 0 drops packets on unstable cellular or LoRa links without any notification to the publisher. QoS 2 (exactly-once) adds significant handshake overhead that increases per-message latency without meaningful benefit when the database already handles duplicates via ON CONFLICT.
Should I open a new database connection per MQTT message?
No. For moderate throughput (under 20 messages/second), maintain a single persistent psycopg2 connection and reconnect on OperationalError. Above 50 messages/second, switch to asyncpg with an asyncio event loop and a connection pool of 4β8 connections to avoid thread contention.
How do I handle sensors that report coordinates in UTM or a local projection?
Transform client-side with pyproj.Transformer before insertion. Calling ST_Transform on every INSERT adds server-side compute that accumulates under high-frequency streams. Pin the source EPSG code in your sensor metadata table so the transform function can look it up dynamically rather than hard-coding the projection per sensor type. See Python Scripts for On-the-Fly CRS Transformation During Ingest for a complete implementation.