์ด ๊ธ€์—์„œ ๋‹ค๋ฃจ๋Š” ๊ฒƒ

DagBag ๊ธฐ๋ฐ˜ test_dag_integrity.py๋กœ DAG ํŒŒ์‹ฑ ์˜ค๋ฅ˜ยทํ•ต์‹ฌ task ์กด์žฌยท์šด์˜ ์„ค์ •์„ ๋ฐฐํฌ ์ „์— ์ž๋™ ๊ฒ€์ฆํ•˜๊ณ , GitHub Actions๋กœ push ์‹œ์ ์— CI๋ฅผ ์‹คํ–‰ํ•˜๋Š” ๊ตฌ์กฐ๋ฅผ ๋‹ค๋ฃน๋‹ˆ๋‹ค.

์„ ์ˆ˜์ง€์‹


์ด ๋‹จ๊ณ„์—์„œ ํ•ด๊ฒฐํ•˜๋ ค๋Š” ๋ฌธ์ œ

DAG ํŒŒ์ผ์— ์˜คํƒ€๊ฐ€ ํ•˜๋‚˜ ๋“ค์–ด๊ฐ€๋ฉด Airflow Scheduler๊ฐ€ ํŒŒ์‹ฑ ์˜ค๋ฅ˜๋ฅผ ๊ฐ์ง€ํ•˜๊ณ  ํ•ด๋‹น DAG์„ ๋น„ํ™œ์„ฑํ™”ํ•œ๋‹ค. ๋ฌธ์ œ๋ฅผ ๋ฐœ๊ฒฌํ•˜๋Š” ๊ฒƒ์€ ๋ฐฐํฌ ํ›„ ์ˆ˜์‹ญ ๋ถ„์ด ์ง€๋‚œ ์‹œ์ ์ด๋‹ค. ์ด ๋น„์šฉ์„ push ์‹œ์ ์œผ๋กœ ์•ž๋‹น๊ฒจ์„œ ํŒŒ์‹ฑ ์˜ค๋ฅ˜๋ฅผ ์šด์˜ ์ „์— ์žก๋Š” ๊ตฌ์กฐ๋ฅผ ๋งŒ๋“ ๋‹ค.


๐ŸŽฏ ํ•ต์‹ฌ ์š”์•ฝ

  • DAG CI ๋ชฉ์ : ํŒŒ์‹ฑ ์˜ค๋ฅ˜ / import ์˜ค๋ฅ˜๋ฅผ ๋ฐฐํฌ ์ „์— ๊ฐ์ง€
  • test_dag_integrity.py: DagBag import, task ์ˆ˜ ๊ฒ€์ฆ, SLA ์„ค์ • ๊ฒ€์ฆ
  • GitHub Actions: push/PR ์‹œ ์ž๋™ ์‹คํ–‰, Python ํ™˜๊ฒฝ์—์„œ ํ…Œ์ŠคํŠธ
  • ๋กœ์ปฌ ์‹คํ–‰: pytest tests/test_dag_integrity.py ๋กœ ๋™์ผํ•˜๊ฒŒ ๊ฒ€์ฆ ๊ฐ€๋Šฅ
  • CI ์‹คํŒจ ์‹œ: ์˜ค๋ฅ˜ ๋กœ๊ทธ์—์„œ ํŒŒ์ผ๋ช…๊ณผ ์›์ธ ์ฆ‰์‹œ ํ™•์ธ ๊ฐ€๋Šฅ

1๏ธโƒฃ ์™œ DAG CI๊ฐ€ ํ•„์š”ํ•œ๊ฐ€

Airflow DAG ํŒŒ์ผ์€ Python ์ฝ”๋“œ์ด๋ฏ€๋กœ import ์˜ค๋ฅ˜, SyntaxError, DAG ์ •์˜ ์˜ค๋ฅ˜, ์ˆœํ™˜ ์ฐธ์กฐ ๋“ฑ์ด ๋ฐœ์ƒํ•  ์ˆ˜ ์žˆ๋‹ค. ์ด ์˜ค๋ฅ˜๋“ค์€ Scheduler๊ฐ€ ํŒŒ์‹ฑํ•˜๊ธฐ ์ „๊นŒ์ง€ ๋ฐœ๊ฒฌ๋˜์ง€ ์•Š๋Š”๋‹ค.

DAG CI ์—†์ด:  ์ฝ”๋“œ ์ˆ˜์ • โ†’ push โ†’ ArgoCD sync โ†’ Scheduler ํŒŒ์‹ฑ โ†’ ์˜ค๋ฅ˜ ๋ฐœ๊ฒฌ (๋ฐฐํฌ ํ›„)
DAG CI ์ ์šฉ:  ์ฝ”๋“œ ์ˆ˜์ • โ†’ push โ†’ GitHub Actions โ†’ ์˜ค๋ฅ˜ ์ฆ‰์‹œ ๊ฐ์ง€ โ†’ PR ์ฐจ๋‹จ (๋ฐฐํฌ ์ „)

2๏ธโƒฃ test_dag_integrity.py ๊ตฌ์กฐ

(1) DagBag import ๊ฒ€์ฆ

import pytest
from airflow.models import DagBag

DAG_FOLDER = "dags"

@pytest.fixture(scope="module")
def dagbag():
    return DagBag(dag_folder=DAG_FOLDER, include_examples=False)

def test_no_import_errors(dagbag):
    """DAG ํŒŒ์ผ import ์˜ค๋ฅ˜๊ฐ€ ์—†์–ด์•ผ ํ•œ๋‹ค."""
    assert dagbag.import_errors == {}, (
        f"DAG import errors: {dagbag.import_errors}"
    )

DagBag์€ Airflow Scheduler๊ฐ€ DAG์„ ๋กœ๋“œํ•˜๋Š” ๋ฐฉ์‹๊ณผ ๋™์ผํ•˜๋‹ค.

(2) ์˜ˆ์ƒ DAG ์กด์žฌ ๊ฒ€์ฆ

EXPECTED_DAG_IDS = [
    "e2e_full",
    "dp_feature_pipeline",
    "feast_materialize_dev",
    "feast_full_refresh_dev_manual",
    "rollback_manual",
]

def test_expected_dag_ids_exist(dagbag):
    """์˜ˆ์ƒ๋œ ๋ชจ๋“  DAG์ด ์ •์ƒ ๋กœ๋“œ๋˜์–ด์•ผ ํ•œ๋‹ค."""
    for dag_id in EXPECTED_DAG_IDS:
        assert dag_id in dagbag.dags, (
            f"Expected DAG '{dag_id}' not found in DagBag"
        )

(3) Task ์ˆ˜ ๊ฒ€์ฆ

def test_dag_has_tasks(dagbag):
    """๊ฐ DAG์— ์ตœ์†Œ 1๊ฐœ ์ด์ƒ์˜ Task๊ฐ€ ์žˆ์–ด์•ผ ํ•œ๋‹ค."""
    for dag_id, dag in dagbag.dags.items():
        assert len(dag.tasks) > 0, f"DAG '{dag_id}' has no tasks"

(4) e2e_full ํ•ต์‹ฌ task ๊ฒ€์ฆ

E2E_FULL_KEY_TASK_IDS = [
    "train_and_evaluate",
    "check_result",
    "observe_post_deploy_metrics",
    "rollback_minimal",
]

def test_e2e_full_key_task_ids(dagbag):
    dag = dagbag.dags["e2e_full"]
    task_ids = {t.task_id for t in dag.tasks}
    for tid in E2E_FULL_KEY_TASK_IDS:
        assert tid in task_ids, f"e2e_full: required task '{tid}' not found"

(5) e2e_full ์šด์˜ ์„ค์ • ๊ฒ€์ฆ

def test_e2e_full_max_active_runs(dagbag):
    dag = dagbag.dags["e2e_full"]
    assert dag.max_active_runs == 1

def test_e2e_full_catchup_disabled(dagbag):
    dag = dagbag.dags["e2e_full"]
    assert dag.catchup is False

(6) e2e_full SLA ์„ค์ • ๊ฒ€์ฆ

from datetime import timedelta

def test_e2e_full_sla_in_default_args(dagbag):
    dag = dagbag.get_dag("e2e_full")
    sla = dag.default_args.get("sla")
    assert sla is not None
    assert isinstance(sla, timedelta)
    assert sla > timedelta(minutes=30)

def test_e2e_full_sla_miss_callback(dagbag):
    dag = dagbag.get_dag("e2e_full")
    assert dag.sla_miss_callback is not None

3๏ธโƒฃ GitHub Actions workflow ๊ตฌ์กฐ

# .github/workflows/ci.yml
name: DAG Integrity

on:
  push:
    branches: ["**"]
  pull_request:
    branches: ["**"]

jobs:
  test:
    name: DagBag Integrity Test
    runs-on: ubuntu-latest

    steps:
      - uses: actions/checkout@v4

      - uses: actions/setup-python@v5
        with:
          python-version: "3.11"

      - name: Install dependencies
        run: |
          pip install --upgrade pip
          if [ -f requirements.txt ]; then
            pip install -r requirements.txt
          else
            pip install "apache-airflow>=2.10.0,<3.0" pytest
          fi

      - name: Initialize Airflow DB
        run: |
          export AIRFLOW_HOME="${GITHUB_WORKSPACE}/.airflow"
          airflow db init
        env:
          AIRFLOW__CORE__DAGS_FOLDER: ${{ github.workspace }}/dags
          AIRFLOW__CORE__LOAD_EXAMPLES: "False"

      - name: Run DAG integrity tests
        run: pytest tests/test_dag_integrity.py -v --tb=short
        env:
          AIRFLOW_HOME: ${{ github.workspace }}/.airflow
          AIRFLOW__CORE__DAGS_FOLDER: ${{ github.workspace }}/dags
          AIRFLOW__CORE__LOAD_EXAMPLES: "False"
          MLFLOW_TRACKING_URI: "sqlite:///mlflow.db"

์ฃผ์š” ์„ค๊ณ„ ํฌ์ธํŠธ

  • ๋ชจ๋“  ๋ธŒ๋žœ์น˜ ํŠธ๋ฆฌ๊ฑฐ: branches: ["**"] โ€” feature ๋ธŒ๋žœ์น˜์—์„œ๋„ ์ฆ‰์‹œ ํ”ผ๋“œ๋ฐฑ
  • AIRFLOW_HOME ๋ถ„๋ฆฌ: CI ์›Œํฌ์ŠคํŽ˜์ด์Šค ๋‚ด ๊ฒฉ๋ฆฌ๋œ Airflow ๋””๋ ‰ํ† ๋ฆฌ ์‚ฌ์šฉ
  • LOAD_EXAMPLES ๋น„ํ™œ์„ฑํ™”: Airflow ๊ธฐ๋ณธ ์˜ˆ์‹œ DAG ์ œ์™ธ
  • MLFLOW_TRACKING_URI ๋”๋ฏธ๊ฐ’: DAG import ์‹œ ํ™˜๊ฒฝ๋ณ€์ˆ˜ ์ฐธ์กฐ ์˜ค๋ฅ˜ ๋ฐฉ์ง€

4๏ธโƒฃ CI ์‹คํŒจ ์‹œ๋‚˜๋ฆฌ์˜ค์™€ ๋Œ€์‘

์‹œ๋‚˜๋ฆฌ์˜ค 1: import ์˜ค๋ฅ˜

FAILED test_no_import_errors
AssertionError: DAG import errors: {
  'dags/mlops_train.py': "No module named 'mlops_lib.feature'"
}

โ†’ ํŒŒ์ผ๋ช…๊ณผ ๋ชจ๋“ˆ๋ช… ํ™•์ธ ํ›„ import ๊ฒฝ๋กœ ์ˆ˜์ •

์‹œ๋‚˜๋ฆฌ์˜ค 2: SyntaxError

FAILED test_no_import_errors
AssertionError: DAG import errors: {
  'dags/mlops_deploy.py': "invalid syntax (mlops_deploy.py, line 5)"
}

โ†’ ํ•ด๋‹น ๋ผ์ธ ์ˆ˜์ •

์‹œ๋‚˜๋ฆฌ์˜ค 3: e2e_full SLA ๋ฏธ์„ค์ •

FAILED test_e2e_full_sla_miss_callback
AssertionError: e2e_full: sla_miss_callback is not set

โ†’ DAG ์ •์˜์— sla_miss_callback ์ถ”๊ฐ€

์‹œ๋‚˜๋ฆฌ์˜ค 4: ์˜ˆ์ƒ DAG ์—†์Œ

FAILED test_expected_dag_ids_exist
AssertionError: Expected DAG 'rollback_manual' not found in DagBag

โ†’ ํŒŒ์ผ ์กด์žฌ ์—ฌ๋ถ€ ํ™•์ธ, dag_id ์ผ์น˜ ํ™•์ธ, import ์˜ค๋ฅ˜ ์—ฌ๋ถ€ ํ™•์ธ


5๏ธโƒฃ ๋กœ์ปฌ ์‹คํ–‰ ๋ฐฉ๋ฒ•

# ์˜์กด์„ฑ ์„ค์น˜
pip install "apache-airflow>=2.10.0,<3.0" pytest

# DB ์ดˆ๊ธฐํ™”
export AIRFLOW_HOME="${PWD}/.airflow"
export AIRFLOW__CORE__DAGS_FOLDER="${PWD}/dags"
export AIRFLOW__CORE__LOAD_EXAMPLES=False
airflow db init

# ํ…Œ์ŠคํŠธ ์‹คํ–‰
pytest tests/test_dag_integrity.py -v --tb=short

push ์ „์— ๋กœ์ปฌ์—์„œ ๋จผ์ € ์‹คํ–‰ํ•˜๋Š” ๊ฒƒ์ด ๊ถŒ์žฅ ํ๋ฆ„์ด๋‹ค.


๐Ÿงฉ ํŒ

  • DagBag์€ Airflow Scheduler๊ฐ€ DAG์„ ๋กœ๋“œํ•˜๋Š” ๋ฐฉ์‹๊ณผ ๋™์ผํ•˜๋‹ค. CI์—์„œ ๊ฐ™์€ ๋ฐฉ์‹์œผ๋กœ ๊ฒ€์ฆํ•˜๋ฉด ์šด์˜๊ณผ์˜ ์ฐจ์ด๊ฐ€ ์—†๋‹ค.
  • include_examples=False๋ฅผ ๋ฐ˜๋“œ์‹œ ์„ค์ •ํ•ด์•ผ Airflow ๊ธฐ๋ณธ DAG์ด ํ…Œ์ŠคํŠธ์— ํฌํ•จ๋˜์ง€ ์•Š๋Š”๋‹ค.
  • SLA ๊ฒ€์ฆ ๋Œ€์ƒ ๋ชฉ๋ก(EXPECTED_DAG_IDS)์€ ์šด์˜ DAG์ด ์ถ”๊ฐ€๋  ๋•Œ ํ•จ๊ป˜ ๊ด€๋ฆฌํ•ด์•ผ ํ•œ๋‹ค.

์„ค๊ณ„ ํŒ๋‹จ (Why This Way?)

DagBag ๊ธฐ๋ฐ˜ ํ…Œ์ŠคํŠธ๋Š” Scheduler์™€ ๋™์ผํ•œ ํŒŒ์‹ฑ ๋ฉ”์ปค๋‹ˆ์ฆ˜์„ CI์—์„œ ์žฌํ˜„ํ•˜์—ฌ import ์˜ค๋ฅ˜์™€ ๋Ÿฐํƒ€์ž„ ์„ค์ • ์˜ค๋ฅ˜๋ฅผ ๋ฐฐํฌ ์ „์— ์žก์Šต๋‹ˆ๋‹ค. ํ•ต์‹ฌ ํŒŒ์ดํ”„๋ผ์ธ e2e_full์€ max_active_runs, catchup, rollback task ์กด์žฌ ์—ฌ๋ถ€๊นŒ์ง€ ๋ณ„๋„๋กœ ๊ฒ€์ฆํ•ฉ๋‹ˆ๋‹ค.


๋‹ค์Œ์— ์ฝ์„ ๊ธ€

โ†’ Observability 1๋‹จ๊ณ„: kube-prometheus-stack + GitOps ๊ตฌ์ถ• โ€” Level 4: Prometheus ์Šคํƒ GitOps ๋ฐฐํฌ