Cross-Calibrating PM2.5 Monitors with Linear Regression

Cross-calibrating PM2.5 monitors with linear regression maps low-cost optical sensor readings to reference-grade instrumentation by fitting a slope (m) and intercept (b) that together correct for manufacturing variance, laser aging, and humidity-driven Mie scattering. The result is a calibrated reading pm25_cal = m * pm25_raw + b that is spatially comparable across a distributed network. In Python the workflow uses pandas for strict temporal alignment, scikit-learn for model fitting, and vectorized numpy operations to apply coefficients across high-frequency telemetry without iterative loops — making it suitable for both batch recalibration scripts and live edge pipelines.

Why Linear Regression Fits PM2.5 Cross-Calibration

Optical particle counters (Plantower PMS5003, Sensirion SPS30, Nova Fitness SDS011) count laser scatter events and report a derived mass concentration. The relationship between raw scatter counts and true gravimetric mass is influenced by particle composition, size distribution, and relative humidity — but over typical urban operating ranges (5–200 µg/m³, RH < 70 %) this relationship is approximately linear. That linearity means a single (m, b) pair fitted against a co-located regulatory Federal Equivalent Method (FEM) monitor can correct systematic sensor bias with an R² regularly above 0.90 in practice.

This places linear regression at the foundation of broader Cross-Device Normalization Techniques: it is the simplest, most interpretable calibration transfer function and acts as the baseline against which more complex models (multiple regression with humidity/temperature features, polynomial expansions, random forests) are benchmarked. Because the coefficients are scalar, they are trivial to store, version, and apply in firmware or stream processors — a major operational advantage over black-box models.

The diagram below shows the four-stage data flow from raw co-located pairs to calibrated network output.

PM2.5 Linear Regression Calibration Data Flow Four-stage pipeline: raw co-located pairs enter temporal alignment (resample + inner join), then model fitting (OLS or RANSAC), coefficient validation (slope and intercept bounds check), and finally vectorized application across the full sensor network. Co-located pairs IoT node + FEM reference station Temporal alignment resample → 1 h mean inner join, dropna Model fitting OLS or RANSAC → m, b + metrics Network application pm25_raw × m + b vectorized, all nodes Stage 1 Stage 2 Stage 3 Stage 4

Prerequisites

Before running calibration, the upstream pipeline must already satisfy these requirements:

  • Python 3.9+ with pandas>=2.0, numpy>=1.24, scikit-learn>=1.3
  • A time-indexed IoT DataFrame with at minimum timestamp (UTC-aware) and raw_pm25 columns; humidity column (relative_humidity) optional but strongly recommended
  • Reference monitor data at a fixed reporting period (typically 1 hour) with a ref_pm25 column; timestamps must already be UTC-aware
  • Automated QC flags for missing readings applied upstream so that gap periods do not appear as zero-concentration intervals in the aggregation step
  • Sensor drift pre-screened using sensor drift correction algorithms — if a node is in active mechanical drift the calibration coefficients will be non-stationary and require more frequent retraining

Production-Ready Implementation

The function below handles all five pipeline stages: UTC enforcement, aggregation, saturation filtering, inner join, optional humidity exclusion, model training, and vectorized application. Drop it into any calibration service or scheduled job.

import pandas as pd
import numpy as np
from sklearn.linear_model import LinearRegression, RANSACRegressor
from sklearn.metrics import mean_squared_error, r2_score, mean_absolute_error
from typing import Optional, Tuple, Dict, Any


def calibrate_pm25(
    iot_df: pd.DataFrame,
    ref_df: pd.DataFrame,
    use_ransac: bool = False,
    humidity_col: Optional[str] = None,
    rh_threshold: float = 70.0,
    min_pairs: int = 15,
) -> Tuple[Dict[str, Any], Any]:
    """
    Cross-calibrate low-cost PM2.5 sensors against a reference monitor.

    Parameters
    ----------
    iot_df : DataFrame
        Must contain 'timestamp' (UTC-aware or naive UTC) and 'raw_pm25'.
        If humidity_col is given, that column must also be present so it
        survives aggregation into the paired dataset.
    ref_df : DataFrame
        Must contain 'timestamp' and 'ref_pm25'. Already at 1-hour cadence
        (BAM or FEM hourly output).
    use_ransac : bool
        When True, wraps LinearRegression in RANSACRegressor for outlier
        robustness. Use when reference instrument has known transient errors.
    humidity_col : str or None
        Name of the relative-humidity column in iot_df. Rows with RH above
        rh_threshold are excluded from training to avoid hygroscopic inflation.
    rh_threshold : float
        Exclusion ceiling for relative humidity during model training (%).
        Default 70 % follows EPA co-location guidance.
    min_pairs : int
        Minimum coincident hourly pairs required for a stable fit.

    Returns
    -------
    metrics : dict
        Keys: r2, rmse, mae, slope (m), intercept (b), n_pairs.
    model : fitted sklearn estimator
    """
    iot = iot_df.copy()
    ref = ref_df.copy()

    # Stage 1 — Enforce UTC and set DatetimeIndex
    iot["timestamp"] = pd.to_datetime(iot["timestamp"], utc=True)
    ref["timestamp"] = pd.to_datetime(ref["timestamp"], utc=True)
    iot = iot.set_index("timestamp")
    ref = ref.set_index("timestamp")

    # Stage 2 — Aggregate IoT to 1-hour means
    # Include humidity in aggregation so it is available for post-join filtering.
    agg_cols = ["raw_pm25"]
    if humidity_col and humidity_col in iot.columns:
        agg_cols.append(humidity_col)
    iot_hourly = iot[agg_cols].resample("1h").mean()

    # Stage 3 — Saturation filter before join (optical sensors saturate ~500 µg/m³)
    iot_valid = iot_hourly[iot_hourly["raw_pm25"].between(0.0, 450.0)]

    # Stage 4 — Strict inner join; drop any remaining NaN in both target columns
    paired = pd.concat(
        [iot_valid, ref[["ref_pm25"]].reindex(iot_valid.index)],
        axis=1,
        join="inner",
    ).dropna(subset=["raw_pm25", "ref_pm25"])

    if len(paired) < min_pairs:
        raise ValueError(
            f"Only {len(paired)} coincident hourly pairs found; "
            f"need at least {min_pairs} for a stable calibration."
        )

    # Optional: exclude high-humidity periods during training
    if humidity_col and humidity_col in paired.columns:
        paired = paired[paired[humidity_col] <= rh_threshold]
        if len(paired) < min_pairs:
            raise ValueError(
                f"After excluding RH > {rh_threshold} %, only {len(paired)} pairs remain. "
                "Widen the threshold, collect more data, or add humidity as a predictor."
            )

    X = paired[["raw_pm25"]].values  # shape (n, 1)
    y = paired["ref_pm25"].values    # shape (n,)

    # Stage 3 — Fit model
    if use_ransac:
        model = RANSACRegressor(
            estimator=LinearRegression(),
            min_samples=0.8,       # 80 % inlier consensus per iteration
            residual_threshold=5.0,  # ±5 µg/m³ inlier band
            random_state=42,
        )
    else:
        model = LinearRegression()

    model.fit(X, y)

    # Extract inner estimator for coefficient access when RANSAC is used
    inner: LinearRegression = getattr(model, "estimator_", model)

    y_pred = model.predict(X)
    metrics: Dict[str, Any] = {
        "r2":        float(r2_score(y, y_pred)),
        "rmse":      float(np.sqrt(mean_squared_error(y, y_pred))),
        "mae":       float(mean_absolute_error(y, y_pred)),
        "slope":     float(inner.coef_[0]),       # m in pm25_cal = m*raw + b
        "intercept": float(inner.intercept_),     # b
        "n_pairs":   len(paired),
    }
    return metrics, model


def apply_calibration_vectorized(
    raw_pm25: "np.ndarray | pd.Series",
    slope: float,
    intercept: float,
) -> np.ndarray:
    """
    Apply a linear calibration (pm25_cal = m * raw + b) without Python loops.

    Negative raw values (sensor noise around zero) are clamped to 0.0 after
    calibration to avoid reporting negative mass concentrations.
    """
    calibrated = np.asarray(raw_pm25, dtype=float) * slope + intercept
    return np.maximum(calibrated, 0.0)

Parameter Tuning Guide

The table below gives recommended starting values for calibrate_pm25 by sensor type and deployment context.

Sensor / Context rh_threshold min_pairs Expected slope range Retrain cadence
Plantower PMS5003 — urban rooftop 70 % 30 0.7 – 1.1 Monthly
Sensirion SPS30 — traffic kerbside 65 % 30 0.8 – 1.3 Monthly
Nova Fitness SDS011 — indoor 60 % 20 0.6 – 1.0 Quarterly
Any sensor — coastal / tropical 55 % 45 0.5 – 1.4 Bi-weekly
Any sensor — wildfire smoke season 70 % 60 0.4 – 1.6 Weekly

Key thresholds:

  • Reject slope outside 0.4 – 1.6 and intercept outside ±25 µg/m³. Values beyond these bounds indicate co-location failure, fouled inlet, or reference instrument drift — not a valid calibration.
  • R² ≥ 0.85 is the production acceptance threshold for urban exposure mapping. R² < 0.75 signals unresolved environmental bias or poor sensor placement.
  • RMSE ≤ 5 µg/m³ is the target for regulatory-adjacent monitoring networks.

Verification & Testing

Run this unit test against synthetic data to confirm the implementation behaves correctly before connecting to production telemetry.

import pandas as pd
import numpy as np
import pytest
from your_module import calibrate_pm25, apply_calibration_vectorized


def _make_synthetic(n_hours: int = 60, noise_std: float = 3.0, seed: int = 0):
    """Generate co-located sensor + reference pairs with known m=1.15, b=-4.0."""
    rng = np.random.default_rng(seed)
    timestamps = pd.date_range("2024-01-01", periods=n_hours, freq="1h", tz="UTC")
    ref_true = rng.uniform(5, 150, size=n_hours)
    raw = (ref_true + 4.0) / 1.15 + rng.normal(0, noise_std, size=n_hours)
    iot_df = pd.DataFrame({"timestamp": timestamps, "raw_pm25": raw})
    ref_df = pd.DataFrame({"timestamp": timestamps, "ref_pm25": ref_true})
    return iot_df, ref_df


def test_calibrate_recovers_known_coefficients():
    iot_df, ref_df = _make_synthetic(n_hours=120, noise_std=1.0)
    metrics, model = calibrate_pm25(iot_df, ref_df)
    # Fitted slope should be close to the true value of 1.15
    assert abs(metrics["slope"] - 1.15) < 0.05, f"slope={metrics['slope']}"
    assert abs(metrics["intercept"] - (-4.0)) < 1.5, f"intercept={metrics['intercept']}"
    assert metrics["r2"] >= 0.95


def test_apply_vectorized_clamps_negatives():
    raw = np.array([0.0, 10.0, 50.0, -2.0])
    result = apply_calibration_vectorized(raw, slope=0.9, intercept=-6.0)
    assert result[0] == 0.0          # 0 * 0.9 - 6 = -6 → clamped to 0
    assert result[3] == 0.0          # negative raw → negative calibrated → clamped
    assert abs(result[2] - 39.0) < 0.01  # 50 * 0.9 - 6 = 39


def test_raises_on_insufficient_pairs():
    iot_df, ref_df = _make_synthetic(n_hours=10)
    with pytest.raises(ValueError, match="coincident hourly pairs"):
        calibrate_pm25(iot_df, ref_df)

Gotchas

1. Aggregation direction matters — never interpolate upward. Interpolating 1-minute IoT data to match hourly reference timestamps invents measurement points and artificially inflates R². Always aggregate downward (resample("1h").mean()). The inner join then discards hours where either stream is absent, preserving measurement independence.

2. Humidity must be included in the aggregation step, not joined later. A common mistake is filtering by humidity after the join using a separate DataFrame. If relative_humidity is not part of agg_cols when calling .resample().mean(), it will not be in iot_hourly and the post-join filter silently passes every row. Include it in agg_cols before resampling.

3. RANSAC’s estimator_ attribute only exists after fitting. Accessing model.coef_ on a RANSACRegressor raises AttributeError. Always use getattr(model, "estimator_", model) to retrieve the inner LinearRegression for coefficient extraction — the pattern shown above works for both OLS and RANSAC without branching.

4. Rolling recalibration without a version tag corrupts audit trails. When automated retraining fires (e.g., 7-day MAE > 15 % of reference mean), write new coefficients tagged with sensor_id, firmware_version, training_window_start, training_window_end, and r2 before pushing to the edge. Without versioned metadata, a degraded coefficient set has no rollback path and downstream spatial models silently break.