Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Canvis en la Distribució de Dades i Monitoratge

Introducció

Quan un model de machine learning arriba a producció, el nostre treball no s’acaba: tot just comença una nova fase. El món real és dinàmic, i les dades que el model veu en producció poden ser molt diferents de les que va veure durant l’entrenament. Aquest fenomen s’anomena canvi en la distribució de dades (data distribution shift o simplement drift).

En aquest capítol explorarem els diferents tipus de drift, com detectar-los, i com establir un sistema de monitoratge que ens permeti saber si el nostre model segueix funcionant correctament.

Per què les dades canvien?

Imaginem que hem entrenat un model per predir quins productes comprarà un client. El model va funcionar molt bé durant les proves, però després d’uns mesos en producció, les prediccions ja no són tan bones. Què ha passat?

Les causes poden ser múltiples:

  • Canvis en el comportament dels usuaris: Nous hàbits de consum, modes, temporades
  • Canvis en el mercat: Nous competidors, canvis de preus, situació econòmica
  • Canvis en la recollida de dades: Actualitzacions en l’aplicació, nous camps, errors en sensors
  • Canvis en la població: Nous segments de clients, expansió a nous mercats

El problema és que el model va aprendre patrons d’unes dades concretes, i quan aquestes dades canvien, el model pot quedar obsolet.

Tipus de drift

No tots els canvis en les dades són iguals. Distingim tres tipus principals:

Covariate Shift (Canvi en les covariables)

El covariate shift es produeix quan la distribució de les variables d’entrada (features) canvia, però la relació entre les entrades i la sortida es manté.

Exemple: Un model per detectar correu brossa entrenat amb correus d’oficina. Quan l’empresa comença a rebre més correus de màrqueting (nova distribució d’entrades), el model pot fallar perquè mai havia vist aquest tipus de correus, tot i que la definició de “brossa” no ha canviat.

Producció

Features X'
distribució diferent

Model

Predicció Y
potencialment dolenta

Entrenament

Features X

Model

Predicció Y

Com detectar-ho: Comparar la distribució de les features entre entrenament i producció.

Label Shift (Canvi en les etiquetes)

El label shift es produeix quan la distribució de les sortides (etiquetes) canvia.

Exemple: Un model de diagnòstic mèdic entrenat amb dades on el 5% dels pacients tenien la malaltia. Si en producció aquest percentatge puja al 20% (per exemple, durant un brot), el model pot tenir problemes perquè els seus llindars de decisió estaven calibrats per a una prevalença diferent.

Entrenament: 95% sans, 5% malalts Producció: 80% sans, 20% malalts ← El model pot subestimar casos positius

Com detectar-ho: Monitoritzar la distribució de les prediccions (si no tenim etiquetes reals) o de les etiquetes reals (si les obtenim amb retard).

Concept Drift (Canvi de concepte)

El concept drift és el més problemàtic: la relació entre les entrades i la sortida canvia.

Exemple: Un model per predir si un client pagarà un préstec, entrenat abans d’una crisi econòmica. Després de la crisi, clients que abans eren bons pagadors ara no poden pagar. Les mateixes features (ingressos, edat, historial) ara porten a resultats diferents.

Abans: Ingressos alts + Historial bo → Alta probabilitat de pagar Després: Ingressos alts + Historial bo → Probabilitat incerta (crisi!)

Com detectar-ho: Només es pot detectar amb certesa si tenim les etiquetes reals de producció. Altrament, podem inferir-ho si detectem degradació en les mètriques del model.

Detecció de drift amb mètodes estadístics

Per detectar drift, comparem distribucions: les dades d’entrenament (o d’un període de referència) contra les dades actuals de producció. Hi ha diversos mètodes estadístics per fer-ho.

Tests de dues mostres (Two-sample tests)

Aquests tests responen a la pregunta: “Aquestes dues mostres provenen de la mateixa distribució?”

Test de Kolmogorov-Smirnov (KS)

El test KS mesura la màxima diferència entre les funcions de distribució acumulada de dues mostres. És útil per a variables numèriques contínues.

from scipy import stats def detect_drift_ks(reference_data, current_data, features, alpha: float = 0.05) -> list[str]: """Detect drift using Kolmogorov-Smirnov test.""" alerts = [] for feature_name in features: statistic, p_value = stats.ks_2samp( reference_data[feature_name], current_data[feature_name] ) if p_value < alpha: alerts.append(f"Drift detected in {feature_name} (p={p_value:.4f})") return alerts

Interpretació: Un p-valor baix (típicament < 0.05) indica que les dues mostres probablement no vénen de la mateixa distribució.

Test Chi-quadrat (χ²)

Per a variables categòriques, el test chi-quadrat compara les freqüències observades amb les esperades.

from scipy import stats # Comptar freqüències reference_counts = reference_data['categoria'].value_counts() current_counts = current_data['categoria'].value_counts() # Assegurar que tenim les mateixes categories all_categories = set(reference_counts.index) | set(current_counts.index) ref_freq = [reference_counts.get(c, 0) for c in all_categories] cur_freq = [current_counts.get(c, 0) for c in all_categories] # Test chi-quadrat statistic, p_value = stats.chisquare(cur_freq, ref_freq)

Population Stability Index (PSI)

El PSI és una mètrica molt utilitzada en la indústria financera per mesurar quant ha canviat una distribució.

import numpy as np def calculate_psi(reference, current, bins: int = 10, smoothing: float = 1e-4) -> float: """ Calculate Population Stability Index. Returns PSI value: < 0.1 insignificant, 0.1-0.2 investigate, > 0.2 action required. """ # Crear bins basats en la distribució de referència breakpoints = np.percentile(reference, np.linspace(0, 100, bins + 1)) breakpoints[0] = -np.inf breakpoints[-1] = np.inf # Calcular proporcions per cada bin ref_counts = np.histogram(reference, bins=breakpoints)[0] cur_counts = np.histogram(current, bins=breakpoints)[0] ref_pct = ref_counts / len(reference) cur_pct = cur_counts / len(current) # Evitar divisió per zero ref_pct = np.where(ref_pct == 0, smoothing, ref_pct) cur_pct = np.where(cur_pct == 0, smoothing, cur_pct) # Calcular PSI psi = np.sum((cur_pct - ref_pct) * np.log(cur_pct / ref_pct)) return psi

Interpretació del PSI:

  • PSI < 0.1: Canvi insignificant
  • 0.1 ≤ PSI < 0.2: Canvi moderat, investigar
  • PSI ≥ 0.2: Canvi significatiu, acció requerida

Finestres de temps

Per aplicar aquests tests en producció, necessitem definir finestres de temps (time windows):

  • Finestra de referència: Dades d’entrenament o d’un període estable
  • Finestra actual: Dades recents de producció (últimes hores, dies, o setmanes)

No

Dades històriques
Referència

Comparar

Dades recents
Última setmana

Drift
detectat?

Alerta

Tot correcte

Monitoratge de qualitat de dades

La detecció de drift és essencial, però no és l’única cosa que hem de monitoritzar. Les dades poden tenir problemes de qualitat que afecten el model sense que hi hagi drift estadístic.

Tipus de problemes de qualitat

1. Integritat de les dades

  • Valors nuls: percentatge de missing values per feature
  • Duplicats: registres repetits
  • Violacions d’esquema: tipus de dades incorrectes, columnes inesperades

2. Qualitat de les features

  • Outliers: valors fora del rang esperat
  • Violacions de rang: valors fora dels límits coneguts
  • Canvis de cardinalitat: noves categories en features categòriques

3. Qualitat de les prediccions

  • Distribució de confiança: les prediccions són menys segures?
  • Calibratge: les probabilitats predites coincideixen amb les freqüències reals?

Exemple de validació

import numpy as np import pandas as pd def check_data_quality(df: pd.DataFrame, reference_stats: dict) -> list[str]: """Comprova problemes de qualitat de dades.""" issues = [] # Valors nuls null_rate = df.isnull().mean() if (null_rate > 0.05).any(): issues.append(f"Taxa de nuls alta: {null_rate[null_rate > 0.05].to_dict()}") # Outliers (mètode z-score) for col in df.select_dtypes(include=[np.number]).columns: z_scores = np.abs((df[col] - df[col].mean()) / df[col].std()) outlier_rate = (z_scores > 3).mean() if outlier_rate > 0.05: issues.append(f"Outliers en {col}: {outlier_rate:.2%}") # Noves categories for col in df.select_dtypes(include=['object', 'category']).columns: known_cats = set(reference_stats['categories'][col]) new_cats = set(df[col].unique()) - known_cats if new_cats: issues.append(f"Noves categories en {col}: {new_cats}") return issues

Integració amb el pipeline:

És important validar les dades abans d’usar-les per entrenar:

import logging logger = logging.getLogger(__name__) def validate_and_prepare_data(raw_data, reference_stats): """Pipeline de validació i preparació.""" # 1. Validar qualitat quality_issues = check_data_quality(raw_data, reference_stats) if quality_issues: logger.warning(f"Problemes de qualitat detectats: {quality_issues}") # Decidir si continuar o abortar if critical_issues(quality_issues): raise ValueError("Dades amb problemes crítics") # 2. Preprocessar processed_data = preprocess(raw_data) return processed_data

Diferència clau: El drift mesura canvis en distribucions; la qualitat mesura problemes en les dades mateixes.

Monitoratge en producció

Detectar drift és només una part del monitoratge. Un sistema de monitoratge complet ha de vigilar múltiples aspectes.

Emmagatzematge de prediccions per monitoratge

Abans de poder monitoritzar el nostre model, necessitem guardar les prediccions que fem en producció. Sense aquest registre, no podem detectar drift, calcular mètriques, ni analitzar el comportament del model.

Què cal guardar?

Per a cada predicció, hauríem de registrar:

Essencial:

  • Timestamp: Quan s’ha fet la predicció
  • Input features: Les dades d’entrada (sense informació sensible!)
  • Prediction: La sortida del model
  • Request ID: Identificador únic de la petició (opcional però útil)

Recomanat:

  • Model version: Quina versió del model ha fet la predicció
  • Confidence/probability: Si el model retorna probabilitats
  • Latency: Temps que ha trigat la predicció

Opcional:

  • User ID: Si és rellevant i permet privacitat
  • Context: Informació addicional del context de la petició

On guardar les prediccions?

Tenim diverses opcions, depenent de l’escala i complexitat:

1. Fitxers de log (més simple)

Per a volums baixos-mitjans, logs estructurats en JSON són suficients:

import logging import json from datetime import datetime # Configurar logger per prediccions pred_logger = logging.getLogger('predictions') pred_logger.setLevel(logging.INFO) # Handler per escriure a fitxer handler = logging.FileHandler('logs/predictions.jsonl') # .jsonl = JSON Lines pred_logger.addHandler(handler) def log_prediction(inputs: dict, prediction: float, model_version: str, latency: float): """Registra una predicció en format JSON.""" log_entry = { 'timestamp': datetime.now().isoformat(), 'inputs': inputs, 'prediction': float(prediction), 'model_version': model_version, 'latency_ms': latency * 1000 } pred_logger.info(json.dumps(log_entry)) # Ús a l'API @app.post("/predict") def predict(data: InputData): start = time.time() prediction = model.predict([data.features])[0] latency = time.time() - start # Registrar predicció log_prediction( inputs=data.dict(), prediction=prediction, model_version='1.2.0', latency=latency ) return {"prediction": float(prediction)}

Això genera un fitxer predictions.jsonl amb una línia per predicció:

{"timestamp": "2024-01-15T10:30:00", "inputs": {"age": 30, "income": 50000}, "prediction": 0.85, "model_version": "1.2.0", "latency_ms": 45.2} {"timestamp": "2024-01-15T10:31:15", "inputs": {"age": 45, "income": 75000}, "prediction": 0.92, "model_version": "1.2.0", "latency_ms": 38.7}

Avantatges: Simple, no requereix base de dades, fàcil d’analitzar Desavantatges: Menys eficient per a grans volums, queries més lentes

2. Base de dades SQLite (mitjà volum)

Per a més volum, una base de dades local és millor:

import sqlite3 from datetime import datetime def init_predictions_db(db_path: str = 'predictions.db'): """Inicialitza la base de dades de prediccions.""" with sqlite3.connect(db_path) as conn: cursor = conn.cursor() cursor.execute(''' CREATE TABLE IF NOT EXISTS predictions ( id INTEGER PRIMARY KEY AUTOINCREMENT, timestamp TEXT NOT NULL, input_age REAL, input_income REAL, prediction REAL NOT NULL, probability REAL, model_version TEXT, latency_ms REAL ) ''') # Índex per timestamp (per queries ràpides) cursor.execute('CREATE INDEX IF NOT EXISTS idx_timestamp ON predictions(timestamp)') conn.commit() def save_prediction(inputs: dict, prediction: float, model_version: str, latency: float): """Guarda predicció a SQLite.""" with sqlite3.connect('predictions.db') as conn: cursor = conn.cursor() cursor.execute(''' INSERT INTO predictions (timestamp, input_age, input_income, prediction, model_version, latency_ms) VALUES (?, ?, ?, ?, ?, ?) ''', ( datetime.now().isoformat(), inputs.get('age'), inputs.get('income'), prediction, model_version, latency * 1000 )) conn.commit()

Avantatges: Queries SQL ràpides, bon rendiment, backups fàcils Desavantatges: Limitat a un sol servidor (no distribuït)

3. PostgreSQL (alt volum, producció)

Per a sistemes amb alt tràfic, una base de dades robusta:

import psycopg2 from datetime import datetime def save_prediction_postgres(inputs: dict, prediction: float, model_version: str): """Guarda predicció a PostgreSQL.""" with psycopg2.connect("postgresql://user:pass@localhost/mldb") as conn: cursor = conn.cursor() cursor.execute(''' INSERT INTO predictions (timestamp, inputs, prediction, model_version) VALUES (%s, %s, %s, %s) ''', ( datetime.now(), json.dumps(inputs), # Guardar inputs com JSONB prediction, model_version )) conn.commit()

Avantatges: Escalable, suporta concurrència, queries complexes Desavantatges: Més complex de configurar i mantenir

Consideracions de privacitat i seguretat

⚠️ Molt important: No guardar dades sensibles sense anonimitzar!

def anonymize_inputs(inputs: dict) -> dict: """Elimina o anonimitza dades sensibles.""" safe_inputs = inputs.copy() # Eliminar camps sensibles sensitive_fields = ['email', 'phone', 'ssn', 'credit_card'] for field in sensitive_fields: if field in safe_inputs: safe_inputs[field] = '***REDACTED***' # Hash d'identificadors si cal mantenir-los if 'user_id' in safe_inputs: safe_inputs['user_id'] = hashlib.sha256( str(safe_inputs['user_id']).encode() ).hexdigest()[:16] return safe_inputs # Usar abans de guardar safe_inputs = anonymize_inputs(raw_inputs) log_prediction(safe_inputs, prediction, model_version, latency)

Integració completa amb FastAPI

Exemple complet d’API amb logging:

from fastapi import FastAPI import joblib import json import time import logging app = FastAPI() # Configurar logging pred_logger = logging.getLogger('predictions') pred_logger.setLevel(logging.INFO) handler = logging.FileHandler('logs/predictions.jsonl') pred_logger.addHandler(handler) # Carregar model model = joblib.load('models/current_model.pkl') with open('models/current_metadata.json') as f: metadata = json.load(f) @app.post("/predict") def predict(data: InputData): """Endpoint de predicció amb logging.""" start_time = time.time() # Predicció features = [data.age, data.income, data.score] prediction = model.predict([features])[0] probability = model.predict_proba([features])[0] if hasattr(model, 'predict_proba') else None latency = time.time() - start_time # Logging log_entry = { 'timestamp': datetime.now().isoformat(), 'inputs': { 'age': data.age, 'income': data.income, 'score': data.score }, 'prediction': float(prediction), 'probability': float(probability[1]) if probability is not None else None, 'model_version': metadata['version'], 'latency_ms': latency * 1000 } pred_logger.info(json.dumps(log_entry)) return { 'prediction': float(prediction), 'probability': float(probability[1]) if probability is not None else None, 'model_version': metadata['version'] }

Llegir i analitzar logs per monitoratge

Un cop tenim logs, podem analitzar-los per detectar drift:

import pandas as pd import json def load_predictions_from_logs(log_file: str, hours: int = 24) -> pd.DataFrame: """ Carrega prediccions dels logs de les últimes N hores. Returns: DataFrame amb columnes: timestamp, inputs, prediction, model_version, etc. """ from datetime import datetime, timedelta cutoff = datetime.now() - timedelta(hours=hours) predictions = [] with open(log_file, 'r') as f: for line in f: try: entry = json.loads(line.strip()) timestamp = datetime.fromisoformat(entry['timestamp']) if timestamp >= cutoff: # Flattenar inputs en columnes separades row = { 'timestamp': timestamp, 'prediction': entry['prediction'], 'model_version': entry.get('model_version'), 'latency_ms': entry.get('latency_ms'), **entry['inputs'] # Unpack inputs com a columnes } predictions.append(row) except (json.JSONDecodeError, KeyError) as e: continue # Skip línies malformades return pd.DataFrame(predictions) # Ús recent_predictions = load_predictions_from_logs('logs/predictions.jsonl', hours=24) print(f"Prediccions últimes 24h: {len(recent_predictions)}") print(f"Latència mitjana: {recent_predictions['latency_ms'].mean():.1f}ms") print(f"Distribució prediccions:\n{recent_predictions['prediction'].value_counts()}")

Resum: estratègia de logging

Per començar (projectes petits):

  • ✅ Logs JSON en fitxers (predictions.jsonl)
  • ✅ Guardar: timestamp, inputs, prediction, model_version
  • ✅ Analitzar amb scripts Python/pandas periòdicament

Per escalar (projectes mitjans):

  • ✅ SQLite per emmagatzematge més eficient
  • ✅ Índexs per timestamps per queries ràpides
  • ✅ Scripts automatitzats (cron) per analitzar diàriament

Per producció (projectes grans):

  • ✅ PostgreSQL o base de dades distribuïda
  • ✅ Anonymització automàtica de dades sensibles
  • ✅ Backups automàtics i retenció de dades
  • ✅ Monitoratge en temps real amb alertes

Amb prediccions guardades, podem començar a detectar drift i monitoritzar el model!

Què monitoritzar?

1. Mètriques del model (Model metrics)

Si tenim accés a les etiquetes reals (encara que sigui amb retard), podem calcular les mètriques tradicionals:

  • Accuracy, precision, recall, F1 per classificació
  • MSE, MAE, R² per regressió
import logging from datetime import datetime # Configurar named logger logger = logging.getLogger(__name__) handler = logging.FileHandler('model_metrics.log') handler.setFormatter(logging.Formatter('%(asctime)s - %(message)s')) logger.addHandler(handler) logger.setLevel(logging.INFO) def log_prediction(prediction, actual=None): """Registra cada predicció i, si està disponible, el valor real.""" log_data = { 'prediction': prediction, 'actual': actual, 'correct': (prediction == actual) if actual is not None else None } logger.info(f"Prediction: {log_data}")

2. Distribució de les prediccions

Fins i tot sense etiquetes, podem monitoritzar la distribució de les prediccions:

  • Per classificació: Proporció de cada classe predita
  • Per regressió: Mitjana, mediana, rang de les prediccions
import numpy as np import logging logger = logging.getLogger(__name__) def log_prediction_distribution(predictions: np.ndarray, window_name: str) -> None: """Registra estadístiques de les prediccions.""" stats = { 'window': window_name, 'count': len(predictions), 'mean': float(np.mean(predictions)), 'std': float(np.std(predictions)), 'min': float(np.min(predictions)), 'max': float(np.max(predictions)) } logger.info(f"Prediction stats: {stats}")

Si la distribució de prediccions canvia dràsticament, pot ser senyal de drift.

3. Distribució de les features

Monitoritzar les features d’entrada ens permet detectar covariate shift:

def check_feature_drift(reference_stats, current_data, threshold: float = 3.0) -> list[str]: """Compara estadístiques de features actuals amb la referència.""" alerts = [] for feature in reference_stats.keys(): ref_mean = reference_stats[feature]['mean'] ref_std = reference_stats[feature]['std'] current_mean = current_data[feature].mean() # Alerta si la mitjana actual està a més de N desviacions deviation = abs(current_mean - ref_mean) / ref_std if ref_std > 0 else 0 if deviation > threshold: alerts.append(f"Feature '{feature}' deviates {deviation:.2f}σ from reference") return alerts

4. Mètriques operacionals

A més del model, hem de monitoritzar el sistema:

  • Latència: Temps de resposta de l’API
  • Throughput: Nombre de peticions per segon
  • Errors: Taxa d’errors (5xx, timeouts)
  • Recursos: Ús de CPU, memòria, disc
import time import logging logger = logging.getLogger(__name__) def predict_with_monitoring(model, data): """Fa una predicció i registra mètriques operacionals.""" start_time = time.time() try: prediction = model.predict(data) latency = time.time() - start_time logger.info(f"Prediction successful, latency: {latency:.3f}s") return prediction except Exception as e: logger.error(f"Prediction failed: {str(e)}") raise

Arquitectura de monitoratge simple

Per al nostre context (scripts Python, sense dependències complexes), podem implementar un sistema de monitoratge basat en:

  1. Logging estructurat: Registrar tot en fitxers de log amb format consistent
  2. Scripts d’anàlisi: Scripts que processen els logs periòdicament
  3. Visualització simple: Gràfics amb matplotlib o similar

Alertes

Anàlisi

Producció

API FastAPI

Fitxers de log

Script d'anàlisi
cron job diari

Estadístiques

Detecció de drift

Gràfics

Llindars
superats?

Email/Slack

Exemple d’script d’anàlisi

import json from datetime import datetime, timedelta import numpy as np import matplotlib.pyplot as plt def analyze_logs(log_file: str, hours: int = 24): """Analitza logs JSON estructurats de les últimes N hores.""" cutoff = datetime.now() - timedelta(hours=hours) latencies = [] errors = 0 with open(log_file, 'r') as f: for line in f: try: # Parsejar log JSON log_entry = json.loads(line) timestamp = datetime.fromisoformat(log_entry['timestamp']) if timestamp < cutoff: continue # Processar segons el tipus de log if log_entry.get('level') == 'INFO' and 'latency' in log_entry.get('message', ''): # Extreure latència del missatge o camps extra latency = float(log_entry['message'].split('latency: ')[1].split('s')[0]) latencies.append(latency) elif log_entry.get('level') == 'ERROR': errors += 1 except (json.JSONDecodeError, KeyError, ValueError): continue # Generar informe if latencies: print(f"Període: últimes {hours} hores") print(f"Total prediccions: {len(latencies)}") print(f"Errors: {errors}") print(f"Latència mitjana: {np.mean(latencies):.3f}s") print(f"Latència p95: {np.percentile(latencies, 95):.3f}s") # Gràfic de latències plt.figure(figsize=(10, 4)) plt.hist(latencies, bins=50) plt.xlabel('Latència (s)') plt.ylabel('Freqüència') plt.title('Distribució de latències') plt.savefig('latency_distribution.png')

Observabilitat (Observability)

El monitoratge ens diu què passa (mètriques, alertes), però l’observabilitat ens permet entendre per què passa.

Els tres pilars de l’observabilitat

  1. Mètriques (Metrics): Valors numèrics agregats (latència mitjana, taxa d’errors)
  2. Logs: Registres detallats d’esdeveniments individuals
  3. Traces: Seguiment del camí d’una petició a través del sistema

Per al nostre context simplificat, ens centrarem en logs ben estructurats que ens permetin fer anàlisi posterior.

Logs estructurats

En lloc de logs de text lliure, utilitzar format JSON facilita l’anàlisi:

import json import logging from datetime import datetime class JSONFormatter(logging.Formatter): def format(self, record): log_data = { 'timestamp': datetime.fromtimestamp(record.created).isoformat(), 'level': record.levelname, 'logger': record.name, 'message': record.getMessage(), } # Afegir camps extra si existeixen if hasattr(record, 'extra_data'): log_data.update(record.extra_data) return json.dumps(log_data) # Configurar el logger logger = logging.getLogger('model_api') logger.setLevel(logging.INFO) handler = logging.FileHandler('app.log') handler.setFormatter(JSONFormatter()) logger.addHandler(handler)

Exemple de log estructurat per a prediccions

def log_prediction_detailed(request_id: str, input_data: dict, prediction: float, latency: float) -> None: """Registra una predicció amb context complet.""" log_data = { 'event': 'prediction', 'request_id': request_id, 'input_features': { 'feature_1': input_data.feature_1, 'feature_2': input_data.feature_2, # ... no incloure dades sensibles }, 'prediction': prediction, 'latency_ms': latency * 1000, } logger.info('Prediction completed', extra={'extra_data': log_data})

Això genera logs com:

{"timestamp": "2024-01-15T10:30:45", "level": "INFO", "message": "Prediction completed", "event": "prediction", "request_id": "abc123", "input_features": {"feature_1": 0.5, "feature_2": 1.2}, "prediction": 0.85, "latency_ms": 45.2}

Establint alertes

Les alertes ens notifiquen quan alguna cosa requereix atenció. La clau és trobar l’equilibri: prou sensibles per detectar problemes, però no tant que generin fatiga d’alertes.

Tipus d’alertes

  1. Alertes de llindar (Threshold alerts): Quan una mètrica supera un valor fix

    • Latència > 500ms
    • Taxa d’errors > 1%
    • PSI > 0.2
  2. Alertes d’anomalia: Quan una mètrica es desvia significativament del seu comportament normal

    • Nombre de peticions 50% inferior a la mitjana de la mateixa hora
  3. Alertes de tendència: Quan una mètrica mostra una tendència preocupant

    • Latència augmentant progressivament durant 3 dies

Implementació simple d’alertes

def check_alerts(metrics: dict, thresholds: dict = None) -> list[dict]: """Comprova si cal generar alertes.""" if thresholds is None: thresholds = { 'latency_p95': 0.5, 'error_rate': 0.01, 'psi': 0.2 } alerts = [] # Alerta de latència if metrics.get('latency_p95', 0) > thresholds['latency_p95']: alerts.append({ 'severity': 'warning', 'message': f"High p95 latency: {metrics['latency_p95']:.2f}s" }) # Alerta d'errors if metrics.get('error_rate', 0) > thresholds['error_rate']: alerts.append({ 'severity': 'critical', 'message': f"High error rate: {metrics['error_rate']:.2%}" }) # Alerta de drift if metrics.get('psi', 0) > thresholds['psi']: alerts.append({ 'severity': 'warning', 'message': f"Data drift detected (PSI={metrics['psi']:.3f})" }) return alerts def send_alerts(alerts): """Envia alertes (per email, fitxer, etc.).""" for alert in alerts: # Exemple simple: escriure a fitxer with open('alerts.log', 'a') as f: f.write(f"{datetime.now()} [{alert['severity']}] {alert['message']}\n") # Aquí podríem afegir enviament per email, Slack, etc.

Resum

En aquest capítol hem après:

  • El drift (canvi en la distribució de dades) és inevitable en producció i pot degradar el rendiment del model
  • Hi ha tres tipus principals: covariate shift (canvi en features), label shift (canvi en distribució d’etiquetes), i concept drift (canvi en la relació entre features i etiquetes)
  • Podem detectar drift amb tests estadístics com Kolmogorov-Smirnov, chi-quadrat, o el Population Stability Index (PSI)
  • Un sistema de monitoratge complet vigila mètriques del model, distribució de prediccions i features, i mètriques operacionals
  • L’observabilitat (especialment logs estructurats) ens permet entendre per què passen els problemes
  • Les alertes ens notifiquen quan cal actuar, però cal calibrar-les per evitar fatiga

Al proper capítol veurem què fer quan detectem que el model necessita ser actualitzat: l’aprenentatge continu i les estratègies per testar models en producció.