docs / workflows

Sources → Bronze Ingestion

Sprint 2 — Continuous ingestion from SMB and sFTP into local Bronze storage. Prediction-based file discovery on 245k+ files.

01Data sourcesexternal

SMB share (Z:\)

~245,000+ JSON sensor files, 2 apartments

sFTP / Meteo2

~70 weather CSV files (daily, Aug–Oct 2023)

detect
02Orchestrator — watcher.pydone
watcher.py

ingestion/fast_flow/watcher.py — continuous loop, checks every 60 seconds

1.

Find baseline — walks newest YYYY/MM/DD/HH folder in Bronze, compares by parsed datetime (not alphabetically — DD.MM format breaks string sort)

2.

Predict next files — from last timestamp + 1 min, generates expected filenames for both apartments

3.

Check with .exists() — instead of scanning 245k files (~72 s), checks if predicted file exists on SMB (~0.01 s)

4.

If found — triggers bulk_to_bronze.py then flatten_sensors.py via subprocess

5.

If not found — waits 60 seconds and checks again

6.

Nightly safety net — at midnight, performs one full os.scandir pass to catch any missed files

performance — file discovery

method

time / check

sorted(glob("*.json")) on 245k files

~72 s

os.scandir single pass (find max)

~40 s

Prediction + .exists()

~0.01 s

configuration

variable

default

description

SMB_PATH

Z:\

Mounted SMB share with sensor JSON files

BRONZE_ROOT

storage\bronze

Local Bronze storage folder

INTERVAL_SECS

60

Seconds between checks

NIGHTLY_HOUR

0

Hour for nightly full scan (midnight)

Usagepython watcher.py|python watcher.py --scan(force full scan, run pipeline, exit)

copy
03Sensor ingestion — bulk_to_bronze.pydone
Prediction modedefault
1.

Finds the newest file in Bronze

2.

From that timestamp + 1 minute, generates expected filenames for both apartments

3.

Checks if each predicted file exists on SMB (.exists() — fast)

4.

If it exists and isn't in Bronze yet, queues it for copy

5.

Stops after 10 consecutive minutes with no files found (caught up)

6.

Copies all new files using 16 parallel threads (ThreadPoolExecutor)

Full scan mode--full flag
1.

os.scandir over the entire SMB share, collects all .json filenames

2.

Sorts them

3.

Walks from newest to oldest

4.

Checks if each file's Bronze destination exists

5.

Stops after 50 consecutive files that already exist (everything older is guaranteed)

6.

Copies new files with 16 threads

Filename formatDD.MM.YYYY HHMM_{ApartmentName}_received.json

JimmyLoup → bronze/jimmy/  · JeremieVianin → bronze/jeremie/

performance

mode

files on SMB

new files

time

Prediction

246,000

2

0.4 s

Prediction

246,000

1,245

20 s

Full scan

246,000

0

~80 s

Full scan

246,000

1,245

~85 s

bronze folder structure

storage/bronze/

├── jimmy/

│ ├── 2023/

│ │ ├── 08/

│ │ │ ├── 18/

│ │ │ │ ├── 10/

│ │ │ │ │ ├── 18.08.2023 1000_JimmyLoup_received.json

│ │ │ │ │ ├── 18.08.2023 1001_JimmyLoup_received.json

│ │ │ │ │ └── ...

│ │ │ │ ├── 11/

│ │ │ │ └── ...

│ │ │ └── ...

│ │ └── 09/

│ └── ...

└── jeremie/

└── (same structure)

download
04Weather ingestion — weather_download.pyin review
weather_download.py

ingestion/slow_flow/weather_download.py — daily, sFTP → Bronze

1.

Connects to sFTP server using paramiko

2.

Lists all files in remote directory with sftp.listdir()

3.

For each .csv file, checks if it already exists in Bronze

4.

If not, downloads it to the appropriate date folder

5.

Closes connection in a finally block (no leaks)

Filename formatPred_YYYY-MM-DD.csv→ bronze/weather/YYYY/MM/DD/Pred_YYYY-MM-DD.csv

configuration

variable

description

SFTP_HOST

sFTP server hostname

SFTP_PORT

Port (default: 22)

SFTP_USER

Username

SFTP_PASSWORD

Password

SFTP_PATH

Remote directory (e.g. /Meteo2)

BRONZE_ROOT

Local Bronze storage folder

output
05Bronze layer — outputdone

bronze/jimmy/

~123k JSON sensor files

YYYY/MM/DD/HH/

bronze/jeremie/

~123k JSON sensor files

YYYY/MM/DD/HH/

bronze/weather/

~70 CSV files (daily)

YYYY/MM/DD/