이 글에서 다루는 것

모델 등록 실패 시 이전 안정 버전으로 자동 롤백하고 FastAPI에 즉시 반영하는 복구 루프와, 운영자가 명시적으로 버전을 지정하는 수동 롤백 DAG을 구축하는 과정을 다룹니다.

선수지식


이 단계에서 해결하려는 문제

모델 배포는 성공했을 때보다, 실패했을 때의 회복 속도가 운영 품질을 결정한다. 등록이 실패하는 즉시 이전 안정 버전으로 돌아가 FastAPI에 자동 반영되는 완전한 롤백 루프를 구축해야 한다. 이 구조가 있어야 장애 상황에서도 서비스가 끊기지 않고 바로 복구될 수 있다.


🎯 핵심 요약

  • 등록 실패 → 즉시 롤백: 시도 직전 before_version(alias가 가리키던 버전)으로 복귀
  • READY 검증 → alias 재할당 → FastAPI /reload 까지 자동 수행(실서비스 반영 보장)
  • 수동 롤백 전용 DAG 제공(운영자가 대상 버전 명시)
  • Slack/로그 추적(성공/실패/복구 구분), 실패는 re-raise로 원인 명확화

1️⃣ 플로우 한 장 요약

mermaid-03.png


2️⃣ 자동 롤백(등록 태스크 내부) — /reload까지 포함

서비스 가용성 최우선. 등록 실패 분기에서 즉시 롤백 + FastAPI 반영 후, 예외는 재전파해 원인 기록.

# dags/dag_ml_train_register_reload.py (발췌: register_model_task)
from mlflow.tracking import MlflowClient
from utils.slack_alerts import send_slack_alert
from ml_code.register_model import register_model
from ml_code.rollback_model import rollback_model
from ml_code.trigger_reload import trigger_reload

def register_model_task(ti, **_):
    run_id = ti.xcom_pull(task_ids="train_and_evaluate", key="run_id")
    model_name = ti.xcom_pull(task_ids="train_and_evaluate", key="model_name")
    alias = ti.xcom_pull(task_ids="train_and_evaluate", key="alias")
    prev_version = get_version_by_alias(model_name, alias)

    try:
        version = register_model(run_id, model_name, alias)
        ti.xcom_push(key="version", value=version)
        send_slack_alert(f"✅ 모델 등록 완료: {model_name} v{version} → @{alias}")
    except Exception as e:
        msg = f"❌ 모델 등록 실패: {e}"
        try:
            if prev_version:
                rollback_model(model_name, prev_version, alias)
                trigger_reload(alias)
                msg += f" → 롤백 완료 및 /reload 반영: v{prev_version}"
            else:
                msg += " → 롤백 생략(이전 버전 없음)"
        except Exception as rr_e:
            msg += f" → 롤백/반영 추가 실패: {rr_e}"
        finally:
            send_slack_alert(msg)
        raise  # DAG 실패로 남겨 원인 추적

3️⃣ 롤백 함수(명시형) — READY 강제 & 동일 버전 소음 제거

대상 버전이 READY인지 확인 후 alias를 재할당합니다.

# ml_code/rollback_model.py
import mlflow
from ml_code.config import get_mlflow_client
from mlflow.exceptions import MlflowException
from airflow.utils.log.logging_mixin import LoggingMixin

logger = LoggingMixin().log

def rollback_model(model_name: str, version: str, alias: str):
    try:
        if not model_name or not version or not alias:
            raise ValueError("model_name / version / alias 값이 비어있음")

        client = get_mlflow_client()

        current_version = client.get_model_version_by_alias(model_name, alias).version
        if str(current_version) == str(version):
            raise RuntimeError(f"[Rollback] 이미 @{alias} → v{version} 상태")

        # 대상 버전 READY 상태 확인
        model_info = client.get_model_version(name=model_name, version=version)
        if model_info.status != "READY":
            raise RuntimeError(f"[Rollback] 대상 버전 READY 아님 → 현재: {model_info.status}")

        # 기존 alias 삭제 후 대상 버전에 재할당
        client.delete_registered_model_alias(model_name, alias)
        client.set_registered_model_alias(model_name, alias, version)

        logger.info(f"[Rollback] 성공: @{alias} → v{version}")

    except MlflowException as e:
        raise RuntimeError(f"[Rollback Error] MLflow 예외 발생: {e}") from e
    except Exception as e:
        logger.error(f"[Rollback Error] {e}")
        raise

4️⃣ FastAPI 반영 트리거 — 계약(응답 규약) 일치

내부 설정 헬퍼를 사용해 URL·토큰을 불러오며, 응답은 status == "success"를 기대합니다. (타임아웃 10초)

# ml_code/trigger_reload.py
import requests
from ml_code.config import get_fastapi_reload_url, get_reload_token
from airflow.utils.log.logging_mixin import LoggingMixin

logger = LoggingMixin().log

def trigger_reload(variant="A"):
    base_url = get_fastapi_reload_url()
    token = get_reload_token()

    try:
        url = f"{base_url}/variant/{variant}/reload"
        res = requests.post(url, headers={"x-token": token}, timeout=10)

        if res.status_code != 200:
            raise Exception(f"FastAPI reload 실패: {res.status_code} {res.text}")

        json_resp = res.json()
        if json_resp.get("status") != "success":
            raise Exception(f"FastAPI reload 실패 (응답 문제): {json_resp}")

        logger.info(f"[Reload 성공] variant={variant}{json_resp}")
        return json_resp

    except Exception as e:
        raise RuntimeError(f"모델 reload 실패: {e}")

FastAPI /variant/{alias}/reload는 반드시 {"status":"success", ...} 형태로 응답해야 합니다.


5️⃣ 수동 롤백 DAG — 운영자 지정형(+/reload 포함)

# dags/dag_model_rollback.py (핵심부)
# 이후 rollback_manual.py로 대체·리팩토링 (→ airflow-dags-dev 레포 참조)
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.models import Variable

def dag_rollback():
    model_name = Variable.get("rollback_model_name")
    version = Variable.get("rollback_version")
    alias = Variable.get("rollback_alias")
    rollback_model(model_name=model_name, version=version, alias=alias)
    send_slack_alert(f"↩️ 수동 롤백 완료: {model_name} @{alias} → v{version}")

def dag_reload():
    alias = Variable.get("rollback_alias")
    resp = trigger_reload(alias)
    send_slack_alert(f"🔁 수동 롤백 반영(/reload): @{alias}{resp}")

with DAG("manual_rollback_model", schedule=None, catchup=False,
         tags=["mlops", "rollback"], on_failure_callback=alert_slack) as dag:
    rollback = PythonOperator(task_id="rollback_model_alias", python_callable=dag_rollback)
    reload = PythonOperator(task_id="reload_fastapi_model", python_callable=dag_reload)
    rollback >> reload

운영 변수 예시(현재 값)

  • dev: rollback_model_name=best_model, rollback_version=2, rollback_alias=B
  • prod: rollback_model_name=best_model, rollback_version=1, rollback_alias=B

Airflow Variable 값은 운영 상황에 맞춰 UI에서 수시로 변경해 사용 가능합니다.


6️⃣ 체크리스트

  • 등록 직전 before_version 확보 (함수 내부에서 즉시 변수화 권장)
  • rollback_model(model, version, alias) 호출 시 대상 버전 명시
  • 롤백 대상 READY 확인 (비READY 차단)
  • 실패 브랜치에서 trigger_reload() 즉시 호출 (실서비스 반영 보장)
  • Slack 메시지에 model / alias / version / run_id (가능 시) 포함

🧩 팁

  • 실패는 재전파하되, 서비스는 먼저 복구: “가용성 우선, 원인 추적은 로그/슬랙/실패표식”
  • 동일 버전 재지정 소음 억제: 현재 alias 버전과 동일한지 먼저 비교
  • 수동 롤백 DAG는 운영자 “즉시복원 스위치"로 생각하고, Variables를 PR 템플릿/런북에 기록

설계 판단 (Why This Way?)

등록 실패 시 가용성 우선으로 즉시 롤백·reload를 수행하고 원인 추적은 사후에 처리하며, 롤백 성공 후에도 예외를 re-raise하여 DAG 실패 기록과 on_failure_callback을 통한 근본 원인 추적을 보장합니다.


다음에 읽을 글

MLOps 운영 고도화 4단계: FastAPI 로그 안정화 — 로그 유실 방지와 NFS 안정성