이 글에서 다루는 것

E2E 파이프라인 전체 구조, DAG 레벨 에러 처리(SLA/on_failure_callback), SSOT 패턴(ids.py/policy.py), Promotion/Shadow 분기, Rollback 구조

선수지식


E2E DAG:

  • 학습 성공이 곧 운영 반영을 의미하지 않도록 설계

들어가며

많은 머신러닝 파이프라인은 다음과 같은 형태를 가집니다.

Train → Register → Deploy

이 구조는 실험 환경에서는 충분히 동작합니다.

하지만 운영 환경에서는 몇 가지 중요한 문제가 생깁니다.

예를 들어 다음 상황을 생각해볼 수 있습니다.

  • 데이터 분포가 바뀌었는데도 모델이 배포되는 경우
  • 모델 성능이 특정 데이터셋에서는 좋지만 실제 트래픽에서는 나쁜 경우
  • 서빙 환경에서 모델 로딩이 실패하는 경우

이런 상황에서

학습 성공이 곧 운영 반영

이라는 구조는 위험할 수 있습니다.

그래서 이 프로젝트에서는 파이프라인을 단순한 학습 흐름이 아니라

운영 반영을 제어하는 구조로 설계했습니다.


전체 E2E 파이프라인 구조

이 플랫폼의 전체 파이프라인은 다음과 같은 흐름을 가집니다.

Feature Pipeline
      ↓
Drift Gate
      ↓
Train
      ↓
Promotion / Shadow 분기
      ↓
Model Register
      ↓
Validate (READY Sensor)
      ↓
Deploy (Triton load)
      ↓
FastAPI Reload
      ↓
Post Deploy Observability
      ↓
Rollback (trigger_rule=ONE_FAILED)

실제 DAG task 흐름으로 표현하면 다음과 같습니다.

train → register → validate → deploy → reload → rollback

validate 단계는 Triton READY Sensor입니다.

모델이 Triton에서 실제로 서빙 가능한 상태인지 확인한 뒤

deploy (Triton load + FastAPI reload) 가 진행됩니다.

이 흐름의 핵심은 다음입니다.

모델 학습이 끝났다고 해서 바로 운영 반영이 되지 않는다.

중간에 여러 단계의 검증을 거친 뒤

운영 환경으로 변경이 반영됩니다.


DAG 구조

이 파이프라인은 Airflow DAG으로 구현되어 있습니다.

관련 코드:

dags/e2e_full.py
dags/pipelines/full_e2e.py

full_e2e.py는 실제 파이프라인 callables를 묶어

DAG에서 사용할 수 있도록 만든 facade 모듈입니다.

예를 들어 다음과 같은 task들이 존재합니다.

from mlops_lib.dp.tasks import (
    task_extract_raw_data,
    task_validate_data,
    task_build_features,
    task_store_features,
)

이 단계는 데이터 파이프라인을 담당합니다.


DAG 레벨 에러 처리: SLA + on_failure_callback

이 DAG에는 Prometheus 기반 Auto Rollback과는 별개로

DAG 레벨의 에러 처리가 설정되어 있습니다.

from datetime import timedelta

default_args = {
    "sla": timedelta(hours=1),
    "retries": ...,
    "retry_delay": timedelta(minutes=...),
}

# DAG 레벨 콜백 (default_args가 아닌 DAG 생성자 파라미터)
dag = DAG(
    ...,
    default_args=default_args,
    on_failure_callback=alert_slack,
    sla_miss_callback=alert_sla_miss,
)

on_failure_callback

task가 아닌 DAG 레벨에서 설정되며, 실패 시 즉시 Slack 알림을 전송합니다.

Task 실패
      ↓
on_failure_callback 호출
      ↓
Slack 알림 전송 (task 이름, 오류 내용, DAG run_id)

이 알림은 다음 상황에서 특히 중요합니다.

Feature Pipeline 실패 (데이터 문제)
Drift Gate 오류 (예외 상황)
Triton deploy 실패 (인프라 문제)

SLA + sla_miss_callback

sla=timedelta(hours=1) 설정은

DAG이 1시간 내에 완료되지 않으면 sla_miss_callback을 호출합니다.

DAG 시작
      ↓
1시간 경과
      ↓
SLA miss 감지
      ↓
sla_miss_callback 호출 → Slack 알림

이 설정은 파이프라인이 hang 상태에 빠졌을 때 감지하기 위한 것입니다.

Prometheus 기반 Auto Rollback과의 차이

DAG 레벨 (on_failure_callback / SLA)Prometheus 기반 Auto Rollback
감지 대상DAG task 실패 / 실행 지연서빙 품질 지표 (error rate, latency)
동작 시점task 실패 즉시 / SLA 초과 시배포 후 observability 단계
목적운영자에게 알림서빙 상태 자동 복구

즉, 두 레이어가 서로 다른 계층에서 에러를 감지합니다.

DAG 레벨     → 파이프라인 실행 이상 감지
Prometheus  → 서빙 품질 이상 감지

Feature Pipeline

파이프라인의 첫 번째 단계는 Feature Pipeline입니다.

흐름은 다음과 같습니다.

extract_raw_data
    ↓
validate_data
    ↓
build_features
    ↓
store_features

이 과정에서

  • 원본 데이터를 추출하고
  • 데이터 품질을 검증하고
  • feature를 생성하고
  • feature store에 저장합니다.

이 단계는 모델 학습 이전에

데이터 품질을 확보하는 역할을 합니다.


Drift Gate

다음 단계는 Drift Gate입니다.

관련 코드:

dags/mlops_lib/quality/drift_gate.py

Drift Gate의 역할은

새로운 데이터와 기존 데이터의 분포 차이를 확인하는 것입니다.

이 프로젝트에서는 KS statistic 기반 drift detection을 사용합니다.

예를 들어 다음과 같은 방식입니다.

d_stat=_ks_stat(new_feature,reference_feature)

그리고 다음 기준을 사용합니다.

ks_stat_threshold=0.20

즉,

KS statistic > threshold

이면 drift가 발생한 것으로 판단합니다.


Drift 발생 시 처리

드리프트가 발생하면

모델을 바로 운영 환경에 반영하지 않습니다.

대신 다음 방식으로 처리합니다.

Drift detected
      ↓
Promotion 차단
      ↓
Shadow 경로로 이동

코드에서도 다음과 같이 처리됩니다.

ti.xcom_push(key=XCOM_DRIFT_BLOCK_PROMOTION,value=True)

즉,

  • 모델 학습은 계속 진행되지만
  • 운영 반영은 차단됩니다.

이 구조 덕분에

데이터 분포가 크게 바뀐 상황에서 모델이 자동으로 배포되는 것을 방지할 수 있습니다.


Train 단계

드리프트 검증을 통과하면

모델 학습이 시작됩니다.

이 프로젝트에서는 예제 모델로

Logistic Regression을 사용합니다.

모델 파라미터는 Airflow Variable을 통해 관리됩니다.

예시:

VAR_LOGREG_C
VAR_LOGREG_MAX_ITER

이 값들은 policy.py에서 관리됩니다.

관련 코드:

dags/mlops_lib/core/policy.py

이 방식의 장점은 다음과 같습니다.

  • DAG 코드에 하드코딩된 값 제거
  • 운영 환경에서 파라미터 조정 가능

SSOT 패턴: ids.py + policy.py

이 프로젝트에서는 DAG 전체에서 사용하는 상수와 정책값을 두 파일로 분리해 관리합니다.

dags/mlops_lib/core/ids.py
dags/mlops_lib/core/policy.py

ids.py — 식별자 / 상수 중앙 관리

ids.py는 DAG 레벨에서 사용하는 식별자를 모두 모아둔 파일입니다.

예시:

DP_EXTRACT_TASK_ID          = "dp.extract_raw_data"
DP_VALIDATE_TASK_ID         = "dp.validate_data"
DRIFT_GATE_TASK_ID          = "drift_gate"
TRAIN_TASK_ID               = "train_and_evaluate"
REGISTER_TASK_ID            = "register_model_task"
DEPLOY_TRITON_LOAD_TASK_ID  = "deploy.triton_load"
FASTAPI_RELOAD_TASK_ID      = "fastapi_reload"
ROLLBACK_MINIMAL_TASK_ID    = "rollback_minimal"

XCOM_DRIFT_BLOCK_PROMOTION  = "drift_block_promotion"
XCOM_VERSION                = "version"
XCOM_RUN_ID                 = "run_id"

이 파일에는 다음이 포함됩니다.

task_id 문자열
XCom key 이름
TaskGroup prefix
정책 식별자 (Airflow Variable 키 이름)

policy.py — 임계값 / 정책값 중앙 관리

policy.py는 실제 수치와 정책 값을 관리합니다.

예시:

VAR_DRIFT_KS_STAT_THRESHOLD = "drift_ks_stat_threshold"
VAR_ACCURACY_THRESHOLD      = "accuracy_threshold"
VAR_LOGREG_C                = "logreg_C"
VAR_LOGREG_MAX_ITER         = "logreg_max_iter"

이 값들은 Airflow Variable에서 런타임에 읽어오기 때문에

운영 환경에서 재배포 없이 조정할 수 있습니다.

역할 구분 요약

파일관리 대상변경 시점
ids.pytask_id, XCom key, 상수코드 구조 변경 시
policy.py임계값, 정책 파라미터운영 정책 변경 시

하드코딩 제거 효과

ids.py 없이 task_id를 문자열로 직접 쓰면 다음 문제가 발생합니다.

# 하드코딩 방식 — 위험
ti.xcom_push(key="drift_block_promotion", ...)
ti.xcom_pull(task_ids="drift_gate", key="drift_block_promotion")

task_id 이름이 바뀌거나 TaskGroup prefix가 추가될 때

xcom_pull이 None을 반환하는 버그가 발생합니다.

ids.py를 통해 이를 방지합니다.

# ids.py 방식 — 안전
from mlops_lib.core.ids import DRIFT_GATE_TASK_ID, XCOM_DRIFT_BLOCK_PROMOTION

ti.xcom_push(key=XCOM_DRIFT_BLOCK_PROMOTION, ...)
ti.xcom_pull(task_ids=DRIFT_GATE_TASK_ID, key=XCOM_DRIFT_BLOCK_PROMOTION)

실제 DAG(e2e_full.py)에서는 E2E alias 클래스로 일괄 import합니다.

from mlops_lib.core.ids import E2E as I

# 사용 예
mk_py(I.TRAIN, p.train_and_evaluate)
mk_py(I.ROLLBACK_MINIMAL, p.triton_rollback_task, ...)

TaskGroup prefix 변경 시에도 ids.py 한 곳만 수정하면 됩니다.


Promotion / Shadow 분기

모델 학습이 끝나면

성능 기준에 따라 분기가 발생합니다.

예를 들어 다음과 같은 구조입니다.

accuracy >= threshold
      ↓
Promotion

또는

accuracy < threshold
      ↓
Shadow

이 분기는 다음 함수에서 처리됩니다.

branch_by_accuracy

즉,

  • 성능이 충분히 좋으면 운영 반영
  • 그렇지 않으면 shadow 테스트로 이동합니다.

Model Register

다음 단계는 모델 등록입니다.

이 프로젝트에서는 MLflow Model Registry를 사용합니다.

등록 과정에서는 다음 정보가 저장됩니다.

  • model version
  • run_id
  • training metadata

이 단계는 단순히 모델을 저장하는 것이 아니라

운영 변경 후보를 기록하는 단계입니다.


READY Sensor

모델을 Triton에 배포하기 전에

READY 상태를 확인하는 단계가 존재합니다.

관련 설정:

MODEL_READY_POKE_INTERVAL_SEC=10
MODEL_READY_TIMEOUT_SEC=180

이 sensor는 다음을 확인합니다.

MLflow Model Registry

즉,

  • 등록된 모델 버전의 상태가 READY인지
  • PENDING이나 FAILED가 아닌지

를 확인합니다.

DAG 흐름상 register → check_model_ready → deploy 순서이므로, 이 시점에서는 아직 Triton에 모델이 배포되지 않은 상태입니다. MLflow에서 모델 버전이 정상적으로 등록 완료되었는지를 먼저 검증하는 것입니다.

이 단계 덕분에

등록에 실패한 모델이 Triton에 배포되는 것을 방지할 수 있습니다.


Triton Deploy

READY 확인이 끝나면

Triton model repository에 모델이 배포됩니다.

관련 task:

triton_materialize_task
triton_load_task
triton_ready_task

이 단계에서는 다음 과정이 수행됩니다.

model repository 생성
      ↓
model load
      ↓
ready 상태 확인

FastAPI Reload

Triton에서 모델이 준비되면

FastAPI gateway에서 reload가 발생합니다.

관련 API:

POST /variant/{alias}/reload

이 API는 다음 역할을 합니다.

  • Triton에서 현재 서빙 중인 모델 확인
  • MLflow metadata 조회
  • alias 기준으로 모델 상태 갱신

즉, FastAPI는 서빙 전환 레이어 역할을 합니다.


Post Deploy Observability

모델이 운영 환경에 반영된 후

즉시 관측 단계가 시작됩니다.

이 단계에서는 다음을 확인합니다.

  • error rate
  • latency
  • service health

관련 코드:

dags/mlops_lib/observability/auto_rollback.py

즉, 배포 이후에도

모델 상태를 계속 확인합니다.


Rollback 구조

이 프로젝트에서 Rollback은 두 가지 방식으로 동작합니다.

Auto Rollback — E2E DAG 내부 task

rollback_minimal task는 E2E DAG 안에 포함되어 있습니다.

trigger_rule = ONE_FAILED

즉, 앞 단계 중 하나라도 실패하면 자동으로 실행됩니다.

deploy 실패
or
fastapi_reload 실패
or
observability 이상 감지
      ↓
rollback_minimal task 실행

이 task는 다음 역할을 합니다.

FastAPI reload → 이전 버전으로 복구
Triton unload → 현재 모델 해제

관련 코드:

dags/mlops_lib/observability/auto_rollback.py

Prometheus 기반으로 다음 조건을 평가합니다.

5xx error rate
latency p95
service up 상태

Manual Rollback — 별도 DAG

운영자가 직접 실행하는 Rollback은 별도 DAG으로 분리되어 있습니다.

dags/rollback_manual.py

이 DAG은 다음 상황에서 사용됩니다.

운영자 직접 롤백 요청
비상 대응
실험 롤백

두 방식의 차이

Auto RollbackManual Rollback
위치E2E DAG 내부 task별도 DAG
트리거trigger_rule=ONE_FAILED운영자 수동 실행
용도배포 실패 시 즉각 복구운영 중 이상 감지 후 대응

왜 이런 구조가 필요한가

이 파이프라인 구조의 핵심은

운영 반영을 여러 단계로 나누는 것입니다.

즉,

Train

Production Deploy

사이에 다음 단계들이 존재합니다.

Drift Gate
Promotion Check
READY Sensor
Observability
Rollback

이 구조 덕분에

  • 잘못된 모델이 운영 환경에 반영되는 것을 방지하고
  • 운영 문제를 빠르게 감지하고
  • 자동으로 복구할 수 있습니다.

핵심 메시지

이 플랫폼의 파이프라인은

단순한 ML 학습 파이프라인이 아닙니다.

오히려 다음 역할을 합니다.

모델 변경을 운영 환경에 안전하게 반영하기 위한 제어 시스템

즉, 이 DAG은

Training Pipeline

이 아니라

Production Control Pipeline

에 가깝습니다.


다음 글

지금까지는 E2E DAG의 전체 구조를 살펴보았습니다.

하지만 이 파이프라인에서 가장 중요한 부분은

Promotion / Shadow 전략과 Drift Gate 정책입니다.

다음 글에서는 다음 내용을 중심으로 설명합니다.

  • Drift Gate 설계 이유
  • Promotion / Shadow 분기 구조
  • 모델 성능과 운영 반영을 분리한 이유

관련 코드:

dags/mlops_lib/quality/drift_gate.py
dags/mlops_lib/core/policy.py
dags/e2e_full.py

이를 통해

검증된 변경만 운영 경로에 올리는 구조

가 어떻게 구현되어 있는지 살펴보겠습니다.


설계 판단 (Why This Way?)

학습 성공이 곧 운영 반영이 되는 구조를 제거하기 위해 Drift Gate, READY Sensor, Auto Rollback을 파이프라인에 내장하고, ids.py/policy.py SSOT 패턴으로 식별자와 임계값의 변경 지점을 하나로 제한했습니다. DAG 레벨과 Prometheus 레벨의 2계층 에러 감지로 파이프라인 실행 이상과 서빙 품질 이상을 각각 포착합니다.


다음에 읽을 글

GitOps 기반 E2E ML Platform - 모델 반영 제어 — Drift Gate / Promotion / Shadow 상세 설계