Aprenentatge Continu i Test en Producció
- Introducció
- Per què actualitzar models?
- Iteració de dades vs. iteració de model
- Amb quina freqüència actualitzar?
- Paradigmes d’actualització: Batch, Streaming, i Continual Learning
- Casos d’ús i estratègies de re-entrenament
- Entrenament amb estat vs. sense estat
- Automatització del pipeline d’actualització
- Test en producció
- Consideracions pràctiques
- Resum
Introducció
Al capítol anterior hem vist com detectar quan les dades canvien i el model comença a degradar-se. La pregunta natural és: què fem quan això passa? La resposta és actualitzar el model, però això obre tot un conjunt de nous reptes.
L’aprenentatge continu (continual learning) és el procés d’actualitzar models en producció per adaptar-se a les dades canviants. No es tracta simplement de re-entrenar de tant en tant, sinó de crear un sistema que pugui evolucionar de manera fiable i segura.
En aquest capítol explorarem quan i com actualitzar models, i les estratègies per provar models nous en producció sense arriscar el servei.
Per què actualitzar models?
Recordem els tipus de drift que hem vist:
- Covariate shift: Les features canvien, cal que el model s’adapti a nous patrons d’entrada
- Label shift: La proporció de classes canvia, cal recalibrar
- Concept drift: La relació entre features i sortida canvia, el model ha après coses que ja no són certes
A més del drift, hi ha altres raons per actualitzar:
- Més dades disponibles: Hem acumulat més exemples que poden millorar el model
- Noves features: Tenim accés a dades noves que poden ser predictives
- Correccions de bugs: Hem descobert errors en el preprocessament o l’entrenament
- Millores algorítmiques: Volem provar un algoritme o arquitectura millor
Iteració de dades vs. iteració de model
Quan el rendiment del model degrada, tenim dues estratègies diferents que sovint es confonen:
Iteració de dades (Data Iteration)
Re-entrenar el model amb dades noves, mantenint l’arquitectura.
- Mateixa arquitectura de model i features
- Afegir mostres noves (nous patrons, concept drift)
- Exemple: Model de frau re-entrenat setmanalment amb nous casos de frau (mateixa arquitectura)
Iteració de model (Model Iteration)
Canviar l’arquitectura o afegir noves features.
- Afegir noves features (noves fonts de dades)
- Canviar arquitectura (algoritme diferent, més complexitat)
- Ajustar hiperparàmetres
- Exemple: Model de crèdit que afegeix “historial laboral” O canvia de regressió logística a XGBoost
Quan usar cada estratègia?
| Símptoma | Causa Probable | Estratègia |
|---|---|---|
| Model funciona bé inicialment, degrada amb el temps | Concept drift | Iteració de dades |
| Model mai arriba a la performance objectiu | Model massa simple | Iteració de model |
| Bo en entrenament, dolent en producció | Distribution shift | Iteració de dades |
| Performance s’estanca malgrat més dades | Model saturat o features insuficients | Iteració de model |
| Noves fonts d’informació disponibles | Nous senyals predictius | Iteració de model |
Implicacions en el pipeline
Pipeline d’iteració de dades:
- Recollir dades noves amb ground truth
- Validar qualitat de dades
- Re-entrenar mateixa arquitectura amb dades noves/actualitzades
- Mantenir features i hiperparàmetres
- Desplegar si la validació passa
Pipeline d’iteració de model:
- Fixar datasets d’entrenament/validació/test (versionats!)
- Afegir noves features O canviar arquitectura
- Entrenar i comparar candidats sobre el mateix test set
- Seleccionar millor configuració
- Re-entrenar amb totes les dades i desplegar
Enfocament híbrid (més comú en producció)
- Iteració de dades regular (setmanal/mensual) → manté el model fresc
- Iteració de model ocasional (trimestral/anual) → millora capacitat fonamental
Relació amb valor de negoci:
- Iteració de dades → s’adapta al món canviant (patrons de frau evolucionen)
- Iteració de model → millora capacitat fonamental (nous senyals, millor arquitectura)
Amb quina freqüència actualitzar?
No hi ha una resposta universal. Depèn de:
Velocitat del canvi en les dades
Si les dades canvien ràpidament (per exemple, tendències en xarxes socials), necessitem actualitzacions freqüents. Si són estables (per exemple, detecció de defectes en manufactura), podem actualitzar menys sovint.
Cost de l’actualització
Cada actualització té costos:
- Computacional: Temps de GPU/CPU per entrenar
- Humà: Temps d’enginyers per validar i desplegar
- Risc: Possibilitat d’introduir regressions
Valor de la frescor (freshness)
No sempre tenir el model més recent és millor. Cal preguntar-se: quant guanyem amb dades més recents?
Un estudi de Facebook va mostrar que passar d’entrenament setmanal a diari reduïa la pèrdua del model en un 1%. Per a alguns casos, aquest 1% pot valer milions; per a altres, no justifica l’esforç.
Estratègies comunes
| Estratègia | Freqüència | Casos d’ús |
|---|---|---|
| Ad-hoc | Quan calgui | Models estables, canvis rars |
| Programada | Setmanal/mensual | Canvis graduals previsibles |
| Basada en triggers | Quan es detecta drift | Canvis imprevisibles |
| Contínua | Diària o més | Canvis molt ràpids |
Paradigmes d’actualització: Batch, Streaming, i Continual Learning
El terme “aprenentatge continu” (continual learning) pot ser ambigu. Aquí definirem clarament tres paradigmes diferents.
Què és Continual Learning?
Continual Learning no es refereix només a algoritmes d’aprenentatge incremental, sinó a crear infraestructura que permeti actualitzar models ràpidament quan calgui — des de zero o amb fine-tuning — i desplegar-los de manera àgil.
És una qüestió d’infraestructura i procés, no només d’algoritme.
Tres paradigmes d’actualització
1. Batch Retraining (Aprenentatge per lots)
- Model entrenat des de zero a intervals discrets
- Re-entrenament programat (setmanal, mensual)
- Pipeline de desplegament lent (dies a setmanes)
- Exemple: Re-entrenament mensual amb aprovació manual
2. Streaming Retraining (Re-entrenament amb finestres)
- Model re-entrenat des de zero sobre finestres de dades recents
- Finestra lliscant o expansiva d’exemples recents
- Pipeline més ràpid (hores a dies)
- Exemple: Re-entrenament diari amb els últims 30 dies de dades
3. Continual Learning (Infraestructura per actualitzacions ràpides)
- Infraestructura que suporta actualitzacions ràpides quan calgui
- Pot usar re-entrenament stateless (des de zero) O stateful (fine-tuning)
- Pipeline de desplegament ràpid (minuts a hores)
- Monitoratge, validació, i desplegament automatitzats
- Exemple: Re-entrenament automàtic quan es detecta drift, amb A/B testing i rollback automàtic
Taula comparativa
| Aspecte | Batch | Streaming | Continual Learning |
|---|---|---|---|
| Infraestructura | Manual/programada | Semi-automatitzada | Totalment automatitzada |
| Velocitat desplegament | Dies a setmanes | Hores a dies | Minuts a hores |
| Trigger | Programat o manual | Programat (freqüent) | Automàtic (drift, performance) |
| Validació | Revisió manual | Automatitzada amb llindars | Automatitzada + A/B testing |
| Monitoratge | Dashboards periòdics | Mètriques contínues | Alertes en temps real + auto-acció |
| Rollback | Manual | Semi-automàtic | Automàtic (instantani) |
| Millor per | Dades estables | Velocitat moderada | Entorns molt canviants |
Quan usar cada paradigma?
Batch Retraining:
- Canvis en dades lents i predictibles
- Revisió manual requerida (mèdic, financer)
- Equip petit, infraestructura simple
- Freqüència de setmanes/mesos acceptable
Streaming Retraining:
- Dades arriben contínuament, patrons canvien moderadament
- Equilibri entre frescor i validació
- Es poden definir finestres temporals significatives
Continual Learning:
- Patrons canvien ràpidament i impredictiblement (frau, spam)
- Necessitat de respondre a drift en hores
- Recursos d’enginyeria per pipelines automatitzats
- Flexibilitat: re-entrenament des de zero O fine-tuning
Desafiaments de Continual Learning
Challenge: Validació automatitzada
- Solució: Test suites complets, holdout sets, A/B testing
Challenge: Detecció de drift i triggering
- Solució: Monitoritzar distribucions, performance, confiança; activar per llindars
Challenge: Pipeline de desplegament ràpid
- Solució: Contenidors, CI/CD, testing automatitzat
Challenge: Rollback i versionat
- Solució: Model registry, blue-green deployment, rollback automàtic
Evitar sobre-enginyeria: La majoria de sistemes usen batch o streaming. Només implementar continual learning quan:
- Negoci requereix actualitzacions ràpides (hores, no dies)
- Patrons canvien ràpidament
- Tens recursos d’enginyeria per mantenir pipelines automatitzats
Casos d’ús i estratègies de re-entrenament
Un cop hem vist els paradigmes d’actualització, vegem com s’apliquen en contextos reals. La taula següent mostra diferents casos d’ús amb les seves característiques i l’estratègia de re-entrenament més adequada.
| Cas d’ús | Ground Truth | Delay | Paradigma | Freqüència |
|---|---|---|---|---|
| Recomanador e-commerce | Automàtic (clicks/compres) | Hores/dies | Streaming | Diari |
| Diagnòstic mèdic | Manual (experts) | Setmanes | Batch | Mensual |
| Detecció de frau bancari | Automàtic (verificació) | Dies | Continual Learning | Quan drift |
| Filtre de spam | Automàtic (feedback usuaris) | Immediat | Continual Learning | Continu |
| Predicció ocupació transport | Automàtic (sensors) | Minuts | Streaming | Diari |
| Classificació documents admin. | Manual (revisió) | Setmanes | Batch | Mensual |
| Detecció defectes manufactura | Automàtic (sensors) | Minuts/hores | Continual Learning | Temps real |
| Moderació xarxes socials | Mixt (reports + signals) | Hores/dies | Streaming | Diari |
| Predicció demanda energia | Automàtic (comptadors) | Hores | Streaming | Diari |
| Sistema recomanació biblioteca | Automàtic (préstecs) | Dies | Streaming | Setmanal |
Patrons observables
Analitzant aquests casos, veiem diversos patrons que ens ajuden a decidir l’estratègia:
Ground truth automàtic + delay curt → Continual Learning o Streaming
- Exemples: Spam filtering, detecció de frau, predicció de transport
- Permet actualitzacions freqüents i automatitzades
- Avantatge: model sempre actualitzat amb els patrons més recents
Ground truth manual → Batch
- Exemples: Diagnòstic mèdic, classificació de documents
- Requereix revisió humana, actualitzacions menys freqüents
- Avantatge: alta qualitat de les etiquetes, validació experta
Ground truth mixt → Streaming
- Exemples: Moderació de contingut, sistemes de recomanació
- Equilibri entre automatització i qualitat
- Avantatge: combina volum (automàtic) amb precisió (manual)
Crític per seguretat/salut → Batch amb validació humana
- Exemples: Diagnòstic mèdic, detecció de defectes crítics
- Encara que hi hagi drift, la revisió manual és imprescindible
- Avantatge: control de qualitat abans de desplegament
Consell pràctic: Comença sempre amb Batch i evoluciona cap a Streaming o Continual Learning només si:
- Tens prou dades noves cada dia/setmana
- Pots automatitzar la validació de manera fiable
- El valor de negoci justifica la complexitat addicional
Entrenament amb estat vs. sense estat
Quan re-entrenem, tenim dues opcions principals:
Entrenament sense estat (Stateless training)
Entrenem el model des de zero amb totes les dades disponibles (històriques + noves).
Avantatges:
- Simple conceptualment
- El model veu totes les dades
- No arrossega errors anteriors
Desavantatges:
- Costós computacionalment si hi ha moltes dades
- Pot “oblidar” patrons antics si les dades recents dominen
Entrenament amb estat (Stateful training)
Continuem entrenant el model existent amb només les dades noves.
Avantatges:
- Molt més ràpid
- Menys recursos computacionals
- Ideal per a actualitzacions freqüents
Desavantatges:
- Risc d’oblit catastròfic (catastrophic forgetting): el model oblida el que sabia
- Pot acumular errors
Exemple real: Grubhub va passar d’entrenament setmanal sense estat a entrenament diari amb estat, reduint el cost computacional 45 vegades i augmentant les conversions un 20%.
Quin triar?
Una estratègia comuna és combinar-los:
- Entrenament amb estat per a actualitzacions freqüents (diari/setmanal)
- Entrenament sense estat periòdicament (mensual/trimestral) per “netejar” el model
Automatització del pipeline d’actualització
Per fer l’actualització sostenible, necessitem automatitzar-la. Aquí presentem un esquema basat en scripts Python simples.
Components del pipeline
Exemple de script d’actualització
#!/usr/bin/env python3
"""
Script d'actualització automàtica del model.
Pensat per executar-se periòdicament (cron job).
"""
import os
import json
import logging
from datetime import datetime
import joblib
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, f1_score
# Configurar logging
logging.basicConfig(
filename='/logs/training.log',
level=logging.INFO,
format='%(asctime)s - %(message)s'
)
logger = logging.getLogger(__name__)
# Configuració
import os
CONFIG = {
'data_path': os.getenv('DATA_PATH', '/data/training_data.csv'),
'model_path': os.getenv('MODEL_PATH', '/models/current_model.pkl'),
'new_model_path': os.getenv('NEW_MODEL_PATH', '/models/candidate_model.pkl'),
'metrics_threshold': {
'accuracy': 0.85,
'f1': 0.80
}
}
def load_new_data() -> pd.DataFrame:
"""Carrega les dades noves per a entrenament."""
logger.info("Carregant dades...")
df = pd.read_csv(CONFIG['data_path'])
logger.info(f"Carregades {len(df)} mostres")
return df
def validate_data(df: pd.DataFrame) -> bool:
"""Valida que les dades són correctes."""
logger.info("Validant dades...")
# Comprovar valors nuls
null_pct = df.isnull().sum().sum() / df.size
if null_pct > 0.05:
raise ValueError(f"Massa valors nuls: {null_pct:.2%}")
# Comprovar mínim de mostres
if len(df) < 1000:
raise ValueError(f"Poques mostres: {len(df)}")
logger.info("Dades validades correctament")
return True
def train_model(X_train: np.ndarray, y_train: np.ndarray) -> RandomForestClassifier:
"""Entrena un nou model."""
logger.info("Entrenant model...")
model = RandomForestClassifier(
n_estimators=100,
max_depth=10,
random_state=42
)
model.fit(X_train, y_train)
logger.info("Model entrenat")
return model
def evaluate_model(model: RandomForestClassifier, X_test: np.ndarray, y_test: np.ndarray) -> dict[str, float]:
"""Avalua el model i retorna mètriques."""
logger.info("Avaluant model...")
predictions = model.predict(X_test)
metrics = {
'accuracy': accuracy_score(y_test, predictions),
'f1': f1_score(y_test, predictions, average='weighted'),
'timestamp': datetime.now().isoformat()
}
logger.info(f"Mètriques: accuracy={metrics['accuracy']:.4f}, f1={metrics['f1']:.4f}")
return metrics
def check_thresholds(metrics):
"""Comprova si el model supera els llindars mínims."""
for metric, threshold in CONFIG['metrics_threshold'].items():
if metrics[metric] < threshold:
logger.warning(f"ALERTA: {metric}={metrics[metric]:.4f} < {threshold}")
return False
return True
def compare_with_current(new_metrics):
"""Compara amb el model actual si existeix."""
metrics_file = CONFIG['model_path'].replace('.pkl', '_metrics.json')
if os.path.exists(metrics_file):
with open(metrics_file, 'r') as f:
current_metrics = json.load(f)
# Comprovar que el nou model no és pitjor
if new_metrics['accuracy'] < current_metrics['accuracy'] - 0.02:
logger.warning("ALERTA: El nou model és significativament pitjor")
return False
return True
def save_model(model, metrics):
"""Guarda el model i les seves mètriques."""
logger.info(f"Guardant model a {CONFIG['new_model_path']}")
joblib.dump(model, CONFIG['new_model_path'])
metrics_file = CONFIG['new_model_path'].replace('.pkl', '_metrics.json')
with open(metrics_file, 'w') as f:
json.dump(metrics, f, indent=2)
logger.info("Model guardat")
def main():
"""Executa el pipeline complet."""
logger.info("=" * 50)
logger.info("Iniciant pipeline d'actualització")
try:
# 1. Carregar dades
df = load_new_data()
# 2. Validar
validate_data(df)
# 3. Preparar dades
X = df.drop('target', axis=1)
y = df['target']
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42
)
# 4. Entrenar
model = train_model(X_train, y_train)
# 5. Avaluar
metrics = evaluate_model(model, X_test, y_test)
# 6. Verificar qualitat
if not check_thresholds(metrics):
logger.warning("Model rebutjat: no supera llindars")
return False
if not compare_with_current(metrics):
logger.warning("Model rebutjat: pitjor que l'actual")
return False
# 7. Guardar
save_model(model, metrics)
logger.info("Pipeline completat amb èxit")
return True
except Exception as e:
logger.error(f"ERROR: {str(e)}")
return False
if __name__ == '__main__':
success = main()
exit(0 if success else 1)
Programació amb cron
Per executar l’script automàticament, podem usar cron:
# Executar cada dia a les 3:00 AM
0 3 * * * /usr/bin/python3 /scripts/update_model.py >> /logs/cron.log 2>&1
Comparació de models abans de desplegar
Un dels passos més importants del pipeline d’actualització és comparar el model candidat amb el model actual abans de desplegar. Sense aquesta comparació, podem desplegar models pitjors accidentalment.
Per què comparar?
Problema sense comparació:
- Entrenem un model nou → sembla “bo” (F1 = 0.78)
- El despleguem → les mètriques empitjoren
- Resulta que el model anterior era millor (F1 = 0.82)!
Solució: Sempre comparar candidat vs actual.
Què comparar?
Mètriques tècniques:
- F1, precision, recall, accuracy
- ROC-AUC, PR-AUC
- Per regressió: MAE, RMSE, R²
Mètriques operacionals:
- Latència de predicció (p50, p95, p99)
- Mida del model (MB)
- Temps d’inferència
Mètriques de negoci (si disponibles):
- Cost estimat de prediccions errònies
- ROI esperat
- Impacte en conversió/vendes
Implementació de la comparació
import json
import joblib
from sklearn.metrics import f1_score, precision_score, recall_score
import time
def compare_models(current_model_path: str, candidate_model_path: str, X_val, y_val) -> dict:
"""
Compara model actual vs candidat en múltiples dimensions.
Returns:
Dict amb comparació i recomanació de desplegament.
"""
# Carregar models
current_model = joblib.load(current_model_path)
candidate_model = joblib.load(candidate_model_path)
# Calcular mètriques tècniques
y_pred_current = current_model.predict(X_val)
y_pred_candidate = candidate_model.predict(X_val)
current_metrics = {
'f1': f1_score(y_val, y_pred_current, average='weighted'),
'precision': precision_score(y_val, y_pred_current, average='weighted'),
'recall': recall_score(y_val, y_pred_current, average='weighted')
}
candidate_metrics = {
'f1': f1_score(y_val, y_pred_candidate, average='weighted'),
'precision': precision_score(y_val, y_pred_candidate, average='weighted'),
'recall': recall_score(y_val, y_pred_candidate, average='weighted')
}
# Calcular latència
start = time.time()
_ = current_model.predict(X_val[:100])
current_latency = (time.time() - start) / 100 * 1000 # ms per predicció
start = time.time()
_ = candidate_model.predict(X_val[:100])
candidate_latency = (time.time() - start) / 100 * 1000
# Mida del model
import os
current_size_mb = os.path.getsize(current_model_path) / (1024 * 1024)
candidate_size_mb = os.path.getsize(candidate_model_path) / (1024 * 1024)
# Compilar resultats
comparison = {
'current': {
'metrics': current_metrics,
'latency_ms': current_latency,
'size_mb': current_size_mb
},
'candidate': {
'metrics': candidate_metrics,
'latency_ms': candidate_latency,
'size_mb': candidate_size_mb
},
'differences': {
'f1': candidate_metrics['f1'] - current_metrics['f1'],
'precision': candidate_metrics['precision'] - current_metrics['precision'],
'recall': candidate_metrics['recall'] - current_metrics['recall'],
'latency_ms': candidate_latency - current_latency,
'size_mb': candidate_size_mb - current_size_mb
}
}
return comparison
Criteris de decisió
Definir regles clares per decidir si desplegar el candidat:
def should_deploy_candidate(comparison: dict, thresholds: dict = None) -> tuple[bool, str]:
"""
Decideix si desplegar el model candidat.
Args:
comparison: Resultat de compare_models()
thresholds: Criteris de decisió personalitzats
Returns:
(should_deploy, reason)
"""
if thresholds is None:
thresholds = {
'min_f1_improvement': 0.00, # No empitjorar
'max_f1_degradation': 0.02, # Tolerar petita degradació
'max_latency_increase_pct': 1.5, # Màxim 50% més lent
'max_size_increase_mb': 100 # Màxim 100MB més gran
}
diff = comparison['differences']
current = comparison['current']
candidate = comparison['candidate']
# Regla 1: F1 no pot degradar més del llindar
if diff['f1'] < -thresholds['max_f1_degradation']:
return False, f"F1 degradat massa: {diff['f1']:.4f}"
# Regla 2: Latència no pot augmentar molt
latency_ratio = candidate['latency_ms'] / current['latency_ms']
if latency_ratio > thresholds['max_latency_increase_pct']:
return False, f"Latència augmentada {latency_ratio:.1f}x"
# Regla 3: Mida no pot créixer massa
if diff['size_mb'] > thresholds['max_size_increase_mb']:
return False, f"Model massa gran: +{diff['size_mb']:.1f} MB"
# Regla 4: Si millora F1, desplegar
if diff['f1'] > thresholds['min_f1_improvement']:
return True, f"F1 millorat: +{diff['f1']:.4f}"
# Regla 5: Si F1 igual però millora recall (depèn del context)
if abs(diff['f1']) < 0.01 and diff['recall'] > 0.02:
return True, f"Recall millorat: +{diff['recall']:.4f}"
# Per defecte: no desplegar si no hi ha millora clara
return False, "Cap millora significativa"
Integració al pipeline d’actualització
Actualitzar el script per incloure comparació:
def main():
logger.info("=== Pipeline d'actualització amb comparació ===")
# 1. Carregar dades
df = load_new_data()
validate_data(df)
X = df.drop('target', axis=1)
y = df['target']
X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2)
# 2. Entrenar model candidat
logger.info("Entrenant model candidat...")
candidate_model = train_model(X_train, y_train)
# Guardar temporalment
candidate_path = '/models/candidate_model.pkl'
joblib.dump(candidate_model, candidate_path)
# 3. Comparar amb model actual
current_path = '/models/current_model.pkl'
if os.path.exists(current_path):
logger.info("Comparant amb model actual...")
comparison = compare_models(current_path, candidate_path, X_val, y_val)
# Mostrar comparació
logger.info("Comparació:")
logger.info(f" Current F1: {comparison['current']['metrics']['f1']:.4f}")
logger.info(f" Candidate F1: {comparison['candidate']['metrics']['f1']:.4f}")
logger.info(f" Diferència: {comparison['differences']['f1']:+.4f}")
# Decidir si desplegar
should_deploy, reason = should_deploy_candidate(comparison)
if should_deploy:
logger.info(f"✓ DESPLEGANT model nou: {reason}")
# Fer backup del model actual
backup_path = f'/models/backup_model_{datetime.now().strftime("%Y%m%d_%H%M%S")}.pkl'
os.rename(current_path, backup_path)
# Desplegar nou model
os.rename(candidate_path, current_path)
# Guardar metadata de comparació
with open('/models/last_comparison.json', 'w') as f:
json.dump(comparison, f, indent=2)
logger.info("Model desplegat correctament")
return True
else:
logger.warning(f"✗ NO desplegant: {reason}")
os.remove(candidate_path)
return False
else:
# Primer model, desplegar directament
logger.info("Primer model, desplegant sense comparació")
os.rename(candidate_path, current_path)
return True
if __name__ == '__main__':
success = main()
exit(0 if success else 1)
Visualització de la comparació
Generar un informe visual per a revisió humana:
def generate_comparison_report(comparison: dict, output_file: str = 'comparison_report.txt'):
"""Genera informe llegible de la comparació."""
with open(output_file, 'w') as f:
f.write("=" * 60 + "\n")
f.write("MODEL COMPARISON REPORT\n")
f.write("=" * 60 + "\n\n")
f.write("TECHNICAL METRICS\n")
f.write("-" * 60 + "\n")
f.write(f"{'Metric':<20} {'Current':<15} {'Candidate':<15} {'Diff':<10}\n")
f.write("-" * 60 + "\n")
for metric in ['f1', 'precision', 'recall']:
current = comparison['current']['metrics'][metric]
candidate = comparison['candidate']['metrics'][metric]
diff = comparison['differences'][metric]
symbol = "↑" if diff > 0 else "↓" if diff < 0 else "="
f.write(f"{metric.upper():<20} {current:<15.4f} {candidate:<15.4f} {symbol} {diff:+.4f}\n")
f.write("\nOPERATIONAL METRICS\n")
f.write("-" * 60 + "\n")
current_lat = comparison['current']['latency_ms']
candidate_lat = comparison['candidate']['latency_ms']
lat_diff = comparison['differences']['latency_ms']
f.write(f"{'Latency (ms)':<20} {current_lat:<15.2f} {candidate_lat:<15.2f} {lat_diff:+.2f}\n")
current_size = comparison['current']['size_mb']
candidate_size = comparison['candidate']['size_mb']
size_diff = comparison['differences']['size_mb']
f.write(f"{'Model size (MB)':<20} {current_size:<15.2f} {candidate_size:<15.2f} {size_diff:+.2f}\n")
f.write("\n" + "=" * 60 + "\n")
# Recomanació
should_deploy, reason = should_deploy_candidate(comparison)
f.write(f"RECOMMENDATION: {'DEPLOY' if should_deploy else 'REJECT'}\n")
f.write(f"REASON: {reason}\n")
f.write("=" * 60 + "\n")
print(f"Report guardat a {output_file}")
Resum de la comparació
Essencial per continual learning:
- ✅ Sempre comparar candidat vs actual
- ✅ Usar múltiples dimensions (mètriques + latència + mida)
- ✅ Definir criteris clars de desplegament
- ✅ Fer backup abans de reemplaçar
Flux recomanat:
- Entrenar model candidat
- Comparar amb actual (mètriques, latència, mida)
- Decidir automàticament basant-se en regles
- Si aprovat → backup + desplegament
- Si rebutjat → descartar candidat
Amb comparació automàtica, evitem desplegar models pitjors i mantenim qualitat constant!
Test en producció
Un cop tenim un model candidat que sembla bo en les proves offline, com sabem si funcionarà bé en producció real? Aquí és on entren les estratègies de test en producció (testing in production).
El problema
Les proves offline (amb dades històriques) tenen limitacions:
- No capturen el comportament real dels usuaris
- No detecten problemes d’integració amb altres sistemes
- No mesuren l’impacte en mètriques de negoci reals
Per això, necessitem estratègies per provar models en producció de manera controlada.
Shadow Deployment (Desplegament ombra)
El nou model rep les mateixes peticions que el model actual, però les seves prediccions no s’utilitzen. Només serveixen per comparar.
Avantatges:
- Zero risc per als usuaris
- Podem comparar prediccions directament
- Detectem problemes d’integració
Desavantatges:
- Duplica el cost computacional
- No mesura l’impacte real en el comportament dels usuaris
Implementació simplificada:
from fastapi import FastAPI
import joblib
import logging
app = FastAPI()
logger = logging.getLogger(__name__)
# Carregar ambdós models
current_model = joblib.load('current_model.pkl')
shadow_model = joblib.load('shadow_model.pkl')
@app.post("/predict")
def predict(data: InputData) -> dict:
# Predicció del model actual (la que s'usa)
current_prediction = current_model.predict([data.features])[0]
# Predicció del model shadow (només logging)
shadow_prediction = shadow_model.predict([data.features])[0]
# Registrar ambdues per comparació
logger.info(f"Shadow comparison: current={current_prediction}, shadow={shadow_prediction}")
# Retornar només la predicció del model actual
return {"prediction": float(current_prediction)}
A/B Testing
Dividim el tràfic entre el model actual (A) i el nou model (B), i mesurem quina versió funciona millor segons mètriques de negoci.
Avantatges:
- Mesura l’impacte real en mètriques de negoci
- Estadísticament rigorós si es fa bé
Desavantatges:
- Part dels usuaris reben el model potencialment pitjor
- Requereix més temps per tenir resultats significatius
- Necessita infraestructura per dividir tràfic
Implementació simplificada:
import hashlib
import logging
logger = logging.getLogger(__name__)
def get_model_version(user_id: str, experiment_percentage: float = 0.5) -> str:
"""
Determina quina versió del model usar per un usuari.
Usa hashing per assegurar consistència (mateix usuari sempre
rep la mateixa versió durant l'experiment).
"""
hash_value = int(hashlib.sha256(user_id.encode()).hexdigest(), 16)
normalized = (hash_value % 100) / 100
if normalized < experiment_percentage:
return 'B' # Model nou
return 'A' # Model actual
@app.post("/predict")
def predict(data: InputData, user_id: str) -> dict:
version = get_model_version(user_id)
# Seleccionar model segons versió
model = new_model if version == 'B' else current_model
prediction = model.predict([data.features])[0]
# Registrar amb la versió per anàlisi posterior (user_id anonimitzat per privacitat)
hashed_user = hashlib.sha256(user_id.encode()).hexdigest()[:16]
logger.info(f"Prediction: user={hashed_user}, version={version}, result={prediction}")
return {"prediction": float(prediction), "model_version": version}
Canary Deployment (Desplegament canari)
Similar a l’A/B testing, però amb l’objectiu de detectar problemes, no de comparar rendiment. Enviem un petit percentatge del tràfic al nou model i augmentem gradualment si tot va bé.
Avantatges:
- Limita l’impacte de problemes
- Permet rollback ràpid
- Detecta problemes que no apareixen en tests
Desavantatges:
- Requereix monitoratge proper durant el desplegament
- Més lent que un desplegament directe
Bandits (Multi-armed bandits)
Els bandits són una alternativa més sofisticada a l’A/B testing. En lloc de dividir el tràfic de manera fixa, el sistema aprèn dinàmicament quina versió funciona millor i li envia més tràfic.
Inici: Model A: 50% | Model B: 50%
Dia 3: Model A: 60% | Model B: 40% (A sembla millor)
Dia 7: Model A: 80% | Model B: 20% (A confirmat millor)
Avantatges:
- Minimitza el “regret” (tràfic enviat al model pitjor)
- S’adapta automàticament
Desavantatges:
- Més complex d’implementar
- Menys control sobre la divisió del tràfic
- Pot ser prematur si les diferències són petites
Per al nostre context, l’A/B testing o el canary deployment són més pràctics i fàcils d’entendre i implementar.
Comparativa de les estratègies
| Estratègia | Risc | Complexitat | Mesura impacte real |
|---|---|---|---|
| Shadow | Cap | Baixa | No |
| A/B Test | Moderat | Mitjana | Sí |
| Canary | Baix | Mitjana | Parcialment |
| Bandits | Variable | Alta | Sí |
Recomanació pràctica
Un flux recomanat per a equips petits:
- Shadow deployment primer: Verificar que el model nou funciona sense errors
- Canary amb 5-10%: Detectar problemes d’integració
- Si tot va bé, desplegament complet o A/B test si volem mesurar impacte
Consideracions pràctiques
Versionat de models
Quan actualitzem models regularment, el versionat esdevé encara més crítica. Necessitem saber exactament què està en producció, comparar models, i poder fer rollback ràpidament.
Per què és crític en continual learning?
En un sistema d’actualització contínua:
- Els models canvien freqüentment (setmanal, diari, o fins i tot més)
- Cal comparar nou model vs actual abans de desplegar
- Si un model nou falla, necessitem rollback immediat
- Hem de poder auditar quin model va fer quina predicció
Sense versionat adequat, el continual learning és inviable.
Què versionar: models + metadata completa
Cada versió del model ha d’incloure:
Essencial:
- Versió (v1.2.0)
- Model serialitzat (.pkl)
- Mètriques de validació (F1, recall, etc.)
- Features usades
Recomanat per continual learning:
- Dataset info: Quantes dades, de quan, hash
- Hyperparameters: Configuració del model
- Training time: Quan i quant temps va trigar
- Data version: Quin split de dades es va usar
- Environment: Python, sklearn, altres versions
Estructura de versionat recomanada
models/
├── v1.0.0/
│ ├── model.pkl
│ ├── metadata.json
│ ├── scaler.pkl
│ └── encoder.pkl
├── v1.1.0/
│ ├── model.pkl
│ ├── metadata.json
│ ├── scaler.pkl
│ └── encoder.pkl
├── v1.2.0/
│ ├── model.pkl
│ ├── metadata.json
│ ├── scaler.pkl
│ └── encoder.pkl
└── current -> v1.2.0/ # Symlink
Metadata extesa per continual learning
import json
import hashlib
from datetime import datetime
def save_model_with_full_metadata(
model,
version: str,
metrics: dict,
training_data_path: str,
hyperparameters: dict,
output_dir: str
):
"""
Guarda model amb metadata completa per continual learning.
"""
import sys
import sklearn
import pandas as pd
# Calcular hash del dataset (per reproducibilitat)
df = pd.read_parquet(training_data_path)
data_hash = hashlib.md5(df.to_json().encode()).hexdigest()
metadata = {
# Versió i timestamps
'version': version,
'trained_at': datetime.now().isoformat(),
'training_duration_seconds': 0, # Calcular si cal
# Dataset
'dataset': {
'path': training_data_path,
'hash': data_hash,
'rows': len(df),
'features': list(df.columns),
},
# Mètriques
'metrics': metrics,
# Model
'model_type': type(model).__name__,
'hyperparameters': hyperparameters,
# Environment (per reproduir)
'environment': {
'python': sys.version,
'sklearn': sklearn.__version__,
'pandas': pd.__version__,
},
# Deployment
'deployed': False,
'deployed_at': None,
}
# Crear directori per aquesta versió
version_dir = f'{output_dir}/{version}'
os.makedirs(version_dir, exist_ok=True)
# Guardar model
joblib.dump(model, f'{version_dir}/model.pkl')
# Guardar metadata
with open(f'{version_dir}/metadata.json', 'w') as f:
json.dump(metadata, f, indent=2)
print(f"✓ Model {version} guardat amb metadata completa")
return version_dir
Comparar versions abans de desplegar
Abans de fer el symlink current -> v1.3.0, comparar amb la versió actual:
def compare_model_versions(registry_dir: str, current_version: str, candidate_version: str) -> dict:
"""
Compara dues versions de model.
Returns:
Dict amb comparació de mètriques i recomanació.
"""
# Carregar metadata
with open(f'{registry_dir}/{current_version}/metadata.json') as f:
current_meta = json.load(f)
with open(f'{registry_dir}/{candidate_version}/metadata.json') as f:
candidate_meta = json.load(f)
# Comparar mètriques
comparison = {
'current_version': current_version,
'candidate_version': candidate_version,
'metrics_comparison': {}
}
for metric in ['f1', 'recall', 'precision']:
current_val = current_meta['metrics'].get(metric, 0)
candidate_val = candidate_meta['metrics'].get(metric, 0)
diff = candidate_val - current_val
comparison['metrics_comparison'][metric] = {
'current': current_val,
'candidate': candidate_val,
'difference': diff,
'improvement': diff > 0
}
# Decisió
f1_improvement = comparison['metrics_comparison']['f1']['difference']
if f1_improvement >= 0.02:
comparison['recommendation'] = 'DEPLOY'
comparison['reason'] = 'Millora significativa'
elif f1_improvement >= 0:
comparison['recommendation'] = 'DEPLOY'
comparison['reason'] = 'Millora petita però positiva'
else:
comparison['recommendation'] = 'REJECT'
comparison['reason'] = 'Model pitjor que l\'actual'
return comparison
# Ús en pipeline d'actualització
comparison = compare_model_versions('models', 'v1.2.0', 'v1.3.0')
if comparison['recommendation'] == 'DEPLOY':
print(f"✓ Desplegant {comparison['candidate_version']}")
print(f" Raó: {comparison['reason']}")
# Actualitzar symlink
os.symlink(comparison['candidate_version'], 'models/current', target_is_directory=True)
else:
print(f"✗ NO desplegant {comparison['candidate_version']}")
print(f" Raó: {comparison['reason']}")
Model registry amb històric
Mantenir un registre de tots els desplegaments:
# models/deployment_history.json
{
"deployments": [
{
"version": "v1.0.0",
"deployed_at": "2024-01-15T10:00:00",
"replaced_by": "v1.1.0",
"reason": "Scheduled retraining"
},
{
"version": "v1.1.0",
"deployed_at": "2024-01-22T10:00:00",
"replaced_by": "v1.2.0",
"reason": "Drift detected"
},
{
"version": "v1.2.0",
"deployed_at": "2024-01-29T10:00:00",
"replaced_by": null,
"reason": "Improved performance"
}
],
"current_version": "v1.2.0"
}
def record_deployment(version: str, reason: str, history_file: str = 'models/deployment_history.json'):
"""Registra un desplegament al històric."""
# Carregar històric
if os.path.exists(history_file):
with open(history_file, 'r') as f:
history = json.load(f)
else:
history = {'deployments': [], 'current_version': None}
# Marcar anterior com reemplaçat
if history['current_version']:
for deployment in history['deployments']:
if deployment['version'] == history['current_version'] and not deployment['replaced_by']:
deployment['replaced_by'] = version
# Afegir nou desplegament
history['deployments'].append({
'version': version,
'deployed_at': datetime.now().isoformat(),
'replaced_by': None,
'reason': reason
})
history['current_version'] = version
# Guardar
with open(history_file, 'w') as f:
json.dump(history, f, indent=2)
Resum del versionat per continual learning
Imprescindible:
- ✅ Cada model té versió única (v1.2.0)
- ✅ Metadata amb mètriques, features, dataset hash
- ✅ Directori per versió (v1.2.0/model.pkl, metadata.json)
- ✅ Comparació automàtica abans de desplegar
Altament recomanat:
- ✅ Històric de desplegaments
- ✅ Rollback automatitzat
- ✅ Metadata extesa (hyperparameters, environment)
Amb aquest sistema, podem actualitzar models amb confiança, sabent que podem revertir si cal!
Rollback
Sempre hem de poder tornar a la versió anterior ràpidament:
#!/bin/bash
# rollback.sh - Torna a la versió anterior del model
set -e # Sortir si hi ha errors
CURRENT=$(readlink -f /models/current_model.pkl || echo "")
if [ -z "$CURRENT" ]; then
echo "Error: No s'ha trobat el model actual"
exit 1
fi
PREVIOUS=$(find /models -name 'model_v*.pkl' -type f | grep -v "$CURRENT" | sort -r | head -1)
if [ -z "$PREVIOUS" ]; then
echo "Error: No hi ha model anterior disponible"
exit 1
fi
echo "Rollback de $CURRENT a $PREVIOUS"
ln -sf "$PREVIOUS" /models/current_model.pkl
# Reiniciar el contenidor per carregar el model antic
if docker restart model_api; then
echo "Rollback completat amb èxit"
else
echo "Error: Fallida al reiniciar el contenidor"
exit 1
fi
Documentació de canvis
Mantenir un registre de tots els canvis:
# Changelog de models
## v1.1.0 (2024-02-01)
- Afegides 3 noves features
- Re-entrenat amb dades de gener
- Accuracy: 0.89 (+0.02)
## v1.0.0 (2024-01-15)
- Versió inicial
- Accuracy: 0.87
Resum
En aquest capítol hem après:
- L’aprenentatge continu és necessari perquè els models s’adaptin als canvis en les dades
- La freqüència d’actualització depèn de la velocitat del canvi, el cost, i el valor de la frescor
- L’entrenament amb estat és més eficient però pot acumular errors; combinar-lo amb entrenament sense estat periòdicament és una bona estratègia
- Un pipeline automatitzat amb scripts Python pot gestionar la recollida de dades, entrenament, validació, i desplegament
- Les estratègies de test en producció (shadow, A/B testing, canary) permeten validar models amb tràfic real de manera controlada
- El versionat i la capacitat de rollback són essencials per a operacions segures
Amb això completem la visió del cicle de vida d’un model en producció: des del desplegament inicial amb Docker i FastAPI, passant pel monitoratge i detecció de drift, fins a l’actualització contínua i el test en producció.