이 글에서 다루는 것

/reload 엔드포인트에 3중 보안(토큰·IP 화이트리스트·TLS)을 적용하고, Airflow DAG으로 학습→등록→감시→핫스왑 전체 시퀀스를 자동화하는 과정을 다룹니다.

선수지식


이 단계에서 해결하려는 문제

모델을 잘 학습시키는 것도 중요하지만, 잘못된 모델이 실수로 핫스왑되는 순간이 더 치명적이다. /reload를 확실하게 잠그고, 학습→등록→감시→핫스왑까지가 Airflow에서 자동으로 흘러가도록 만들어야 한다. 이 단계가 완성돼야 이후 단계에서 안전한 자동 운영(MLOps)을 구현할 수 있다.


🎯 핵심 요약

  • 보안 3종: x-token(SealedSecret) + Ingress IP whitelist + TLS
  • 자동화: Airflow Train → Register → Sensor(READY) → Reload 전체 시퀀스
  • 일관성: MLflow Alias(@A/@B) 기반 등록·조회·핫스왑
  • 관제: 모든 단계 성공/스킵/실패 Slack 알림

환경 분리: dev(.local) / prod(.prod) — 예: fastapi.local, fastapi.prod


1️⃣ 아키텍처 플로우

mermaid-01.png


2️⃣ FastAPI /reload 보안 라우트

# charts/fastapi/app/routes/reload.py
import secrets
from fastapi import APIRouter, HTTPException, Header, Request
from core.config import settings
from services.model_loader import load_model_by_alias
from utils.slack_alerts import send_slack_alert

router = APIRouter()

@router.post("/variant/{alias}/reload")
def reload_model(request: Request, alias: str, x_token: str = Header(...)):
    expected = settings.reload_secret_token
    if not expected:
        raise HTTPException(status_code=500, detail="서버 설정 오류: 인증 토큰 미설정")

    if not secrets.compare_digest(x_token, expected):
        raise HTTPException(status_code=403, detail="Access denied")

    loaded = load_model_by_alias(alias)
    if not loaded:
        raise HTTPException(status_code=500, detail="모델 로딩 실패")

    request.app.state.models[alias] = loaded
    info = loaded["info"]
    send_slack_alert(
        f"🔁 [FastAPI] 모델 {alias} 핫스왑 완료: v{info['version']}, run_id={info['run_id']}"
    )
    return {"status": "success", "variant": alias, "version": info["version"], "run_id": info["run_id"]}

3️⃣ Airflow DAG (Train → Register → Sensor → Reload)

이 코드는 초기 단일 DAG 버전입니다. 이후 e2e_full.py로 통합·리팩토링되었습니다.

# dags/dag_ml_train_register_reload.py (핵심부)
from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.sensors.python import PythonSensor
from airflow.utils.trigger_rule import TriggerRule

# ... (import 및 헬퍼 함수 생략)

def train_and_evaluate(ti, **_):
    C = get_param("logreg_C", 1.0, float, lambda x: 0.001 <= x <= 10.0)
    max_iter = get_param("logreg_max_iter", 200, int, lambda x: x > 50)
    threshold = get_param("accuracy_threshold", 0.9, float, lambda x: 0.5 <= x <= 0.99)
    acc, run_id = train_model(C=C, max_iter=max_iter)
    for k, v in {"run_id": run_id, "model_name": model_name, "alias": alias, "acc": acc, "threshold": threshold}.items():
        ti.xcom_push(key=k, value=v)

def register_model_task(ti, **_):
    prev = get_version_by_alias(model_name, alias)
    try:
        version = register_model(run_id, model_name, alias)
        ti.xcom_push(key="version", value=version)
    except Exception as e:
        if prev:
            rollback_model(model_name, prev, alias)
        raise

with DAG("ml_train_register_and_reload", schedule=None, catchup=False) as dag:
    train = PythonOperator(task_id="train_and_evaluate", python_callable=train_and_evaluate)
    branch = BranchPythonOperator(task_id="check_result", python_callable=check_result)
    register = PythonOperator(task_id="register_model", python_callable=register_model_task)
    sensor = PythonSensor(task_id="check_model_ready", python_callable=sensor_ready_func,
                          poke_interval=10, timeout=180, mode="reschedule")
    reload = PythonOperator(task_id="trigger_reload", python_callable=trigger_reload_task,
                            trigger_rule=TriggerRule.ALL_SUCCESS)
    failure = PythonOperator(task_id="notify_failure", python_callable=notify_failure)

    train >> branch >> [register, failure]
    register >> sensor >> reload

4️⃣ Airflow → FastAPI 호출 (Ingress 경유)

# ml_code/trigger_reload.py
# 최종 플랫폼에서는 mlops_lib/fastapi/reload_client.py로 리팩토링
import os, requests
from airflow.models import Variable

def trigger_reload(alias: str):
    base = (
        os.getenv("FASTAPI_RELOAD_URL")
        or os.getenv("FASTAPI_BASE_URL")
        or Variable.get("FASTAPI_RELOAD_URL", default_var=None)
        or Variable.get("FASTAPI_BASE_URL")
    )
    token = os.getenv("RELOAD_SECRET_TOKEN") or Variable.get("RELOAD_SECRET_TOKEN")
    url = f"{base}/variant/{alias}/reload"

    verify = not url.startswith("https://") or os.getenv("ALLOW_SELF_SIGNED") != "true"

    r = requests.post(url, headers={"x-token": token}, timeout=10, verify=verify)
    r.raise_for_status()
    data = r.json()
    if data.get("status") != "success":
        raise RuntimeError(f"Reload failed: {data}")
    return {"alias": alias, "version": data.get("version"), "run_id": data.get("run_id")}

5️⃣ Helm/값 요약 (dev/prod)

▶ FastAPI (env)

  • MLFLOW_TRACKING_URI:
    • dev: http://mlflow-dev-service.mlflow-dev.svc.cluster.local:5000
    • prod: http://mlflow-prod-service.mlflow-prod.svc.cluster.local:5000
  • MODEL_NAME=best_model
  • ALIAS_SELECTION_MODE / DEFAULT_ALIAS / CANARY_PERCENT
  • envFrom: aws-credentials-secret, fastapi-token-*-secret, slack-webhook-*-secret

▶ Airflow (env/variables)

  • FASTAPI_RELOAD_URL

    • dev: https://fastapi.local
    • prod: https://fastapi.prod
  • AIRFLOW__WEBSERVER__WEB_SERVER_BASE_URL

    • dev: http://airflow.local
    • prod: http://airflow.prod
  • MLFLOW_TRACKING_URI

    • dev: http://mlflow-dev-service.mlflow-dev.svc.cluster.local:5000
    • prod: http://mlflow-prod-service.mlflow-prod.svc.cluster.local:5000
  • Airflow Variables:

    model_name, mlflow_alias, logreg_C, logreg_max_iter, accuracy_threshold


6️⃣ Slack 알림 표준

구분발생 시점메시지 필드(예시)
학습 기준 미달notify_failureacc, threshold
등록 성공/실패register_model_taskmodel, version, @alias, run_id, (실패 시 롤백)
센서check_model_readystatus (READY/FAILED_REGISTRATION)
핫스왑trigger_reload & FastAPIalias, version, run_id
전역 실패on_failure_callbackdag_id, task_id, log_url

7️⃣ 검증 명령

# Ingress(화이트리스트/TLS/host)
kubectl -n fastapi-dev get ing -o yaml | egrep 'ingressClassName|whitelist-source-range|host:'

# Airflow → FastAPI 호출 환경
NS=airflow-dev
POD=$(kubectl -n $NS get po -l component=scheduler -o name | head -1)
kubectl -n $NS exec $POD -- printenv | egrep 'FASTAPI_RELOAD_URL|RELOAD_SECRET_TOKEN|AIRFLOW__WEBSERVER__WEB_SERVER_BASE_URL'

# 핫스왑 호출
curl -sk -X POST https://fastapi.local/variant/B/reload -H "x-token: <RELOAD_SECRET_TOKEN>"

🧩 팁

  • Secret 이름 일치성: envFrom.secretRef.name ↔ 실제 Secret metadata.name
  • A/B Test 모드에서는 SHA256 해시 기반으로 90/10 분할
  • 기동 실패 시 즉시 종료(sys.exit) → Health 노이즈 차단

설계 판단 (Why This Way?)

/reload 엔드포인트에 IP whitelist + TLS + token 3중 보안을 적용하여 모델 핫스왑의 위험성에 대응하고, Airflow에서 FastAPI를 동기 호출하여 즉시 성공/실패를 확인하고 파이프라인 분기를 명확히 제어합니다.


다음에 읽을 글

MLOps 운영 고도화 2단계: Slack Alert 통합 — 파이프라인 알림 자동화