이 글에서 다루는 것
E2E 파이프라인 전체 구조, DAG 레벨 에러 처리(SLA/on_failure_callback), SSOT 패턴(ids.py/policy.py), Promotion/Shadow 분기, Rollback 구조
선수지식
E2E DAG:
- 학습 성공이 곧 운영 반영을 의미하지 않도록 설계
들어가며
많은 머신러닝 파이프라인은 다음과 같은 형태를 가집니다.
Train → Register → Deploy
이 구조는 실험 환경에서는 충분히 동작합니다.
하지만 운영 환경에서는 몇 가지 중요한 문제가 생깁니다.
예를 들어 다음 상황을 생각해볼 수 있습니다.
- 데이터 분포가 바뀌었는데도 모델이 배포되는 경우
- 모델 성능이 특정 데이터셋에서는 좋지만 실제 트래픽에서는 나쁜 경우
- 서빙 환경에서 모델 로딩이 실패하는 경우
이런 상황에서
학습 성공이 곧 운영 반영
이라는 구조는 위험할 수 있습니다.
그래서 이 프로젝트에서는 파이프라인을 단순한 학습 흐름이 아니라
운영 반영을 제어하는 구조로 설계했습니다.
전체 E2E 파이프라인 구조
이 플랫폼의 전체 파이프라인은 다음과 같은 흐름을 가집니다.
Feature Pipeline
↓
Drift Gate
↓
Train
↓
Promotion / Shadow 분기
↓
Model Register
↓
Validate (READY Sensor)
↓
Deploy (Triton load)
↓
FastAPI Reload
↓
Post Deploy Observability
↓
Rollback (trigger_rule=ONE_FAILED)
실제 DAG task 흐름으로 표현하면 다음과 같습니다.
train → register → validate → deploy → reload → rollback
validate 단계는 Triton READY Sensor입니다.
모델이 Triton에서 실제로 서빙 가능한 상태인지 확인한 뒤
deploy (Triton load + FastAPI reload) 가 진행됩니다.
이 흐름의 핵심은 다음입니다.
모델 학습이 끝났다고 해서 바로 운영 반영이 되지 않는다.
중간에 여러 단계의 검증을 거친 뒤
운영 환경으로 변경이 반영됩니다.
DAG 구조
이 파이프라인은 Airflow DAG으로 구현되어 있습니다.
관련 코드:
dags/e2e_full.py
dags/pipelines/full_e2e.py
full_e2e.py는 실제 파이프라인 callables를 묶어
DAG에서 사용할 수 있도록 만든 facade 모듈입니다.
예를 들어 다음과 같은 task들이 존재합니다.
from mlops_lib.dp.tasks import (
task_extract_raw_data,
task_validate_data,
task_build_features,
task_store_features,
)
이 단계는 데이터 파이프라인을 담당합니다.
DAG 레벨 에러 처리: SLA + on_failure_callback
이 DAG에는 Prometheus 기반 Auto Rollback과는 별개로
DAG 레벨의 에러 처리가 설정되어 있습니다.
from datetime import timedelta
default_args = {
"sla": timedelta(hours=1),
"retries": ...,
"retry_delay": timedelta(minutes=...),
}
# DAG 레벨 콜백 (default_args가 아닌 DAG 생성자 파라미터)
dag = DAG(
...,
default_args=default_args,
on_failure_callback=alert_slack,
sla_miss_callback=alert_sla_miss,
)
on_failure_callback
task가 아닌 DAG 레벨에서 설정되며, 실패 시 즉시 Slack 알림을 전송합니다.
Task 실패
↓
on_failure_callback 호출
↓
Slack 알림 전송 (task 이름, 오류 내용, DAG run_id)
이 알림은 다음 상황에서 특히 중요합니다.
Feature Pipeline 실패 (데이터 문제)
Drift Gate 오류 (예외 상황)
Triton deploy 실패 (인프라 문제)
SLA + sla_miss_callback
sla=timedelta(hours=1) 설정은
DAG이 1시간 내에 완료되지 않으면 sla_miss_callback을 호출합니다.
DAG 시작
↓
1시간 경과
↓
SLA miss 감지
↓
sla_miss_callback 호출 → Slack 알림
이 설정은 파이프라인이 hang 상태에 빠졌을 때 감지하기 위한 것입니다.
Prometheus 기반 Auto Rollback과의 차이
| DAG 레벨 (on_failure_callback / SLA) | Prometheus 기반 Auto Rollback | |
|---|---|---|
| 감지 대상 | DAG task 실패 / 실행 지연 | 서빙 품질 지표 (error rate, latency) |
| 동작 시점 | task 실패 즉시 / SLA 초과 시 | 배포 후 observability 단계 |
| 목적 | 운영자에게 알림 | 서빙 상태 자동 복구 |
즉, 두 레이어가 서로 다른 계층에서 에러를 감지합니다.
DAG 레벨 → 파이프라인 실행 이상 감지
Prometheus → 서빙 품질 이상 감지
Feature Pipeline
파이프라인의 첫 번째 단계는 Feature Pipeline입니다.
흐름은 다음과 같습니다.
extract_raw_data
↓
validate_data
↓
build_features
↓
store_features
이 과정에서
- 원본 데이터를 추출하고
- 데이터 품질을 검증하고
- feature를 생성하고
- feature store에 저장합니다.
이 단계는 모델 학습 이전에
데이터 품질을 확보하는 역할을 합니다.
Drift Gate
다음 단계는 Drift Gate입니다.
관련 코드:
dags/mlops_lib/quality/drift_gate.py
Drift Gate의 역할은
새로운 데이터와 기존 데이터의 분포 차이를 확인하는 것입니다.
이 프로젝트에서는 KS statistic 기반 drift detection을 사용합니다.
예를 들어 다음과 같은 방식입니다.
d_stat=_ks_stat(new_feature,reference_feature)
그리고 다음 기준을 사용합니다.
ks_stat_threshold=0.20
즉,
KS statistic > threshold
이면 drift가 발생한 것으로 판단합니다.
Drift 발생 시 처리
드리프트가 발생하면
모델을 바로 운영 환경에 반영하지 않습니다.
대신 다음 방식으로 처리합니다.
Drift detected
↓
Promotion 차단
↓
Shadow 경로로 이동
코드에서도 다음과 같이 처리됩니다.
ti.xcom_push(key=XCOM_DRIFT_BLOCK_PROMOTION,value=True)
즉,
- 모델 학습은 계속 진행되지만
- 운영 반영은 차단됩니다.
이 구조 덕분에
데이터 분포가 크게 바뀐 상황에서 모델이 자동으로 배포되는 것을 방지할 수 있습니다.
Train 단계
드리프트 검증을 통과하면
모델 학습이 시작됩니다.
이 프로젝트에서는 예제 모델로
Logistic Regression을 사용합니다.
모델 파라미터는 Airflow Variable을 통해 관리됩니다.
예시:
VAR_LOGREG_C
VAR_LOGREG_MAX_ITER
이 값들은 policy.py에서 관리됩니다.
관련 코드:
dags/mlops_lib/core/policy.py
이 방식의 장점은 다음과 같습니다.
- DAG 코드에 하드코딩된 값 제거
- 운영 환경에서 파라미터 조정 가능
SSOT 패턴: ids.py + policy.py
이 프로젝트에서는 DAG 전체에서 사용하는 상수와 정책값을 두 파일로 분리해 관리합니다.
dags/mlops_lib/core/ids.py
dags/mlops_lib/core/policy.py
ids.py — 식별자 / 상수 중앙 관리
ids.py는 DAG 레벨에서 사용하는 식별자를 모두 모아둔 파일입니다.
예시:
DP_EXTRACT_TASK_ID = "dp.extract_raw_data"
DP_VALIDATE_TASK_ID = "dp.validate_data"
DRIFT_GATE_TASK_ID = "drift_gate"
TRAIN_TASK_ID = "train_and_evaluate"
REGISTER_TASK_ID = "register_model_task"
DEPLOY_TRITON_LOAD_TASK_ID = "deploy.triton_load"
FASTAPI_RELOAD_TASK_ID = "fastapi_reload"
ROLLBACK_MINIMAL_TASK_ID = "rollback_minimal"
XCOM_DRIFT_BLOCK_PROMOTION = "drift_block_promotion"
XCOM_VERSION = "version"
XCOM_RUN_ID = "run_id"
이 파일에는 다음이 포함됩니다.
task_id 문자열
XCom key 이름
TaskGroup prefix
정책 식별자 (Airflow Variable 키 이름)
policy.py — 임계값 / 정책값 중앙 관리
policy.py는 실제 수치와 정책 값을 관리합니다.
예시:
VAR_DRIFT_KS_STAT_THRESHOLD = "drift_ks_stat_threshold"
VAR_ACCURACY_THRESHOLD = "accuracy_threshold"
VAR_LOGREG_C = "logreg_C"
VAR_LOGREG_MAX_ITER = "logreg_max_iter"
이 값들은 Airflow Variable에서 런타임에 읽어오기 때문에
운영 환경에서 재배포 없이 조정할 수 있습니다.
역할 구분 요약
| 파일 | 관리 대상 | 변경 시점 |
|---|---|---|
ids.py | task_id, XCom key, 상수 | 코드 구조 변경 시 |
policy.py | 임계값, 정책 파라미터 | 운영 정책 변경 시 |
하드코딩 제거 효과
ids.py 없이 task_id를 문자열로 직접 쓰면 다음 문제가 발생합니다.
# 하드코딩 방식 — 위험
ti.xcom_push(key="drift_block_promotion", ...)
ti.xcom_pull(task_ids="drift_gate", key="drift_block_promotion")
task_id 이름이 바뀌거나 TaskGroup prefix가 추가될 때
xcom_pull이 None을 반환하는 버그가 발생합니다.
ids.py를 통해 이를 방지합니다.
# ids.py 방식 — 안전
from mlops_lib.core.ids import DRIFT_GATE_TASK_ID, XCOM_DRIFT_BLOCK_PROMOTION
ti.xcom_push(key=XCOM_DRIFT_BLOCK_PROMOTION, ...)
ti.xcom_pull(task_ids=DRIFT_GATE_TASK_ID, key=XCOM_DRIFT_BLOCK_PROMOTION)
실제 DAG(e2e_full.py)에서는 E2E alias 클래스로 일괄 import합니다.
from mlops_lib.core.ids import E2E as I
# 사용 예
mk_py(I.TRAIN, p.train_and_evaluate)
mk_py(I.ROLLBACK_MINIMAL, p.triton_rollback_task, ...)
TaskGroup prefix 변경 시에도 ids.py 한 곳만 수정하면 됩니다.
Promotion / Shadow 분기
모델 학습이 끝나면
성능 기준에 따라 분기가 발생합니다.
예를 들어 다음과 같은 구조입니다.
accuracy >= threshold
↓
Promotion
또는
accuracy < threshold
↓
Shadow
이 분기는 다음 함수에서 처리됩니다.
branch_by_accuracy
즉,
- 성능이 충분히 좋으면 운영 반영
- 그렇지 않으면 shadow 테스트로 이동합니다.
Model Register
다음 단계는 모델 등록입니다.
이 프로젝트에서는 MLflow Model Registry를 사용합니다.
등록 과정에서는 다음 정보가 저장됩니다.
- model version
- run_id
- training metadata
이 단계는 단순히 모델을 저장하는 것이 아니라
운영 변경 후보를 기록하는 단계입니다.
READY Sensor
모델을 Triton에 배포하기 전에
READY 상태를 확인하는 단계가 존재합니다.
관련 설정:
MODEL_READY_POKE_INTERVAL_SEC=10
MODEL_READY_TIMEOUT_SEC=180
이 sensor는 다음을 확인합니다.
MLflow Model Registry
즉,
- 등록된 모델 버전의 상태가 READY인지
- PENDING이나 FAILED가 아닌지
를 확인합니다.
DAG 흐름상 register → check_model_ready → deploy 순서이므로,
이 시점에서는 아직 Triton에 모델이 배포되지 않은 상태입니다.
MLflow에서 모델 버전이 정상적으로 등록 완료되었는지를 먼저 검증하는 것입니다.
이 단계 덕분에
등록에 실패한 모델이 Triton에 배포되는 것을 방지할 수 있습니다.
Triton Deploy
READY 확인이 끝나면
Triton model repository에 모델이 배포됩니다.
관련 task:
triton_materialize_task
triton_load_task
triton_ready_task
이 단계에서는 다음 과정이 수행됩니다.
model repository 생성
↓
model load
↓
ready 상태 확인
FastAPI Reload
Triton에서 모델이 준비되면
FastAPI gateway에서 reload가 발생합니다.
관련 API:
POST /variant/{alias}/reload
이 API는 다음 역할을 합니다.
- Triton에서 현재 서빙 중인 모델 확인
- MLflow metadata 조회
- alias 기준으로 모델 상태 갱신
즉, FastAPI는 서빙 전환 레이어 역할을 합니다.
Post Deploy Observability
모델이 운영 환경에 반영된 후
즉시 관측 단계가 시작됩니다.
이 단계에서는 다음을 확인합니다.
- error rate
- latency
- service health
관련 코드:
dags/mlops_lib/observability/auto_rollback.py
즉, 배포 이후에도
모델 상태를 계속 확인합니다.
Rollback 구조
이 프로젝트에서 Rollback은 두 가지 방식으로 동작합니다.
Auto Rollback — E2E DAG 내부 task
rollback_minimal task는 E2E DAG 안에 포함되어 있습니다.
trigger_rule = ONE_FAILED
즉, 앞 단계 중 하나라도 실패하면 자동으로 실행됩니다.
deploy 실패
or
fastapi_reload 실패
or
observability 이상 감지
↓
rollback_minimal task 실행
이 task는 다음 역할을 합니다.
FastAPI reload → 이전 버전으로 복구
Triton unload → 현재 모델 해제
관련 코드:
dags/mlops_lib/observability/auto_rollback.py
Prometheus 기반으로 다음 조건을 평가합니다.
5xx error rate
latency p95
service up 상태
Manual Rollback — 별도 DAG
운영자가 직접 실행하는 Rollback은 별도 DAG으로 분리되어 있습니다.
dags/rollback_manual.py
이 DAG은 다음 상황에서 사용됩니다.
운영자 직접 롤백 요청
비상 대응
실험 롤백
두 방식의 차이
| Auto Rollback | Manual Rollback | |
|---|---|---|
| 위치 | E2E DAG 내부 task | 별도 DAG |
| 트리거 | trigger_rule=ONE_FAILED | 운영자 수동 실행 |
| 용도 | 배포 실패 시 즉각 복구 | 운영 중 이상 감지 후 대응 |
왜 이런 구조가 필요한가
이 파이프라인 구조의 핵심은
운영 반영을 여러 단계로 나누는 것입니다.
즉,
Train
과
Production Deploy
사이에 다음 단계들이 존재합니다.
Drift Gate
Promotion Check
READY Sensor
Observability
Rollback
이 구조 덕분에
- 잘못된 모델이 운영 환경에 반영되는 것을 방지하고
- 운영 문제를 빠르게 감지하고
- 자동으로 복구할 수 있습니다.
핵심 메시지
이 플랫폼의 파이프라인은
단순한 ML 학습 파이프라인이 아닙니다.
오히려 다음 역할을 합니다.
모델 변경을 운영 환경에 안전하게 반영하기 위한 제어 시스템
즉, 이 DAG은
Training Pipeline
이 아니라
Production Control Pipeline
에 가깝습니다.
다음 글
지금까지는 E2E DAG의 전체 구조를 살펴보았습니다.
하지만 이 파이프라인에서 가장 중요한 부분은
Promotion / Shadow 전략과 Drift Gate 정책입니다.
다음 글에서는 다음 내용을 중심으로 설명합니다.
- Drift Gate 설계 이유
- Promotion / Shadow 분기 구조
- 모델 성능과 운영 반영을 분리한 이유
관련 코드:
dags/mlops_lib/quality/drift_gate.py
dags/mlops_lib/core/policy.py
dags/e2e_full.py
이를 통해
검증된 변경만 운영 경로에 올리는 구조
가 어떻게 구현되어 있는지 살펴보겠습니다.
설계 판단 (Why This Way?)
학습 성공이 곧 운영 반영이 되는 구조를 제거하기 위해 Drift Gate, READY Sensor, Auto Rollback을 파이프라인에 내장하고, ids.py/policy.py SSOT 패턴으로 식별자와 임계값의 변경 지점을 하나로 제한했습니다. DAG 레벨과 Prometheus 레벨의 2계층 에러 감지로 파이프라인 실행 이상과 서빙 품질 이상을 각각 포착합니다.
다음에 읽을 글
→ GitOps 기반 E2E ML Platform - 모델 반영 제어 — Drift Gate / Promotion / Shadow 상세 설계