이 글에서 다루는 것
모델 등록 실패 시 이전 안정 버전으로 자동 롤백하고 FastAPI에 즉시 반영하는 복구 루프와, 운영자가 명시적으로 버전을 지정하는 수동 롤백 DAG을 구축하는 과정을 다룹니다.
선수지식
- MLOps 운영 고도화 2단계: Slack Alert 통합 — 파이프라인 알림 자동화
이 단계에서 해결하려는 문제
모델 배포는 성공했을 때보다, 실패했을 때의 회복 속도가 운영 품질을 결정한다. 등록이 실패하는 즉시 이전 안정 버전으로 돌아가 FastAPI에 자동 반영되는 완전한 롤백 루프를 구축해야 한다. 이 구조가 있어야 장애 상황에서도 서비스가 끊기지 않고 바로 복구될 수 있다.
🎯 핵심 요약
- 등록 실패 → 즉시 롤백: 시도 직전
before_version(alias가 가리키던 버전)으로 복귀 - READY 검증 → alias 재할당 → FastAPI
/reload까지 자동 수행(실서비스 반영 보장) - 수동 롤백 전용 DAG 제공(운영자가 대상 버전 명시)
- Slack/로그 추적(성공/실패/복구 구분), 실패는 re-raise로 원인 명확화
1️⃣ 플로우 한 장 요약

2️⃣ 자동 롤백(등록 태스크 내부) — /reload까지 포함
서비스 가용성 최우선. 등록 실패 분기에서 즉시 롤백 + FastAPI 반영 후, 예외는 재전파해 원인 기록.
# dags/dag_ml_train_register_reload.py (발췌: register_model_task)
from mlflow.tracking import MlflowClient
from utils.slack_alerts import send_slack_alert
from ml_code.register_model import register_model
from ml_code.rollback_model import rollback_model
from ml_code.trigger_reload import trigger_reload
def register_model_task(ti, **_):
run_id = ti.xcom_pull(task_ids="train_and_evaluate", key="run_id")
model_name = ti.xcom_pull(task_ids="train_and_evaluate", key="model_name")
alias = ti.xcom_pull(task_ids="train_and_evaluate", key="alias")
prev_version = get_version_by_alias(model_name, alias)
try:
version = register_model(run_id, model_name, alias)
ti.xcom_push(key="version", value=version)
send_slack_alert(f"✅ 모델 등록 완료: {model_name} v{version} → @{alias}")
except Exception as e:
msg = f"❌ 모델 등록 실패: {e}"
try:
if prev_version:
rollback_model(model_name, prev_version, alias)
trigger_reload(alias)
msg += f" → 롤백 완료 및 /reload 반영: v{prev_version}"
else:
msg += " → 롤백 생략(이전 버전 없음)"
except Exception as rr_e:
msg += f" → 롤백/반영 추가 실패: {rr_e}"
finally:
send_slack_alert(msg)
raise # DAG 실패로 남겨 원인 추적
3️⃣ 롤백 함수(명시형) — READY 강제 & 동일 버전 소음 제거
대상 버전이 READY인지 확인 후 alias를 재할당합니다.
# ml_code/rollback_model.py
import mlflow
from ml_code.config import get_mlflow_client
from mlflow.exceptions import MlflowException
from airflow.utils.log.logging_mixin import LoggingMixin
logger = LoggingMixin().log
def rollback_model(model_name: str, version: str, alias: str):
try:
if not model_name or not version or not alias:
raise ValueError("model_name / version / alias 값이 비어있음")
client = get_mlflow_client()
current_version = client.get_model_version_by_alias(model_name, alias).version
if str(current_version) == str(version):
raise RuntimeError(f"[Rollback] 이미 @{alias} → v{version} 상태")
# 대상 버전 READY 상태 확인
model_info = client.get_model_version(name=model_name, version=version)
if model_info.status != "READY":
raise RuntimeError(f"[Rollback] 대상 버전 READY 아님 → 현재: {model_info.status}")
# 기존 alias 삭제 후 대상 버전에 재할당
client.delete_registered_model_alias(model_name, alias)
client.set_registered_model_alias(model_name, alias, version)
logger.info(f"[Rollback] 성공: @{alias} → v{version}")
except MlflowException as e:
raise RuntimeError(f"[Rollback Error] MLflow 예외 발생: {e}") from e
except Exception as e:
logger.error(f"[Rollback Error] {e}")
raise
4️⃣ FastAPI 반영 트리거 — 계약(응답 규약) 일치
내부 설정 헬퍼를 사용해 URL·토큰을 불러오며, 응답은
status == "success"를 기대합니다. (타임아웃 10초)
# ml_code/trigger_reload.py
import requests
from ml_code.config import get_fastapi_reload_url, get_reload_token
from airflow.utils.log.logging_mixin import LoggingMixin
logger = LoggingMixin().log
def trigger_reload(variant="A"):
base_url = get_fastapi_reload_url()
token = get_reload_token()
try:
url = f"{base_url}/variant/{variant}/reload"
res = requests.post(url, headers={"x-token": token}, timeout=10)
if res.status_code != 200:
raise Exception(f"FastAPI reload 실패: {res.status_code} {res.text}")
json_resp = res.json()
if json_resp.get("status") != "success":
raise Exception(f"FastAPI reload 실패 (응답 문제): {json_resp}")
logger.info(f"[Reload 성공] variant={variant} → {json_resp}")
return json_resp
except Exception as e:
raise RuntimeError(f"모델 reload 실패: {e}")
FastAPI
/variant/{alias}/reload는 반드시{"status":"success", ...}형태로 응답해야 합니다.
5️⃣ 수동 롤백 DAG — 운영자 지정형(+/reload 포함)
# dags/dag_model_rollback.py (핵심부)
# 이후 rollback_manual.py로 대체·리팩토링 (→ airflow-dags-dev 레포 참조)
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.models import Variable
def dag_rollback():
model_name = Variable.get("rollback_model_name")
version = Variable.get("rollback_version")
alias = Variable.get("rollback_alias")
rollback_model(model_name=model_name, version=version, alias=alias)
send_slack_alert(f"↩️ 수동 롤백 완료: {model_name} @{alias} → v{version}")
def dag_reload():
alias = Variable.get("rollback_alias")
resp = trigger_reload(alias)
send_slack_alert(f"🔁 수동 롤백 반영(/reload): @{alias} → {resp}")
with DAG("manual_rollback_model", schedule=None, catchup=False,
tags=["mlops", "rollback"], on_failure_callback=alert_slack) as dag:
rollback = PythonOperator(task_id="rollback_model_alias", python_callable=dag_rollback)
reload = PythonOperator(task_id="reload_fastapi_model", python_callable=dag_reload)
rollback >> reload
운영 변수 예시(현재 값)
- dev:
rollback_model_name=best_model,rollback_version=2,rollback_alias=B - prod:
rollback_model_name=best_model,rollback_version=1,rollback_alias=B
Airflow Variable 값은 운영 상황에 맞춰 UI에서 수시로 변경해 사용 가능합니다.
6️⃣ 체크리스트
- 등록 직전
before_version확보 (함수 내부에서 즉시 변수화 권장) -
rollback_model(model, version, alias)호출 시 대상 버전 명시 - 롤백 대상 READY 확인 (비READY 차단)
- 실패 브랜치에서
trigger_reload()즉시 호출 (실서비스 반영 보장) - Slack 메시지에 model / alias / version / run_id (가능 시) 포함
🧩 팁
- 실패는 재전파하되, 서비스는 먼저 복구: “가용성 우선, 원인 추적은 로그/슬랙/실패표식”
- 동일 버전 재지정 소음 억제: 현재 alias 버전과 동일한지 먼저 비교
- 수동 롤백 DAG는 운영자 “즉시복원 스위치"로 생각하고, Variables를 PR 템플릿/런북에 기록
설계 판단 (Why This Way?)
등록 실패 시 가용성 우선으로 즉시 롤백·reload를 수행하고 원인 추적은 사후에 처리하며, 롤백 성공 후에도 예외를 re-raise하여 DAG 실패 기록과 on_failure_callback을 통한 근본 원인 추적을 보장합니다.
다음에 읽을 글
→ MLOps 운영 고도화 4단계: FastAPI 로그 안정화 — 로그 유실 방지와 NFS 안정성