Lo que aprenderás en esta guía
Este es un artículo técnico y profundo redactado por los ingenieros de ForgeNEX. Está diseñado para profesionales que buscan implementar soluciones sólidas y evitar los errores comunes que cuestan horas de producción.
La transición de modelos de Machine Learning desde entornos experimentales (como Jupyter Notebooks) hacia producción requiere un ecosistema robusto de Continuous Training (CT). En arquitecturas empresariales, el acoplamiento de Apache Airflow como orquestador y MLflow como System of Record para modelos, ofrece un stack estándar, escalable y agnóstico a la infraestructura subyacente.
En este artículo, desglosamos un patrón de arquitectura avanzado donde Airflow delega la computación pesada a clusters efímeros (usando Kubernetes) y MLflow consolida la observabilidad del ciclo de vida del modelo.
Arquitectura Desacoplada: Orquestación vs. Ejecución
El antipatrón más común en MLOps es utilizar los workers de Airflow (ej. Celery) para ejecutar entrenamiento de modelos. Esto genera saturación de memoria, Out-Of-Memory (OOM) kills y contención de recursos.
Nota Importante: Airflow debe actuar estrictamente como el "controlador de tráfico". El cómputo del modelo debe delegarse a infraestructuras elásticas mediante operadores como
KubernetesPodOperatoro tareas contenerizadas gestionadas remotamente.
Diseño del DAG de Entrenamiento
A continuación, implementamos un DAG de Airflow que orquesta la extracción de datos, el preprocesamiento y el entrenamiento en contenedores Docker aislados en un cluster Kubernetes.
from airflow import DAG
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
from airflow.utils.dates import days_ago
from datetime import timedelta
default_args = {
'owner': 'ml-platform',
'depends_on_past': False,
'email_on_failure': True,
'retries': 2,
'retry_delay': timedelta(minutes=5),
}
with DAG(
'mlops_continuous_training_v1',
default_args=default_args,
schedule_interval='@weekly',
start_date=days_ago(1),
catchup=False,
tags=['mlops', 'training'],
) as dag:
# 1. Tarea de Preprocesamiento en un Pod efímero
preprocess_data = KubernetesPodOperator(
task_id='preprocess_features',
name='preprocess-pod',
namespace='ml-workloads',
image='registry.forgenex.internal/ml/feature-prep:latest',
cmds=["python", "prep_pipeline.py"],
arguments=["--date", "{{ ds }}"],
env_vars={'S3_BUCKET': 'forgenex-data-lake'},
is_delete_operator_pod=True,
)
# 2. Tarea de Entrenamiento con integración nativa a MLflow
train_model = KubernetesPodOperator(
task_id='train_xgboost',
name='train-pod',
namespace='ml-workloads',
image='registry.forgenex.internal/ml/xgb-trainer:latest',
cmds=["python", "train.py"],
env_vars={
'MLFLOW_TRACKING_URI': 'http://mlflow-server.ml-ops.svc.cluster.local:5000',
'EXPERIMENT_NAME': 'customer_churn_xgboost'
},
get_logs=True,
is_delete_operator_pod=True,
)
preprocess_data >> train_modelGestión del Ciclo de Vida con MLflow
Mientras Airflow maneja las dependencias de ejecución, reintentos y alertas, MLflow gestiona el versionado, el seguimiento de métricas y el almacenamiento de artefactos binarios. Dentro del script train.py ejecutado en el paso anterior, instrumentamos el código para realizar un tracking automático e inferir la signature del modelo (su esquema de entrada/salida).
Autologging y Model Registry
El uso de mlflow.autolog() minimiza drásticamente el código boilerplate necesario. Al mismo tiempo, el Model Registry permite transicionar o promover modelos (por ejemplo, de Staging a Production) de forma automática y programática, cumpliendo con las políticas de CI/CD.
import mlflow
import xgboost as xgb
from mlflow.models.signature import infer_signature
from sklearn.metrics import roc_auc_score
def train_and_log(X_train, y_train, X_test, y_test):
# Habilitar autologging para la librería específica (XGBoost)
mlflow.xgboost.autolog(log_models=False) # Log manual para mayor control
with mlflow.start_run(run_name="weekly_ct_run"):
# Definición de hiperparámetros
params = {
"objective": "binary:logistic",
"max_depth": 5,
"eta": 0.1,
"eval_metric": "auc"
}
dtrain = xgb.DMatrix(X_train, label=y_train)
dtest = xgb.DMatrix(X_test, label=y_test)
# Entrenamiento del modelo
booster = xgb.train(
params=params,
dtrain=dtrain,
num_boost_round=100,
evals=[(dtest, "Test")]
)
# Validación y Logging de Métricas
predictions = booster.predict(dtest)
auc = roc_auc_score(y_test, predictions)
mlflow.log_metric("final_auc", auc)
# Inferir la firma (tipos de datos precisos del dataset)
signature = infer_signature(X_train, predictions)
# Registrar el artefacto en MLflow Registry
model_info = mlflow.xgboost.log_model(
xgb_model=booster,
artifact_path="churn_model",
signature=signature,
registered_model_name="CustomerChurnModel"
)
# Regla de Negocio: Promoción automática de acuerdo a calidad
if auc > 0.85:
client = mlflow.tracking.MlflowClient()
client.transition_model_version_stage(
name="CustomerChurnModel",
version=model_info.registered_model_version,
stage="Production",
archive_existing_versions=True
)
print(f"Modelo promovido a Producción (AUC: {auc})")Nota Importante: Al utilizar
infer_signature, se logra que las plataformas de model serving en tiempo real (como KServe, Seldon Core o MLflow Scoring) expongan validación estricta de payloads. Esto provoca el rechazo automático de requests HTTP que no cumplan con el esquema esperado, ayudando a detectar y mitigar incidencias de data drift silencioso en producción.
Conclusión y Próximos Pasos
Integrar Airflow y MLflow bajo el paradigma de orquestación desacoplada permite separar las responsabilidades operativas (el job routing, monitorización de infraestructura) de las puramente algorítmicas o científicas (métricas, hiperparámetros, linaje de datos). Mediante el uso de herramientas cloud-native para la ejecución y MLflow para la gobernanza integral, los equipos pueden desplegar Continuous Training Pipelines sin fricción y listos para escalar a los estándares más altos del enterprise B2B.
¿Demasiado complejo para tu equipo?
En ForgeNEX gestionamos este tipo de soluciones tecnológicas todos los días. Evita riesgos y delega la implementación en nuestros expertos.
- Respuesta en menos de 2 horas
- Auditamos tu caso sin compromiso
- Expertos certificados