이 글에서 다루는 것
/reload엔드포인트에 3중 보안(토큰·IP 화이트리스트·TLS)을 적용하고, Airflow DAG으로 학습→등록→감시→핫스왑 전체 시퀀스를 자동화하는 과정을 다룹니다.
선수지식
- MLOps 운영 고도화 0단계: FastAPI A/B·Canary·Blue-Green 서빙 베이스 — 멀티 모델 라우팅 기반 설계
이 단계에서 해결하려는 문제
모델을 잘 학습시키는 것도 중요하지만, 잘못된 모델이 실수로 핫스왑되는 순간이 더 치명적이다. /reload를 확실하게 잠그고, 학습→등록→감시→핫스왑까지가 Airflow에서 자동으로 흘러가도록 만들어야 한다. 이 단계가 완성돼야 이후 단계에서 안전한 자동 운영(MLOps)을 구현할 수 있다.
🎯 핵심 요약
- 보안 3종:
x-token(SealedSecret) + Ingress IP whitelist + TLS - 자동화: Airflow Train → Register → Sensor(READY) → Reload 전체 시퀀스
- 일관성: MLflow Alias(@A/@B) 기반 등록·조회·핫스왑
- 관제: 모든 단계 성공/스킵/실패 Slack 알림
환경 분리: dev(.local) / prod(.prod) — 예: fastapi.local, fastapi.prod
1️⃣ 아키텍처 플로우

2️⃣ FastAPI /reload 보안 라우트
# charts/fastapi/app/routes/reload.py
import secrets
from fastapi import APIRouter, HTTPException, Header, Request
from core.config import settings
from services.model_loader import load_model_by_alias
from utils.slack_alerts import send_slack_alert
router = APIRouter()
@router.post("/variant/{alias}/reload")
def reload_model(request: Request, alias: str, x_token: str = Header(...)):
expected = settings.reload_secret_token
if not expected:
raise HTTPException(status_code=500, detail="서버 설정 오류: 인증 토큰 미설정")
if not secrets.compare_digest(x_token, expected):
raise HTTPException(status_code=403, detail="Access denied")
loaded = load_model_by_alias(alias)
if not loaded:
raise HTTPException(status_code=500, detail="모델 로딩 실패")
request.app.state.models[alias] = loaded
info = loaded["info"]
send_slack_alert(
f"🔁 [FastAPI] 모델 {alias} 핫스왑 완료: v{info['version']}, run_id={info['run_id']}"
)
return {"status": "success", "variant": alias, "version": info["version"], "run_id": info["run_id"]}
3️⃣ Airflow DAG (Train → Register → Sensor → Reload)
이 코드는 초기 단일 DAG 버전입니다. 이후
e2e_full.py로 통합·리팩토링되었습니다.
# dags/dag_ml_train_register_reload.py (핵심부)
from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.sensors.python import PythonSensor
from airflow.utils.trigger_rule import TriggerRule
# ... (import 및 헬퍼 함수 생략)
def train_and_evaluate(ti, **_):
C = get_param("logreg_C", 1.0, float, lambda x: 0.001 <= x <= 10.0)
max_iter = get_param("logreg_max_iter", 200, int, lambda x: x > 50)
threshold = get_param("accuracy_threshold", 0.9, float, lambda x: 0.5 <= x <= 0.99)
acc, run_id = train_model(C=C, max_iter=max_iter)
for k, v in {"run_id": run_id, "model_name": model_name, "alias": alias, "acc": acc, "threshold": threshold}.items():
ti.xcom_push(key=k, value=v)
def register_model_task(ti, **_):
prev = get_version_by_alias(model_name, alias)
try:
version = register_model(run_id, model_name, alias)
ti.xcom_push(key="version", value=version)
except Exception as e:
if prev:
rollback_model(model_name, prev, alias)
raise
with DAG("ml_train_register_and_reload", schedule=None, catchup=False) as dag:
train = PythonOperator(task_id="train_and_evaluate", python_callable=train_and_evaluate)
branch = BranchPythonOperator(task_id="check_result", python_callable=check_result)
register = PythonOperator(task_id="register_model", python_callable=register_model_task)
sensor = PythonSensor(task_id="check_model_ready", python_callable=sensor_ready_func,
poke_interval=10, timeout=180, mode="reschedule")
reload = PythonOperator(task_id="trigger_reload", python_callable=trigger_reload_task,
trigger_rule=TriggerRule.ALL_SUCCESS)
failure = PythonOperator(task_id="notify_failure", python_callable=notify_failure)
train >> branch >> [register, failure]
register >> sensor >> reload
4️⃣ Airflow → FastAPI 호출 (Ingress 경유)
# ml_code/trigger_reload.py
# 최종 플랫폼에서는 mlops_lib/fastapi/reload_client.py로 리팩토링
import os, requests
from airflow.models import Variable
def trigger_reload(alias: str):
base = (
os.getenv("FASTAPI_RELOAD_URL")
or os.getenv("FASTAPI_BASE_URL")
or Variable.get("FASTAPI_RELOAD_URL", default_var=None)
or Variable.get("FASTAPI_BASE_URL")
)
token = os.getenv("RELOAD_SECRET_TOKEN") or Variable.get("RELOAD_SECRET_TOKEN")
url = f"{base}/variant/{alias}/reload"
verify = not url.startswith("https://") or os.getenv("ALLOW_SELF_SIGNED") != "true"
r = requests.post(url, headers={"x-token": token}, timeout=10, verify=verify)
r.raise_for_status()
data = r.json()
if data.get("status") != "success":
raise RuntimeError(f"Reload failed: {data}")
return {"alias": alias, "version": data.get("version"), "run_id": data.get("run_id")}
5️⃣ Helm/값 요약 (dev/prod)
▶ FastAPI (env)
MLFLOW_TRACKING_URI:- dev:
http://mlflow-dev-service.mlflow-dev.svc.cluster.local:5000 - prod:
http://mlflow-prod-service.mlflow-prod.svc.cluster.local:5000
- dev:
MODEL_NAME=best_modelALIAS_SELECTION_MODE/DEFAULT_ALIAS/CANARY_PERCENTenvFrom:aws-credentials-secret,fastapi-token-*-secret,slack-webhook-*-secret
▶ Airflow (env/variables)
FASTAPI_RELOAD_URL- dev:
https://fastapi.local - prod:
https://fastapi.prod
- dev:
AIRFLOW__WEBSERVER__WEB_SERVER_BASE_URL- dev:
http://airflow.local - prod:
http://airflow.prod
- dev:
MLFLOW_TRACKING_URI- dev:
http://mlflow-dev-service.mlflow-dev.svc.cluster.local:5000 - prod:
http://mlflow-prod-service.mlflow-prod.svc.cluster.local:5000
- dev:
Airflow Variables:
model_name,mlflow_alias,logreg_C,logreg_max_iter,accuracy_threshold
6️⃣ Slack 알림 표준
| 구분 | 발생 시점 | 메시지 필드(예시) |
|---|---|---|
| 학습 기준 미달 | notify_failure | acc, threshold |
| 등록 성공/실패 | register_model_task | model, version, @alias, run_id, (실패 시 롤백) |
| 센서 | check_model_ready | status (READY/FAILED_REGISTRATION) |
| 핫스왑 | trigger_reload & FastAPI | alias, version, run_id |
| 전역 실패 | on_failure_callback | dag_id, task_id, log_url |
7️⃣ 검증 명령
# Ingress(화이트리스트/TLS/host)
kubectl -n fastapi-dev get ing -o yaml | egrep 'ingressClassName|whitelist-source-range|host:'
# Airflow → FastAPI 호출 환경
NS=airflow-dev
POD=$(kubectl -n $NS get po -l component=scheduler -o name | head -1)
kubectl -n $NS exec $POD -- printenv | egrep 'FASTAPI_RELOAD_URL|RELOAD_SECRET_TOKEN|AIRFLOW__WEBSERVER__WEB_SERVER_BASE_URL'
# 핫스왑 호출
curl -sk -X POST https://fastapi.local/variant/B/reload -H "x-token: <RELOAD_SECRET_TOKEN>"
🧩 팁
- Secret 이름 일치성:
envFrom.secretRef.name↔ 실제 Secretmetadata.name - A/B Test 모드에서는 SHA256 해시 기반으로 90/10 분할
- 기동 실패 시 즉시 종료(sys.exit) → Health 노이즈 차단
설계 판단 (Why This Way?)
/reload 엔드포인트에 IP whitelist + TLS + token 3중 보안을 적용하여 모델 핫스왑의 위험성에 대응하고, Airflow에서 FastAPI를 동기 호출하여 즉시 성공/실패를 확인하고 파이프라인 분기를 명확히 제어합니다.
다음에 읽을 글
→ MLOps 운영 고도화 2단계: Slack Alert 통합 — 파이프라인 알림 자동화