Canvis en la Distribució de Dades i Monitoratge
- Introducció
- Per què les dades canvien?
- Tipus de drift
- Detecció de drift amb mètodes estadístics
- Monitoratge de qualitat de dades
- Monitoratge en producció
- Observabilitat (Observability)
- Establint alertes
- Resum
Introducció
Quan un model de machine learning arriba a producció, el nostre treball no s’acaba: tot just comença una nova fase. El món real és dinàmic, i les dades que el model veu en producció poden ser molt diferents de les que va veure durant l’entrenament. Aquest fenomen s’anomena canvi en la distribució de dades (data distribution shift o simplement drift).
En aquest capítol explorarem els diferents tipus de drift, com detectar-los, i com establir un sistema de monitoratge que ens permeti saber si el nostre model segueix funcionant correctament.
Per què les dades canvien?
Imaginem que hem entrenat un model per predir quins productes comprarà un client. El model va funcionar molt bé durant les proves, però després d’uns mesos en producció, les prediccions ja no són tan bones. Què ha passat?
Les causes poden ser múltiples:
- Canvis en el comportament dels usuaris: Nous hàbits de consum, modes, temporades
- Canvis en el mercat: Nous competidors, canvis de preus, situació econòmica
- Canvis en la recollida de dades: Actualitzacions en l’aplicació, nous camps, errors en sensors
- Canvis en la població: Nous segments de clients, expansió a nous mercats
El problema és que el model va aprendre patrons d’unes dades concretes, i quan aquestes dades canvien, el model pot quedar obsolet.
Tipus de drift
No tots els canvis en les dades són iguals. Distingim tres tipus principals:
Covariate Shift (Canvi en les covariables)
El covariate shift es produeix quan la distribució de les variables d’entrada (features) canvia, però la relació entre les entrades i la sortida es manté.
Exemple: Un model per detectar correu brossa entrenat amb correus d’oficina. Quan l’empresa comença a rebre més correus de màrqueting (nova distribució d’entrades), el model pot fallar perquè mai havia vist aquest tipus de correus, tot i que la definició de “brossa” no ha canviat.
Com detectar-ho: Comparar la distribució de les features entre entrenament i producció.
Label Shift (Canvi en les etiquetes)
El label shift es produeix quan la distribució de les sortides (etiquetes) canvia.
Exemple: Un model de diagnòstic mèdic entrenat amb dades on el 5% dels pacients tenien la malaltia. Si en producció aquest percentatge puja al 20% (per exemple, durant un brot), el model pot tenir problemes perquè els seus llindars de decisió estaven calibrats per a una prevalença diferent.
Entrenament: 95% sans, 5% malalts
Producció: 80% sans, 20% malalts ← El model pot subestimar casos positius
Com detectar-ho: Monitoritzar la distribució de les prediccions (si no tenim etiquetes reals) o de les etiquetes reals (si les obtenim amb retard).
Concept Drift (Canvi de concepte)
El concept drift és el més problemàtic: la relació entre les entrades i la sortida canvia.
Exemple: Un model per predir si un client pagarà un préstec, entrenat abans d’una crisi econòmica. Després de la crisi, clients que abans eren bons pagadors ara no poden pagar. Les mateixes features (ingressos, edat, historial) ara porten a resultats diferents.
Abans: Ingressos alts + Historial bo → Alta probabilitat de pagar
Després: Ingressos alts + Historial bo → Probabilitat incerta (crisi!)
Com detectar-ho: Només es pot detectar amb certesa si tenim les etiquetes reals de producció. Altrament, podem inferir-ho si detectem degradació en les mètriques del model.
Detecció de drift amb mètodes estadístics
Per detectar drift, comparem distribucions: les dades d’entrenament (o d’un període de referència) contra les dades actuals de producció. Hi ha diversos mètodes estadístics per fer-ho.
Tests de dues mostres (Two-sample tests)
Aquests tests responen a la pregunta: “Aquestes dues mostres provenen de la mateixa distribució?”
Test de Kolmogorov-Smirnov (KS)
El test KS mesura la màxima diferència entre les funcions de distribució acumulada de dues mostres. És útil per a variables numèriques contínues.
from scipy import stats
def detect_drift_ks(reference_data, current_data, features, alpha: float = 0.05) -> list[str]:
"""Detect drift using Kolmogorov-Smirnov test."""
alerts = []
for feature_name in features:
statistic, p_value = stats.ks_2samp(
reference_data[feature_name],
current_data[feature_name]
)
if p_value < alpha:
alerts.append(f"Drift detected in {feature_name} (p={p_value:.4f})")
return alerts
Interpretació: Un p-valor baix (típicament < 0.05) indica que les dues mostres probablement no vénen de la mateixa distribució.
Test Chi-quadrat (χ²)
Per a variables categòriques, el test chi-quadrat compara les freqüències observades amb les esperades.
from scipy import stats
# Comptar freqüències
reference_counts = reference_data['categoria'].value_counts()
current_counts = current_data['categoria'].value_counts()
# Assegurar que tenim les mateixes categories
all_categories = set(reference_counts.index) | set(current_counts.index)
ref_freq = [reference_counts.get(c, 0) for c in all_categories]
cur_freq = [current_counts.get(c, 0) for c in all_categories]
# Test chi-quadrat
statistic, p_value = stats.chisquare(cur_freq, ref_freq)
Population Stability Index (PSI)
El PSI és una mètrica molt utilitzada en la indústria financera per mesurar quant ha canviat una distribució.
import numpy as np
def calculate_psi(reference, current, bins: int = 10, smoothing: float = 1e-4) -> float:
"""
Calculate Population Stability Index.
Returns PSI value: < 0.1 insignificant, 0.1-0.2 investigate, > 0.2 action required.
"""
# Crear bins basats en la distribució de referència
breakpoints = np.percentile(reference, np.linspace(0, 100, bins + 1))
breakpoints[0] = -np.inf
breakpoints[-1] = np.inf
# Calcular proporcions per cada bin
ref_counts = np.histogram(reference, bins=breakpoints)[0]
cur_counts = np.histogram(current, bins=breakpoints)[0]
ref_pct = ref_counts / len(reference)
cur_pct = cur_counts / len(current)
# Evitar divisió per zero
ref_pct = np.where(ref_pct == 0, smoothing, ref_pct)
cur_pct = np.where(cur_pct == 0, smoothing, cur_pct)
# Calcular PSI
psi = np.sum((cur_pct - ref_pct) * np.log(cur_pct / ref_pct))
return psi
Interpretació del PSI:
- PSI < 0.1: Canvi insignificant
- 0.1 ≤ PSI < 0.2: Canvi moderat, investigar
- PSI ≥ 0.2: Canvi significatiu, acció requerida
Finestres de temps
Per aplicar aquests tests en producció, necessitem definir finestres de temps (time windows):
- Finestra de referència: Dades d’entrenament o d’un període estable
- Finestra actual: Dades recents de producció (últimes hores, dies, o setmanes)
Monitoratge de qualitat de dades
La detecció de drift és essencial, però no és l’única cosa que hem de monitoritzar. Les dades poden tenir problemes de qualitat que afecten el model sense que hi hagi drift estadístic.
Tipus de problemes de qualitat
1. Integritat de les dades
- Valors nuls: percentatge de missing values per feature
- Duplicats: registres repetits
- Violacions d’esquema: tipus de dades incorrectes, columnes inesperades
2. Qualitat de les features
- Outliers: valors fora del rang esperat
- Violacions de rang: valors fora dels límits coneguts
- Canvis de cardinalitat: noves categories en features categòriques
3. Qualitat de les prediccions
- Distribució de confiança: les prediccions són menys segures?
- Calibratge: les probabilitats predites coincideixen amb les freqüències reals?
Exemple de validació
import numpy as np
import pandas as pd
def check_data_quality(df: pd.DataFrame, reference_stats: dict) -> list[str]:
"""Comprova problemes de qualitat de dades."""
issues = []
# Valors nuls
null_rate = df.isnull().mean()
if (null_rate > 0.05).any():
issues.append(f"Taxa de nuls alta: {null_rate[null_rate > 0.05].to_dict()}")
# Outliers (mètode z-score)
for col in df.select_dtypes(include=[np.number]).columns:
z_scores = np.abs((df[col] - df[col].mean()) / df[col].std())
outlier_rate = (z_scores > 3).mean()
if outlier_rate > 0.05:
issues.append(f"Outliers en {col}: {outlier_rate:.2%}")
# Noves categories
for col in df.select_dtypes(include=['object', 'category']).columns:
known_cats = set(reference_stats['categories'][col])
new_cats = set(df[col].unique()) - known_cats
if new_cats:
issues.append(f"Noves categories en {col}: {new_cats}")
return issues
Integració amb el pipeline:
És important validar les dades abans d’usar-les per entrenar:
import logging
logger = logging.getLogger(__name__)
def validate_and_prepare_data(raw_data, reference_stats):
"""Pipeline de validació i preparació."""
# 1. Validar qualitat
quality_issues = check_data_quality(raw_data, reference_stats)
if quality_issues:
logger.warning(f"Problemes de qualitat detectats: {quality_issues}")
# Decidir si continuar o abortar
if critical_issues(quality_issues):
raise ValueError("Dades amb problemes crítics")
# 2. Preprocessar
processed_data = preprocess(raw_data)
return processed_data
Diferència clau: El drift mesura canvis en distribucions; la qualitat mesura problemes en les dades mateixes.
Monitoratge en producció
Detectar drift és només una part del monitoratge. Un sistema de monitoratge complet ha de vigilar múltiples aspectes.
Emmagatzematge de prediccions per monitoratge
Abans de poder monitoritzar el nostre model, necessitem guardar les prediccions que fem en producció. Sense aquest registre, no podem detectar drift, calcular mètriques, ni analitzar el comportament del model.
Què cal guardar?
Per a cada predicció, hauríem de registrar:
Essencial:
- Timestamp: Quan s’ha fet la predicció
- Input features: Les dades d’entrada (sense informació sensible!)
- Prediction: La sortida del model
- Request ID: Identificador únic de la petició (opcional però útil)
Recomanat:
- Model version: Quina versió del model ha fet la predicció
- Confidence/probability: Si el model retorna probabilitats
- Latency: Temps que ha trigat la predicció
Opcional:
- User ID: Si és rellevant i permet privacitat
- Context: Informació addicional del context de la petició
On guardar les prediccions?
Tenim diverses opcions, depenent de l’escala i complexitat:
1. Fitxers de log (més simple)
Per a volums baixos-mitjans, logs estructurats en JSON són suficients:
import logging
import json
from datetime import datetime
# Configurar logger per prediccions
pred_logger = logging.getLogger('predictions')
pred_logger.setLevel(logging.INFO)
# Handler per escriure a fitxer
handler = logging.FileHandler('logs/predictions.jsonl') # .jsonl = JSON Lines
pred_logger.addHandler(handler)
def log_prediction(inputs: dict, prediction: float, model_version: str, latency: float):
"""Registra una predicció en format JSON."""
log_entry = {
'timestamp': datetime.now().isoformat(),
'inputs': inputs,
'prediction': float(prediction),
'model_version': model_version,
'latency_ms': latency * 1000
}
pred_logger.info(json.dumps(log_entry))
# Ús a l'API
@app.post("/predict")
def predict(data: InputData):
start = time.time()
prediction = model.predict([data.features])[0]
latency = time.time() - start
# Registrar predicció
log_prediction(
inputs=data.dict(),
prediction=prediction,
model_version='1.2.0',
latency=latency
)
return {"prediction": float(prediction)}
Això genera un fitxer predictions.jsonl amb una línia per predicció:
{"timestamp": "2024-01-15T10:30:00", "inputs": {"age": 30, "income": 50000}, "prediction": 0.85, "model_version": "1.2.0", "latency_ms": 45.2}
{"timestamp": "2024-01-15T10:31:15", "inputs": {"age": 45, "income": 75000}, "prediction": 0.92, "model_version": "1.2.0", "latency_ms": 38.7}
Avantatges: Simple, no requereix base de dades, fàcil d’analitzar Desavantatges: Menys eficient per a grans volums, queries més lentes
2. Base de dades SQLite (mitjà volum)
Per a més volum, una base de dades local és millor:
import sqlite3
from datetime import datetime
def init_predictions_db(db_path: str = 'predictions.db'):
"""Inicialitza la base de dades de prediccions."""
with sqlite3.connect(db_path) as conn:
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS predictions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT NOT NULL,
input_age REAL,
input_income REAL,
prediction REAL NOT NULL,
probability REAL,
model_version TEXT,
latency_ms REAL
)
''')
# Índex per timestamp (per queries ràpides)
cursor.execute('CREATE INDEX IF NOT EXISTS idx_timestamp ON predictions(timestamp)')
conn.commit()
def save_prediction(inputs: dict, prediction: float, model_version: str, latency: float):
"""Guarda predicció a SQLite."""
with sqlite3.connect('predictions.db') as conn:
cursor = conn.cursor()
cursor.execute('''
INSERT INTO predictions (timestamp, input_age, input_income, prediction, model_version, latency_ms)
VALUES (?, ?, ?, ?, ?, ?)
''', (
datetime.now().isoformat(),
inputs.get('age'),
inputs.get('income'),
prediction,
model_version,
latency * 1000
))
conn.commit()
Avantatges: Queries SQL ràpides, bon rendiment, backups fàcils Desavantatges: Limitat a un sol servidor (no distribuït)
3. PostgreSQL (alt volum, producció)
Per a sistemes amb alt tràfic, una base de dades robusta:
import psycopg2
from datetime import datetime
def save_prediction_postgres(inputs: dict, prediction: float, model_version: str):
"""Guarda predicció a PostgreSQL."""
with psycopg2.connect("postgresql://user:pass@localhost/mldb") as conn:
cursor = conn.cursor()
cursor.execute('''
INSERT INTO predictions (timestamp, inputs, prediction, model_version)
VALUES (%s, %s, %s, %s)
''', (
datetime.now(),
json.dumps(inputs), # Guardar inputs com JSONB
prediction,
model_version
))
conn.commit()
Avantatges: Escalable, suporta concurrència, queries complexes Desavantatges: Més complex de configurar i mantenir
Consideracions de privacitat i seguretat
⚠️ Molt important: No guardar dades sensibles sense anonimitzar!
def anonymize_inputs(inputs: dict) -> dict:
"""Elimina o anonimitza dades sensibles."""
safe_inputs = inputs.copy()
# Eliminar camps sensibles
sensitive_fields = ['email', 'phone', 'ssn', 'credit_card']
for field in sensitive_fields:
if field in safe_inputs:
safe_inputs[field] = '***REDACTED***'
# Hash d'identificadors si cal mantenir-los
if 'user_id' in safe_inputs:
safe_inputs['user_id'] = hashlib.sha256(
str(safe_inputs['user_id']).encode()
).hexdigest()[:16]
return safe_inputs
# Usar abans de guardar
safe_inputs = anonymize_inputs(raw_inputs)
log_prediction(safe_inputs, prediction, model_version, latency)
Integració completa amb FastAPI
Exemple complet d’API amb logging:
from fastapi import FastAPI
import joblib
import json
import time
import logging
app = FastAPI()
# Configurar logging
pred_logger = logging.getLogger('predictions')
pred_logger.setLevel(logging.INFO)
handler = logging.FileHandler('logs/predictions.jsonl')
pred_logger.addHandler(handler)
# Carregar model
model = joblib.load('models/current_model.pkl')
with open('models/current_metadata.json') as f:
metadata = json.load(f)
@app.post("/predict")
def predict(data: InputData):
"""Endpoint de predicció amb logging."""
start_time = time.time()
# Predicció
features = [data.age, data.income, data.score]
prediction = model.predict([features])[0]
probability = model.predict_proba([features])[0] if hasattr(model, 'predict_proba') else None
latency = time.time() - start_time
# Logging
log_entry = {
'timestamp': datetime.now().isoformat(),
'inputs': {
'age': data.age,
'income': data.income,
'score': data.score
},
'prediction': float(prediction),
'probability': float(probability[1]) if probability is not None else None,
'model_version': metadata['version'],
'latency_ms': latency * 1000
}
pred_logger.info(json.dumps(log_entry))
return {
'prediction': float(prediction),
'probability': float(probability[1]) if probability is not None else None,
'model_version': metadata['version']
}
Llegir i analitzar logs per monitoratge
Un cop tenim logs, podem analitzar-los per detectar drift:
import pandas as pd
import json
def load_predictions_from_logs(log_file: str, hours: int = 24) -> pd.DataFrame:
"""
Carrega prediccions dels logs de les últimes N hores.
Returns:
DataFrame amb columnes: timestamp, inputs, prediction, model_version, etc.
"""
from datetime import datetime, timedelta
cutoff = datetime.now() - timedelta(hours=hours)
predictions = []
with open(log_file, 'r') as f:
for line in f:
try:
entry = json.loads(line.strip())
timestamp = datetime.fromisoformat(entry['timestamp'])
if timestamp >= cutoff:
# Flattenar inputs en columnes separades
row = {
'timestamp': timestamp,
'prediction': entry['prediction'],
'model_version': entry.get('model_version'),
'latency_ms': entry.get('latency_ms'),
**entry['inputs'] # Unpack inputs com a columnes
}
predictions.append(row)
except (json.JSONDecodeError, KeyError) as e:
continue # Skip línies malformades
return pd.DataFrame(predictions)
# Ús
recent_predictions = load_predictions_from_logs('logs/predictions.jsonl', hours=24)
print(f"Prediccions últimes 24h: {len(recent_predictions)}")
print(f"Latència mitjana: {recent_predictions['latency_ms'].mean():.1f}ms")
print(f"Distribució prediccions:\n{recent_predictions['prediction'].value_counts()}")
Resum: estratègia de logging
Per començar (projectes petits):
- ✅ Logs JSON en fitxers (
predictions.jsonl) - ✅ Guardar: timestamp, inputs, prediction, model_version
- ✅ Analitzar amb scripts Python/pandas periòdicament
Per escalar (projectes mitjans):
- ✅ SQLite per emmagatzematge més eficient
- ✅ Índexs per timestamps per queries ràpides
- ✅ Scripts automatitzats (cron) per analitzar diàriament
Per producció (projectes grans):
- ✅ PostgreSQL o base de dades distribuïda
- ✅ Anonymització automàtica de dades sensibles
- ✅ Backups automàtics i retenció de dades
- ✅ Monitoratge en temps real amb alertes
Amb prediccions guardades, podem començar a detectar drift i monitoritzar el model!
Què monitoritzar?
1. Mètriques del model (Model metrics)
Si tenim accés a les etiquetes reals (encara que sigui amb retard), podem calcular les mètriques tradicionals:
- Accuracy, precision, recall, F1 per classificació
- MSE, MAE, R² per regressió
import logging
from datetime import datetime
# Configurar named logger
logger = logging.getLogger(__name__)
handler = logging.FileHandler('model_metrics.log')
handler.setFormatter(logging.Formatter('%(asctime)s - %(message)s'))
logger.addHandler(handler)
logger.setLevel(logging.INFO)
def log_prediction(prediction, actual=None):
"""Registra cada predicció i, si està disponible, el valor real."""
log_data = {
'prediction': prediction,
'actual': actual,
'correct': (prediction == actual) if actual is not None else None
}
logger.info(f"Prediction: {log_data}")
2. Distribució de les prediccions
Fins i tot sense etiquetes, podem monitoritzar la distribució de les prediccions:
- Per classificació: Proporció de cada classe predita
- Per regressió: Mitjana, mediana, rang de les prediccions
import numpy as np
import logging
logger = logging.getLogger(__name__)
def log_prediction_distribution(predictions: np.ndarray, window_name: str) -> None:
"""Registra estadístiques de les prediccions."""
stats = {
'window': window_name,
'count': len(predictions),
'mean': float(np.mean(predictions)),
'std': float(np.std(predictions)),
'min': float(np.min(predictions)),
'max': float(np.max(predictions))
}
logger.info(f"Prediction stats: {stats}")
Si la distribució de prediccions canvia dràsticament, pot ser senyal de drift.
3. Distribució de les features
Monitoritzar les features d’entrada ens permet detectar covariate shift:
def check_feature_drift(reference_stats, current_data, threshold: float = 3.0) -> list[str]:
"""Compara estadístiques de features actuals amb la referència."""
alerts = []
for feature in reference_stats.keys():
ref_mean = reference_stats[feature]['mean']
ref_std = reference_stats[feature]['std']
current_mean = current_data[feature].mean()
# Alerta si la mitjana actual està a més de N desviacions
deviation = abs(current_mean - ref_mean) / ref_std if ref_std > 0 else 0
if deviation > threshold:
alerts.append(f"Feature '{feature}' deviates {deviation:.2f}σ from reference")
return alerts
4. Mètriques operacionals
A més del model, hem de monitoritzar el sistema:
- Latència: Temps de resposta de l’API
- Throughput: Nombre de peticions per segon
- Errors: Taxa d’errors (5xx, timeouts)
- Recursos: Ús de CPU, memòria, disc
import time
import logging
logger = logging.getLogger(__name__)
def predict_with_monitoring(model, data):
"""Fa una predicció i registra mètriques operacionals."""
start_time = time.time()
try:
prediction = model.predict(data)
latency = time.time() - start_time
logger.info(f"Prediction successful, latency: {latency:.3f}s")
return prediction
except Exception as e:
logger.error(f"Prediction failed: {str(e)}")
raise
Arquitectura de monitoratge simple
Per al nostre context (scripts Python, sense dependències complexes), podem implementar un sistema de monitoratge basat en:
- Logging estructurat: Registrar tot en fitxers de log amb format consistent
- Scripts d’anàlisi: Scripts que processen els logs periòdicament
- Visualització simple: Gràfics amb matplotlib o similar
Exemple d’script d’anàlisi
import json
from datetime import datetime, timedelta
import numpy as np
import matplotlib.pyplot as plt
def analyze_logs(log_file: str, hours: int = 24):
"""Analitza logs JSON estructurats de les últimes N hores."""
cutoff = datetime.now() - timedelta(hours=hours)
latencies = []
errors = 0
with open(log_file, 'r') as f:
for line in f:
try:
# Parsejar log JSON
log_entry = json.loads(line)
timestamp = datetime.fromisoformat(log_entry['timestamp'])
if timestamp < cutoff:
continue
# Processar segons el tipus de log
if log_entry.get('level') == 'INFO' and 'latency' in log_entry.get('message', ''):
# Extreure latència del missatge o camps extra
latency = float(log_entry['message'].split('latency: ')[1].split('s')[0])
latencies.append(latency)
elif log_entry.get('level') == 'ERROR':
errors += 1
except (json.JSONDecodeError, KeyError, ValueError):
continue
# Generar informe
if latencies:
print(f"Període: últimes {hours} hores")
print(f"Total prediccions: {len(latencies)}")
print(f"Errors: {errors}")
print(f"Latència mitjana: {np.mean(latencies):.3f}s")
print(f"Latència p95: {np.percentile(latencies, 95):.3f}s")
# Gràfic de latències
plt.figure(figsize=(10, 4))
plt.hist(latencies, bins=50)
plt.xlabel('Latència (s)')
plt.ylabel('Freqüència')
plt.title('Distribució de latències')
plt.savefig('latency_distribution.png')
Observabilitat (Observability)
El monitoratge ens diu què passa (mètriques, alertes), però l’observabilitat ens permet entendre per què passa.
Els tres pilars de l’observabilitat
- Mètriques (Metrics): Valors numèrics agregats (latència mitjana, taxa d’errors)
- Logs: Registres detallats d’esdeveniments individuals
- Traces: Seguiment del camí d’una petició a través del sistema
Per al nostre context simplificat, ens centrarem en logs ben estructurats que ens permetin fer anàlisi posterior.
Logs estructurats
En lloc de logs de text lliure, utilitzar format JSON facilita l’anàlisi:
import json
import logging
from datetime import datetime
class JSONFormatter(logging.Formatter):
def format(self, record):
log_data = {
'timestamp': datetime.fromtimestamp(record.created).isoformat(),
'level': record.levelname,
'logger': record.name,
'message': record.getMessage(),
}
# Afegir camps extra si existeixen
if hasattr(record, 'extra_data'):
log_data.update(record.extra_data)
return json.dumps(log_data)
# Configurar el logger
logger = logging.getLogger('model_api')
logger.setLevel(logging.INFO)
handler = logging.FileHandler('app.log')
handler.setFormatter(JSONFormatter())
logger.addHandler(handler)
Exemple de log estructurat per a prediccions
def log_prediction_detailed(request_id: str, input_data: dict, prediction: float, latency: float) -> None:
"""Registra una predicció amb context complet."""
log_data = {
'event': 'prediction',
'request_id': request_id,
'input_features': {
'feature_1': input_data.feature_1,
'feature_2': input_data.feature_2,
# ... no incloure dades sensibles
},
'prediction': prediction,
'latency_ms': latency * 1000,
}
logger.info('Prediction completed', extra={'extra_data': log_data})
Això genera logs com:
{"timestamp": "2024-01-15T10:30:45", "level": "INFO", "message": "Prediction completed", "event": "prediction", "request_id": "abc123", "input_features": {"feature_1": 0.5, "feature_2": 1.2}, "prediction": 0.85, "latency_ms": 45.2}
Establint alertes
Les alertes ens notifiquen quan alguna cosa requereix atenció. La clau és trobar l’equilibri: prou sensibles per detectar problemes, però no tant que generin fatiga d’alertes.
Tipus d’alertes
-
Alertes de llindar (Threshold alerts): Quan una mètrica supera un valor fix
- Latència > 500ms
- Taxa d’errors > 1%
- PSI > 0.2
-
Alertes d’anomalia: Quan una mètrica es desvia significativament del seu comportament normal
- Nombre de peticions 50% inferior a la mitjana de la mateixa hora
-
Alertes de tendència: Quan una mètrica mostra una tendència preocupant
- Latència augmentant progressivament durant 3 dies
Implementació simple d’alertes
def check_alerts(metrics: dict, thresholds: dict = None) -> list[dict]:
"""Comprova si cal generar alertes."""
if thresholds is None:
thresholds = {
'latency_p95': 0.5,
'error_rate': 0.01,
'psi': 0.2
}
alerts = []
# Alerta de latència
if metrics.get('latency_p95', 0) > thresholds['latency_p95']:
alerts.append({
'severity': 'warning',
'message': f"High p95 latency: {metrics['latency_p95']:.2f}s"
})
# Alerta d'errors
if metrics.get('error_rate', 0) > thresholds['error_rate']:
alerts.append({
'severity': 'critical',
'message': f"High error rate: {metrics['error_rate']:.2%}"
})
# Alerta de drift
if metrics.get('psi', 0) > thresholds['psi']:
alerts.append({
'severity': 'warning',
'message': f"Data drift detected (PSI={metrics['psi']:.3f})"
})
return alerts
def send_alerts(alerts):
"""Envia alertes (per email, fitxer, etc.)."""
for alert in alerts:
# Exemple simple: escriure a fitxer
with open('alerts.log', 'a') as f:
f.write(f"{datetime.now()} [{alert['severity']}] {alert['message']}\n")
# Aquí podríem afegir enviament per email, Slack, etc.
Resum
En aquest capítol hem après:
- El drift (canvi en la distribució de dades) és inevitable en producció i pot degradar el rendiment del model
- Hi ha tres tipus principals: covariate shift (canvi en features), label shift (canvi en distribució d’etiquetes), i concept drift (canvi en la relació entre features i etiquetes)
- Podem detectar drift amb tests estadístics com Kolmogorov-Smirnov, chi-quadrat, o el Population Stability Index (PSI)
- Un sistema de monitoratge complet vigila mètriques del model, distribució de prediccions i features, i mètriques operacionals
- L’observabilitat (especialment logs estructurats) ens permet entendre per què passen els problemes
- Les alertes ens notifiquen quan cal actuar, però cal calibrar-les per evitar fatiga
Al proper capítol veurem què fer quan detectem que el model necessita ser actualitzat: l’aprenentatge continu i les estratègies per testar models en producció.