이 글에서 다루는 것

PythonOperator와 BashOperator를 하나의 DAG에 조합하고, XCom으로 Task 간 데이터를 전달하는 패턴을 다룹니다.

선수지식


이 단계에서 해결하려는 문제

실제 파이프라인에서는 여러 Task가 순서대로 실행되면서 결과를 다음 Task에 넘겨야 합니다. 예를 들어 학습 Task가 모델 경로를 생성하면 배포 Task가 그 경로를 받아야 합니다. Airflow의 XCom은 이 Task 간 소규모 데이터 전달을 담당합니다.

실습 코드: GitHub (PythonOperator_and_XCom)


🧭 실습 전체 흐름 요약

① DAG 생성: PythonOperator + BashOperator 조합
② XCom으로 태스크 간 메시지 전달
③ 로그로 전달 메시지 확인
④ 전체 DAG 실행 및 의존성 확인

📁 DAG 파일 구조

airflow/
├── dags/
│   └── python_bash_xcom.py  ← 여기 저장
└── docker-compose.yaml

🧱 DAG 코드 (핵심 부분)

def generate_message():
    return "Hello from PythonOperator!"

def print_xcom_message(**context):
    msg = context['ti'].xcom_pull(task_ids='generate_task')
    print(f"XCom received message: {msg}")

with DAG(dag_id='python_bash_xcom', start_date=datetime(2023, 1, 1),
         schedule_interval=None, catchup=False) as dag:

    generate_task = PythonOperator(
        task_id='generate_task', python_callable=generate_message)

    consume_task = PythonOperator(
        task_id='consume_task', python_callable=print_xcom_message,
        provide_context=True)

    bash_task = BashOperator(
        task_id='bash_echo', bash_command="echo 'Bash task is running!'")

    generate_task >> consume_task >> bash_task

전체 코드: GitHub (python_bash_xcom.py)


🧪 실행 방법

docker-compose up -d  # Airflow 실행 중인지 확인
  1. 브라우저 접속: http://localhost:8080
  2. DAG 목록 → python_bash_xcom ON
  3. Trigger DAG 실행
  4. 각 Task 클릭 → Log 탭에서 실행 결과 확인

📊 결과 확인 포인트

Task로그에서 확인 내용
generate_task"Hello from PythonOperator!" 메시지 리턴
consume_taskXCom received message: 출력 확인
bash_task'Bash task is running!' 로그 확인

설계 판단 (Why This Way?)

XCom은 경로나 ID 같은 소규모 메타데이터 전달용이며 대용량 데이터는 S3/DB에 저장하고 경로만 전달하는 것이 패턴입니다. PythonOperator가 실무에서 주로 쓰이며, provide_context를 통해 TaskInstance에 접근합니다.


다음에 읽을 글

Airflow 3단계: ML 파이프라인 DAG 구성 — 학습 파이프라인을 DAG로 표현