์ด ๊ธ€์—์„œ ๋‹ค๋ฃจ๋Š” ๊ฒƒ

PythonOperator๋กœ MLflow ์‹คํ—˜์„ ์ž๋™ํ™”ํ•˜์—ฌ, ํ•˜์ดํผํŒŒ๋ผ๋ฏธํ„ฐยท๋ฉ”ํŠธ๋ฆญยท๋ชจ๋ธ์„ ์ž๋™ ๊ธฐ๋กํ•˜๋Š” Airflow + MLflow ์—ฐ๋™ ํŒจํ„ด์„ ๋‹ค๋ฃน๋‹ˆ๋‹ค.

์„ ์ˆ˜์ง€์‹


์ด ๋‹จ๊ณ„์—์„œ ํ•ด๊ฒฐํ•˜๋ ค๋Š” ๋ฌธ์ œ

๋ชจ๋ธ์„ ํ•™์Šตํ•  ๋•Œ๋งˆ๋‹ค ํ•˜์ดํผํŒŒ๋ผ๋ฏธํ„ฐ, ์ •ํ™•๋„, ๋ชจ๋ธ ์•„ํ‹ฐํŒฉํŠธ๋ฅผ ์ˆ˜๋™์œผ๋กœ ๊ธฐ๋กํ•˜๋ฉด ์‹คํ—˜ ์žฌํ˜„์ด ๋ถˆ๊ฐ€๋Šฅํ•ฉ๋‹ˆ๋‹ค. MLflow Tracking์€ ์ด ๋ชจ๋“  ์ •๋ณด๋ฅผ ์ž๋™ ๊ธฐ๋กํ•˜๊ณ , Airflow DAG์—์„œ ํ˜ธ์ถœํ•˜๋ฉด ํ•™์Šต ํŒŒ์ดํ”„๋ผ์ธ ์ž์ฒด๋ฅผ ์ž๋™ํ™”ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

์‹ค์Šต ์ฝ”๋“œ: GitHub (Airflow_and_MLflow)


๐Ÿงญ ์‹ค์Šต ์ „์ฒด ํ๋ฆ„ ์š”์•ฝ

[1๋‹จ๊ณ„] MLflow ์‹คํ—˜ ์Šคํฌ๋ฆฝํŠธ ์ž‘์„ฑ
[2๋‹จ๊ณ„] Airflow DAG ๊ตฌ์„ฑ
[3๋‹จ๊ณ„] DAG ์‹คํ–‰ ๋ฐ ํŒŒ๋ผ๋ฏธํ„ฐ/๋ฉ”ํŠธ๋ฆญ ํ™•์ธ
[4๋‹จ๊ณ„] ๋ชจ๋ธ ์ €์žฅ ๋ฐ ๋กœ๊น… ์ƒํƒœ ์ ๊ฒ€

๐Ÿ“ ๋””๋ ‰ํ† ๋ฆฌ ๊ตฌ์กฐ

airflow/
โ”œโ”€โ”€ dags/
โ”‚   โ”œโ”€โ”€ train_with_mlflow.py        โ† DAG ํŒŒ์ผ
โ”œโ”€โ”€ ml_code/
โ”‚   โ”œโ”€โ”€ train_mlflow.py             โ† MLflow ์—ฐ๋™ ํ•™์Šต ์Šคํฌ๋ฆฝํŠธ
โ””โ”€โ”€ mlruns/                         โ† MLflow ๋กœ๊น… ๊ฒฐ๊ณผ ์ €์žฅ ํด๋” (์ž๋™ ์ƒ์„ฑ)

๐Ÿงช 1๋‹จ๊ณ„: MLflow ํ•™์Šต ์Šคํฌ๋ฆฝํŠธ (ํ•ต์‹ฌ ๋ถ€๋ถ„)

# airflow/ml_code/train_mlflow.py

def run_experiment():
    mlflow.set_tracking_uri("file:/opt/airflow/mlruns")
    mlflow.set_experiment("airflow_mlflow_example")

    with mlflow.start_run():
        data = load_iris()
        X, y = data.data, data.target

        model = RandomForestClassifier(n_estimators=50, max_depth=3)
        model.fit(X, y)
        preds = model.predict(X)

        # ํ•™์Šต ๋ฐ์ดํ„ฐ๋กœ ํ‰๊ฐ€ โ€” ์‹ค์ œ ์šด์˜์—์„œ๋Š” train_test_split ํ•„์ˆ˜
        acc = accuracy_score(y, preds)

        mlflow.log_param("n_estimators", 50)
        mlflow.log_param("max_depth", 3)
        mlflow.log_metric("accuracy", acc)
        mlflow.sklearn.log_model(model, "model")

์ „์ฒด ์ฝ”๋“œ: GitHub (train_mlflow.py)


๐Ÿงช 2๋‹จ๊ณ„: Airflow DAG ์ž‘์„ฑ

# airflow/dags/train_with_mlflow.py

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
import sys
sys.path.append("/opt/airflow/ml_code")

from train_mlflow import run_experiment

with DAG(
    dag_id='mlflow_tracking_dag',
    start_date=datetime(2023, 1, 1),
    schedule_interval=None,
    catchup=False,
) as dag:

    run_mlflow = PythonOperator(
        task_id='run_mlflow_training',
        python_callable=run_experiment,
    )

โœ… ์‹คํ–‰ ์ ˆ์ฐจ

  1. train_mlflow.py ์ž‘์„ฑ
  2. train_with_mlflow.py DAG ๋“ฑ๋ก
  3. Airflow UI์—์„œ DAG ์‹คํ–‰
  4. Task ๋กœ๊ทธ์—์„œ ํŒŒ๋ผ๋ฏธํ„ฐ/๋ฉ”ํŠธ๋ฆญ/๋ชจ๋ธ ๊ธฐ๋ก ํ™•์ธ

โ“ ๋ชจ๋“ˆ ์—๋Ÿฌ๊ฐ€ ๋ฐœ์ƒํ•˜๋ฉด?

  1. docker-compose.yaml์—์„œ ๋ณผ๋ฅจ ํ™•์ธ:

    volumes:
      - ./ml_code:/opt/airflow/ml_code
    
  2. requirements.txt์— MLflow ์ถ”๊ฐ€:

    mlflow
    
  3. Dockerfile ์ž‘์„ฑ ํ›„ ์žฌ๋นŒ๋“œ:

    docker-compose down
    docker-compose build
    docker-compose up -d
    

๐Ÿ” ํ™•์ธ ํฌ์ธํŠธ

ํ•ญ๋ชฉํ™•์ธ ๋ฐฉ๋ฒ•
MLflow ๋กœ๊ทธ/opt/airflow/mlruns/ ๊ฒฝ๋กœ์—์„œ ํ™•์ธ
์ •ํ™•๋„ ๋กœ๊ทธDAG ๋กœ๊ทธ์—์„œ acc = ... ํ™•์ธ
DAG ์ด๋ฆ„mlflow_tracking_dag
Experiment ์ด๋ฆ„airflow_mlflow_example

์„ค๊ณ„ ํŒ๋‹จ (Why This Way?)

MLflow ํ•จ์ˆ˜๋ฅผ ์ง์ ‘ ํ˜ธ์ถœํ•ด์•ผ ํ•˜๋ฏ€๋กœ PythonOperator๊ฐ€ ์ ํ•ฉํ•˜๋ฉฐ, ๋กœ์ปฌ file URI ๋Œ€์‹  ํ”„๋กœ๋•์…˜์—์„œ๋Š” PostgreSQL+S3 ๋ฐฑ์—”๋“œ์˜ MLflow ์„œ๋ฒ„๋ฅผ ์‚ฌ์šฉํ•ด ํŒ€ ๋‹จ์œ„ ์‹คํ—˜ ์ถ”์ ๊ณผ ์•„ํ‹ฐํŒฉํŠธ ๊ด€๋ฆฌ๋ฅผ ์ˆ˜ํ–‰ํ•ฉ๋‹ˆ๋‹ค.


๋‹ค์Œ์— ์ฝ์„ ๊ธ€

โ†’ MLOps ํ”Œ๋žซํผ ๊ตฌ์ถ• 1๋‹จ๊ณ„: ์ธํ”„๋ผ ์„ค๊ณ„ ๋ฐ ํ™˜๊ฒฝ ์ค€๋น„ โ€” Level 2: Helm ๊ธฐ๋ฐ˜ MLOps ํ”Œ๋žซํผ ๊ตฌ์ถ• ์‹œ์ž‘