Implementing Tumbling Windows for Air Quality Metrics
Tumbling windows partition a continuous IoT sensor stream into fixed, non-overlapping time intervals so that every reading belongs to exactly one aggregation bucket. For air quality pipelines this is the correct choice when you need regulatory-compliant 15-minute or 1-hour averages: each interval aligns deterministically with reporting schedules, eliminates double-counting across periods, and produces a single authoritative mean concentration value that feeds directly into EPA AQI conversion. In Python, the pattern is implemented via pd.Grouper with strict closed="left" boundaries or Polars group_by_dynamic, with spatial coordinates preserved alongside every aggregated record for downstream GIS joins.
Why Tumbling Windows — Not Rolling — for Regulatory Reporting
Windowed Aggregation for Time-Series covers three window shapes — tumbling, sliding, and session. Rolling windows are appropriate for smoothing sensor drift, but they create overlapping output records that cannot be submitted as distinct regulatory reporting periods. A 15-minute tumbling window starting at 08:00, 08:15, 08:30 generates three non-overlapping rows; a rolling window of the same width generates a row for every incoming reading and would report inflated sample counts to compliance systems.
The distinction matters most for PM2.5 and NO₂ because EPA breakpoints are defined against concentration averages over fixed intervals. Feeding a rolling average into the AQI formula produces a formally incorrect classification and will fail audit when compared against reference-method monitors.
Prerequisites
Before running this implementation, ensure your pipeline already handles timestamp alignment and timezone normalization — out-of-order or timezone-naïve timestamps will silently corrupt window boundaries. You also need:
- Python 3.11+
pandas==2.2.x(pip install pandas==2.2.2)numpy==1.26.x(pip install numpy==1.26.4)- A structured sensor payload with columns:
timestamp(UTC, timezone-aware),sensor_id,lat,lon,pm25(µg/m³),no2(ppb)
If you are ingesting via MQTT, the MQTT broker integration cluster covers payload parsing and deserialization before you reach the windowing stage.
Step-by-Step Implementation
Step 1 — Ingest and Parse the Sensor Stream
In production, replace the synthetic data dict with a structured Kafka or MQTT consumer. The schema contract here — timezone-aware timestamps, float concentrations, and string sensor IDs — must be enforced at ingestion.
import pandas as pd
import numpy as np
from typing import Optional
# Simulated IoT air quality payload (replace with Kafka consumer in production)
data = {
"timestamp": pd.date_range(
"2024-05-01T08:00:00",
periods=120,
freq="5min",
tz="UTC"
),
"sensor_id": ["A1", "A2", "B1"] * 40,
"lat": [34.0522, 34.0525, 34.0498] * 40,
"lon": [-118.2437, -118.2440, -118.2415] * 40,
"pm25": np.random.uniform(5, 45, 120), # µg/m³
"no2": np.random.uniform(10, 80, 120), # ppb
}
df = pd.DataFrame(data)
Step 2 — Apply Tumbling Windows with pd.Grouper
pd.Grouper with freq, closed="left", and label="left" enforces strict non-overlapping boundaries. The label="left" setting names each window by its start timestamp, which maps directly to regulatory period notation.
WINDOW_SIZE = "15min"
MIN_READINGS_PER_WINDOW = 3 # 3 × 5-min readings expected per 15-min window
windowed = (
df.groupby(
["sensor_id",
pd.Grouper(key="timestamp", freq=WINDOW_SIZE, closed="left", label="left")]
)
.agg(
pm25_mean=("pm25", "mean"),
pm25_max=("pm25", "max"),
no2_mean=("no2", "mean"),
lat=("lat", "first"), # fixed station: use 'mean' for mobile sensors
lon=("lon", "first"),
reading_count=("pm25", "count"),
)
.reset_index()
.rename(columns={"timestamp": "window_start"})
)
Step 3 — Filter Incomplete Windows
.dropna() is insufficient for regulatory compliance. Count readings per window and discard any interval that received fewer than expected — this guards against sensor dropouts without silently inflating incomplete means.
windowed = (
windowed[windowed["reading_count"] >= MIN_READINGS_PER_WINDOW]
.drop(columns="reading_count")
.copy()
)
Step 4 — Vectorized AQI Conversion
Row-wise .apply() is too slow at production scale (millions of windowed records per day). Use np.select to apply all breakpoint conditions in a single pass over the NumPy array. The breakpoints below reflect the EPA’s 2024 PM2.5 revision (effective May 2024).
def vectorized_pm25_aqi(pm25_vals: np.ndarray) -> np.ndarray:
"""
Compute PM2.5 AQI via EPA linear interpolation across breakpoints.
Args:
pm25_vals: 1-D array of 24-hr average PM2.5 concentrations (µg/m³).
Returns:
Integer AQI values as a float64 array (round before display).
"""
# (c_low, c_high, aqi_low, aqi_high) — EPA 2024 PM2.5 breakpoints
bp = [
(0.0, 9.0, 0, 50),
(9.1, 35.4, 51, 100),
(35.5, 55.4, 101, 150),
(55.5, 125.4, 151, 200),
]
conditions = [
pm25_vals <= bp[0][1],
(pm25_vals > bp[0][1]) & (pm25_vals <= bp[1][1]),
(pm25_vals > bp[1][1]) & (pm25_vals <= bp[2][1]),
(pm25_vals > bp[2][1]) & (pm25_vals <= bp[3][1]),
]
choices = [
np.round(((b[3] - b[2]) / (b[1] - b[0])) * (pm25_vals - b[0]) + b[2])
for b in bp
]
return np.select(conditions, choices, default=300.0) # 300 = Hazardous fallback
windowed["pm25_aqi"] = vectorized_pm25_aqi(windowed["pm25_mean"].values).astype(int)
Step 5 — Validate and Route Downstream
Print a sample and confirm output shape before routing to storage or alerting systems.
print(windowed.dtypes)
print(windowed.head(6))
# Route to PostGIS, Parquet, or alerting sink
# windowed.to_parquet("air_quality_windows.parquet", index=False)
Expected output columns: sensor_id, window_start, pm25_mean, pm25_max, no2_mean, lat, lon, pm25_aqi.
Parameter Tuning by Sensor Type
Regulatory interval requirements and sensor emission rates vary. Use the table below to set WINDOW_SIZE and MIN_READINGS_PER_WINDOW for common deployments:
| Pollutant / sensor | Emission rate | Recommended window | Min readings | Regulatory basis |
|---|---|---|---|---|
| PM2.5 (low-cost optical) | 1 Hz | 15 min | 600 | EPA real-time alerting |
| PM2.5 (FEM reference) | 1/min | 1 hour | 45 | EPA NAAQS 24-hr average |
| NO₂ (electrochemical) | 1/30 s | 1 hour | 100 | EPA 1-hr NAAQS |
| Dissolved oxygen (water) | 1/15 s | 1 hour | 200 | Clean Water Act monitoring |
| Temperature / humidity | 1/min | 5 min | 4 | Site-specific SOP |
| CO₂ (NDIR) | 1 Hz | 10 min | 500 | Indoor air quality standards |
For mobile sensors (vehicle-mounted, drone), halve MIN_READINGS_PER_WINDOW to account for data loss during transmission gaps, and switch from lat="first" to lat="mean" to represent the patrol centroid.
Verification and Testing
Run a unit test against a known synthetic dataset to confirm boundary alignment and AQI correctness:
import pytest
def test_tumbling_window_boundaries():
rng = pd.date_range("2024-01-01T00:00:00", periods=6, freq="5min", tz="UTC")
test_df = pd.DataFrame({
"timestamp": rng,
"sensor_id": ["S1"] * 6,
"lat": [34.0] * 6,
"lon": [-118.0] * 6,
"pm25": [10.0, 10.0, 10.0, 20.0, 20.0, 20.0],
"no2": [30.0] * 6,
})
result = (
test_df.groupby(
["sensor_id", pd.Grouper(key="timestamp", freq="15min", closed="left", label="left")]
)
.agg(pm25_mean=("pm25", "mean"), reading_count=("pm25", "count"))
.reset_index()
)
assert len(result) == 2, "Expected 2 windows from 6 readings at 5-min intervals"
assert result.iloc[0]["pm25_mean"] == pytest.approx(10.0), "W1 mean should be 10.0"
assert result.iloc[1]["pm25_mean"] == pytest.approx(20.0), "W2 mean should be 20.0"
# Verify no reading crosses the 00:15 boundary
assert (result["reading_count"] == 3).all(), "Each window must have exactly 3 readings"
Cross-validate aggregated means against raw readings for a single sensor manually. If pm25_aqi values seem high, confirm your breakpoint table matches the jurisdiction — WHO guidelines use different thresholds from EPA.
Gotchas
Timezone-naïve timestamps silently split windows. If df["timestamp"] has no timezone (tz=None), daylight saving transitions in local time will generate a spurious window at the clock change boundary. Always ingest with tz="UTC" and convert to local time only for display, not for grouping. The timestamp alignment and timezone normalization guide covers enforcement at the ingestion layer.
closed="right" with label="right" is a common misconfiguration. This names each window by its end timestamp, which looks correct in output but shifts the boundary by one reading, causing the first sample of each period to land in the previous window. Regulatory systems will reject submissions where window labels do not match the interval start.
Dropping trailing partial windows only by count can mask stalled sensors. A sensor that emits three identical readings in one minute then goes silent will pass the reading_count >= 3 filter for that window, producing a mean that represents 3 minutes of data, not 15. Add a time-span check: confirm window_end - window_start >= 14 minutes using the min/max timestamps within each group if data quality is critical.
np.select default must be set explicitly for ultra-high concentrations. If a wildfire event pushes PM2.5 above 125.4 µg/m³, none of the standard breakpoint conditions match. The default=300.0 ensures a Hazardous sentinel is returned rather than 0, which would incorrectly classify a dangerous reading as Good.
Related
- Parent: Windowed Aggregation for Time-Series — tumbling, sliding, and session windows in a single cluster reference
- Real-Time Stream Processing & Spatial Analytics — the broader pipeline this page feeds into
- Timestamp Alignment & Timezone Normalization — prerequisite step before windowing
- Managing Python Memory Limits for Continuous Sensor Streams — scaling aggregation when window state grows large