이 글에서 다루는 것
PythonOperator와 BashOperator를 하나의 DAG에 조합하고, XCom으로 Task 간 데이터를 전달하는 패턴을 다룹니다.
선수지식
- Airflow 1단계: 로컬 환경에서 기본 DAG 실행 — DAG 구조와 실행 환경
이 단계에서 해결하려는 문제
실제 파이프라인에서는 여러 Task가 순서대로 실행되면서 결과를 다음 Task에 넘겨야 합니다. 예를 들어 학습 Task가 모델 경로를 생성하면 배포 Task가 그 경로를 받아야 합니다. Airflow의 XCom은 이 Task 간 소규모 데이터 전달을 담당합니다.
실습 코드: GitHub (PythonOperator_and_XCom)
🧭 실습 전체 흐름 요약
① DAG 생성: PythonOperator + BashOperator 조합
② XCom으로 태스크 간 메시지 전달
③ 로그로 전달 메시지 확인
④ 전체 DAG 실행 및 의존성 확인
📁 DAG 파일 구조
airflow/
├── dags/
│ └── python_bash_xcom.py ← 여기 저장
└── docker-compose.yaml
🧱 DAG 코드 (핵심 부분)
def generate_message():
return "Hello from PythonOperator!"
def print_xcom_message(**context):
msg = context['ti'].xcom_pull(task_ids='generate_task')
print(f"XCom received message: {msg}")
with DAG(dag_id='python_bash_xcom', start_date=datetime(2023, 1, 1),
schedule_interval=None, catchup=False) as dag:
generate_task = PythonOperator(
task_id='generate_task', python_callable=generate_message)
consume_task = PythonOperator(
task_id='consume_task', python_callable=print_xcom_message,
provide_context=True)
bash_task = BashOperator(
task_id='bash_echo', bash_command="echo 'Bash task is running!'")
generate_task >> consume_task >> bash_task
전체 코드: GitHub (python_bash_xcom.py)
🧪 실행 방법
docker-compose up -d # Airflow 실행 중인지 확인
- 브라우저 접속:
http://localhost:8080 - DAG 목록 →
python_bash_xcomON - Trigger DAG 실행
- 각 Task 클릭 → Log 탭에서 실행 결과 확인
📊 결과 확인 포인트
| Task | 로그에서 확인 내용 |
|---|---|
| generate_task | "Hello from PythonOperator!" 메시지 리턴 |
| consume_task | XCom received message: 출력 확인 |
| bash_task | 'Bash task is running!' 로그 확인 |
설계 판단 (Why This Way?)
XCom은 경로나 ID 같은 소규모 메타데이터 전달용이며 대용량 데이터는 S3/DB에 저장하고 경로만 전달하는 것이 패턴입니다. PythonOperator가 실무에서 주로 쓰이며, provide_context를 통해 TaskInstance에 접근합니다.
다음에 읽을 글
→ Airflow 3단계: ML 파이프라인 DAG 구성 — 학습 파이프라인을 DAG로 표현