Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Aprenentatge Continu i Test en Producció

Introducció

Al capítol anterior hem vist com detectar quan les dades canvien i el model comença a degradar-se. La pregunta natural és: què fem quan això passa? La resposta és actualitzar el model, però això obre tot un conjunt de nous reptes.

L’aprenentatge continu (continual learning) és el procés d’actualitzar models en producció per adaptar-se a les dades canviants. No es tracta simplement de re-entrenar de tant en tant, sinó de crear un sistema que pugui evolucionar de manera fiable i segura.

En aquest capítol explorarem quan i com actualitzar models, i les estratègies per provar models nous en producció sense arriscar el servei.

Per què actualitzar models?

Recordem els tipus de drift que hem vist:

  • Covariate shift: Les features canvien, cal que el model s’adapti a nous patrons d’entrada
  • Label shift: La proporció de classes canvia, cal recalibrar
  • Concept drift: La relació entre features i sortida canvia, el model ha après coses que ja no són certes

A més del drift, hi ha altres raons per actualitzar:

  • Més dades disponibles: Hem acumulat més exemples que poden millorar el model
  • Noves features: Tenim accés a dades noves que poden ser predictives
  • Correccions de bugs: Hem descobert errors en el preprocessament o l’entrenament
  • Millores algorítmiques: Volem provar un algoritme o arquitectura millor

Iteració de dades vs. iteració de model

Quan el rendiment del model degrada, tenim dues estratègies diferents que sovint es confonen:

Iteració de dades (Data Iteration)

Re-entrenar el model amb dades noves, mantenint l’arquitectura.

  • Mateixa arquitectura de model i features
  • Afegir mostres noves (nous patrons, concept drift)
  • Exemple: Model de frau re-entrenat setmanalment amb nous casos de frau (mateixa arquitectura)

Iteració de model (Model Iteration)

Canviar l’arquitectura o afegir noves features.

  • Afegir noves features (noves fonts de dades)
  • Canviar arquitectura (algoritme diferent, més complexitat)
  • Ajustar hiperparàmetres
  • Exemple: Model de crèdit que afegeix “historial laboral” O canvia de regressió logística a XGBoost

Quan usar cada estratègia?

SímptomaCausa ProbableEstratègia
Model funciona bé inicialment, degrada amb el tempsConcept driftIteració de dades
Model mai arriba a la performance objectiuModel massa simpleIteració de model
Bo en entrenament, dolent en produccióDistribution shiftIteració de dades
Performance s’estanca malgrat més dadesModel saturat o features insuficientsIteració de model
Noves fonts d’informació disponiblesNous senyals predictiusIteració de model

Implicacions en el pipeline

Pipeline d’iteració de dades:

  1. Recollir dades noves amb ground truth
  2. Validar qualitat de dades
  3. Re-entrenar mateixa arquitectura amb dades noves/actualitzades
  4. Mantenir features i hiperparàmetres
  5. Desplegar si la validació passa

Pipeline d’iteració de model:

  1. Fixar datasets d’entrenament/validació/test (versionats!)
  2. Afegir noves features O canviar arquitectura
  3. Entrenar i comparar candidats sobre el mateix test set
  4. Seleccionar millor configuració
  5. Re-entrenar amb totes les dades i desplegar

Enfocament híbrid (més comú en producció)

  • Iteració de dades regular (setmanal/mensual) → manté el model fresc
  • Iteració de model ocasional (trimestral/anual) → millora capacitat fonamental

Relació amb valor de negoci:

  • Iteració de dades → s’adapta al món canviant (patrons de frau evolucionen)
  • Iteració de model → millora capacitat fonamental (nous senyals, millor arquitectura)

Amb quina freqüència actualitzar?

No hi ha una resposta universal. Depèn de:

Velocitat del canvi en les dades

Si les dades canvien ràpidament (per exemple, tendències en xarxes socials), necessitem actualitzacions freqüents. Si són estables (per exemple, detecció de defectes en manufactura), podem actualitzar menys sovint.

Cost de l’actualització

Cada actualització té costos:

  • Computacional: Temps de GPU/CPU per entrenar
  • Humà: Temps d’enginyers per validar i desplegar
  • Risc: Possibilitat d’introduir regressions

Valor de la frescor (freshness)

No sempre tenir el model més recent és millor. Cal preguntar-se: quant guanyem amb dades més recents?

Un estudi de Facebook va mostrar que passar d’entrenament setmanal a diari reduïa la pèrdua del model en un 1%. Per a alguns casos, aquest 1% pot valer milions; per a altres, no justifica l’esforç.

Estratègies comunes

EstratègiaFreqüènciaCasos d’ús
Ad-hocQuan calguiModels estables, canvis rars
ProgramadaSetmanal/mensualCanvis graduals previsibles
Basada en triggersQuan es detecta driftCanvis imprevisibles
ContínuaDiària o mésCanvis molt ràpids

Paradigmes d’actualització: Batch, Streaming, i Continual Learning

El terme “aprenentatge continu” (continual learning) pot ser ambigu. Aquí definirem clarament tres paradigmes diferents.

Què és Continual Learning?

Continual Learning no es refereix només a algoritmes d’aprenentatge incremental, sinó a crear infraestructura que permeti actualitzar models ràpidament quan calgui — des de zero o amb fine-tuning — i desplegar-los de manera àgil.

És una qüestió d’infraestructura i procés, no només d’algoritme.

Tres paradigmes d’actualització

1. Batch Retraining (Aprenentatge per lots)

  • Model entrenat des de zero a intervals discrets
  • Re-entrenament programat (setmanal, mensual)
  • Pipeline de desplegament lent (dies a setmanes)
  • Exemple: Re-entrenament mensual amb aprovació manual

2. Streaming Retraining (Re-entrenament amb finestres)

  • Model re-entrenat des de zero sobre finestres de dades recents
  • Finestra lliscant o expansiva d’exemples recents
  • Pipeline més ràpid (hores a dies)
  • Exemple: Re-entrenament diari amb els últims 30 dies de dades

3. Continual Learning (Infraestructura per actualitzacions ràpides)

  • Infraestructura que suporta actualitzacions ràpides quan calgui
  • Pot usar re-entrenament stateless (des de zero) O stateful (fine-tuning)
  • Pipeline de desplegament ràpid (minuts a hores)
  • Monitoratge, validació, i desplegament automatitzats
  • Exemple: Re-entrenament automàtic quan es detecta drift, amb A/B testing i rollback automàtic

Taula comparativa

AspecteBatchStreamingContinual Learning
InfraestructuraManual/programadaSemi-automatitzadaTotalment automatitzada
Velocitat desplegamentDies a setmanesHores a diesMinuts a hores
TriggerProgramat o manualProgramat (freqüent)Automàtic (drift, performance)
ValidacióRevisió manualAutomatitzada amb llindarsAutomatitzada + A/B testing
MonitoratgeDashboards periòdicsMètriques contínuesAlertes en temps real + auto-acció
RollbackManualSemi-automàticAutomàtic (instantani)
Millor perDades establesVelocitat moderadaEntorns molt canviants

Quan usar cada paradigma?

Batch Retraining:

  • Canvis en dades lents i predictibles
  • Revisió manual requerida (mèdic, financer)
  • Equip petit, infraestructura simple
  • Freqüència de setmanes/mesos acceptable

Streaming Retraining:

  • Dades arriben contínuament, patrons canvien moderadament
  • Equilibri entre frescor i validació
  • Es poden definir finestres temporals significatives

Continual Learning:

  • Patrons canvien ràpidament i impredictiblement (frau, spam)
  • Necessitat de respondre a drift en hores
  • Recursos d’enginyeria per pipelines automatitzats
  • Flexibilitat: re-entrenament des de zero O fine-tuning

Desafiaments de Continual Learning

Challenge: Validació automatitzada

  • Solució: Test suites complets, holdout sets, A/B testing

Challenge: Detecció de drift i triggering

  • Solució: Monitoritzar distribucions, performance, confiança; activar per llindars

Challenge: Pipeline de desplegament ràpid

  • Solució: Contenidors, CI/CD, testing automatitzat

Challenge: Rollback i versionat

  • Solució: Model registry, blue-green deployment, rollback automàtic

Evitar sobre-enginyeria: La majoria de sistemes usen batch o streaming. Només implementar continual learning quan:

  1. Negoci requereix actualitzacions ràpides (hores, no dies)
  2. Patrons canvien ràpidament
  3. Tens recursos d’enginyeria per mantenir pipelines automatitzats

Casos d’ús i estratègies de re-entrenament

Un cop hem vist els paradigmes d’actualització, vegem com s’apliquen en contextos reals. La taula següent mostra diferents casos d’ús amb les seves característiques i l’estratègia de re-entrenament més adequada.

Cas d’úsGround TruthDelayParadigmaFreqüència
Recomanador e-commerceAutomàtic (clicks/compres)Hores/diesStreamingDiari
Diagnòstic mèdicManual (experts)SetmanesBatchMensual
Detecció de frau bancariAutomàtic (verificació)DiesContinual LearningQuan drift
Filtre de spamAutomàtic (feedback usuaris)ImmediatContinual LearningContinu
Predicció ocupació transportAutomàtic (sensors)MinutsStreamingDiari
Classificació documents admin.Manual (revisió)SetmanesBatchMensual
Detecció defectes manufacturaAutomàtic (sensors)Minuts/horesContinual LearningTemps real
Moderació xarxes socialsMixt (reports + signals)Hores/diesStreamingDiari
Predicció demanda energiaAutomàtic (comptadors)HoresStreamingDiari
Sistema recomanació bibliotecaAutomàtic (préstecs)DiesStreamingSetmanal

Patrons observables

Analitzant aquests casos, veiem diversos patrons que ens ajuden a decidir l’estratègia:

Ground truth automàtic + delay curt → Continual Learning o Streaming

  • Exemples: Spam filtering, detecció de frau, predicció de transport
  • Permet actualitzacions freqüents i automatitzades
  • Avantatge: model sempre actualitzat amb els patrons més recents

Ground truth manual → Batch

  • Exemples: Diagnòstic mèdic, classificació de documents
  • Requereix revisió humana, actualitzacions menys freqüents
  • Avantatge: alta qualitat de les etiquetes, validació experta

Ground truth mixt → Streaming

  • Exemples: Moderació de contingut, sistemes de recomanació
  • Equilibri entre automatització i qualitat
  • Avantatge: combina volum (automàtic) amb precisió (manual)

Crític per seguretat/salut → Batch amb validació humana

  • Exemples: Diagnòstic mèdic, detecció de defectes crítics
  • Encara que hi hagi drift, la revisió manual és imprescindible
  • Avantatge: control de qualitat abans de desplegament

Consell pràctic: Comença sempre amb Batch i evoluciona cap a Streaming o Continual Learning només si:

  1. Tens prou dades noves cada dia/setmana
  2. Pots automatitzar la validació de manera fiable
  3. El valor de negoci justifica la complexitat addicional

Entrenament amb estat vs. sense estat

Quan re-entrenem, tenim dues opcions principals:

Entrenament sense estat (Stateless training)

Entrenem el model des de zero amb totes les dades disponibles (històriques + noves).

Dades històriques
+ Dades noves

Entrenament
des de zero

Model nou

Avantatges:

  • Simple conceptualment
  • El model veu totes les dades
  • No arrossega errors anteriors

Desavantatges:

  • Costós computacionalment si hi ha moltes dades
  • Pot “oblidar” patrons antics si les dades recents dominen

Entrenament amb estat (Stateful training)

Continuem entrenant el model existent amb només les dades noves.

Model actual

Entrenament
incremental

Dades noves

Model actualitzat

Avantatges:

  • Molt més ràpid
  • Menys recursos computacionals
  • Ideal per a actualitzacions freqüents

Desavantatges:

  • Risc d’oblit catastròfic (catastrophic forgetting): el model oblida el que sabia
  • Pot acumular errors

Exemple real: Grubhub va passar d’entrenament setmanal sense estat a entrenament diari amb estat, reduint el cost computacional 45 vegades i augmentant les conversions un 20%.

Quin triar?

Una estratègia comuna és combinar-los:

  • Entrenament amb estat per a actualitzacions freqüents (diari/setmanal)
  • Entrenament sense estat periòdicament (mensual/trimestral) per “netejar” el model

Automatització del pipeline d’actualització

Per fer l’actualització sostenible, necessitem automatitzar-la. Aquí presentem un esquema basat en scripts Python simples.

Components del pipeline

No

Recollir dades noves

Validar dades

Entrenar model

Avaluar model

Prou bo?

Registrar model

Alerta + Revisió manual

Desplegar

Exemple de script d’actualització

#!/usr/bin/env python3 """ Script d'actualització automàtica del model. Pensat per executar-se periòdicament (cron job). """ import os import json import logging from datetime import datetime import joblib import pandas as pd from sklearn.ensemble import RandomForestClassifier from sklearn.model_selection import train_test_split from sklearn.metrics import accuracy_score, f1_score # Configurar logging logging.basicConfig( filename='/logs/training.log', level=logging.INFO, format='%(asctime)s - %(message)s' ) logger = logging.getLogger(__name__) # Configuració import os CONFIG = { 'data_path': os.getenv('DATA_PATH', '/data/training_data.csv'), 'model_path': os.getenv('MODEL_PATH', '/models/current_model.pkl'), 'new_model_path': os.getenv('NEW_MODEL_PATH', '/models/candidate_model.pkl'), 'metrics_threshold': { 'accuracy': 0.85, 'f1': 0.80 } } def load_new_data() -> pd.DataFrame: """Carrega les dades noves per a entrenament.""" logger.info("Carregant dades...") df = pd.read_csv(CONFIG['data_path']) logger.info(f"Carregades {len(df)} mostres") return df def validate_data(df: pd.DataFrame) -> bool: """Valida que les dades són correctes.""" logger.info("Validant dades...") # Comprovar valors nuls null_pct = df.isnull().sum().sum() / df.size if null_pct > 0.05: raise ValueError(f"Massa valors nuls: {null_pct:.2%}") # Comprovar mínim de mostres if len(df) < 1000: raise ValueError(f"Poques mostres: {len(df)}") logger.info("Dades validades correctament") return True def train_model(X_train: np.ndarray, y_train: np.ndarray) -> RandomForestClassifier: """Entrena un nou model.""" logger.info("Entrenant model...") model = RandomForestClassifier( n_estimators=100, max_depth=10, random_state=42 ) model.fit(X_train, y_train) logger.info("Model entrenat") return model def evaluate_model(model: RandomForestClassifier, X_test: np.ndarray, y_test: np.ndarray) -> dict[str, float]: """Avalua el model i retorna mètriques.""" logger.info("Avaluant model...") predictions = model.predict(X_test) metrics = { 'accuracy': accuracy_score(y_test, predictions), 'f1': f1_score(y_test, predictions, average='weighted'), 'timestamp': datetime.now().isoformat() } logger.info(f"Mètriques: accuracy={metrics['accuracy']:.4f}, f1={metrics['f1']:.4f}") return metrics def check_thresholds(metrics): """Comprova si el model supera els llindars mínims.""" for metric, threshold in CONFIG['metrics_threshold'].items(): if metrics[metric] < threshold: logger.warning(f"ALERTA: {metric}={metrics[metric]:.4f} < {threshold}") return False return True def compare_with_current(new_metrics): """Compara amb el model actual si existeix.""" metrics_file = CONFIG['model_path'].replace('.pkl', '_metrics.json') if os.path.exists(metrics_file): with open(metrics_file, 'r') as f: current_metrics = json.load(f) # Comprovar que el nou model no és pitjor if new_metrics['accuracy'] < current_metrics['accuracy'] - 0.02: logger.warning("ALERTA: El nou model és significativament pitjor") return False return True def save_model(model, metrics): """Guarda el model i les seves mètriques.""" logger.info(f"Guardant model a {CONFIG['new_model_path']}") joblib.dump(model, CONFIG['new_model_path']) metrics_file = CONFIG['new_model_path'].replace('.pkl', '_metrics.json') with open(metrics_file, 'w') as f: json.dump(metrics, f, indent=2) logger.info("Model guardat") def main(): """Executa el pipeline complet.""" logger.info("=" * 50) logger.info("Iniciant pipeline d'actualització") try: # 1. Carregar dades df = load_new_data() # 2. Validar validate_data(df) # 3. Preparar dades X = df.drop('target', axis=1) y = df['target'] X_train, X_test, y_train, y_test = train_test_split( X, y, test_size=0.2, random_state=42 ) # 4. Entrenar model = train_model(X_train, y_train) # 5. Avaluar metrics = evaluate_model(model, X_test, y_test) # 6. Verificar qualitat if not check_thresholds(metrics): logger.warning("Model rebutjat: no supera llindars") return False if not compare_with_current(metrics): logger.warning("Model rebutjat: pitjor que l'actual") return False # 7. Guardar save_model(model, metrics) logger.info("Pipeline completat amb èxit") return True except Exception as e: logger.error(f"ERROR: {str(e)}") return False if __name__ == '__main__': success = main() exit(0 if success else 1)

Programació amb cron

Per executar l’script automàticament, podem usar cron:

# Executar cada dia a les 3:00 AM 0 3 * * * /usr/bin/python3 /scripts/update_model.py >> /logs/cron.log 2>&1

Comparació de models abans de desplegar

Un dels passos més importants del pipeline d’actualització és comparar el model candidat amb el model actual abans de desplegar. Sense aquesta comparació, podem desplegar models pitjors accidentalment.

Per què comparar?

Problema sense comparació:

  • Entrenem un model nou → sembla “bo” (F1 = 0.78)
  • El despleguem → les mètriques empitjoren
  • Resulta que el model anterior era millor (F1 = 0.82)!

Solució: Sempre comparar candidat vs actual.

Què comparar?

Mètriques tècniques:

  • F1, precision, recall, accuracy
  • ROC-AUC, PR-AUC
  • Per regressió: MAE, RMSE, R²

Mètriques operacionals:

  • Latència de predicció (p50, p95, p99)
  • Mida del model (MB)
  • Temps d’inferència

Mètriques de negoci (si disponibles):

  • Cost estimat de prediccions errònies
  • ROI esperat
  • Impacte en conversió/vendes

Implementació de la comparació

import json import joblib from sklearn.metrics import f1_score, precision_score, recall_score import time def compare_models(current_model_path: str, candidate_model_path: str, X_val, y_val) -> dict: """ Compara model actual vs candidat en múltiples dimensions. Returns: Dict amb comparació i recomanació de desplegament. """ # Carregar models current_model = joblib.load(current_model_path) candidate_model = joblib.load(candidate_model_path) # Calcular mètriques tècniques y_pred_current = current_model.predict(X_val) y_pred_candidate = candidate_model.predict(X_val) current_metrics = { 'f1': f1_score(y_val, y_pred_current, average='weighted'), 'precision': precision_score(y_val, y_pred_current, average='weighted'), 'recall': recall_score(y_val, y_pred_current, average='weighted') } candidate_metrics = { 'f1': f1_score(y_val, y_pred_candidate, average='weighted'), 'precision': precision_score(y_val, y_pred_candidate, average='weighted'), 'recall': recall_score(y_val, y_pred_candidate, average='weighted') } # Calcular latència start = time.time() _ = current_model.predict(X_val[:100]) current_latency = (time.time() - start) / 100 * 1000 # ms per predicció start = time.time() _ = candidate_model.predict(X_val[:100]) candidate_latency = (time.time() - start) / 100 * 1000 # Mida del model import os current_size_mb = os.path.getsize(current_model_path) / (1024 * 1024) candidate_size_mb = os.path.getsize(candidate_model_path) / (1024 * 1024) # Compilar resultats comparison = { 'current': { 'metrics': current_metrics, 'latency_ms': current_latency, 'size_mb': current_size_mb }, 'candidate': { 'metrics': candidate_metrics, 'latency_ms': candidate_latency, 'size_mb': candidate_size_mb }, 'differences': { 'f1': candidate_metrics['f1'] - current_metrics['f1'], 'precision': candidate_metrics['precision'] - current_metrics['precision'], 'recall': candidate_metrics['recall'] - current_metrics['recall'], 'latency_ms': candidate_latency - current_latency, 'size_mb': candidate_size_mb - current_size_mb } } return comparison

Criteris de decisió

Definir regles clares per decidir si desplegar el candidat:

def should_deploy_candidate(comparison: dict, thresholds: dict = None) -> tuple[bool, str]: """ Decideix si desplegar el model candidat. Args: comparison: Resultat de compare_models() thresholds: Criteris de decisió personalitzats Returns: (should_deploy, reason) """ if thresholds is None: thresholds = { 'min_f1_improvement': 0.00, # No empitjorar 'max_f1_degradation': 0.02, # Tolerar petita degradació 'max_latency_increase_pct': 1.5, # Màxim 50% més lent 'max_size_increase_mb': 100 # Màxim 100MB més gran } diff = comparison['differences'] current = comparison['current'] candidate = comparison['candidate'] # Regla 1: F1 no pot degradar més del llindar if diff['f1'] < -thresholds['max_f1_degradation']: return False, f"F1 degradat massa: {diff['f1']:.4f}" # Regla 2: Latència no pot augmentar molt latency_ratio = candidate['latency_ms'] / current['latency_ms'] if latency_ratio > thresholds['max_latency_increase_pct']: return False, f"Latència augmentada {latency_ratio:.1f}x" # Regla 3: Mida no pot créixer massa if diff['size_mb'] > thresholds['max_size_increase_mb']: return False, f"Model massa gran: +{diff['size_mb']:.1f} MB" # Regla 4: Si millora F1, desplegar if diff['f1'] > thresholds['min_f1_improvement']: return True, f"F1 millorat: +{diff['f1']:.4f}" # Regla 5: Si F1 igual però millora recall (depèn del context) if abs(diff['f1']) < 0.01 and diff['recall'] > 0.02: return True, f"Recall millorat: +{diff['recall']:.4f}" # Per defecte: no desplegar si no hi ha millora clara return False, "Cap millora significativa"

Integració al pipeline d’actualització

Actualitzar el script per incloure comparació:

def main(): logger.info("=== Pipeline d'actualització amb comparació ===") # 1. Carregar dades df = load_new_data() validate_data(df) X = df.drop('target', axis=1) y = df['target'] X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2) # 2. Entrenar model candidat logger.info("Entrenant model candidat...") candidate_model = train_model(X_train, y_train) # Guardar temporalment candidate_path = '/models/candidate_model.pkl' joblib.dump(candidate_model, candidate_path) # 3. Comparar amb model actual current_path = '/models/current_model.pkl' if os.path.exists(current_path): logger.info("Comparant amb model actual...") comparison = compare_models(current_path, candidate_path, X_val, y_val) # Mostrar comparació logger.info("Comparació:") logger.info(f" Current F1: {comparison['current']['metrics']['f1']:.4f}") logger.info(f" Candidate F1: {comparison['candidate']['metrics']['f1']:.4f}") logger.info(f" Diferència: {comparison['differences']['f1']:+.4f}") # Decidir si desplegar should_deploy, reason = should_deploy_candidate(comparison) if should_deploy: logger.info(f"✓ DESPLEGANT model nou: {reason}") # Fer backup del model actual backup_path = f'/models/backup_model_{datetime.now().strftime("%Y%m%d_%H%M%S")}.pkl' os.rename(current_path, backup_path) # Desplegar nou model os.rename(candidate_path, current_path) # Guardar metadata de comparació with open('/models/last_comparison.json', 'w') as f: json.dump(comparison, f, indent=2) logger.info("Model desplegat correctament") return True else: logger.warning(f"✗ NO desplegant: {reason}") os.remove(candidate_path) return False else: # Primer model, desplegar directament logger.info("Primer model, desplegant sense comparació") os.rename(candidate_path, current_path) return True if __name__ == '__main__': success = main() exit(0 if success else 1)

Visualització de la comparació

Generar un informe visual per a revisió humana:

def generate_comparison_report(comparison: dict, output_file: str = 'comparison_report.txt'): """Genera informe llegible de la comparació.""" with open(output_file, 'w') as f: f.write("=" * 60 + "\n") f.write("MODEL COMPARISON REPORT\n") f.write("=" * 60 + "\n\n") f.write("TECHNICAL METRICS\n") f.write("-" * 60 + "\n") f.write(f"{'Metric':<20} {'Current':<15} {'Candidate':<15} {'Diff':<10}\n") f.write("-" * 60 + "\n") for metric in ['f1', 'precision', 'recall']: current = comparison['current']['metrics'][metric] candidate = comparison['candidate']['metrics'][metric] diff = comparison['differences'][metric] symbol = "↑" if diff > 0 else "↓" if diff < 0 else "=" f.write(f"{metric.upper():<20} {current:<15.4f} {candidate:<15.4f} {symbol} {diff:+.4f}\n") f.write("\nOPERATIONAL METRICS\n") f.write("-" * 60 + "\n") current_lat = comparison['current']['latency_ms'] candidate_lat = comparison['candidate']['latency_ms'] lat_diff = comparison['differences']['latency_ms'] f.write(f"{'Latency (ms)':<20} {current_lat:<15.2f} {candidate_lat:<15.2f} {lat_diff:+.2f}\n") current_size = comparison['current']['size_mb'] candidate_size = comparison['candidate']['size_mb'] size_diff = comparison['differences']['size_mb'] f.write(f"{'Model size (MB)':<20} {current_size:<15.2f} {candidate_size:<15.2f} {size_diff:+.2f}\n") f.write("\n" + "=" * 60 + "\n") # Recomanació should_deploy, reason = should_deploy_candidate(comparison) f.write(f"RECOMMENDATION: {'DEPLOY' if should_deploy else 'REJECT'}\n") f.write(f"REASON: {reason}\n") f.write("=" * 60 + "\n") print(f"Report guardat a {output_file}")

Resum de la comparació

Essencial per continual learning:

  • ✅ Sempre comparar candidat vs actual
  • ✅ Usar múltiples dimensions (mètriques + latència + mida)
  • ✅ Definir criteris clars de desplegament
  • ✅ Fer backup abans de reemplaçar

Flux recomanat:

  1. Entrenar model candidat
  2. Comparar amb actual (mètriques, latència, mida)
  3. Decidir automàticament basant-se en regles
  4. Si aprovat → backup + desplegament
  5. Si rebutjat → descartar candidat

Amb comparació automàtica, evitem desplegar models pitjors i mantenim qualitat constant!

Test en producció

Un cop tenim un model candidat que sembla bo en les proves offline, com sabem si funcionarà bé en producció real? Aquí és on entren les estratègies de test en producció (testing in production).

El problema

Les proves offline (amb dades històriques) tenen limitacions:

  • No capturen el comportament real dels usuaris
  • No detecten problemes d’integració amb altres sistemes
  • No mesuren l’impacte en mètriques de negoci reals

Per això, necessitem estratègies per provar models en producció de manera controlada.

Shadow Deployment (Desplegament ombra)

El nou model rep les mateixes peticions que el model actual, però les seves prediccions no s’utilitzen. Només serveixen per comparar.

Petició

Model actual

Model nou
shadow

Resposta a l'usuari

Només logging

Comparar prediccions

Avantatges:

  • Zero risc per als usuaris
  • Podem comparar prediccions directament
  • Detectem problemes d’integració

Desavantatges:

  • Duplica el cost computacional
  • No mesura l’impacte real en el comportament dels usuaris

Implementació simplificada:

from fastapi import FastAPI import joblib import logging app = FastAPI() logger = logging.getLogger(__name__) # Carregar ambdós models current_model = joblib.load('current_model.pkl') shadow_model = joblib.load('shadow_model.pkl') @app.post("/predict") def predict(data: InputData) -> dict: # Predicció del model actual (la que s'usa) current_prediction = current_model.predict([data.features])[0] # Predicció del model shadow (només logging) shadow_prediction = shadow_model.predict([data.features])[0] # Registrar ambdues per comparació logger.info(f"Shadow comparison: current={current_prediction}, shadow={shadow_prediction}") # Retornar només la predicció del model actual return {"prediction": float(current_prediction)}

A/B Testing

Dividim el tràfic entre el model actual (A) i el nou model (B), i mesurem quina versió funciona millor segons mètriques de negoci.

50%

50%

Peticions

Splitter
50/50

Model A
actual

Model B
nou

Mètriques A

Mètriques B

Comparar

Avantatges:

  • Mesura l’impacte real en mètriques de negoci
  • Estadísticament rigorós si es fa bé

Desavantatges:

  • Part dels usuaris reben el model potencialment pitjor
  • Requereix més temps per tenir resultats significatius
  • Necessita infraestructura per dividir tràfic

Implementació simplificada:

import hashlib import logging logger = logging.getLogger(__name__) def get_model_version(user_id: str, experiment_percentage: float = 0.5) -> str: """ Determina quina versió del model usar per un usuari. Usa hashing per assegurar consistència (mateix usuari sempre rep la mateixa versió durant l'experiment). """ hash_value = int(hashlib.sha256(user_id.encode()).hexdigest(), 16) normalized = (hash_value % 100) / 100 if normalized < experiment_percentage: return 'B' # Model nou return 'A' # Model actual @app.post("/predict") def predict(data: InputData, user_id: str) -> dict: version = get_model_version(user_id) # Seleccionar model segons versió model = new_model if version == 'B' else current_model prediction = model.predict([data.features])[0] # Registrar amb la versió per anàlisi posterior (user_id anonimitzat per privacitat) hashed_user = hashlib.sha256(user_id.encode()).hexdigest()[:16] logger.info(f"Prediction: user={hashed_user}, version={version}, result={prediction}") return {"prediction": float(prediction), "model_version": version}

Canary Deployment (Desplegament canari)

Similar a l’A/B testing, però amb l’objectiu de detectar problemes, no de comparar rendiment. Enviem un petit percentatge del tràfic al nou model i augmentem gradualment si tot va bé.

No

No

No

Dia 1: 5% tràfic

Errors?

Dia 2: 20% tràfic

Rollback

Errors?

Dia 3: 50% tràfic

Errors?

Dia 4: 100% tràfic

Avantatges:

  • Limita l’impacte de problemes
  • Permet rollback ràpid
  • Detecta problemes que no apareixen en tests

Desavantatges:

  • Requereix monitoratge proper durant el desplegament
  • Més lent que un desplegament directe

Bandits (Multi-armed bandits)

Els bandits són una alternativa més sofisticada a l’A/B testing. En lloc de dividir el tràfic de manera fixa, el sistema aprèn dinàmicament quina versió funciona millor i li envia més tràfic.

Inici: Model A: 50% | Model B: 50% Dia 3: Model A: 60% | Model B: 40% (A sembla millor) Dia 7: Model A: 80% | Model B: 20% (A confirmat millor)

Avantatges:

  • Minimitza el “regret” (tràfic enviat al model pitjor)
  • S’adapta automàticament

Desavantatges:

  • Més complex d’implementar
  • Menys control sobre la divisió del tràfic
  • Pot ser prematur si les diferències són petites

Per al nostre context, l’A/B testing o el canary deployment són més pràctics i fàcils d’entendre i implementar.

Comparativa de les estratègies

EstratègiaRiscComplexitatMesura impacte real
ShadowCapBaixaNo
A/B TestModeratMitjana
CanaryBaixMitjanaParcialment
BanditsVariableAlta

Recomanació pràctica

Un flux recomanat per a equips petits:

  1. Shadow deployment primer: Verificar que el model nou funciona sense errors
  2. Canary amb 5-10%: Detectar problemes d’integració
  3. Si tot va bé, desplegament complet o A/B test si volem mesurar impacte

Consideracions pràctiques

Versionat de models

Quan actualitzem models regularment, el versionat esdevé encara més crítica. Necessitem saber exactament què està en producció, comparar models, i poder fer rollback ràpidament.

Per què és crític en continual learning?

En un sistema d’actualització contínua:

  • Els models canvien freqüentment (setmanal, diari, o fins i tot més)
  • Cal comparar nou model vs actual abans de desplegar
  • Si un model nou falla, necessitem rollback immediat
  • Hem de poder auditar quin model va fer quina predicció

Sense versionat adequat, el continual learning és inviable.

Què versionar: models + metadata completa

Cada versió del model ha d’incloure:

Essencial:

  • Versió (v1.2.0)
  • Model serialitzat (.pkl)
  • Mètriques de validació (F1, recall, etc.)
  • Features usades

Recomanat per continual learning:

  • Dataset info: Quantes dades, de quan, hash
  • Hyperparameters: Configuració del model
  • Training time: Quan i quant temps va trigar
  • Data version: Quin split de dades es va usar
  • Environment: Python, sklearn, altres versions

Estructura de versionat recomanada

models/ ├── v1.0.0/ │ ├── model.pkl │ ├── metadata.json │ ├── scaler.pkl │ └── encoder.pkl ├── v1.1.0/ │ ├── model.pkl │ ├── metadata.json │ ├── scaler.pkl │ └── encoder.pkl ├── v1.2.0/ │ ├── model.pkl │ ├── metadata.json │ ├── scaler.pkl │ └── encoder.pkl └── current -> v1.2.0/ # Symlink

Metadata extesa per continual learning

import json import hashlib from datetime import datetime def save_model_with_full_metadata( model, version: str, metrics: dict, training_data_path: str, hyperparameters: dict, output_dir: str ): """ Guarda model amb metadata completa per continual learning. """ import sys import sklearn import pandas as pd # Calcular hash del dataset (per reproducibilitat) df = pd.read_parquet(training_data_path) data_hash = hashlib.md5(df.to_json().encode()).hexdigest() metadata = { # Versió i timestamps 'version': version, 'trained_at': datetime.now().isoformat(), 'training_duration_seconds': 0, # Calcular si cal # Dataset 'dataset': { 'path': training_data_path, 'hash': data_hash, 'rows': len(df), 'features': list(df.columns), }, # Mètriques 'metrics': metrics, # Model 'model_type': type(model).__name__, 'hyperparameters': hyperparameters, # Environment (per reproduir) 'environment': { 'python': sys.version, 'sklearn': sklearn.__version__, 'pandas': pd.__version__, }, # Deployment 'deployed': False, 'deployed_at': None, } # Crear directori per aquesta versió version_dir = f'{output_dir}/{version}' os.makedirs(version_dir, exist_ok=True) # Guardar model joblib.dump(model, f'{version_dir}/model.pkl') # Guardar metadata with open(f'{version_dir}/metadata.json', 'w') as f: json.dump(metadata, f, indent=2) print(f"✓ Model {version} guardat amb metadata completa") return version_dir

Comparar versions abans de desplegar

Abans de fer el symlink current -> v1.3.0, comparar amb la versió actual:

def compare_model_versions(registry_dir: str, current_version: str, candidate_version: str) -> dict: """ Compara dues versions de model. Returns: Dict amb comparació de mètriques i recomanació. """ # Carregar metadata with open(f'{registry_dir}/{current_version}/metadata.json') as f: current_meta = json.load(f) with open(f'{registry_dir}/{candidate_version}/metadata.json') as f: candidate_meta = json.load(f) # Comparar mètriques comparison = { 'current_version': current_version, 'candidate_version': candidate_version, 'metrics_comparison': {} } for metric in ['f1', 'recall', 'precision']: current_val = current_meta['metrics'].get(metric, 0) candidate_val = candidate_meta['metrics'].get(metric, 0) diff = candidate_val - current_val comparison['metrics_comparison'][metric] = { 'current': current_val, 'candidate': candidate_val, 'difference': diff, 'improvement': diff > 0 } # Decisió f1_improvement = comparison['metrics_comparison']['f1']['difference'] if f1_improvement >= 0.02: comparison['recommendation'] = 'DEPLOY' comparison['reason'] = 'Millora significativa' elif f1_improvement >= 0: comparison['recommendation'] = 'DEPLOY' comparison['reason'] = 'Millora petita però positiva' else: comparison['recommendation'] = 'REJECT' comparison['reason'] = 'Model pitjor que l\'actual' return comparison # Ús en pipeline d'actualització comparison = compare_model_versions('models', 'v1.2.0', 'v1.3.0') if comparison['recommendation'] == 'DEPLOY': print(f"✓ Desplegant {comparison['candidate_version']}") print(f" Raó: {comparison['reason']}") # Actualitzar symlink os.symlink(comparison['candidate_version'], 'models/current', target_is_directory=True) else: print(f"✗ NO desplegant {comparison['candidate_version']}") print(f" Raó: {comparison['reason']}")

Model registry amb històric

Mantenir un registre de tots els desplegaments:

# models/deployment_history.json { "deployments": [ { "version": "v1.0.0", "deployed_at": "2024-01-15T10:00:00", "replaced_by": "v1.1.0", "reason": "Scheduled retraining" }, { "version": "v1.1.0", "deployed_at": "2024-01-22T10:00:00", "replaced_by": "v1.2.0", "reason": "Drift detected" }, { "version": "v1.2.0", "deployed_at": "2024-01-29T10:00:00", "replaced_by": null, "reason": "Improved performance" } ], "current_version": "v1.2.0" }
def record_deployment(version: str, reason: str, history_file: str = 'models/deployment_history.json'): """Registra un desplegament al històric.""" # Carregar històric if os.path.exists(history_file): with open(history_file, 'r') as f: history = json.load(f) else: history = {'deployments': [], 'current_version': None} # Marcar anterior com reemplaçat if history['current_version']: for deployment in history['deployments']: if deployment['version'] == history['current_version'] and not deployment['replaced_by']: deployment['replaced_by'] = version # Afegir nou desplegament history['deployments'].append({ 'version': version, 'deployed_at': datetime.now().isoformat(), 'replaced_by': None, 'reason': reason }) history['current_version'] = version # Guardar with open(history_file, 'w') as f: json.dump(history, f, indent=2)

Resum del versionat per continual learning

Imprescindible:

  • ✅ Cada model té versió única (v1.2.0)
  • ✅ Metadata amb mètriques, features, dataset hash
  • ✅ Directori per versió (v1.2.0/model.pkl, metadata.json)
  • ✅ Comparació automàtica abans de desplegar

Altament recomanat:

  • ✅ Històric de desplegaments
  • ✅ Rollback automatitzat
  • ✅ Metadata extesa (hyperparameters, environment)

Amb aquest sistema, podem actualitzar models amb confiança, sabent que podem revertir si cal!

Rollback

Sempre hem de poder tornar a la versió anterior ràpidament:

#!/bin/bash # rollback.sh - Torna a la versió anterior del model set -e # Sortir si hi ha errors CURRENT=$(readlink -f /models/current_model.pkl || echo "") if [ -z "$CURRENT" ]; then echo "Error: No s'ha trobat el model actual" exit 1 fi PREVIOUS=$(find /models -name 'model_v*.pkl' -type f | grep -v "$CURRENT" | sort -r | head -1) if [ -z "$PREVIOUS" ]; then echo "Error: No hi ha model anterior disponible" exit 1 fi echo "Rollback de $CURRENT a $PREVIOUS" ln -sf "$PREVIOUS" /models/current_model.pkl # Reiniciar el contenidor per carregar el model antic if docker restart model_api; then echo "Rollback completat amb èxit" else echo "Error: Fallida al reiniciar el contenidor" exit 1 fi

Documentació de canvis

Mantenir un registre de tots els canvis:

# Changelog de models ## v1.1.0 (2024-02-01) - Afegides 3 noves features - Re-entrenat amb dades de gener - Accuracy: 0.89 (+0.02) ## v1.0.0 (2024-01-15) - Versió inicial - Accuracy: 0.87

Resum

En aquest capítol hem après:

  • L’aprenentatge continu és necessari perquè els models s’adaptin als canvis en les dades
  • La freqüència d’actualització depèn de la velocitat del canvi, el cost, i el valor de la frescor
  • L’entrenament amb estat és més eficient però pot acumular errors; combinar-lo amb entrenament sense estat periòdicament és una bona estratègia
  • Un pipeline automatitzat amb scripts Python pot gestionar la recollida de dades, entrenament, validació, i desplegament
  • Les estratègies de test en producció (shadow, A/B testing, canary) permeten validar models amb tràfic real de manera controlada
  • El versionat i la capacitat de rollback són essencials per a operacions segures

Amb això completem la visió del cicle de vida d’un model en producció: des del desplegament inicial amb Docker i FastAPI, passant pel monitoratge i detecció de drift, fins a l’actualització contínua i el test en producció.