ForgeNEX Logo

Arquitectura MLOps: Pipeline de Entrenamiento Continuo con Apache Airflow y MLflow

Diseño e implementación de un pipeline MLOps de grado enterprise. Orquestación distribuida con Apache Airflow y versionado de modelos con MLflow.

Equipo de Ingeniería ForgeNEX

Consultor Senior IT

Actualizado: 17 Jun, 2026
5 min de lectura
Arquitectura MLOps: Pipeline de Entrenamiento Continuo con Apache Airflow y MLflow

Lo que aprenderás en esta guía

Este es un artículo técnico y profundo redactado por los ingenieros de ForgeNEX. Está diseñado para profesionales que buscan implementar soluciones sólidas y evitar los errores comunes que cuestan horas de producción.

La transición de modelos de Machine Learning desde entornos experimentales (como Jupyter Notebooks) hacia producción requiere un ecosistema robusto de Continuous Training (CT). En arquitecturas empresariales, el acoplamiento de Apache Airflow como orquestador y MLflow como System of Record para modelos, ofrece un stack estándar, escalable y agnóstico a la infraestructura subyacente.

En este artículo, desglosamos un patrón de arquitectura avanzado donde Airflow delega la computación pesada a clusters efímeros (usando Kubernetes) y MLflow consolida la observabilidad del ciclo de vida del modelo.

Arquitectura Desacoplada: Orquestación vs. Ejecución

El antipatrón más común en MLOps es utilizar los workers de Airflow (ej. Celery) para ejecutar entrenamiento de modelos. Esto genera saturación de memoria, Out-Of-Memory (OOM) kills y contención de recursos.

Nota Importante: Airflow debe actuar estrictamente como el "controlador de tráfico". El cómputo del modelo debe delegarse a infraestructuras elásticas mediante operadores como KubernetesPodOperator o tareas contenerizadas gestionadas remotamente.

Diseño del DAG de Entrenamiento

A continuación, implementamos un DAG de Airflow que orquesta la extracción de datos, el preprocesamiento y el entrenamiento en contenedores Docker aislados en un cluster Kubernetes.

from airflow import DAG
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
from airflow.utils.dates import days_ago
from datetime import timedelta

default_args = {
    'owner': 'ml-platform',
    'depends_on_past': False,
    'email_on_failure': True,
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
}

with DAG(
    'mlops_continuous_training_v1',
    default_args=default_args,
    schedule_interval='@weekly',
    start_date=days_ago(1),
    catchup=False,
    tags=['mlops', 'training'],
) as dag:

    # 1. Tarea de Preprocesamiento en un Pod efímero
    preprocess_data = KubernetesPodOperator(
        task_id='preprocess_features',
        name='preprocess-pod',
        namespace='ml-workloads',
        image='registry.forgenex.internal/ml/feature-prep:latest',
        cmds=["python", "prep_pipeline.py"],
        arguments=["--date", "{{ ds }}"],
        env_vars={'S3_BUCKET': 'forgenex-data-lake'},
        is_delete_operator_pod=True,
    )

    # 2. Tarea de Entrenamiento con integración nativa a MLflow
    train_model = KubernetesPodOperator(
        task_id='train_xgboost',
        name='train-pod',
        namespace='ml-workloads',
        image='registry.forgenex.internal/ml/xgb-trainer:latest',
        cmds=["python", "train.py"],
        env_vars={
            'MLFLOW_TRACKING_URI': 'http://mlflow-server.ml-ops.svc.cluster.local:5000',
            'EXPERIMENT_NAME': 'customer_churn_xgboost'
        },
        get_logs=True,
        is_delete_operator_pod=True,
    )

    preprocess_data >> train_model

Gestión del Ciclo de Vida con MLflow

Mientras Airflow maneja las dependencias de ejecución, reintentos y alertas, MLflow gestiona el versionado, el seguimiento de métricas y el almacenamiento de artefactos binarios. Dentro del script train.py ejecutado en el paso anterior, instrumentamos el código para realizar un tracking automático e inferir la signature del modelo (su esquema de entrada/salida).

Autologging y Model Registry

El uso de mlflow.autolog() minimiza drásticamente el código boilerplate necesario. Al mismo tiempo, el Model Registry permite transicionar o promover modelos (por ejemplo, de Staging a Production) de forma automática y programática, cumpliendo con las políticas de CI/CD.

import mlflow
import xgboost as xgb
from mlflow.models.signature import infer_signature
from sklearn.metrics import roc_auc_score

def train_and_log(X_train, y_train, X_test, y_test):
    # Habilitar autologging para la librería específica (XGBoost)
    mlflow.xgboost.autolog(log_models=False) # Log manual para mayor control

    with mlflow.start_run(run_name="weekly_ct_run"):
        # Definición de hiperparámetros
        params = {
            "objective": "binary:logistic",
            "max_depth": 5,
            "eta": 0.1,
            "eval_metric": "auc"
        }

        dtrain = xgb.DMatrix(X_train, label=y_train)
        dtest = xgb.DMatrix(X_test, label=y_test)

        # Entrenamiento del modelo
        booster = xgb.train(
            params=params,
            dtrain=dtrain,
            num_boost_round=100,
            evals=[(dtest, "Test")]
        )

        # Validación y Logging de Métricas
        predictions = booster.predict(dtest)
        auc = roc_auc_score(y_test, predictions)
        mlflow.log_metric("final_auc", auc)

        # Inferir la firma (tipos de datos precisos del dataset)
        signature = infer_signature(X_train, predictions)

        # Registrar el artefacto en MLflow Registry
        model_info = mlflow.xgboost.log_model(
            xgb_model=booster,
            artifact_path="churn_model",
            signature=signature,
            registered_model_name="CustomerChurnModel"
        )

        # Regla de Negocio: Promoción automática de acuerdo a calidad
        if auc > 0.85:
            client = mlflow.tracking.MlflowClient()
            client.transition_model_version_stage(
                name="CustomerChurnModel",
                version=model_info.registered_model_version,
                stage="Production",
                archive_existing_versions=True
            )
            print(f"Modelo promovido a Producción (AUC: {auc})")

Nota Importante: Al utilizar infer_signature, se logra que las plataformas de model serving en tiempo real (como KServe, Seldon Core o MLflow Scoring) expongan validación estricta de payloads. Esto provoca el rechazo automático de requests HTTP que no cumplan con el esquema esperado, ayudando a detectar y mitigar incidencias de data drift silencioso en producción.

Conclusión y Próximos Pasos

Integrar Airflow y MLflow bajo el paradigma de orquestación desacoplada permite separar las responsabilidades operativas (el job routing, monitorización de infraestructura) de las puramente algorítmicas o científicas (métricas, hiperparámetros, linaje de datos). Mediante el uso de herramientas cloud-native para la ejecución y MLflow para la gobernanza integral, los equipos pueden desplegar Continuous Training Pipelines sin fricción y listos para escalar a los estándares más altos del enterprise B2B.

¿Demasiado complejo para tu equipo?

En ForgeNEX gestionamos este tipo de soluciones tecnológicas todos los días. Evita riesgos y delega la implementación en nuestros expertos.

  • Respuesta en menos de 2 horas
  • Auditamos tu caso sin compromiso
  • Expertos certificados