์ด ๊ธ์์ ๋ค๋ฃจ๋ ๊ฒ
DagBag ๊ธฐ๋ฐ test_dag_integrity.py๋ก DAG ํ์ฑ ์ค๋ฅยทํต์ฌ task ์กด์ฌยท์ด์ ์ค์ ์ ๋ฐฐํฌ ์ ์ ์๋ ๊ฒ์ฆํ๊ณ , GitHub Actions๋ก push ์์ ์ CI๋ฅผ ์คํํ๋ ๊ตฌ์กฐ๋ฅผ ๋ค๋ฃน๋๋ค.
์ ์์ง์
- Airflow 6๋จ๊ณ: KubernetesExecutor ์ ํ ์ด์ โ CeleryExecutor ๋๋น ์ฅ๋จ์ ๋ถ์
์ด ๋จ๊ณ์์ ํด๊ฒฐํ๋ ค๋ ๋ฌธ์
DAG ํ์ผ์ ์คํ๊ฐ ํ๋ ๋ค์ด๊ฐ๋ฉด Airflow Scheduler๊ฐ ํ์ฑ ์ค๋ฅ๋ฅผ ๊ฐ์งํ๊ณ ํด๋น DAG์ ๋นํ์ฑํํ๋ค. ๋ฌธ์ ๋ฅผ ๋ฐ๊ฒฌํ๋ ๊ฒ์ ๋ฐฐํฌ ํ ์์ญ ๋ถ์ด ์ง๋ ์์ ์ด๋ค. ์ด ๋น์ฉ์ push ์์ ์ผ๋ก ์๋น๊ฒจ์ ํ์ฑ ์ค๋ฅ๋ฅผ ์ด์ ์ ์ ์ก๋ ๊ตฌ์กฐ๋ฅผ ๋ง๋ ๋ค.
๐ฏ ํต์ฌ ์์ฝ
- DAG CI ๋ชฉ์ : ํ์ฑ ์ค๋ฅ / import ์ค๋ฅ๋ฅผ ๋ฐฐํฌ ์ ์ ๊ฐ์ง
- test_dag_integrity.py: DagBag import, task ์ ๊ฒ์ฆ, SLA ์ค์ ๊ฒ์ฆ
- GitHub Actions: push/PR ์ ์๋ ์คํ, Python ํ๊ฒฝ์์ ํ ์คํธ
- ๋ก์ปฌ ์คํ:
pytest tests/test_dag_integrity.py๋ก ๋์ผํ๊ฒ ๊ฒ์ฆ ๊ฐ๋ฅ - CI ์คํจ ์: ์ค๋ฅ ๋ก๊ทธ์์ ํ์ผ๋ช ๊ณผ ์์ธ ์ฆ์ ํ์ธ ๊ฐ๋ฅ
1๏ธโฃ ์ DAG CI๊ฐ ํ์ํ๊ฐ
Airflow DAG ํ์ผ์ Python ์ฝ๋์ด๋ฏ๋ก import ์ค๋ฅ, SyntaxError, DAG ์ ์ ์ค๋ฅ, ์ํ ์ฐธ์กฐ ๋ฑ์ด ๋ฐ์ํ ์ ์๋ค. ์ด ์ค๋ฅ๋ค์ Scheduler๊ฐ ํ์ฑํ๊ธฐ ์ ๊น์ง ๋ฐ๊ฒฌ๋์ง ์๋๋ค.
DAG CI ์์ด: ์ฝ๋ ์์ โ push โ ArgoCD sync โ Scheduler ํ์ฑ โ ์ค๋ฅ ๋ฐ๊ฒฌ (๋ฐฐํฌ ํ)
DAG CI ์ ์ฉ: ์ฝ๋ ์์ โ push โ GitHub Actions โ ์ค๋ฅ ์ฆ์ ๊ฐ์ง โ PR ์ฐจ๋จ (๋ฐฐํฌ ์ )
2๏ธโฃ test_dag_integrity.py ๊ตฌ์กฐ
(1) DagBag import ๊ฒ์ฆ
import pytest
from airflow.models import DagBag
DAG_FOLDER = "dags"
@pytest.fixture(scope="module")
def dagbag():
return DagBag(dag_folder=DAG_FOLDER, include_examples=False)
def test_no_import_errors(dagbag):
"""DAG ํ์ผ import ์ค๋ฅ๊ฐ ์์ด์ผ ํ๋ค."""
assert dagbag.import_errors == {}, (
f"DAG import errors: {dagbag.import_errors}"
)
DagBag์ Airflow Scheduler๊ฐ DAG์ ๋ก๋ํ๋ ๋ฐฉ์๊ณผ ๋์ผํ๋ค.
(2) ์์ DAG ์กด์ฌ ๊ฒ์ฆ
EXPECTED_DAG_IDS = [
"e2e_full",
"dp_feature_pipeline",
"feast_materialize_dev",
"feast_full_refresh_dev_manual",
"rollback_manual",
]
def test_expected_dag_ids_exist(dagbag):
"""์์๋ ๋ชจ๋ DAG์ด ์ ์ ๋ก๋๋์ด์ผ ํ๋ค."""
for dag_id in EXPECTED_DAG_IDS:
assert dag_id in dagbag.dags, (
f"Expected DAG '{dag_id}' not found in DagBag"
)
(3) Task ์ ๊ฒ์ฆ
def test_dag_has_tasks(dagbag):
"""๊ฐ DAG์ ์ต์ 1๊ฐ ์ด์์ Task๊ฐ ์์ด์ผ ํ๋ค."""
for dag_id, dag in dagbag.dags.items():
assert len(dag.tasks) > 0, f"DAG '{dag_id}' has no tasks"
(4) e2e_full ํต์ฌ task ๊ฒ์ฆ
E2E_FULL_KEY_TASK_IDS = [
"train_and_evaluate",
"check_result",
"observe_post_deploy_metrics",
"rollback_minimal",
]
def test_e2e_full_key_task_ids(dagbag):
dag = dagbag.dags["e2e_full"]
task_ids = {t.task_id for t in dag.tasks}
for tid in E2E_FULL_KEY_TASK_IDS:
assert tid in task_ids, f"e2e_full: required task '{tid}' not found"
(5) e2e_full ์ด์ ์ค์ ๊ฒ์ฆ
def test_e2e_full_max_active_runs(dagbag):
dag = dagbag.dags["e2e_full"]
assert dag.max_active_runs == 1
def test_e2e_full_catchup_disabled(dagbag):
dag = dagbag.dags["e2e_full"]
assert dag.catchup is False
(6) e2e_full SLA ์ค์ ๊ฒ์ฆ
from datetime import timedelta
def test_e2e_full_sla_in_default_args(dagbag):
dag = dagbag.get_dag("e2e_full")
sla = dag.default_args.get("sla")
assert sla is not None
assert isinstance(sla, timedelta)
assert sla > timedelta(minutes=30)
def test_e2e_full_sla_miss_callback(dagbag):
dag = dagbag.get_dag("e2e_full")
assert dag.sla_miss_callback is not None
3๏ธโฃ GitHub Actions workflow ๊ตฌ์กฐ
# .github/workflows/ci.yml
name: DAG Integrity
on:
push:
branches: ["**"]
pull_request:
branches: ["**"]
jobs:
test:
name: DagBag Integrity Test
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: "3.11"
- name: Install dependencies
run: |
pip install --upgrade pip
if [ -f requirements.txt ]; then
pip install -r requirements.txt
else
pip install "apache-airflow>=2.10.0,<3.0" pytest
fi
- name: Initialize Airflow DB
run: |
export AIRFLOW_HOME="${GITHUB_WORKSPACE}/.airflow"
airflow db init
env:
AIRFLOW__CORE__DAGS_FOLDER: ${{ github.workspace }}/dags
AIRFLOW__CORE__LOAD_EXAMPLES: "False"
- name: Run DAG integrity tests
run: pytest tests/test_dag_integrity.py -v --tb=short
env:
AIRFLOW_HOME: ${{ github.workspace }}/.airflow
AIRFLOW__CORE__DAGS_FOLDER: ${{ github.workspace }}/dags
AIRFLOW__CORE__LOAD_EXAMPLES: "False"
MLFLOW_TRACKING_URI: "sqlite:///mlflow.db"
์ฃผ์ ์ค๊ณ ํฌ์ธํธ
- ๋ชจ๋ ๋ธ๋์น ํธ๋ฆฌ๊ฑฐ:
branches: ["**"]โ feature ๋ธ๋์น์์๋ ์ฆ์ ํผ๋๋ฐฑ - AIRFLOW_HOME ๋ถ๋ฆฌ: CI ์ํฌ์คํ์ด์ค ๋ด ๊ฒฉ๋ฆฌ๋ Airflow ๋๋ ํ ๋ฆฌ ์ฌ์ฉ
- LOAD_EXAMPLES ๋นํ์ฑํ: Airflow ๊ธฐ๋ณธ ์์ DAG ์ ์ธ
- MLFLOW_TRACKING_URI ๋๋ฏธ๊ฐ: DAG import ์ ํ๊ฒฝ๋ณ์ ์ฐธ์กฐ ์ค๋ฅ ๋ฐฉ์ง
4๏ธโฃ CI ์คํจ ์๋๋ฆฌ์ค์ ๋์
์๋๋ฆฌ์ค 1: import ์ค๋ฅ
FAILED test_no_import_errors
AssertionError: DAG import errors: {
'dags/mlops_train.py': "No module named 'mlops_lib.feature'"
}
โ ํ์ผ๋ช ๊ณผ ๋ชจ๋๋ช ํ์ธ ํ import ๊ฒฝ๋ก ์์
์๋๋ฆฌ์ค 2: SyntaxError
FAILED test_no_import_errors
AssertionError: DAG import errors: {
'dags/mlops_deploy.py': "invalid syntax (mlops_deploy.py, line 5)"
}
โ ํด๋น ๋ผ์ธ ์์
์๋๋ฆฌ์ค 3: e2e_full SLA ๋ฏธ์ค์
FAILED test_e2e_full_sla_miss_callback
AssertionError: e2e_full: sla_miss_callback is not set
โ DAG ์ ์์ sla_miss_callback ์ถ๊ฐ
์๋๋ฆฌ์ค 4: ์์ DAG ์์
FAILED test_expected_dag_ids_exist
AssertionError: Expected DAG 'rollback_manual' not found in DagBag
โ ํ์ผ ์กด์ฌ ์ฌ๋ถ ํ์ธ, dag_id ์ผ์น ํ์ธ, import ์ค๋ฅ ์ฌ๋ถ ํ์ธ
5๏ธโฃ ๋ก์ปฌ ์คํ ๋ฐฉ๋ฒ
# ์์กด์ฑ ์ค์น
pip install "apache-airflow>=2.10.0,<3.0" pytest
# DB ์ด๊ธฐํ
export AIRFLOW_HOME="${PWD}/.airflow"
export AIRFLOW__CORE__DAGS_FOLDER="${PWD}/dags"
export AIRFLOW__CORE__LOAD_EXAMPLES=False
airflow db init
# ํ
์คํธ ์คํ
pytest tests/test_dag_integrity.py -v --tb=short
push ์ ์ ๋ก์ปฌ์์ ๋จผ์ ์คํํ๋ ๊ฒ์ด ๊ถ์ฅ ํ๋ฆ์ด๋ค.
๐งฉ ํ
DagBag์ Airflow Scheduler๊ฐ DAG์ ๋ก๋ํ๋ ๋ฐฉ์๊ณผ ๋์ผํ๋ค. CI์์ ๊ฐ์ ๋ฐฉ์์ผ๋ก ๊ฒ์ฆํ๋ฉด ์ด์๊ณผ์ ์ฐจ์ด๊ฐ ์๋ค.include_examples=False๋ฅผ ๋ฐ๋์ ์ค์ ํด์ผ Airflow ๊ธฐ๋ณธ DAG์ด ํ ์คํธ์ ํฌํจ๋์ง ์๋๋ค.- SLA ๊ฒ์ฆ ๋์ ๋ชฉ๋ก(
EXPECTED_DAG_IDS)์ ์ด์ DAG์ด ์ถ๊ฐ๋ ๋ ํจ๊ป ๊ด๋ฆฌํด์ผ ํ๋ค.
์ค๊ณ ํ๋จ (Why This Way?)
DagBag ๊ธฐ๋ฐ ํ ์คํธ๋ Scheduler์ ๋์ผํ ํ์ฑ ๋ฉ์ปค๋์ฆ์ CI์์ ์ฌํํ์ฌ import ์ค๋ฅ์ ๋ฐํ์ ์ค์ ์ค๋ฅ๋ฅผ ๋ฐฐํฌ ์ ์ ์ก์ต๋๋ค. ํต์ฌ ํ์ดํ๋ผ์ธ e2e_full์ max_active_runs, catchup, rollback task ์กด์ฌ ์ฌ๋ถ๊น์ง ๋ณ๋๋ก ๊ฒ์ฆํฉ๋๋ค.
๋ค์์ ์ฝ์ ๊ธ
โ Observability 1๋จ๊ณ: kube-prometheus-stack + GitOps ๊ตฌ์ถ โ Level 4: Prometheus ์คํ GitOps ๋ฐฐํฌ