이 글에서 다루는 것
데이터 로딩 → 모델 학습 → 모델 저장의 ML 워크플로우를 DAG로 구성하고, XCom으로 단계별 결과를 전달하는 패턴을 다룹니다.
선수지식
- Airflow 2단계: Python & Bash Operator + XCom — XCom 데이터 전달 패턴
이 단계에서 해결하려는 문제
실제 ML 파이프라인은 데이터 준비 → 모델 학습 → 모델 저장이 순차적으로 연결되어야 합니다. 각 단계의 결과(데이터 경로, 모델 경로)를 다음 단계에 전달해야 하고, 실패 시 어느 단계에서 문제가 생겼는지 추적해야 합니다. 이 글에서는 가상 데이터로 이 흐름을 시뮬레이션합니다.
실습 코드: GitHub (ML_Pipeline)
🧭 실습 전체 흐름 요약
① load_data (가상 데이터 경로 리턴)
② train_model (데이터 경로 받아 학습)
③ save_model (모델 경로 받아 저장 완료)
→ XCom을 통해 단계별 결과 전달
📁 실습 디렉토리
airflow/
└── dags/
└── ml_simulation.py
🧪 DAG 코드 (핵심 부분)
def load_data():
print("데이터 로딩 완료 (가상)")
return {"data_path": "/tmp/fake_data.csv"}
def train_model(**context):
data = context['ti'].xcom_pull(task_ids='load_data')
print(f"데이터 경로: {data['data_path']}")
return {"model_path": "/tmp/fake_model.pkl"}
def save_model(**context):
model = context['ti'].xcom_pull(task_ids='train_model')
print(f"모델 저장 경로: {model['model_path']}")
with DAG(dag_id='ml_simulation', start_date=datetime(2023, 1, 1),
schedule_interval=None, catchup=False) as dag:
t1 = PythonOperator(task_id='load_data', python_callable=load_data)
t2 = PythonOperator(task_id='train_model', python_callable=train_model,
provide_context=True)
t3 = PythonOperator(task_id='save_model', python_callable=save_model,
provide_context=True)
t1 >> t2 >> t3
전체 코드: GitHub (ml_simulation.py)
저장 경로: dags/ml_simulation.py → DAG 활성화 후 실행하면 UI에서 상태 확인 가능
🔍 실행 결과 로그
| Task 이름 | 기대 출력 메시지 |
|---|---|
load_data | 데이터 로딩 완료 |
train_model | 데이터 경로 출력 + 학습 완료 |
save_model | 모델 경로 출력 + 저장 완료 |
설계 판단 (Why This Way?)
load/train/save를 별도 Task로 분리하면 실패 지점만 재실행할 수 있고, XCom에 dict를 전달하여 여러 메타데이터를 구조화하여 넘깁니다.
다음에 읽을 글
→ Airflow 4단계: BashOperator로 외부 Python 학습 스크립트 실행 — 외부 스크립트 호출 패턴