์ด ๊ธ€์—์„œ ๋‹ค๋ฃจ๋Š” ๊ฒƒ

Feast ํ’€๋„์ž… ์ด์ „ ๋‹จ๊ณ„๋กœ, GitOps + Airflow๋ฅผ ์‚ฌ์šฉํ•ด ํ”ผ์ฒ˜ ์ƒ์„ฑ/๋ฒ„์ „ํ™”/์žฌํ˜„์„ฑ์˜ ์ตœ์†Œ ์š”๊ฑด(๊ณ„์•ฝ/๋ฉ”ํƒ€/๋ฒ„์ „ํ™”)์„ ๊ณ ์ •ํ•˜๋Š” Feature Store-lite ์„ค๊ณ„

์„ ์ˆ˜์ง€์‹


์ด ๋‹จ๊ณ„์—์„œ ํ•ด๊ฒฐํ•˜๋ ค๋Š” ๋ฌธ์ œ

Feature Store๋Š” “ML ์„ฑ๋Šฅ"์ด ์•„๋‹ˆ๋ผ ์šด์˜ ์•ˆ์ •์„ฑ/์žฌํ˜„์„ฑ์—์„œ ๋จผ์ € ๊ฐˆ๋ฆฝ๋‹ˆ๋‹ค.

“์˜ค๋Š˜ ๋งŒ๋“  feature.csv"๊ฐ€ ์•„๋‹ˆ๋ผ, ์•„๋ž˜๊ฐ€ ๋ฐ˜๋“œ์‹œ ๋‚จ์•„์•ผ ์šด์˜์ด ๋ฉ๋‹ˆ๋‹ค.

  • ์–ธ์ œ ์ƒ์„ฑ๋๋Š”์ง€ (generated_at)
  • ์–ด๋–ค ์Šคํ‚ค๋งˆ(๊ณ„์•ฝ)๋กœ ์ƒ์„ฑ๋๋Š”์ง€ (schema + schema_hash)
  • ์–ด๋–ค ์†Œ์Šค์—์„œ ์ƒ์„ฑ๋๋Š”์ง€ (source)
  • ์–ด๋–ค ๋ฒ„์ „์œผ๋กœ ์ €์žฅ๋๋Š”์ง€ (version)
  • ๊ฒฐ๊ณผ๋ฌผ์ด ์–ด๋”” ์žˆ๋Š”์ง€ (feature_uri)

Feature Store ๋„์ž…์„ ๊ณ ๋ฏผํ•˜๋ฉด ํ”ํžˆ “Feast๋ถ€ํ„ฐ ์จ์•ผ ํ•˜๋‚˜?“๊ฐ€ ๋จผ์ € ๋‚˜์˜ค๋Š”๋ฐ, ๋„๊ตฌ๋ณด๋‹ค ๋จผ์ € ํ”ผ์ฒ˜ ์ƒ์„ฑ/๋ฒ„์ „ํ™”/์žฌํ˜„ ๋ฐฉ์‹์ด ๊ณ ์ •๋ผ์•ผ ํ•ฉ๋‹ˆ๋‹ค.

์ด ๊ธ€์€ Feast ํ’€๋„์ž… ์ด์ „ ๋‹จ๊ณ„๋กœ, **Feature Store-lite ์ตœ์†Œ ์š”๊ฑด(๊ณ„์•ฝ/๋ฉ”ํƒ€/๋ฒ„์ „ํ™”/์žฌํ˜„์„ฑ)**์„ GitOps + Airflow๋กœ ๋จผ์ € ๊ณ ์ •ํ•˜๋Š” ๊ตฌ์ถ•์ž…๋‹ˆ๋‹ค.

  • Online Store / Serving ์—ฐ๋™์€ ์˜๋„์ ์œผ๋กœ ์ œ์™ธํ–ˆ์Šต๋‹ˆ๋‹ค.
  • ๋Œ€์‹  ์šด์˜์—์„œ ํ†ตํ•˜๋Š” ๋ผˆ๋Œ€(Contract -> Pipeline -> Versioned Storage)๋ถ€ํ„ฐ ๋งŒ๋“ญ๋‹ˆ๋‹ค.

๐ŸŽฏ ์ด ๊ธ€์—์„œ ๋‹ฌ์„ฑํ•˜๋Š” ๊ฒƒ (์™„๋ฃŒ ๊ธฐ์ค€)

GitOps๋กœ ๊ณ„์•ฝ ๋ฐฐํฌ + Airflow๋กœ ์Šคํ‚ค๋งˆ ๊ธฐ๋ฐ˜ ํ”ผ์ฒ˜ ์ƒ์„ฑ + S3 ๋ฒ„์ „ํ™” ์ €์žฅ + ์žฌํ˜„์„ฑ ๋ฉ”ํƒ€ ๋‚จ๊น€


1๏ธโƒฃ ์ „์ฒด ๊ตฌ์กฐ

mermaid-feature-store-01.png

์ด ๊ธ€์—์„œ์˜ Feature Store-lite ๋ฒ”์œ„๋Š” ์•„๋ž˜๊นŒ์ง€์ž…๋‹ˆ๋‹ค.

  • GitOps๋กœ ๋ฐฐํฌ๋œ ๊ณ„์•ฝ ๋ฆฌ์†Œ์Šค(schema/metadata template)
  • Airflow ํŒŒ์ดํ”„๋ผ์ธ ์‹คํ–‰
  • S3์— ๋ฒ„์ „ํ™” ์ €์žฅ + ๋ฉ”ํƒ€๋ฐ์ดํ„ฐ๋กœ ์žฌํ˜„์„ฑ ํ™•๋ณด

Online Store / Serving / Feast ์—ฐ๋™์€ ๋‹ค์Œ ๊ธ€์—์„œ ๋‹ค๋ฃน๋‹ˆ๋‹ค.


2๏ธโƒฃ ์ฝ”๋“œ/๋ฆฌ์†Œ์Šค ํŠธ๋ฆฌ

(A) GitOps ๋ฆฌ์†Œ์Šค

mlops-infra-gitops/
  envs/
    dev/feature-store/feature-store-cm.yaml
    prod/feature-store/feature-store-cm.yaml

(B) ArgoCD Application

mlops-infra/
  apps/
    feature-store-dev.yaml
    feature-store-prod.yaml

(C) Airflow DAG/๋ผ์ด๋ธŒ๋Ÿฌ๋ฆฌ

airflow-dags-dev/dags/
  dag_data_pipeline_daily_v4.py       # DAG๋Š” ์—ฐ๊ฒฐ๋งŒ
  mlops_lib/
    dp/
      config.py
      s3.py
      feature_schema.py
      build.py
      store.py
      tasks.py
  .airflowignore                      # ๋ผ์ด๋ธŒ๋Ÿฌ๋ฆฌ ์Šค์บ” ์ฐจ๋‹จ

3๏ธโƒฃ ์„ค๊ณ„ ํฌ์ธํŠธ (์šด์˜ ๊ด€์  ํ•ต์‹ฌ)

(1) “๊ณ„์•ฝ(Contract)“์„ GitOps๋กœ ๋ฐฐํฌํ•œ๋‹ค

์Šคํ‚ค๋งˆ/๋ฉ”ํƒ€ ํ…œํ”Œ๋ฆฟ์€ ์ฝ”๋“œ๊ฐ€ ์•„๋‹ˆ๋ผ ์šด์˜ ํ‘œ์ค€ ๋ฆฌ์†Œ์Šค๋กœ ๋‹ค๋ค˜์Šต๋‹ˆ๋‹ค.

  • ConfigMap์œผ๋กœ ๊ด€๋ฆฌํ•˜๊ณ  ArgoCD๋กœ dev/prod์— ๋™์ผ ๋ฐฐํฌ
  • Airflow๋Š” ํŒŒ์ผ์„ ์ฝ๊ธฐ๋งŒ ํ•˜๋„๋ก ๊ตฌ์„ฑ -> ์ฝ”๋“œ ๋ณ€๊ฒฝ ์ตœ์†Œํ™”

(2) DAG๋Š” ์–‡๊ฒŒ, ๋กœ์ง์€ ํŒจํ‚ค์ง€๋กœ ๋ถ„๋ฆฌํ•œ๋‹ค

DAG๊ฐ€ ๋‘๊บผ์›Œ์ง€๋Š” ์ˆœ๊ฐ„ ์œ ์ง€๋ณด์ˆ˜/ํ…Œ์ŠคํŠธ/์žฌ์‚ฌ์šฉ์ด ๋ฐ”๋กœ ๊นจ์ง‘๋‹ˆ๋‹ค.

  • ๋กœ์ง์„ mlops_lib/dp/*๋กœ ๋ถ„๋ฆฌํ•˜๋ฉด
    • ๋กœ์ง ๋‹จ์œ„ ์ˆ˜์ •/์žฌ์‚ฌ์šฉ ๊ฐ€๋Šฅ
    • ํ…Œ์ŠคํŠธ/๋ฆฌํŒฉํ„ฐ๋ง์ด ์‰ฌ์›€
    • DAG๋Š” ํ๋ฆ„๋งŒ ์œ ์ง€

(3) ๋ฒ„์ „ํ™” ์ €์žฅ์€ “๊นŠ์–ด์ง€๋Š” ๊ฒƒ"์ด ์•„๋‹ˆ๋ผ “ํ‘œ์ค€ํ™”"๋‹ค

๋ฒ„์ „ ๋””๋ ‰ํ† ๋ฆฌ depth๋Š” 1๋‹จ ๊ณ ์ •์ด ์šด์˜์—์„œ ๊ฐ€์žฅ ์•ˆ์ „ํ•ฉ๋‹ˆ๋‹ค.

  • ๊ถŒ์žฅ: s3://<feature_base>/<feature_set>/<version>/
  • ํ”ํ•œ ์‹คํŒจ: ๋‚ ์งœ/์‹คํ–‰ID/ํƒœ์ŠคํฌID๋ฅผ ์„ž์–ด depth ํญ๋ฐœ -> “์ตœ์‹ ์ด ๋ฌด์—‡์ธ์ง€"๊ฐ€ ์• ๋งคํ•ด์ง

4๏ธโƒฃ Contract ๋ฆฌ์†Œ์Šค GitOps ๋ฐฐํฌ + Airflow ๋งˆ์šดํŠธ

ConfigMap์œผ๋กœ ๊ณ„์•ฝ ๋ฆฌ์†Œ์Šค๋ฅผ ๋ถ„๋ฆฌํ•ด GitOps๋กœ ๋ฐฐํฌํ•ฉ๋‹ˆ๋‹ค.

apiVersion: v1
kind: ConfigMap
metadata:
  name: feature-store-resources
  namespace: airflow-dev
data:
  user_features.schema.json: |
    {
      "feature_set": "user_features",
      "version": "v1",
      "columns": [
        {"name": "user_id", "type": "int64"},
        {"name": "f_total_events_7d", "type": "int64"}
      ],
      "primary_keys": ["user_id"]
    }

  metadata.json.j2: |
    {
      "feature_set": "{{ feature_set }}",
      "version": "{{ version }}",
      "generated_at": "{{ generated_at }}",
      # ... (์ดํ•˜ ์ƒ๋žต)
    }

Airflow ๊ตฌ์„ฑ์š”์†Œ์— ๋™์ผํ•˜๊ฒŒ ๋งˆ์šดํŠธํ•˜๊ณ , mountPath๋ฅผ ํ‘œ์ค€ํ™”ํ•ฉ๋‹ˆ๋‹ค.

  • /opt/airflow/feature-store/
    • user_features.schema.json
    • metadata.json.j2

5๏ธโƒฃ ์žฌํ˜„์„ฑ์˜ ํ•ต์‹ฌ: schema_hash

์Šคํ‚ค๋งˆ๋Š” “๊ณ„์•ฝ์„œ"์ด๊ณ , schema_hash๋Š” “์„œ๋ช…"์ž…๋‹ˆ๋‹ค. ์ด ํ•ด์‹œ๊ฐ€ ์žˆ์–ด์•ผ “๊ทธ๋•Œ ๊ทธ ์Šคํ‚ค๋งˆ๋กœ ๋งŒ๋“  ํ”ผ์ฒ˜"๋ฅผ ์‹œ์Šคํ…œ์ ์œผ๋กœ ์ฆ๋ช…ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

# dags/mlops_lib/dp/feature_schema.py
import json, hashlib

def load_schema(schema_path: str, expected_feature_set: str) -> tuple[dict, str]:
    with open(schema_path, "r", encoding="utf-8") as f:
        schema = json.load(f)

    if schema.get("feature_set") != expected_feature_set:
        raise ValueError(f"schema mismatch: {schema.get('feature_set')} != {expected_feature_set}")

    canonical = json.dumps(schema, ensure_ascii=False, sort_keys=True, separators=(",", ":")).encode("utf-8")
    schema_hash = hashlib.sha256(canonical).hexdigest()
    return schema, schema_hash

6๏ธโƒฃ Airflow ํŒŒ์ดํ”„๋ผ์ธ ํ๋ฆ„

์›์น™: DAG๋Š” ์—ฐ๊ฒฐ๋งŒ ๋‹ด๋‹นํ•œ๋‹ค

# dags/dag_data_pipeline_daily_v4.py (์š”์•ฝ)
with DAG(
    dag_id="data_pipeline_daily_dev_v4",
    schedule=None,
    catchup=False,
    max_active_runs=1,
    tags=["data-pipeline", "dev", "mlops"],
    on_failure_callback=alert_slack,
) as dag:
    t1 = PythonOperator(task_id="extract_raw_data", python_callable=task_extract_raw_data)
    t2 = PythonOperator(task_id="validate_data", python_callable=task_validate_data)
    t3 = PythonOperator(task_id="build_features", python_callable=task_build_features)
    t4 = PythonOperator(task_id="store_features", python_callable=task_store_features)
    # ... (์ดํ•˜ ์ƒ๋žต)

    t1 >> t2 >> t3 >> t4

์‹คํ–‰ ๋‹จ๊ณ„

  • extract_raw_data: RAW S3 ๊ฐ์ฒด ์กด์žฌ ํ™•์ธ + source ๊ฒฝ๋กœ XCom ๊ธฐ๋ก
  • validate_data: ์ตœ์†Œ ๊ฒ€์ฆ(๋นˆ ๋ฐ์ดํ„ฐ ๋ฐฉ์ง€)
  • build_features: schema ๋กœ๋“œ + schema_hash ์ƒ์„ฑ + CSV ์ƒ์„ฑ(์Šคํ‚ค๋งˆ ์ˆœ์„œ ์ค€์ˆ˜)
  • store_features: ๋ฒ„์ „ ์ƒ์„ฑ ํ›„ S3์— 3ํŒŒ์ผ ์„ธํŠธ ์ €์žฅ

7๏ธโƒฃ S3 ์ €์žฅ ๊ทœ์น™: “๋ฒ„์ „ 1๋‹จ + 3ํŒŒ์ผ ์„ธํŠธ”

์‹ค๋ฌด์—์„œ ๊ฐ€์žฅ ์ค‘์š”ํ•œ ๊ณ ์ • ๊ทœ์น™์ž…๋‹ˆ๋‹ค.

  • features.csv
  • schema.json
  • metadata.json
# dags/mlops_lib/dp/store.py (ํ•ต์‹ฌ)
def store_features(feature_base, pipeline_name, feature_set, metadata_tpl_path, ti):
    ver = _version_id(getattr(ti, "execution_date", None))
    prefix = prefix.rstrip("/") + f"/{feature_set}/{ver}/"

    feature_uri = f"s3://{bkt}/{prefix}features.csv"

    s3.put_object(... Key=f"{prefix}features.csv", Body=features_csv.encode("utf-8"))
    s3.put_object(... Key=f"{prefix}schema.json", Body=json.dumps(schema, ...).encode("utf-8"))
    s3.put_object(... Key=f"{prefix}metadata.json", Body=meta.encode("utf-8"))
    # ... (์ดํ•˜ ์ƒ๋žต)

8๏ธโƒฃ ๊ฒ€์ฆ ์ฒดํฌ๋ฆฌ์ŠคํŠธ (์šด์˜ํ˜•)

  • ArgoCD์—์„œ feature-store-dev, feature-store-prod Synced/Healthy
  • Airflow Pod ๋‚ด๋ถ€ ํŒŒ์ผ ์กด์žฌ ํ™•์ธ
    • /opt/airflow/feature-store/user_features.schema.json
    • /opt/airflow/feature-store/metadata.json.j2
  • DAG ์‹คํ–‰ ์„ฑ๊ณต
  • S3์— ๋ฒ„์ „ํ™” prefix ์ƒ์„ฑ ํ™•์ธ
  • ๋™์ผ prefix์— 3๊ฐœ ํŒŒ์ผ ์กด์žฌ: features.csv, schema.json, metadata.json
  • metadata.json ๊ฐ’ ์ฑ„์›Œ์ง: schema_hash, feature_uri, generated_at, source
  • KubernetesExecutor Task Pod์—์„œ AWS credential chain ์ •์ƒ ๋™์ž‘

9๏ธโƒฃ ํŠธ๋Ÿฌ๋ธ”์ŠˆํŒ… (์ด๋ฒˆ ๊ตฌ์ถ•์—์„œ ์‹ค์ œ๋กœ ๊ฑธ๋ฆฐ ์ง€์ )

(1) ArgoCD Application apply ์—๋Ÿฌ: spec.orphanedResources

  • CRD ๋ฒ„์ „์— ๋”ฐ๋ผ ํ•„๋“œ๊ฐ€ ์—†์–ด์„œ strict decode ์‹คํŒจ
  • ํ•ด๊ฒฐ: ํ•ด๋‹น ํ•„๋“œ ์ œ๊ฑฐ(๋˜๋Š” ArgoCD ๋ฒ„์ „๊ณผ ์ŠคํŽ™ ์ •ํ•ฉ ๋งž์ถ”๊ธฐ)

(2) Task Pod์—์„œ Variable.get(default_var=...) ์—๋Ÿฌ

  • ์—๋Ÿฌ: unexpected keyword argument 'default_var'
  • ์›์ธ: Airflow ๋ฒ„์ „/SDK ์ฐจ์ด
  • ํ•ด๊ฒฐ: ์˜ˆ์™ธ ์ฒ˜๋ฆฌ๋กœ ์•ˆ์ „ํ™”
def _get_var(key: str, default: str) -> str:
    try:
        return Variable.get(key)
    except Exception:
        return default

(3) mlops_lib ๋ถ„๋ฆฌ ํ›„ import ์—๋Ÿฌ

  • Airflow๊ฐ€ ๋ผ์ด๋ธŒ๋Ÿฌ๋ฆฌ ํŒŒ์ผ๊นŒ์ง€ DAG์ฒ˜๋Ÿผ ์Šค์บ”ํ•˜๋ฉด์„œ import๊ฐ€ ๊นจ์ง
  • ํ•ด๊ฒฐ: .airflowignore๋กœ ์Šค์บ” ์ฐจ๋‹จ

(4) KubernetesExecutor ๋กœ๊ทธ ์ถ”์  ๋‚œ์ด๋„

  • Task Pod๊ฐ€ ์งง๊ฒŒ ์˜ฌ๋ผ์™”๋‹ค๊ฐ€ ์‚ฌ๋ผ์ง
  • ์‹ค๋ฌด ํŒ: Airflow UI ๋กœ๊ทธ + remote logging(๊ถŒ์žฅ), ๋˜๋Š” ๋น ๋ฅด๊ฒŒ kubectl logs -f๋กœ ์ถ”์ 

์„ค๊ณ„ ํŒ๋‹จ (Why This Way?)

Feast ๋„์ž… ์ „์— ์Šคํ‚ค๋งˆยท๋ฒ„์ „ํ™”ยท์žฌํ˜„์„ฑ ๊ณ„์•ฝ์„ ๋จผ์ € ํ™•๋ฆฝํ•ด์•ผ ๋„๊ตฌ ์œ„์— ์–น๊ธฐ๋งŒ ํ•˜๋ฉด ๋˜๊ณ , ์Šคํ‚ค๋งˆ๋Š” ConfigMap์œผ๋กœ ๋ถ„๋ฆฌํ•˜์—ฌ ArgoCD sync ๊ธฐ๋ฐ˜ GitOps ๋ฐฐํฌ์™€ ๋กค๋ฐฑ์„ ๊ฐ€๋Šฅํ•˜๊ฒŒ ํ–ˆ์Šต๋‹ˆ๋‹ค. ๋ฒ„์ „ ๋””๋ ‰ํ† ๋ฆฌ๋ฅผ 1๋‹จ ๊ตฌ์กฐ๋กœ ๊ณ ์ •ํ•˜์—ฌ ์ตœ์‹  ๋ฒ„์ „ ํƒ์ƒ‰๊ณผ latest ํฌ์ธํ„ฐ ๋„์ž…์„ ๋‹จ์ˆœํ™”ํ–ˆ์Šต๋‹ˆ๋‹ค.


๋‹ค์Œ์— ์ฝ์„ ๊ธ€

โ†’ Feature Store & Feast - Feast โ€” Feast ๋„์ž…๊ณผ ์šด์˜