이 글에서 다루는 것

FastAPI와 Airflow가 흩어져 보내던 알림을 하나의 Slack 인터페이스로 통합하고, SealedSecret 기반으로 Webhook URL을 안전하게 주입하는 구조를 다룹니다.

선수지식


이 단계에서 해결하려는 문제

운영 환경에서 가장 무서운 건 문제가 생겼는데 아무도 모르고 있는 상황이다. FastAPI와 Airflow가 각자 흩어져 보내던 알림을 하나의 Slack 인터페이스로 완전히 통합해서, 모델 학습·등록·센서·핫스왑·헬스체크까지 어디에서 무슨 일이 일어나도 즉시 눈에 들어오도록 만든다.


🎯 핵심 요약

  • FastAPI와 Airflow가 공통 함수 send_slack_alert(text)를 사용해 알림 체계를 통합
  • Airflow는 전역 실패 콜백 alert_slack(context)로 DAG/Task 실패 자동 통보
  • Webhook URL은 SealedSecret → 환경변수(SLACK_WEBHOOK_URL)로 주입
  • dev(.local) / prod(.prod) 환경 완전 분리: 네임스페이스·시크릿·도메인 각각 대응

1️⃣ 구조 개요

구성 요소역할
공용 함수send_slack_alert(text) — FastAPI·Airflow 모두에서 호출
Airflow 전역 콜백alert_slack(context) — DAG 실패 시 자동 호출
환경 주입 방식slack-webhook-{env}-secretSLACK_WEBHOOK_URL
분리 원칙dev/prod 완전 분리 (네임스페이스·시크릿·도메인)

2️⃣ 시스템 흐름도

mermaid-02.png


3️⃣ FastAPI 알림 모듈

# charts/fastapi/app/utils/slack_alerts.py
import requests
import os

def send_slack_alert(text: str):
    slack_url = os.environ.get("SLACK_WEBHOOK_URL")
    if not slack_url:
        print("[Slack Alert] SKIP: SLACK_WEBHOOK_URL not set")
        return

    try:
        res = requests.post(slack_url, json={"text": text}, timeout=5)
        res.raise_for_status()
    except Exception as e:
        print(f"[Slack Alert Error] {e}")

사용 예시:

  • FastAPI 서버 시작/헬스 체크
  • /variant/{alias}/reload 핫스왑 성공·실패
  • 예측 처리 중 예외 발생 시

4️⃣ Airflow 알림 모듈

# dags/utils/slack_alerts.py
import os
import requests

def send_slack_alert(text: str) -> None:
    slack_url = os.environ.get("SLACK_WEBHOOK_URL")
    if not slack_url:
        print("[Slack Alert] SKIP: SLACK_WEBHOOK_URL not set")
        return
    try:
        res = requests.post(slack_url, json={"text": text}, timeout=5)
        res.raise_for_status()
    except Exception as e:
        print(f"[Slack Alert Error] {e}")

def alert_slack(context) -> None:
    dag = context.get("dag")
    ti = context.get("task_instance")
    dag_id = getattr(dag, "dag_id", "unknown")
    task_id = getattr(ti, "task_id", "unknown")
    execution_ts = context.get("ts")
    run_id = context.get("run_id")
    base_url = os.environ.get("AIRFLOW__WEBSERVER__WEB_SERVER_BASE_URL", "").rstrip("/")

    log_url = f"{base_url}/dags/{dag_id}/runs/{run_id}/tasks/{task_id}" if base_url else "(base_url unset)"
    text = (
        "*🔥 Airflow DAG 실패 알림!*\n"
        f"*DAG*: `{dag_id}`\n"
        f"*Task*: `{task_id}`\n"
        f"*Time*: `{execution_ts}`\n"
        f"*Log*: <{log_url}|로그 바로가기>"
    )
    send_slack_alert(text)

모든 DAG에 on_failure_callback=alert_slack 적용 → 실패 시 로그 링크 포함 메시지 자동 전송


5️⃣ Helm Values (dev/prod)

▶ FastAPI(dev)

# charts/fastapi/values/dev.yaml
env:
  MLFLOW_TRACKING_URI: "http://mlflow-dev-service.mlflow-dev.svc.cluster.local:5000"
  MODEL_NAME: "best_model"
  DEFAULT_ALIAS: "B"
  ALIAS_SELECTION_MODE: "blue_green"
  CANARY_PERCENT: "20"
  LOG_TO_STDOUT: "true"
  LOG_LEVEL: "info"
envFrom:
  - secretRef: { name: aws-credentials-secret }
  - secretRef: { name: fastapi-token-dev-secret }
  - secretRef: { name: slack-webhook-dev-secret }

▶ FastAPI(prod)

# charts/fastapi/values/prod.yaml
env:
  MLFLOW_TRACKING_URI: "http://mlflow-prod-service.mlflow-prod.svc.cluster.local:5000"
  MODEL_NAME: "best_model"
  DEFAULT_ALIAS: "B"
  ALIAS_SELECTION_MODE: "blue_green"
  CANARY_PERCENT: "10"
  LOG_TO_STDOUT: "true"
  LOG_LEVEL: "info"
envFrom:
  - secretRef: { name: aws-credentials-secret }
  - secretRef: { name: fastapi-token-prod-secret }
  - secretRef: { name: slack-webhook-prod-secret }

▶ Airflow(dev)

# charts/airflow/values/dev.yaml
airflow:
  config:
    AIRFLOW__WEBSERVER__WEB_SERVER_BASE_URL:
      value: "http://airflow.local"

  scheduler: &dev_scheduler
    env:
      - name: SLACK_WEBHOOK_URL
        valueFrom:
          secretKeyRef:
            name: slack-webhook-dev-secret
            key: SLACK_WEBHOOK_URL
      - name: FASTAPI_RELOAD_URL
        value: "http://fastapi-dev-service.fastapi-dev.svc.cluster.local"
      - name: MLFLOW_TRACKING_URI
        value: "http://mlflow-dev-service.mlflow-dev.svc.cluster.local:5000"

  workers: *dev_scheduler
  dagProcessor: *dev_scheduler

▶ Airflow(prod)

# charts/airflow/values/prod.yaml
airflow:
  config:
    AIRFLOW__WEBSERVER__WEB_SERVER_BASE_URL:
      value: "http://airflow.prod"

  scheduler: &prod_scheduler
    env:
      - name: SLACK_WEBHOOK_URL
        valueFrom:
          secretKeyRef:
            name: slack-webhook-prod-secret
            key: SLACK_WEBHOOK_URL
      - name: FASTAPI_RELOAD_URL
        value: "http://fastapi-prod-service.fastapi-prod.svc.cluster.local"
      - name: MLFLOW_TRACKING_URI
        value: "http://mlflow-prod-service.mlflow-prod.svc.cluster.local:5000"

  workers: *prod_scheduler
  dagProcessor: *prod_scheduler

6️⃣ SealedSecret 예시

# envs/dev/sealed-secrets/airflow/sealed-slack-webhook-dev-secret.yaml
apiVersion: bitnami.com/v1alpha1
kind: SealedSecret
metadata:
  name: slack-webhook-dev-secret
  namespace: airflow-dev
spec:
  encryptedData:
    SLACK_WEBHOOK_URL: <sealed payload>
  template:
    type: Opaque

7️⃣ Slack 메시지 예시

상황예시 메시지
FastAPI 핫스왑 성공🔁 [FastAPI] 모델 @A 핫스왑 완료 v9 (run_id=…)
FastAPI 헬스 체크 경고⚠️ [FastAPI] degraded (loaded=[‘A’], missing=[‘B’])
Airflow 등록 성공✅ [Airflow] 모델 등록: my_model v12 → @A
Airflow 센서 실패❌ [Airflow] READY 감지 실패: my_model v12
DAG 전역 실패🔥 Airflow DAG 실패 알림 (Task=check_model_ready)

8️⃣ 체크리스트

  • SLACK_WEBHOOK_URL dev/prod 각각 주입 확인
  • Airflow DAG에 on_failure_callback=alert_slack 설정 유지
  • FastAPI 주요 이벤트(핫스왑·헬스·기동)에 send_slack_alert() 호출
  • Airflow 각 단계(Task)에서도 send_slack_alert() 호출
  • Secret 이름(slack-webhook-{env}-secret)이 Helm envFrom과 일치

🧩 팁

  • Slack 전송 실패 시 서비스 로직 차단 금지 — 네트워크 장애가 모델 서빙을 막지 않게
  • 배포 전 Webhook Secret 불일치 여부를 dev에서 반드시 점검

설계 판단 (Why This Way?)

Slack 알림은 단일 함수로 빠르게 적용하되 이후 확장 가능한 구조를 택했고, 알림 전송 실패 시 예외를 삼켜 메인 로직(모델 서빙, 파이프라인)을 차단하지 않도록 했습니다. dev/prod Webhook Secret을 분리하여 알림 채널 오염을 방지합니다.


다음에 읽을 글

MLOps 운영 고도화 3단계: 모델 롤백 자동화 — 스냅샷 기반 자동 롤백