REST API Polling & Batch Ingestion for Environmental IoT Data

Without a disciplined polling strategy, environmental monitoring pipelines quietly accumulate the worst class of data quality problem: silently missing records. Government air quality portals enforce strict daily quotas; legacy weather-station APIs return partial pages when their underlying databases lock during high-concurrency periods; vendor hydrological sensors expire OAuth tokens mid-run and return HTML error pages instead of JSON. When any of these failures go undetected β€” no retry, no validation, no audit trail β€” downstream spatial CRS mapping on ingest and timestamp alignment and timezone normalisation operate on an incomplete foundation, producing spatial joins and trend analyses with invisible gaps. REST API polling and batch ingestion, done correctly, is the controlled gate that prevents all of this: deterministic retrieval, schema enforcement at the boundary, idempotent persistence, and an audit trail that lets you prove exactly which records entered the system and when.


REST API Polling Pipeline Five-stage data-flow diagram showing environmental sensor data moving from a REST API endpoint through rate-limit enforcement, cursor-based pagination, Pydantic schema validation, spatial and temporal normalisation, and finally into a PostGIS or GeoParquet store. Sensor REST API endpoint Rate-limit & retry layer (tenacity) Pagination & schema (pydantic) Spatial & temporal normalisation PostGIS / GeoParquet store invalid records β†’ dead-letter queue

Prerequisites & Architecture Baseline

Confirm each dependency before wiring up the pipeline. Version mismatches between pydantic v1 and v2 are a frequent source of silent payload corruption β€” pin explicitly.

Library Pinned version Role
requests ==2.31.0 HTTP session, connection pooling
tenacity ==8.2.3 Declarative retry orchestration
pydantic ==2.6.4 Payload schema enforcement
geopandas ==0.14.3 Spatial DataFrame construction
shapely ==2.0.3 Geometry objects and validation
pyproj ==3.6.1 CRS transformation and bounds checking
pandas ==2.2.1 Timestamp normalisation
sqlalchemy ==2.0.29 PostGIS connection pooling (optional)

Upstream pipeline steps that must be complete before polling:

  • API credentials provisioned (Bearer token, API key, or mutual-TLS certificate in place)
  • Target schema created in your geospatial store, including a (station_id, observed_at) unique constraint for idempotent upserts
  • Your IoT Sensor Data Ingestion & Spatial Synchronization network topology documented so you know which endpoints serve which sensor networks

Data schema requirements: Each API response must expose at minimum station_id (string), observed_at (ISO-8601 or epoch integer), lat/lon (WGS84 decimal degrees), and at least one sensor reading. APIs that embed spatial references only in separate metadata endpoints require a two-phase fetch; the step-by-step below covers this variant.

Step-by-Step Ingestion Workflow

Step 1 β€” Establish Polling Cadence and Rate-Limit Enforcement

Align your poll interval to the sensor’s native reporting cadence β€” typically 5 minutes for urban air quality networks, 15–60 minutes for rural weather stations, and 24 hours for archival hydrological gauges. Polling faster than the source update interval does not deliver new data but does consume daily quota.

Always store the timestamp of the last successfully ingested record in a persistent cursor file or your target database. Use it as the start_time parameter on the next poll cycle. This cursor-based approach is idempotent: if the process crashes mid-run, it restarts from the last confirmed commit rather than re-fetching the entire window.

# Python 3.11 | requests==2.31.0 | tenacity==8.2.3
import json
from pathlib import Path
from datetime import datetime, timezone, timedelta

CURSOR_FILE = Path("/var/lib/iot-ingest/last_polled.json")

def load_cursor(station_id: str, default_lookback_hours: int = 1) -> datetime:
    """Return the last successfully polled timestamp for this station.

    Falls back to (now - default_lookback_hours) when no cursor exists,
    which provides a safe warm-start on first deployment.
    """
    if CURSOR_FILE.exists():
        data = json.loads(CURSOR_FILE.read_text())
        if station_id in data:
            return datetime.fromisoformat(data[station_id])
    return datetime.now(timezone.utc) - timedelta(hours=default_lookback_hours)

def save_cursor(station_id: str, ts: datetime) -> None:
    data = json.loads(CURSOR_FILE.read_text()) if CURSOR_FILE.exists() else {}
    data[station_id] = ts.isoformat()
    CURSOR_FILE.write_text(json.dumps(data))

Time complexity: O(1) per poll cycle for cursor reads and writes. Space: one small JSON file, bounded by the number of monitored stations.

Step 2 β€” Build a Resilient HTTP Session with Retry Orchestration

Wrap requests.Session with tenacity retry logic keyed on transient network errors and 5xx responses. Do not retry 400-series client errors (invalid parameters, expired tokens) β€” these require human intervention, not automatic retries.

# requests==2.31.0 | tenacity==8.2.3
import logging
import requests
from tenacity import (
    retry, stop_after_attempt, wait_exponential_jitter,
    retry_if_exception_type, before_sleep_log
)

logger = logging.getLogger(__name__)

RETRYABLE_EXCEPTIONS = (
    requests.exceptions.ConnectionError,
    requests.exceptions.Timeout,
    requests.exceptions.ChunkedEncodingError,
)

def build_session(api_key: str) -> requests.Session:
    """Create a pooled session with auth headers and conservative timeouts.

    connect_timeout=5s: fail fast on DNS/TCP failures.
    read_timeout=20s:   generous buffer for slow government endpoints.
    """
    session = requests.Session()
    session.headers.update({
        "Authorization": f"Bearer {api_key}",
        "Accept": "application/json",
        "User-Agent": "EnvironmentalIoT-Ingest/1.0",
    })
    # Mount adapters with connection pool sizing
    adapter = requests.adapters.HTTPAdapter(
        pool_connections=4,
        pool_maxsize=10,
        max_retries=0,  # tenacity owns retries; urllib3 retries disabled
    )
    session.mount("https://", adapter)
    return session

@retry(
    stop=stop_after_attempt(6),
    wait=wait_exponential_jitter(initial=2, max=60, jitter=5),
    retry=retry_if_exception_type(RETRYABLE_EXCEPTIONS),
    before_sleep=before_sleep_log(logger, logging.WARNING),
    reraise=True,
)
def get_with_retry(
    session: requests.Session,
    url: str,
    params: dict,
) -> dict:
    """Fetch one page; raises on 4xx/5xx after exhausting retries.

    Respects Retry-After header on 429 by re-raising as a
    requests.exceptions.RetryError so the caller can handle it explicitly.
    """
    response = session.get(url, params=params, timeout=(5, 20))
    if response.status_code == 429:
        retry_after = int(response.headers.get("Retry-After", 60))
        logger.warning("Rate-limited; backing off %ds", retry_after)
        raise requests.exceptions.ConnectionError(
            f"HTTP 429 β€” Retry-After: {retry_after}s"
        )
    response.raise_for_status()
    return response.json()

Parameter notes: wait_exponential_jitter adds randomised jitter to prevent thundering-herd conditions when multiple ingestion workers target the same provider. The pool_maxsize=10 cap prevents connection exhaustion on multi-station runs.

Step 3 β€” Fetch, Paginate, and Validate Batch Payloads

Implement cursor-based or offset/limit pagination depending on what the API supports. Cursor pagination (timestamp or opaque token) is strongly preferred β€” offset pagination grows stale if new records are inserted between requests, producing skips or duplicates.

The diagram below shows the branching logic: when a next_cursor field is present in the response envelope, strip the time-window parameters and follow the cursor; when absent, check whether the page was full and fall back to offset arithmetic.

Pagination Decision Flow Flowchart showing how the paginate_endpoint function decides whether to follow a next_cursor field or fall back to offset-based pagination, and how invalid records are routed to a dead-letter queue rather than failing the batch. Fetch page from API Validate each record (pydantic) Dead-letter queue (quarantine) invalid next_cursor present? yes Use cursor, drop time window params no Page full? yes Increment offset no Done β€” last page

Validate every record with a Pydantic model before it touches your geospatial store. Discard or route malformed records to a dead-letter queue with offline buffering rather than failing the entire batch β€” this ensures one rogue reading does not abort a 10,000-record poll window.

# pydantic==2.6.4 | requests==2.31.0
from typing import Iterator, Optional
from pydantic import BaseModel, Field, model_validator
import requests

class SensorReading(BaseModel):
    station_id: str
    observed_at: datetime
    temperature_c: Optional[float] = Field(default=None, ge=-90, le=60)
    humidity_pct: Optional[float] = Field(default=None, ge=0, le=100)
    pm25_ugm3: Optional[float] = Field(default=None, ge=0, le=2000)
    lat: float = Field(ge=-90, le=90)
    lon: float = Field(ge=-180, le=180)

    @model_validator(mode="after")
    def require_at_least_one_reading(self) -> "SensorReading":
        readings = [self.temperature_c, self.humidity_pct, self.pm25_ugm3]
        if all(v is None for v in readings):
            raise ValueError("Record contains no sensor readings")
        return self

def paginate_endpoint(
    session: requests.Session,
    base_url: str,
    station_id: str,
    start_time: datetime,
    end_time: datetime,
    page_size: int = 500,
) -> Iterator[SensorReading]:
    """Yield validated SensorReading objects for the given time window.

    Uses cursor-based pagination via the 'next_cursor' field in the
    response envelope. Falls back to offset pagination if 'next_cursor'
    is absent.

    Time complexity: O(N) where N is total records in window.
    Space: O(page_size) β€” one page in memory at a time.
    """
    params = {
        "station_id": station_id,
        "start_time": start_time.isoformat(),
        "end_time": end_time.isoformat(),
        "limit": page_size,
    }
    rejected = 0

    while True:
        payload = get_with_retry(session, f"{base_url}/observations", params)
        records = payload.get("results", [])

        for raw in records:
            try:
                yield SensorReading.model_validate(raw)
            except Exception as exc:
                logger.warning("Rejected record station=%s: %s", station_id, exc)
                rejected += 1
                # Route to dead-letter queue in production
                # dlq.send(raw, reason=str(exc))

        next_cursor = payload.get("next_cursor")
        if next_cursor:
            params = {**params, "cursor": next_cursor}
            params.pop("start_time", None)
            params.pop("end_time", None)
        elif len(records) < page_size:
            break  # Last page (offset fallback)
        else:
            params["offset"] = params.get("offset", 0) + page_size

    if rejected:
        logger.warning("Batch complete: %d records rejected and quarantined", rejected)

Step 4 β€” Normalize Timestamps to UTC and Coordinates to EPSG:4326

Environmental datasets mix UTC, local civil timezones, epoch integers, and ISO-8601 strings with and without timezone offsets. Perform timezone normalisation immediately on ingest, before any join or aggregation. Similarly, all geometries must land in EPSG:4326 at the boundary; downstream analytical layers apply their own projections.

This connects directly to the timestamp alignment and timezone normalisation patterns discussed in the sibling guide β€” the field-level normalisation here feeds the batch-level alignment strategies described there.

# pandas==2.2.1 | geopandas==0.14.3 | shapely==2.0.3 | pyproj==3.6.1
from datetime import timezone
from typing import List
import geopandas as gpd
import pandas as pd
from shapely.geometry import Point
from pyproj import Transformer

# Pre-build transformer; never instantiate inside a loop
_TRANSFORMER_4326 = Transformer.from_crs("EPSG:4326", "EPSG:4326", always_xy=True)

# WGS84 bounding box for a sanity check; tighten per region
WGS84_BOUNDS = {"lat": (-90, 90), "lon": (-180, 180)}

def to_utc(ts: datetime) -> datetime:
    """Coerce any datetime to timezone-aware UTC."""
    if ts.tzinfo is None:
        return ts.replace(tzinfo=timezone.utc)
    return ts.astimezone(timezone.utc)

def build_geodataframe(readings: List[SensorReading]) -> gpd.GeoDataFrame:
    """Convert validated readings to a spatially indexed GeoDataFrame.

    Filters out any records whose coordinates fall outside WGS84 bounds
    (GPS drift artefacts, null-island points, inverted lat/lon swaps).
    Returns an empty GeoDataFrame if all records are filtered.
    """
    rows = []
    for r in readings:
        lat, lon = r.lat, r.lon
        if not (WGS84_BOUNDS["lat"][0] <= lat <= WGS84_BOUNDS["lat"][1]):
            logger.warning("Out-of-bounds lat=%.4f for station %s", lat, r.station_id)
            continue
        if not (WGS84_BOUNDS["lon"][0] <= lon <= WGS84_BOUNDS["lon"][1]):
            logger.warning("Out-of-bounds lon=%.4f for station %s", lon, r.station_id)
            continue
        row = r.model_dump()
        row["observed_at"] = to_utc(r.observed_at)
        row["geometry"] = Point(lon, lat)
        rows.append(row)

    if not rows:
        return gpd.GeoDataFrame(columns=["station_id", "observed_at", "geometry"])

    gdf = gpd.GeoDataFrame(rows, geometry="geometry", crs="EPSG:4326")
    gdf = gdf.sort_values("observed_at").reset_index(drop=True)
    return gdf

Edge case: Some government APIs return lat and lon swapped for stations near the prime meridian. Add a heuristic check: if abs(lat) > 90 but abs(lon) <= 90, swap them and log the correction.

Step 5 β€” Persist Validated Records to a Geospatial Data Store

Batch-write with idempotent upsert semantics. Always include ingested_at (server-side insertion time), batch_id, and source_api_version in the persisted schema for lineage tracking. This lineage metadata is what makes it possible to audit which records came from which poll run when you later need to backfill or replay.

# geopandas==0.14.3 | sqlalchemy==2.0.29
import uuid
from datetime import datetime, timezone
import geopandas as gpd
from sqlalchemy import create_engine, text

def persist_to_postgis(
    gdf: gpd.GeoDataFrame,
    dsn: str,
    table: str = "environmental_telemetry",
    api_version: str = "v2",
) -> int:
    """Upsert validated records into PostGIS; returns count of new rows inserted.

    Uses ON CONFLICT DO NOTHING keyed on (station_id, observed_at).
    Re-running the same poll window is therefore safe and idempotent.
    """
    if gdf.empty:
        return 0

    batch_id = str(uuid.uuid4())
    gdf = gdf.copy()
    gdf["ingested_at"] = datetime.now(timezone.utc)
    gdf["batch_id"] = batch_id
    gdf["source_api_version"] = api_version

    engine = create_engine(dsn, pool_pre_ping=True)

    # to_postgis does not natively support ON CONFLICT; use a temp-table approach
    temp_table = f"_tmp_{batch_id.replace('-', '')[:12]}"
    gdf.to_postgis(temp_table, engine, if_exists="replace", index=False)

    with engine.begin() as conn:
        result = conn.execute(text(f"""
            INSERT INTO {table}
                SELECT * FROM {temp_table}
            ON CONFLICT (station_id, observed_at) DO NOTHING
        """))
        conn.execute(text(f"DROP TABLE {temp_table}"))
        return result.rowcount

For GeoParquet (object storage): Replace persist_to_postgis with gdf.to_parquet(path, geometry_encoding="WKB"). Partition by (station_id, date) for efficient predicate pushdown in DuckDB or Spark.

Configuration & Tuning

The right parameters depend heavily on sensor type and API tier. These values are production-tested starting points; always profile against your specific provider’s rate-limit policy.

Sensor type Poll interval Page size Retry attempts Max backoff
Urban air quality (PM2.5, NO2) 5 min 200 5 60 s
Rural weather station (temp, humidity, wind) 15 min 500 6 120 s
Hydrological gauge (water level, conductivity) 60 min 1 000 4 180 s
Satellite-derived land surface temperature 24 h 2 000 3 300 s
Wildfire smoke sensor (PM2.5 burst mode) 1 min 100 8 30 s

Connection pool sizing: Set pool_maxsize to the number of concurrent station workers you run. Exceeding the pool silently queues requests, inflating observed latency without raising exceptions.

Pydantic field bounds: Tighten the ge/le constraints per sensor type. PM2.5 above 500 Β΅g/mΒ³ is physically possible during wildfires but should trigger a quality flag rather than a hard validation failure. Use @model_validator to emit a warning and set a qc_flag field rather than outright rejecting the record.

Validation β€” Confirming the Pipeline Worked

After a poll cycle completes, run these checks before advancing the cursor:

1. Row-count reconciliation: The API’s response envelope typically includes a total_count field. Compare it against sum(page_sizes_received). A discrepancy indicates a dropped page (pagination bug or network error).

def assert_completeness(expected_total: int, received: int, tolerance: float = 0.01) -> None:
    """Raise if more than 1% of expected records are missing."""
    gap = abs(expected_total - received)
    if gap / max(expected_total, 1) > tolerance:
        raise ValueError(
            f"Completeness check failed: expected {expected_total}, received {received}"
        )

2. Spatial distribution check: Plot or aggregate the ingested GeoDataFrame by bounding-box quadrant. A sudden shift in centroid location signals a lat/lon swap or a station being reassigned to a new physical location.

3. Temporal gap detection: After persisting, query the target table for gaps larger than twice the nominal reporting interval. Genuine outages produce single gaps; systematic polling bugs produce regularly-spaced gaps.

# pandas==2.2.1
def detect_temporal_gaps(
    gdf: gpd.GeoDataFrame,
    station_id: str,
    expected_interval_minutes: int = 15,
) -> pd.DataFrame:
    """Return rows where the gap to the previous observation exceeds 2Γ— the interval."""
    station = gdf[gdf["station_id"] == station_id].sort_values("observed_at")
    station = station.copy()
    station["gap_min"] = station["observed_at"].diff().dt.total_seconds() / 60
    return station[station["gap_min"] > 2 * expected_interval_minutes][
        ["station_id", "observed_at", "gap_min"]
    ]

4. Schema version check: Log the source_api_version on every batch. Unexpected version changes are the earliest indicator that an API provider has modified their response schema, which will surface as Pydantic validation failures in the next run.

Failure Modes & Edge Cases

Pagination drift with offset/limit: If the source database inserts new records between pages, offset-based pagination skips rows. Symptom: row-count reconciliation shows received < expected with no network errors logged. Fix: switch to cursor/timestamp pagination, or fetch pages in reverse-chronological order so new insertions only affect already-received pages.

Expired credentials mid-run: OAuth2 Bearer tokens typically expire in 1–24 hours. A long historical backfill will hit a 401 mid-run. Fix: implement a TokenRefresher context manager that detects 401 responses, refreshes the token, retries the request once, and propagates the error if refresh fails. Do not retry 401 indefinitely β€” that triggers account lockouts.

Heterogeneous timestamp formats across stations: A single API endpoint can return "2024-03-15T09:00:00Z" for one station and 1710493200 (Unix epoch) for another. pd.to_datetime handles both, but you must pass utc=True and unit="s" respectively. A mixed-format batch will silently assign NaT for unparsed timestamps in older pandas versions. Use errors="raise" during development.

Memory pressure on high-frequency wildfire sensor backfills: Fetching 12 months of 1-minute PM2.5 data for 200 sensors in a single run loads O(hundreds of millions) of records. Keep page_size at or below 1,000, process one station at a time, and stream directly to Parquet without materialising the full dataset in memory. gdf.to_parquet in append mode is not natively supported β€” write each page to a uniquely named part file and compact offline.

GPS drift artefacts during device cold-starts: Some sensor firmware reports (0.0, 0.0) β€” null island β€” for 30–90 seconds after power-on before GPS lock is acquired. These records pass coordinate bounds checks. Add a secondary filter: reject any record whose coordinates differ by more than 1 km from the station’s known reference location.

Integration

REST API polling is the authoritative data entry point for sensor networks that do not support push protocols. Once records land in your geospatial store, they flow into:

  • Spatial CRS Mapping on Ingest β€” the EPSG:4326 geometries produced by this pipeline become the input for downstream projection and spatial join operations against land-use, elevation, or administrative boundary layers.

  • Timestamp Alignment and Timezone Normalisation β€” the UTC-normalised observed_at column enables multi-station temporal joins; the detailed alignment patterns for handling irregular cadences and clock-drift correction are covered there.

  • MQTT Broker Integration for Environmental Sensors β€” for vendors that expose both a REST archive and an MQTT live feed, the REST pipeline handles historical backfill while MQTT delivers real-time updates. The two paths converge in the same geospatial store, with the REST records serving as the authoritative reconciliation baseline.

  • Kafka Stream Synchronisation Workflows β€” polled batches can be published to compacted Kafka topics, enabling downstream consumers that need exactly-once semantics and schema-evolution tracking without querying the source API directly.

  • Fallback Buffering and Offline Caching β€” when an API endpoint becomes unreachable for an extended period, the dead-letter queue patterns described here feed directly into the offline-cache strategies that preserve data during network partitions.

FAQ

How often should I poll a government air quality API?

Match your interval to the sensor’s native reporting cadence β€” typically 5–60 minutes for air quality stations. Always cache the last-seen record timestamp and use it as a cursor. Polling more frequently than the update interval wastes daily quota and triggers 429 responses without delivering new data.

How do I make batch ingestion idempotent?

Assign each record a deterministic ID derived from (station_id, observed_at). Use INSERT ... ON CONFLICT DO NOTHING in PostGIS, keyed on that composite. Re-running the same poll window skips duplicates without corrupting the time series.

What pagination style do environmental APIs typically use?

Most government portals use offset/limit pagination. Vendor APIs increasingly prefer cursor-based pagination keyed on an ISO-8601 timestamp or an opaque page token. Prefer cursor pagination when available β€” offset pagination becomes inconsistent if new records are inserted between requests.

How should I handle a 429 during a long polling run?

Read the Retry-After header and honour it exactly. Use tenacity with wait_fixed(retry_after) as the primary strategy and exponential backoff with jitter as the fallback for cases where Retry-After is absent. Never hardcode a sleep interval β€” provider limits vary widely.

When should I switch from REST polling to MQTT or Kafka?

Switch when latency matters more than auditability. REST polling introduces at minimum one full polling interval of lag; for storm alerts or air quality exceedance notifications, sub-second delivery via MQTT broker integration is necessary. Retain REST polling as the authoritative historical backfill path even after introducing a streaming channel.