이 글에서 다루는 것
Airflow DAG에서 조건부 모델 등록 후 FastAPI 핫스왑까지 E2E 자동화 흐름을 실험합니다.
선수지식
- MLOps 플랫폼 구축 5단계: FastAPI 서빙 및 핫스왑 구조 구축 — FastAPI 모델 로딩과
/reloadAPI 구조
이 단계에서 해결하려는 문제
개별 컴포넌트(Airflow, MLflow, FastAPI)를 각각 구성했지만, 학습부터 서빙까지의 자동화 흐름을 E2E로 검증해야 실제 운영 가능 여부를 판단할 수 있습니다. 이 단계에서는 Airflow DAG에서 모델 학습 → 성능 기준 분기 → MLflow 등록 → FastAPI 핫스왑까지 전체 파이프라인을 실험합니다.
🧠 구조 다이어그램

🧩 핵심 구성 요소
1. Airflow DAG - 조건부 모델 등록 (핵심 로직)
def run_and_check():
# Variable로 파라미터 동적 제어
try:
C = float(Variable.get("logreg_C", default_var=1.0))
max_iter = int(Variable.get("logreg_max_iter", default_var=200))
except Exception as e:
C, max_iter = 1.0, 200
# 정확도 기준으로 성공/실패 분기
acc = train_model(C=C, max_iter=max_iter)
if acc > 0.9:
return 'notify_success'
else:
return 'notify_failure'
with DAG(
dag_id="mlflow_experiment_conditional_register_runner_fastapi",
schedule=None,
catchup=False,
) as dag:
branch_task = BranchPythonOperator(
task_id='branch_by_accuracy',
python_callable=run_and_check
)
branch_task >> [success_notify_task, failure_notify_task]
2. 모델 학습 및 등록 (핵심 로직)
def train_model(C, max_iter):
X, y = load_iris(return_X_y=True)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)
with mlflow.start_run() as run:
clf = LogisticRegression(C=C, max_iter=max_iter)
clf.fit(X_train, y_train)
acc = accuracy_score(y_test, clf.predict(X_test))
mlflow.log_param("C", C)
mlflow.log_metric("accuracy", acc)
mlflow.sklearn.log_model(clf, "model")
if acc > 0.9:
# 모델 등록 → Staging 승격 → FastAPI 핫스왑
result = mlflow.register_model(
model_uri=f"runs:/{run.info.run_id}/model",
name="best_model"
)
time.sleep(3) # DB 업데이트 대기
client = MlflowClient()
client.transition_model_version_stage(
name="best_model",
version=result.version,
stage="Staging",
archive_existing_versions=False
)
# FastAPI 핫스왑 요청
requests.post("http://fastapi.local/reload")
return acc
FastAPI 코드는 5단계에서 구성한 것과 동일합니다. /reload 엔드포인트가 load_model_from_mlflow()를 호출하여 Staging 모델을 다시 로딩합니다.
🧪 E2E 검증
핫스왑 전후 모델 버전 비교
# 핫스왑 전: 기존 모델 확인
curl http://fastapi.local/model-info | jq
{
"model_name": "best_model",
"stage": "Staging",
"version": "1",
"run_id": "8bd09505eabf40648337e811110ab22c",
"model_uri": "models:/best_model/Staging"
}
# DAG 실행 → 모델 학습 → 등록 → 핫스왑 자동 수행
# 핫스왑 후: 새 모델 확인
curl http://fastapi.local/model-info | jq
{
"model_name": "best_model",
"stage": "Staging",
"version": "2",
"run_id": "72f388927f5749c185b828a1a16bb063",
"model_uri": "models:/best_model/Staging"
}
검증 항목:
logreg_C,logreg_max_iter파라미터로 학습 DAG 설정 가능- 모델 등록 시
Staging으로 자동 승격 - FastAPI
/reload호출 시 핫스왑 적용 확인 /model-info,/predict에서 새 모델 정보 및 예측 결과 확인
설계 판단 (Why This Way?)
BranchPythonOperator로 정확도 기반 성공/실패 경로를 명시적으로 분기하고, HTTP reload 방식으로 Pod 재시작 없이 모델 핫스왑을 구현했습니다. 임계값 하드코딩, 전역 변수 교체, 롤백 부재 등의 한계는 Level 3에서 GitOps 기반 운영 고도화와 Triton 서빙으로 개선합니다.
다음에 읽을 글
→ TS: Airflow 기초 자동화 트러블슈팅 — Airflow + MLflow 연동 시 자주 발생하는 에러 정리