Building a Local SQLite Fallback Buffer for Remote Sensors

Building a local SQLite fallback buffer decouples data acquisition from network reliability. When cellular links drop, satellite windows close, or cloud APIs throttle requests, a disk-backed queue guarantees zero data loss by persisting each reading immediately, then replaying it once connectivity is restored. SQLite’s write-ahead log (WAL) mode delivers crash-safe durability with minimal write overhead — the right trade-off for ARM-class field hardware. The pattern integrates directly into Fallback Buffering & Offline Caching pipelines, adding an explicit synchronization state machine on top of raw storage so every byte of sensor telemetry can be audited, retried, or quarantined without manual intervention.

Why WAL Mode and Explicit State Tracking Matter

SQLite’s default journal mode (DELETE) rewrites the original database file on every commit. On SD cards and eMMC modules common in field gateways, that pattern triggers repeated random writes to the same sectors, accelerating wear and risking mid-write corruption if power fails at the wrong moment. WAL mode instead appends all modifications to a separate -wal file, leaving the main database file untouched until a checkpoint runs. Readers never block writers, and a sudden power cut leaves both files in a consistent state that SQLite recovers automatically on the next open.

State tracking — the pending → synced or pending → failed transitions stored on each row — is what separates a reliable replay queue from a simple log file. Without it, a restarted edge process has no way to know which records already reached the upstream endpoint and which were in-flight when the network dropped. Explicit states, combined with idempotency keys on the ingestion side, make the sync loop safe to restart at any point.

The diagram below shows the full lifecycle of a reading from sensor acquisition through confirmed ingestion:

SQLite Fallback Buffer State Machine State machine showing a sensor reading moving from acquisition through pending, sync attempt, and finally either synced or failed, with retry backoff on transient failures. Sensor Reading Queue (pending) Sync Attempt Upstream (synced ✓) Dead Letter (failed) backoff retry enqueue batch 200 OK error

Production-Ready Implementation

The class below handles initialization, enqueueing, batched GeoJSON transmission, retry accounting, and WAL checkpointing. It depends only on Python’s standard library and requests. Pin versions as shown in the prerequisites table before deploying.

import sqlite3
import json
import time
import logging
from datetime import datetime, timezone
from typing import Dict, List, Any, Optional, Tuple

import requests  # requests==2.31.0

logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s")
logger = logging.getLogger(__name__)


class SQLiteSensorBuffer:
    """
    Write-ahead-log SQLite queue for environmental sensor readings.

    Records transition: pending -> synced (on successful POST)
                     or pending -> failed (after retry_limit exhausted).
    All coordinates are stored as WGS84 (EPSG:4326) REAL columns and
    serialised into RFC 7946 GeoJSON FeatureCollections for upload.

    Args:
        db_path:       Path to the SQLite file. Use persistent storage, not tmpfs.
        max_batch:     Maximum records per sync request (tune to endpoint rate limits).
        retry_limit:   Mark record 'failed' after this many consecutive upload errors.
        base_delay:    Base seconds for exponential backoff between sync cycles.
        checkpoint_every: Run a PASSIVE WAL checkpoint every N sync cycles.
    """

    def __init__(
        self,
        db_path: str = "sensor_buffer.db",
        max_batch: int = 50,
        retry_limit: int = 5,
        base_delay: float = 2.0,
        checkpoint_every: int = 10,
    ) -> None:
        self.db_path = db_path
        self.max_batch = max_batch
        self.retry_limit = retry_limit
        self.base_delay = base_delay
        self.checkpoint_every = checkpoint_every
        self._sync_cycle: int = 0
        self._init_db()

    # ------------------------------------------------------------------
    # Initialisation
    # ------------------------------------------------------------------

    def _init_db(self) -> None:
        """Create schema and configure WAL mode on first run."""
        with sqlite3.connect(self.db_path) as conn:
            # WAL mode: crash-safe, non-blocking concurrent reads
            conn.execute("PRAGMA journal_mode=WAL;")
            # NORMAL: fsync only at checkpoints — safe with WAL, faster writes
            conn.execute("PRAGMA synchronous=NORMAL;")
            # 2 MB page cache — reduces physical I/O on flash storage
            conn.execute("PRAGMA cache_size=-2000;")
            conn.execute("""
                CREATE TABLE IF NOT EXISTS sensor_readings (
                    id             INTEGER PRIMARY KEY AUTOINCREMENT,
                    sensor_id      TEXT    NOT NULL,
                    timestamp_utc  TEXT    NOT NULL,
                    latitude       REAL    NOT NULL,
                    longitude      REAL    NOT NULL,
                    payload        TEXT    NOT NULL,
                    status         TEXT    NOT NULL DEFAULT 'pending',
                    retry_count    INTEGER NOT NULL DEFAULT 0,
                    created_at     TEXT    NOT NULL DEFAULT (datetime('now'))
                );
            """)
            # Covering index for the hot query path: status + created_at ordering
            conn.execute(
                "CREATE INDEX IF NOT EXISTS idx_status_created "
                "ON sensor_readings(status, created_at);"
            )

    # ------------------------------------------------------------------
    # Write path
    # ------------------------------------------------------------------

    def enqueue(
        self,
        sensor_id: str,
        lat: float,
        lon: float,
        payload: Dict[str, Any],
    ) -> None:
        """
        Persist a single sensor reading to the pending queue.

        Timestamps are captured in UTC at call time; never rely on the
        caller supplying a pre-formatted string to avoid local-time bugs.
        """
        ts = datetime.now(timezone.utc).isoformat()
        with sqlite3.connect(self.db_path) as conn:
            conn.execute(
                "INSERT INTO sensor_readings "
                "(sensor_id, timestamp_utc, latitude, longitude, payload) "
                "VALUES (?, ?, ?, ?, ?)",
                (sensor_id, ts, lat, lon, json.dumps(payload)),
            )

    # ------------------------------------------------------------------
    # Read + sync path
    # ------------------------------------------------------------------

    def _fetch_pending(self) -> List[Tuple]:
        with sqlite3.connect(self.db_path) as conn:
            cur = conn.execute(
                "SELECT id, sensor_id, timestamp_utc, latitude, longitude, payload "
                "FROM sensor_readings "
                "WHERE status = 'pending' "
                "ORDER BY created_at "
                "LIMIT ?",
                (self.max_batch,),
            )
            return cur.fetchall()

    def sync_to_endpoint(
        self,
        url: str,
        headers: Optional[Dict[str, str]] = None,
        timeout: int = 30,
    ) -> None:
        """
        Fetch a pending batch, serialise as GeoJSON, POST to url.
        On HTTP error or network failure, increments retry counters;
        records that exceed retry_limit transition to 'failed'.
        """
        records = self._fetch_pending()
        if not records:
            logger.info("No pending records to sync.")
            return

        features = []
        ids_to_update: List[int] = []

        for rid, sid, ts, lat, lon, raw_payload in records:
            ids_to_update.append(rid)
            props = {**json.loads(raw_payload), "sensor_id": sid, "timestamp_utc": ts}
            features.append({
                "type": "Feature",
                # GeoJSON spec (RFC 7946): coordinates are [longitude, latitude]
                "geometry": {"type": "Point", "coordinates": [lon, lat]},
                "properties": props,
            })

        body = {"type": "FeatureCollection", "features": features}

        try:
            resp = requests.post(url, json=body, headers=headers, timeout=timeout)
            resp.raise_for_status()
            self._mark_synced(ids_to_update)
            logger.info("Synced %d records.", len(ids_to_update))
        except requests.RequestException as exc:
            logger.warning("Sync failed (%s). Incrementing retry counters.", exc)
            self._increment_retries(ids_to_update)

        # Periodic WAL checkpoint to bound -wal file growth
        self._sync_cycle += 1
        if self._sync_cycle % self.checkpoint_every == 0:
            self._checkpoint()

    def _mark_synced(self, ids: List[int]) -> None:
        placeholders = ",".join("?" * len(ids))
        with sqlite3.connect(self.db_path) as conn:
            conn.execute(
                f"UPDATE sensor_readings SET status = 'synced' WHERE id IN ({placeholders})",
                ids,
            )

    def _increment_retries(self, ids: List[int]) -> None:
        """
        Increment retry_count. Transition to 'failed' when retry_limit is reached.
        Records remain 'pending' while retries remain — exponential backoff in the
        run_sync_loop prevents hammering an unavailable endpoint.
        """
        placeholders = ",".join("?" * len(ids))
        with sqlite3.connect(self.db_path) as conn:
            conn.execute(
                f"""
                UPDATE sensor_readings
                SET retry_count = retry_count + 1,
                    status = CASE
                        WHEN retry_count + 1 >= ? THEN 'failed'
                        ELSE 'pending'
                    END
                WHERE id IN ({placeholders})
                """,
                [self.retry_limit] + ids,
            )

    def _checkpoint(self) -> None:
        """PASSIVE checkpoint: flush WAL pages without blocking writers."""
        with sqlite3.connect(self.db_path) as conn:
            conn.execute("PRAGMA wal_checkpoint(PASSIVE);")

    # ------------------------------------------------------------------
    # Continuous sync loop
    # ------------------------------------------------------------------

    def run_sync_loop(
        self,
        url: str,
        headers: Optional[Dict[str, str]] = None,
        poll_interval: int = 60,
    ) -> None:
        """
        Poll and sync indefinitely. Uses exponential backoff after consecutive
        failures to avoid thundering-herd effects when multiple gateways
        reconnect to the same upstream endpoint simultaneously.
        """
        consecutive_failures = 0
        logger.info("Starting sync loop (poll_interval=%ds).", poll_interval)

        while True:
            try:
                self.sync_to_endpoint(url, headers)
                consecutive_failures = 0
                time.sleep(poll_interval)
            except Exception as exc:
                consecutive_failures += 1
                delay = min(
                    self.base_delay * (2 ** consecutive_failures) + __import__("random").uniform(0, 1),
                    300,  # cap at 5 minutes
                )
                logger.error("Unhandled error (%s). Retrying in %.1fs.", exc, delay)
                time.sleep(delay)

Parameter Tuning Guide

The right values for max_batch, retry_limit, and poll_interval depend on sensor type, sampling frequency, and link characteristics. The table below gives production-tested starting points.

Sensor Type Sampling Frequency max_batch retry_limit poll_interval (s) Notes
Air temperature / humidity 1 reading / 5 min 25 5 120 Low volume; generous retries suit daily maintenance windows
PM2.5 particulate 1 reading / 1 min 50 5 60 Matches typical gateway rate limits of 50 req/min
Dissolved oxygen 1 reading / 15 min 20 7 300 Longer poll on low-bandwidth satellite links
Electrical conductivity 1 reading / 10 min 30 5 180 Batch size keeps payloads under 32 KB per POST
Multi-parameter sonde (4+ sensors) 1 reading / 1 min 40 6 60 Larger payload per record; reduce batch if endpoint times out

For sensors using MQTT broker integration, set poll_interval to match the broker’s keep-alive timeout so the sync thread does not race with MQTT reconnection logic.

Verification and Testing

Before deploying to a field gateway, run the unit test below against a temporary in-memory database. It confirms that enqueue, sync, and retry state transitions all behave correctly without touching the network.

import unittest
import sqlite3
from unittest.mock import patch, MagicMock


class TestSQLiteSensorBuffer(unittest.TestCase):

    def setUp(self):
        # Use an in-memory DB; override db_path after construction
        self.buf = SQLiteSensorBuffer(db_path=":memory:", max_batch=10, retry_limit=3)

    def test_enqueue_creates_pending_record(self):
        self.buf.enqueue("sensor-001", 51.5, -0.1, {"temp_c": 22.4})
        with sqlite3.connect(self.buf.db_path) as conn:
            row = conn.execute(
                "SELECT status, retry_count FROM sensor_readings WHERE sensor_id = 'sensor-001'"
            ).fetchone()
        self.assertEqual(row[0], "pending")
        self.assertEqual(row[1], 0)

    @patch("requests.post")
    def test_successful_sync_marks_synced(self, mock_post):
        mock_post.return_value = MagicMock(status_code=200, raise_for_status=lambda: None)
        self.buf.enqueue("sensor-002", 48.8, 2.3, {"pm25": 12.1})
        self.buf.sync_to_endpoint("https://example.invalid/ingest")
        with sqlite3.connect(self.buf.db_path) as conn:
            status = conn.execute(
                "SELECT status FROM sensor_readings WHERE sensor_id = 'sensor-002'"
            ).fetchone()[0]
        self.assertEqual(status, "synced")

    @patch("requests.post", side_effect=requests.RequestException("timeout"))
    def test_retries_exhaust_to_failed(self, _mock):
        self.buf.enqueue("sensor-003", 35.6, 139.7, {"do_mg_l": 7.8})
        for _ in range(self.buf.retry_limit):
            self.buf.sync_to_endpoint("https://example.invalid/ingest")
        with sqlite3.connect(self.buf.db_path) as conn:
            row = conn.execute(
                "SELECT status, retry_count FROM sensor_readings WHERE sensor_id = 'sensor-003'"
            ).fetchone()
        self.assertEqual(row[0], "failed")
        self.assertEqual(row[1], self.buf.retry_limit)


if __name__ == "__main__":
    unittest.main()

Run the suite before each firmware or container image update to catch schema regressions early.

Gotchas

WAL file growing unbounded. If run_sync_loop never calls _checkpoint, the -wal file accumulates indefinitely. On an 8 GB SD card this can fill the partition within days on a 1 Hz sensor. The checkpoint_every parameter triggers a PASSIVE checkpoint every N cycles; set it to 10 for normal operation and 2 during initial commissioning to bound growth during burn-in.

GeoJSON coordinate order inversion. RFC 7946 mandates [longitude, latitude] — the opposite of most sensor libraries that return (lat, lon) tuples. The implementation passes [lon, lat] explicitly. Swapping this silently places every reading at a mirror location across the meridian or equator; downstream spatial CRS mapping and GIS joins will produce nonsense results with no error raised.

Stale lock on process restart. If a crash leaves a connection open, sqlite3.connect() blocks until the lock times out. Set timeout=10 in sqlite3.connect(self.db_path, timeout=10) so the recovery process does not hang indefinitely on a stale journal lock.

Timestamp drift from naive datetimes. Calling datetime.now() without timezone.utc produces a naive datetime that reflects the device’s local clock and DST state. When a gateway moves between timezones or DST transitions, naive timestamps create silent ordering inversions. The implementation enforces datetime.now(timezone.utc) consistently; for additional coverage on detecting drift that has already propagated into a stream, see handling timezone drift in high-frequency IoT streams.