How to Sync MQTT Sensor Data to PostGIS with Python

To sync MQTT sensor data to PostGIS with Python, establish a persistent subscriber using paho-mqtt v2, parse incoming JSON payloads that carry coordinates and environmental metrics, transform those coordinates into PostGIS geometries with ST_MakePoint and ST_SetSRID, then write records through psycopg2 using parameterized INSERT … ON CONFLICT statements. The pipeline requires strict payload validation, explicit coordinate ordering (longitude before latitude), idempotent write logic, and a spatial GiST index to tolerate broker reconnects and duplicate telemetry without corrupting historical data.

Why the pipeline is harder than it looks

A raw MQTT stream offers no schema guarantees. Sensors from different manufacturers publish different field names, unit conventions, and timestamp formats. Coordinate order alone causes silent data corruption: ST_MakePoint(lat, lon) compiles without error but places every reading on a mirrored globe. Duplicate delivery (QoS 1 retransmission after a broker restart) overwrites valid readings unless the write path is designed to be idempotent. And a new database connection per message collapses throughput above a few messages per second.

Getting this right is the foundation for every downstream spatial query β€” bounding-box lookups, ST_DWithin radius searches, and the spatial CRS mapping on ingest steps that reproject data from local survey grids into a common coordinate system.

MQTT to PostGIS pipeline data flow Four stages: MQTT Broker publishes to Python Subscriber, which validates the payload, then writes a spatial UPSERT to the PostGIS sensor_readings table. MQTT Broker QoS 1 publish sensors/+/environ paho-mqtt v2 on_message callback loop_forever() Validate & Parse schema Β· bounds check TZ normalise Β· lon/lat PostGIS ST_MakePoint ON CONFLICT UPSERT log.warning Β· discard

Prerequisites & version compatibility

Component Minimum version Notes
Python 3.9+ Required for datetime.fromisoformat() timezone handling and modern asyncio patterns
paho-mqtt 2.0.0 v2 enforces CallbackAPIVersion.VERSION2; legacy v1 callbacks raise TypeError at runtime
psycopg2-binary 2.9.0 PostgreSQL 12+ recommended for GENERATED ALWAYS AS IDENTITY and JSONB performance
asyncpg 0.28.0 Optional; use instead of psycopg2 for async pipelines above 50 msgs/sec
PostGIS 3.3+ Stable ST_MakePoint behaviour, native type casting, improved GiST index vacuuming
CRS EPSG:4326 (WGS 84) PostGIS default; transform client-side with pyproj if sensors report UTM or local projections

The timestamp alignment and timezone normalization step must be complete before this pipeline runs β€” naive timestamps written to a TIMESTAMPTZ column are stored as UTC without adjustment, silently breaking time-range queries across sensor deployments in different time zones.

Step 1 β€” Define the PostGIS schema

Create a table that enforces spatial integrity and supports idempotent writes. The UNIQUE constraint on (sensor_id, recorded_at) prevents duplicate telemetry from overwriting valid historical records during broker reconnects or QoS 1 retransmissions.

CREATE EXTENSION IF NOT EXISTS postgis;

CREATE TABLE sensor_readings (
    id           BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
    sensor_id    VARCHAR(50)     NOT NULL,
    recorded_at  TIMESTAMPTZ     NOT NULL,
    temperature  NUMERIC(5,2),
    humidity     NUMERIC(5,2),
    pm25         NUMERIC(7,3),
    location     GEOMETRY(Point, 4326) NOT NULL
);

-- Enforce idempotency: (sensor, timestamp) pairs must be unique
ALTER TABLE sensor_readings
    ADD CONSTRAINT uq_sensor_reading UNIQUE (sensor_id, recorded_at);

-- Spatial index for ST_DWithin and bounding-box queries
CREATE INDEX idx_sensor_readings_location
    ON sensor_readings USING GIST (location);

-- Temporal index for time-range slices
CREATE INDEX idx_sensor_readings_time
    ON sensor_readings (sensor_id, recorded_at DESC);

The separate temporal index matters for production: spatial queries typically filter by both geography and time window, and the planner cannot use a GiST index to satisfy a WHERE recorded_at > now() - interval '1 hour' clause.

Step 2 β€” Implement the MQTT subscriber (v2 API)

paho-mqtt 2.0 made CallbackAPIVersion a required argument. Omitting it raises a TypeError at construction time, not at connect time, which means the daemon fails to start silently if you suppress startup logs.

Subscribe to a hierarchical wildcard topic such as sensors/+/environmental to capture telemetry across all node IDs without managing per-sensor subscriptions. This matches the MQTT broker integration patterns used across environmental monitoring networks where sensors are deployed dynamically.

import paho.mqtt.client as mqtt

client = mqtt.Client(
    callback_api_version=mqtt.CallbackAPIVersion.VERSION2,
    client_id="postgis_sync_daemon",
)

# Reconnect with exponential backoff: 1 s min, 60 s max
client.reconnect_delay_set(min_delay=1, max_delay=60)

client.on_message = on_message  # defined in Step 3

client.connect("mqtt.example.com", 1883, keepalive=60)
client.subscribe("sensors/+/environmental", qos=1)

Set keepalive=60 for cellular links where TCP idle connections are dropped after 90 seconds by carrier NAT tables. Lower values increase broker load; 60 seconds is the field-tested default for LTE-M deployments.

Step 3 β€” Production-ready sync function

The complete, copy-pasteable implementation below handles payload validation, coordinate bounds checking, timezone normalisation, and idempotent writes in a single self-contained module. It maintains a persistent database connection and reconnects on OperationalError rather than opening a new connection per message.

"""
mqtt_to_postgis.py
Sync MQTT environmental telemetry to PostGIS.

Requirements:
    paho-mqtt>=2.0.0
    psycopg2-binary>=2.9.0

Usage:
    python mqtt_to_postgis.py
"""

from __future__ import annotations

import json
import logging
import sys
from datetime import datetime, timezone
from typing import Any

import psycopg2
import psycopg2.extras
import paho.mqtt.client as mqtt

# ── Configuration ──────────────────────────────────────────────────────────────
MQTT_BROKER: str = "mqtt.example.com"
MQTT_PORT: int = 1883
MQTT_TOPIC: str = "sensors/+/environmental"
DB_DSN: str = (
    "dbname=env_data user=iot_user password=secure_pass "
    "host=postgis.example.com port=5432"
)
TABLE_NAME: str = "sensor_readings"

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(name)s – %(message)s",
)
logger = logging.getLogger("mqtt_to_postgis")

# ── Database ────────────────────────────────────────────────────────────────────

_conn: psycopg2.extensions.connection | None = None

UPSERT_SQL = f"""
    INSERT INTO {TABLE_NAME}
        (sensor_id, recorded_at, temperature, humidity, pm25, location)
    VALUES
        (%s, %s, %s, %s, %s, ST_SetSRID(ST_MakePoint(%s, %s), 4326))
    ON CONFLICT (sensor_id, recorded_at) DO UPDATE SET
        temperature = EXCLUDED.temperature,
        humidity    = EXCLUDED.humidity,
        pm25        = EXCLUDED.pm25,
        location    = EXCLUDED.location;
"""


def get_connection() -> psycopg2.extensions.connection:
    """Return the shared connection, reconnecting if necessary."""
    global _conn
    if _conn is None or _conn.closed:
        _conn = psycopg2.connect(DB_DSN)
    return _conn


def write_reading(record: dict[str, Any]) -> None:
    """
    Execute an idempotent spatial UPSERT for a single telemetry record.

    Args:
        record: Validated dict with keys sensor_id, recorded_at (timezone-aware
                datetime), temperature, humidity, pm25, lon, lat.

    Raises:
        psycopg2.Error: Propagated to the caller after logging; the daemon
                        continues running for subsequent messages.
    """
    conn = get_connection()
    try:
        with conn.cursor() as cur:
            cur.execute(
                UPSERT_SQL,
                (
                    record["sensor_id"],
                    record["recorded_at"],
                    record.get("temperature"),   # nullable β€” not all sensors report temperature
                    record.get("humidity"),
                    record.get("pm25"),
                    record["lon"],               # ST_MakePoint(x, y) β†’ (longitude, latitude)
                    record["lat"],
                ),
            )
        conn.commit()
    except psycopg2.OperationalError:
        logger.warning("Lost DB connection β€” will reconnect on next message")
        _conn = None
        raise


# ── Payload validation ──────────────────────────────────────────────────────────

REQUIRED_KEYS: frozenset[str] = frozenset(
    {"sensor_id", "timestamp", "lon", "lat"}
)


def validate_payload(raw: dict[str, Any]) -> dict[str, Any]:
    """
    Validate and normalise a raw telemetry dict.

    Enforces: required keys present, WGS 84 coordinate bounds,
    ISO 8601 timestamp parseable, timezone made explicit (UTC if naive).

    Returns:
        Cleaned dict with ``recorded_at`` as a timezone-aware datetime
        and numeric fields coerced to float or None.

    Raises:
        ValueError: On any schema or bounds violation.
    """
    missing = REQUIRED_KEYS - raw.keys()
    if missing:
        raise ValueError(f"Missing required keys: {missing}")

    lon = float(raw["lon"])
    lat = float(raw["lat"])
    if not (-180.0 <= lon <= 180.0):
        raise ValueError(f"Longitude {lon} out of WGS 84 range [-180, 180]")
    if not (-90.0 <= lat <= 90.0):
        raise ValueError(f"Latitude {lat} out of WGS 84 range [-90, 90]")

    ts = datetime.fromisoformat(str(raw["timestamp"]))
    if ts.tzinfo is None:
        # Treat naive timestamps as UTC; log a warning so field deployments
        # can be corrected β€” silent UTC assumption hides firmware bugs.
        logger.warning(
            "Naive timestamp from sensor %s β€” assuming UTC", raw["sensor_id"]
        )
        ts = ts.replace(tzinfo=timezone.utc)

    return {
        "sensor_id":   str(raw["sensor_id"]),
        "recorded_at": ts,
        "lon":         lon,
        "lat":         lat,
        "temperature": float(raw["temperature"]) if "temperature" in raw else None,
        "humidity":    float(raw["humidity"])    if "humidity"    in raw else None,
        "pm25":        float(raw["pm25"])        if "pm25"        in raw else None,
    }


# ── MQTT callback ───────────────────────────────────────────────────────────────

def on_message(
    client: mqtt.Client,
    userdata: Any,
    msg: mqtt.MQTTMessage,
) -> None:
    """Process a single MQTT message: decode β†’ validate β†’ write."""
    try:
        raw = json.loads(msg.payload.decode("utf-8"))
    except (json.JSONDecodeError, UnicodeDecodeError) as exc:
        logger.warning("Malformed payload on %s: %s", msg.topic, exc)
        return

    try:
        record = validate_payload(raw)
    except ValueError as exc:
        logger.warning("Invalid telemetry from %s: %s", msg.topic, exc)
        return

    try:
        write_reading(record)
        logger.info("Synced %s @ %s", record["sensor_id"], record["recorded_at"])
    except psycopg2.Error as exc:
        logger.error("DB write failed for %s: %s", record["sensor_id"], exc)


# ── Entry point ─────────────────────────────────────────────────────────────────

def main() -> None:
    client = mqtt.Client(
        callback_api_version=mqtt.CallbackAPIVersion.VERSION2,
        client_id="postgis_sync_daemon",
    )
    client.reconnect_delay_set(min_delay=1, max_delay=60)
    client.on_message = on_message

    try:
        client.connect(MQTT_BROKER, MQTT_PORT, keepalive=60)
    except OSError as exc:
        logger.critical("Cannot connect to broker %s:%d β€” %s", MQTT_BROKER, MQTT_PORT, exc)
        sys.exit(1)

    client.subscribe(MQTT_TOPIC, qos=1)
    logger.info("Subscribed to %s β€” starting sync daemon", MQTT_TOPIC)
    client.loop_forever()


if __name__ == "__main__":
    main()

Parameter tuning by sensor type

Different environmental metrics arrive at different rates and carry different null-rate profiles. Tune connection pooling and batch flushing based on aggregate message frequency, not individual sensor rate.

Sensor / metric Typical publish rate Nullable fields Tuning notes
Temperature / humidity (DHT22, SHT31) 0.1–1 Hz None Low rate β€” synchronous psycopg2 sufficient
PM2.5 (SDS011, Plantower PMS5003) 1 Hz pm25 during warm-up Validate pm25 >= 0; negative values are sensor boot artefacts
Dissolved oxygen (Atlas Scientific) 0.03–0.1 Hz do_mgl if probe disconnected Expect 5–10 % null rate on field deployments; do not impute on ingest
Conductivity / TDS 0.1 Hz conductivity Reject values > 100,000 Β΅S/cm as sensor saturation artefacts
Multi-parameter sonde (YSI, In-Situ) 1–5 Hz Multiple Use execute_values batch inserts at 5 Hz; single-row UPSERT adds latency
GPS-tracked mobile sensor 1 Hz None lon/lat change each message β€” GiST index write amplification is high; batch in 10-second windows

For multi-parameter sondes publishing at 5 Hz across 20 sensors (100 rows/second), replace the synchronous write path with psycopg2.extras.execute_values on a 1-second buffer, or switch to asyncpg with connection.executemany. Both approaches reduce round-trip count by 100Γ— at that throughput.

Verification & testing

Confirm the pipeline writes correct geometries before deploying to production. The following unit test patches the MQTT callback and the database write, verifying the coordinate order that causes the most field incidents:

"""tests/test_mqtt_to_postgis.py"""

import pytest
from datetime import datetime, timezone
from unittest.mock import MagicMock, patch
from mqtt_to_postgis import validate_payload, on_message


class TestValidatePayload:
    BASE = {
        "sensor_id": "node-01",
        "timestamp": "2024-11-15T12:00:00+00:00",
        "lon": -0.1276,
        "lat": 51.5074,
        "temperature": 18.3,
        "humidity": 72.1,
    }

    def test_valid_payload_returns_cleaned_dict(self):
        result = validate_payload(self.BASE)
        assert result["lon"] == -0.1276
        assert result["lat"] == 51.5074
        assert isinstance(result["recorded_at"], datetime)
        assert result["recorded_at"].tzinfo is not None

    def test_swapped_coordinates_still_pass_bounds(self):
        # lat 51.5074 is valid lon; lon -0.1276 is valid lat β€” bounds check alone
        # cannot catch swapped coordinates. Verify your sensor firmware, not the code.
        bad = {**self.BASE, "lon": 51.5074, "lat": -0.1276}
        result = validate_payload(bad)
        assert result["lon"] == 51.5074  # passes bounds, but semantically wrong

    def test_longitude_out_of_range_raises(self):
        bad = {**self.BASE, "lon": 200.0}
        with pytest.raises(ValueError, match="Longitude"):
            validate_payload(bad)

    def test_missing_required_key_raises(self):
        bad = {k: v for k, v in self.BASE.items() if k != "sensor_id"}
        with pytest.raises(ValueError, match="Missing required keys"):
            validate_payload(bad)

    def test_naive_timestamp_gets_utc(self):
        naive = {**self.BASE, "timestamp": "2024-11-15T12:00:00"}
        result = validate_payload(naive)
        assert result["recorded_at"].tzinfo == timezone.utc


class TestOnMessage:
    def _make_msg(self, payload: bytes, topic: str = "sensors/node-01/environmental"):
        msg = MagicMock()
        msg.payload = payload
        msg.topic = topic
        return msg

    def test_valid_message_calls_write(self):
        payload = b'{"sensor_id":"n1","timestamp":"2024-11-15T12:00:00Z","lon":-0.1,"lat":51.5,"temperature":18.0}'
        with patch("mqtt_to_postgis.write_reading") as mock_write:
            on_message(MagicMock(), None, self._make_msg(payload))
            mock_write.assert_called_once()
            call_record = mock_write.call_args[0][0]
            assert call_record["lon"] == -0.1      # longitude written as x
            assert call_record["lat"] == 51.5      # latitude written as y

    def test_malformed_json_does_not_raise(self):
        on_message(MagicMock(), None, self._make_msg(b"not json"))  # must not propagate

Verify geometries are spatially correct in the database after the first real run:

-- Confirm the point lands in London, not the Indian Ocean
SELECT sensor_id, ST_AsText(location) AS wkt, ST_SRID(location) AS srid
FROM sensor_readings
WHERE sensor_id = 'node-01'
LIMIT 5;
-- Expected: POINT(-0.1276 51.5074), SRID 4326

Gotchas

Longitude and latitude are silently reversed. ST_MakePoint(lat, lon) writes valid geometry β€” PostGIS cannot tell that you meant the other order. Every spatial query returns wrong results or empty result sets. Always label your variables lon and lat explicitly and write a coordinate-bounds test that checks UK/EU/US sensors actually land in the right hemisphere.

Naive timestamps written to TIMESTAMPTZ become UTC without warning. If a sensor’s firmware writes local time without a timezone offset and you skip the tzinfo is None check, all timestamps shift by the server’s UTC offset β€” queries against a specific date range silently miss records.

Opening a new psycopg2.connect() per message exhausts the PostgreSQL connection limit. The default max_connections is 100. A network of 20 sensors publishing at 1 Hz opens and closes 20 connections per second, rapidly hitting the limit and generating FATAL: too many connections errors that surface as message loss, not an application exception.

The ON CONFLICT clause requires an exact match on the UNIQUE constraint columns. If the constraint is on (sensor_id, recorded_at) but you write ON CONFLICT (sensor_id), PostgreSQL raises a ProgrammingError at runtime because the conflict target does not match any constraint. Check the constraint name with \d sensor_readings in psql before deploying.



Why does ST_MakePoint take longitude before latitude?

ST_MakePoint follows the mathematical (x, y) convention where x is the east-west axis (longitude) and y is the north-south axis (latitude). Most human-readable formats and many APIs list latitude first, so reversing the arguments silently places every point on a mirrored coordinate grid β€” queries return empty results and bounding-box checks never intersect real geometries. Always annotate your variables as lon and lat, not x and y or bare index positions.

What QoS level should I use for environmental telemetry?

Use QoS 1 (at-least-once delivery). QoS 0 drops packets on unstable cellular or LoRa links without any notification to the publisher. QoS 2 (exactly-once) adds significant handshake overhead that increases per-message latency without meaningful benefit when the database already handles duplicates via ON CONFLICT.

Should I open a new database connection per MQTT message?

No. For moderate throughput (under 20 messages/second), maintain a single persistent psycopg2 connection and reconnect on OperationalError. Above 50 messages/second, switch to asyncpg with an asyncio event loop and a connection pool of 4–8 connections to avoid thread contention.

How do I handle sensors that report coordinates in UTM or a local projection?

Transform client-side with pyproj.Transformer before insertion. Calling ST_Transform on every INSERT adds server-side compute that accumulates under high-frequency streams. Pin the source EPSG code in your sensor metadata table so the transform function can look it up dynamically rather than hard-coding the projection per sensor type. See Python Scripts for On-the-Fly CRS Transformation During Ingest for a complete implementation.