docs / workflows
Sources → Bronze Ingestion
Sprint 2 — Continuous ingestion from SMB and sFTP into local Bronze storage. Prediction-based file discovery on 245k+ files.
SMB share (Z:\)
~245,000+ JSON sensor files, 2 apartments
sFTP / Meteo2
~70 weather CSV files (daily, Aug–Oct 2023)
ingestion/fast_flow/watcher.py — continuous loop, checks every 60 seconds
Find baseline — walks newest YYYY/MM/DD/HH folder in Bronze, compares by parsed datetime (not alphabetically — DD.MM format breaks string sort)
Predict next files — from last timestamp + 1 min, generates expected filenames for both apartments
Check with .exists() — instead of scanning 245k files (~72 s), checks if predicted file exists on SMB (~0.01 s)
If found — triggers bulk_to_bronze.py then flatten_sensors.py via subprocess
If not found — waits 60 seconds and checks again
Nightly safety net — at midnight, performs one full os.scandir pass to catch any missed files
performance — file discovery
method
time / check
sorted(glob("*.json")) on 245k files
~72 s
os.scandir single pass (find max)
~40 s
Prediction + .exists()
~0.01 s
configuration
variable
default
description
SMB_PATH
Z:\
Mounted SMB share with sensor JSON files
BRONZE_ROOT
storage\bronze
Local Bronze storage folder
INTERVAL_SECS
60
Seconds between checks
NIGHTLY_HOUR
0
Hour for nightly full scan (midnight)
Usage —python watcher.py|python watcher.py --scan(force full scan, run pipeline, exit)
Finds the newest file in Bronze
From that timestamp + 1 minute, generates expected filenames for both apartments
Checks if each predicted file exists on SMB (.exists() — fast)
If it exists and isn't in Bronze yet, queues it for copy
Stops after 10 consecutive minutes with no files found (caught up)
Copies all new files using 16 parallel threads (ThreadPoolExecutor)
os.scandir over the entire SMB share, collects all .json filenames
Sorts them
Walks from newest to oldest
Checks if each file's Bronze destination exists
Stops after 50 consecutive files that already exist (everything older is guaranteed)
Copies new files with 16 threads
Filename format —DD.MM.YYYY HHMM_{ApartmentName}_received.json
JimmyLoup → bronze/jimmy/ · JeremieVianin → bronze/jeremie/
performance
mode
files on SMB
new files
time
Prediction
246,000
2
0.4 s
Prediction
246,000
1,245
20 s
Full scan
246,000
0
~80 s
Full scan
246,000
1,245
~85 s
bronze folder structure
storage/bronze/
├── jimmy/
│ ├── 2023/
│ │ ├── 08/
│ │ │ ├── 18/
│ │ │ │ ├── 10/
│ │ │ │ │ ├── 18.08.2023 1000_JimmyLoup_received.json
│ │ │ │ │ ├── 18.08.2023 1001_JimmyLoup_received.json
│ │ │ │ │ └── ...
│ │ │ │ ├── 11/
│ │ │ │ └── ...
│ │ │ └── ...
│ │ └── 09/
│ └── ...
└── jeremie/
└── (same structure)
ingestion/slow_flow/weather_download.py — daily, sFTP → Bronze
Connects to sFTP server using paramiko
Lists all files in remote directory with sftp.listdir()
For each .csv file, checks if it already exists in Bronze
If not, downloads it to the appropriate date folder
Closes connection in a finally block (no leaks)
Filename format —Pred_YYYY-MM-DD.csv→ bronze/weather/YYYY/MM/DD/Pred_YYYY-MM-DD.csv
configuration
variable
description
SFTP_HOST
sFTP server hostname
SFTP_PORT
Port (default: 22)
SFTP_USER
Username
SFTP_PASSWORD
Password
SFTP_PATH
Remote directory (e.g. /Meteo2)
BRONZE_ROOT
Local Bronze storage folder
bronze/jimmy/
~123k JSON sensor files
YYYY/MM/DD/HH/
bronze/jeremie/
~123k JSON sensor files
YYYY/MM/DD/HH/
bronze/weather/
~70 CSV files (daily)
YYYY/MM/DD/