docs / workflows

Bronze → Silver ETL Workflow

Sprint 2 — Schema creation, sensor flattening, MySQL dimension import. 15M+ rows ingested into PostgreSQL with watermark-based incremental processing.

01Schema creation — create_silver.pydone
create_silver.py

etl/bronze_to_silver/ — run once per environment

1.

If DB_ADMIN_URL is set, connects as admin to check if the target database exists — creates it if not, grants privileges to app user

2.

Connects to target database and runs CREATE TABLE IF NOT EXISTS for all Silver tables — safe to re-run

tables created

table

purpose

rows

sensor_events

Flattened sensor readings

15M+

weather_clean

Standardized weather forecasts

~70 days

apartment_metadata

Building/room/sensor reference data

small

di_errors_clean

Sensor error logs

varies

etl_watermark

Tracks which files have been processed

varies

ConfigDB_URL (app user connection) ·DB_ADMIN_URL (admin connection for DB creation)

flatten
02Sensor flattening — flatten_sensors.pydone
flatten_sensors.py

etl/bronze_to_silver/ — Bronze JSON → silver.sensor_events

1.

Load watermark — reads silver.etl_watermark to know which files have already been processed

2.

Find new files (newest first) — walks Bronze folders backwards, stops after 50 consecutive already-processed files

3.

Parallel processing — splits into batches of 2,000, processed by 8 workers (ProcessPoolExecutor)

4.

Each worker: opens JSON, parses datetime, calls flatten() to extract all sensor readings

5.

Upsert — inserts into silver.sensor_events with ON CONFLICT DO UPDATE (dedup by unique constraint)

6.

Mark done — adds processed filenames to the watermark table

sensor extraction — 6 categories

JSON key

sensor_type

fields extracted

plugs

plug

power (W), total (Wh), temperature (°C)

doorsWindows

door/window

open (bool), battery (%)

motions

motion

motion (bool), light (lux), temperature (°C)

meteos.meteo

meteo

temperature_c, co2_ppm, humidity_pct, noise_db, pressure_hpa, battery

humidities

humidity

temperature (°C), humidity (%), battery (%)

consumptions

consumption

total_power, power1-3 (W), current1-3 (A), voltage1-3 (V)

Room normalization — raw names like Bhroom, Bdroom, Livingroom mapped to Bathroom, Bedroom, Living Room

Outlier flagging — values outside bounds are flagged with is_outlier = TRUE but NOT removed

outlier bounds

field

min

max

temperature_c

-20

60

humidity_pct

0

100

co2_ppm

300

5,000

noise_db

0

140

pressure_hpa

870

1,085

power

0

10,000

battery

0

100

output schema — silver.sensor_events

column

type

description

id

BIGSERIAL

PRIMARY KEY

apartment

VARCHAR(20)

'jimmy' or 'jeremie'

room

VARCHAR(50)

'Living Room', 'Office', etc.

sensor_type

VARCHAR(20)

'plug', 'motion', 'meteo', etc.

field

VARCHAR(50)

'power', 'temperature_c', 'co2_ppm', etc.

value

FLOAT

the reading

unit

VARCHAR(10)

'W', '°C', 'ppm', etc.

timestamp

TIMESTAMPTZ

when the reading was taken

is_outlier

BOOLEAN

flagged if outside bounds

UNIQUE (apartment, room, sensor_type, field, timestamp)

performance

scenario

files

rows

time

First run (full backfill)

245,000

15M+

~3.5 hours

Incremental (after watcher)

2–20

100–1,500

2–4 seconds

Nothing new

0

0

1–2 seconds

import
03MySQL dimension importdone
import_mysql_to_silver.py

etl/bronze_to_silver/ — MySQL (pidb) → Silver dimension tables

1.

Connects to both MySQL (source: pidb @ 10.130.25.152:3306) and PostgreSQL (target)

2.

For each table in the mapping, reads all rows from MySQL

3.

Drops and recreates the Silver table (TEXT columns for safe import)

4.

Inserts all rows

tables imported (10)

MySQL

Silver

description

rows

buildings

dim_buildings

Apartment metadata, location, building year

small

buildingtype

dim_building_types

Maison / Appartement lookup

small

rooms

dim_rooms

Room details, sensor counts, orientation, m²

~20

sensors

dim_sensors

Sensor IPs mapped to rooms

~30

devices

dim_devices

Appliances per room (fridge, washer, etc.)

~40

profilereference

ref_energy_profiles

Reference energy consumption kWh/yr by type

small

profile

ref_power_snapshots

Power consumption snapshots over time

varies

parameters

ref_parameters

Threshold configs per building

small

parameterstype

ref_parameters_type

Parameter type lookup

small

dierrors

log_sensor_errors

Sensor error logs — null values, failures

varies

skipped tables

users

GDPR (names, emails, passwords, phone numbers)

actions, achievements, badges

Gamification, not relevant for analytics

events, eventsgeneric, eventsignore

App-generated, not raw sensor data

categories

Only useful with events table

userrelationships

App config

clean
04Weather cleaning — clean_weather.pyin review
clean_weather.py

etl/bronze_to_silver/ — Bronze weather CSV → silver.weather_clean

1.

Validates CSV columns (Time, Value, Prediction, Site, Measurement, Unit)

2.

Parses timestamps as UTC, filters out data older than WEATHER_MIN_YEAR

3.

Normalizes site names, converts Prediction to numeric, deduplicates

4.

Maps 4 measurements: PRED_T_2M_ctrl → temperature_c, PRED_RELHUM_2M_ctrl → humidity_pct, PRED_TOT_PREC_ctrl → precipitation_mm, PRED_GLOB_ctrl → radiation_wm2

5.

Replaces sentinel value -99999.0 with NULL, pivots long → wide format

6.

Flags outliers (temp: -50..60, humidity: 0..100, precip: 0..500, radiation: 0..1500)

7.

Upserts into silver.weather_clean with ON CONFLICT (timestamp, site)

8.

Watermark tracking via silver.weather_watermark — skips already-processed files

Usagepython etl/bronze_to_silver/clean_weather.py

output
05Silver layer — output15M+ rows

silver.sensor_events

15M+ rows — full resolution sensor data

apartment, room, sensor_type, field, value, timestamp

silver.dim_* / ref_* / log_*

10 dimension tables from MySQL + weather

buildings, rooms, sensors, devices, parameters, errors

First runcreate_silver.pyimport_mysql_to_silver.pybulk_to_bronze.py --fullflatten_sensors.py

Ongoingwatcher.py (leave running, handles both ingestion and flattening)