์ด ๊ธ์์ ๋ค๋ฃจ๋ ๊ฒ
Feast ํ๋์ ์ด์ ๋จ๊ณ๋ก, GitOps + Airflow๋ฅผ ์ฌ์ฉํด ํผ์ฒ ์์ฑ/๋ฒ์ ํ/์ฌํ์ฑ์ ์ต์ ์๊ฑด(๊ณ์ฝ/๋ฉํ/๋ฒ์ ํ)์ ๊ณ ์ ํ๋ Feature Store-lite ์ค๊ณ
์ ์์ง์
์ด ๋จ๊ณ์์ ํด๊ฒฐํ๋ ค๋ ๋ฌธ์
Feature Store๋ “ML ์ฑ๋ฅ"์ด ์๋๋ผ ์ด์ ์์ ์ฑ/์ฌํ์ฑ์์ ๋จผ์ ๊ฐ๋ฆฝ๋๋ค.
“์ค๋ ๋ง๋ feature.csv"๊ฐ ์๋๋ผ, ์๋๊ฐ ๋ฐ๋์ ๋จ์์ผ ์ด์์ด ๋ฉ๋๋ค.
- ์ธ์ ์์ฑ๋๋์ง (generated_at)
- ์ด๋ค ์คํค๋ง(๊ณ์ฝ)๋ก ์์ฑ๋๋์ง (schema + schema_hash)
- ์ด๋ค ์์ค์์ ์์ฑ๋๋์ง (source)
- ์ด๋ค ๋ฒ์ ์ผ๋ก ์ ์ฅ๋๋์ง (version)
- ๊ฒฐ๊ณผ๋ฌผ์ด ์ด๋ ์๋์ง (feature_uri)
Feature Store ๋์ ์ ๊ณ ๋ฏผํ๋ฉด ํํ “Feast๋ถํฐ ์จ์ผ ํ๋?“๊ฐ ๋จผ์ ๋์ค๋๋ฐ, ๋๊ตฌ๋ณด๋ค ๋จผ์ ํผ์ฒ ์์ฑ/๋ฒ์ ํ/์ฌํ ๋ฐฉ์์ด ๊ณ ์ ๋ผ์ผ ํฉ๋๋ค.
์ด ๊ธ์ Feast ํ๋์ ์ด์ ๋จ๊ณ๋ก, **Feature Store-lite ์ต์ ์๊ฑด(๊ณ์ฝ/๋ฉํ/๋ฒ์ ํ/์ฌํ์ฑ)**์ GitOps + Airflow๋ก ๋จผ์ ๊ณ ์ ํ๋ ๊ตฌ์ถ์ ๋๋ค.
- Online Store / Serving ์ฐ๋์ ์๋์ ์ผ๋ก ์ ์ธํ์ต๋๋ค.
- ๋์ ์ด์์์ ํตํ๋ ๋ผ๋(Contract -> Pipeline -> Versioned Storage)๋ถํฐ ๋ง๋ญ๋๋ค.
๐ฏ ์ด ๊ธ์์ ๋ฌ์ฑํ๋ ๊ฒ (์๋ฃ ๊ธฐ์ค)
GitOps๋ก ๊ณ์ฝ ๋ฐฐํฌ + Airflow๋ก ์คํค๋ง ๊ธฐ๋ฐ ํผ์ฒ ์์ฑ + S3 ๋ฒ์ ํ ์ ์ฅ + ์ฌํ์ฑ ๋ฉํ ๋จ๊น
1๏ธโฃ ์ ์ฒด ๊ตฌ์กฐ

์ด ๊ธ์์์ Feature Store-lite ๋ฒ์๋ ์๋๊น์ง์ ๋๋ค.
- GitOps๋ก ๋ฐฐํฌ๋ ๊ณ์ฝ ๋ฆฌ์์ค(schema/metadata template)
- Airflow ํ์ดํ๋ผ์ธ ์คํ
- S3์ ๋ฒ์ ํ ์ ์ฅ + ๋ฉํ๋ฐ์ดํฐ๋ก ์ฌํ์ฑ ํ๋ณด
Online Store / Serving / Feast ์ฐ๋์ ๋ค์ ๊ธ์์ ๋ค๋ฃน๋๋ค.
2๏ธโฃ ์ฝ๋/๋ฆฌ์์ค ํธ๋ฆฌ
(A) GitOps ๋ฆฌ์์ค
mlops-infra-gitops/
envs/
dev/feature-store/feature-store-cm.yaml
prod/feature-store/feature-store-cm.yaml
(B) ArgoCD Application
mlops-infra/
apps/
feature-store-dev.yaml
feature-store-prod.yaml
(C) Airflow DAG/๋ผ์ด๋ธ๋ฌ๋ฆฌ
airflow-dags-dev/dags/
dag_data_pipeline_daily_v4.py # DAG๋ ์ฐ๊ฒฐ๋ง
mlops_lib/
dp/
config.py
s3.py
feature_schema.py
build.py
store.py
tasks.py
.airflowignore # ๋ผ์ด๋ธ๋ฌ๋ฆฌ ์ค์บ ์ฐจ๋จ
3๏ธโฃ ์ค๊ณ ํฌ์ธํธ (์ด์ ๊ด์ ํต์ฌ)
(1) “๊ณ์ฝ(Contract)“์ GitOps๋ก ๋ฐฐํฌํ๋ค
์คํค๋ง/๋ฉํ ํ ํ๋ฆฟ์ ์ฝ๋๊ฐ ์๋๋ผ ์ด์ ํ์ค ๋ฆฌ์์ค๋ก ๋ค๋ค์ต๋๋ค.
- ConfigMap์ผ๋ก ๊ด๋ฆฌํ๊ณ ArgoCD๋ก dev/prod์ ๋์ผ ๋ฐฐํฌ
- Airflow๋ ํ์ผ์ ์ฝ๊ธฐ๋ง ํ๋๋ก ๊ตฌ์ฑ -> ์ฝ๋ ๋ณ๊ฒฝ ์ต์ํ
(2) DAG๋ ์๊ฒ, ๋ก์ง์ ํจํค์ง๋ก ๋ถ๋ฆฌํ๋ค
DAG๊ฐ ๋๊บผ์์ง๋ ์๊ฐ ์ ์ง๋ณด์/ํ ์คํธ/์ฌ์ฌ์ฉ์ด ๋ฐ๋ก ๊นจ์ง๋๋ค.
- ๋ก์ง์
mlops_lib/dp/*๋ก ๋ถ๋ฆฌํ๋ฉด- ๋ก์ง ๋จ์ ์์ /์ฌ์ฌ์ฉ ๊ฐ๋ฅ
- ํ ์คํธ/๋ฆฌํฉํฐ๋ง์ด ์ฌ์
- DAG๋ ํ๋ฆ๋ง ์ ์ง
(3) ๋ฒ์ ํ ์ ์ฅ์ “๊น์ด์ง๋ ๊ฒ"์ด ์๋๋ผ “ํ์คํ"๋ค
๋ฒ์ ๋๋ ํ ๋ฆฌ depth๋ 1๋จ ๊ณ ์ ์ด ์ด์์์ ๊ฐ์ฅ ์์ ํฉ๋๋ค.
- ๊ถ์ฅ:
s3://<feature_base>/<feature_set>/<version>/ - ํํ ์คํจ: ๋ ์ง/์คํID/ํ์คํฌID๋ฅผ ์์ด depth ํญ๋ฐ -> “์ต์ ์ด ๋ฌด์์ธ์ง"๊ฐ ์ ๋งคํด์ง
4๏ธโฃ Contract ๋ฆฌ์์ค GitOps ๋ฐฐํฌ + Airflow ๋ง์ดํธ
ConfigMap์ผ๋ก ๊ณ์ฝ ๋ฆฌ์์ค๋ฅผ ๋ถ๋ฆฌํด GitOps๋ก ๋ฐฐํฌํฉ๋๋ค.
apiVersion: v1
kind: ConfigMap
metadata:
name: feature-store-resources
namespace: airflow-dev
data:
user_features.schema.json: |
{
"feature_set": "user_features",
"version": "v1",
"columns": [
{"name": "user_id", "type": "int64"},
{"name": "f_total_events_7d", "type": "int64"}
],
"primary_keys": ["user_id"]
}
metadata.json.j2: |
{
"feature_set": "{{ feature_set }}",
"version": "{{ version }}",
"generated_at": "{{ generated_at }}",
# ... (์ดํ ์๋ต)
}
Airflow ๊ตฌ์ฑ์์์ ๋์ผํ๊ฒ ๋ง์ดํธํ๊ณ , mountPath๋ฅผ ํ์คํํฉ๋๋ค.
/opt/airflow/feature-store/user_features.schema.jsonmetadata.json.j2
5๏ธโฃ ์ฌํ์ฑ์ ํต์ฌ: schema_hash
์คํค๋ง๋ “๊ณ์ฝ์"์ด๊ณ , schema_hash๋ “์๋ช
"์
๋๋ค.
์ด ํด์๊ฐ ์์ด์ผ “๊ทธ๋ ๊ทธ ์คํค๋ง๋ก ๋ง๋ ํผ์ฒ"๋ฅผ ์์คํ
์ ์ผ๋ก ์ฆ๋ช
ํ ์ ์์ต๋๋ค.
# dags/mlops_lib/dp/feature_schema.py
import json, hashlib
def load_schema(schema_path: str, expected_feature_set: str) -> tuple[dict, str]:
with open(schema_path, "r", encoding="utf-8") as f:
schema = json.load(f)
if schema.get("feature_set") != expected_feature_set:
raise ValueError(f"schema mismatch: {schema.get('feature_set')} != {expected_feature_set}")
canonical = json.dumps(schema, ensure_ascii=False, sort_keys=True, separators=(",", ":")).encode("utf-8")
schema_hash = hashlib.sha256(canonical).hexdigest()
return schema, schema_hash
6๏ธโฃ Airflow ํ์ดํ๋ผ์ธ ํ๋ฆ
์์น: DAG๋ ์ฐ๊ฒฐ๋ง ๋ด๋นํ๋ค
# dags/dag_data_pipeline_daily_v4.py (์์ฝ)
with DAG(
dag_id="data_pipeline_daily_dev_v4",
schedule=None,
catchup=False,
max_active_runs=1,
tags=["data-pipeline", "dev", "mlops"],
on_failure_callback=alert_slack,
) as dag:
t1 = PythonOperator(task_id="extract_raw_data", python_callable=task_extract_raw_data)
t2 = PythonOperator(task_id="validate_data", python_callable=task_validate_data)
t3 = PythonOperator(task_id="build_features", python_callable=task_build_features)
t4 = PythonOperator(task_id="store_features", python_callable=task_store_features)
# ... (์ดํ ์๋ต)
t1 >> t2 >> t3 >> t4
์คํ ๋จ๊ณ
- extract_raw_data: RAW S3 ๊ฐ์ฒด ์กด์ฌ ํ์ธ + source ๊ฒฝ๋ก XCom ๊ธฐ๋ก
- validate_data: ์ต์ ๊ฒ์ฆ(๋น ๋ฐ์ดํฐ ๋ฐฉ์ง)
- build_features: schema ๋ก๋ + schema_hash ์์ฑ + CSV ์์ฑ(์คํค๋ง ์์ ์ค์)
- store_features: ๋ฒ์ ์์ฑ ํ S3์ 3ํ์ผ ์ธํธ ์ ์ฅ
7๏ธโฃ S3 ์ ์ฅ ๊ท์น: “๋ฒ์ 1๋จ + 3ํ์ผ ์ธํธ”
์ค๋ฌด์์ ๊ฐ์ฅ ์ค์ํ ๊ณ ์ ๊ท์น์ ๋๋ค.
features.csvschema.jsonmetadata.json
# dags/mlops_lib/dp/store.py (ํต์ฌ)
def store_features(feature_base, pipeline_name, feature_set, metadata_tpl_path, ti):
ver = _version_id(getattr(ti, "execution_date", None))
prefix = prefix.rstrip("/") + f"/{feature_set}/{ver}/"
feature_uri = f"s3://{bkt}/{prefix}features.csv"
s3.put_object(... Key=f"{prefix}features.csv", Body=features_csv.encode("utf-8"))
s3.put_object(... Key=f"{prefix}schema.json", Body=json.dumps(schema, ...).encode("utf-8"))
s3.put_object(... Key=f"{prefix}metadata.json", Body=meta.encode("utf-8"))
# ... (์ดํ ์๋ต)
8๏ธโฃ ๊ฒ์ฆ ์ฒดํฌ๋ฆฌ์คํธ (์ด์ํ)
- ArgoCD์์
feature-store-dev,feature-store-prodSynced/Healthy - Airflow Pod ๋ด๋ถ ํ์ผ ์กด์ฌ ํ์ธ
/opt/airflow/feature-store/user_features.schema.json/opt/airflow/feature-store/metadata.json.j2
- DAG ์คํ ์ฑ๊ณต
- S3์ ๋ฒ์ ํ prefix ์์ฑ ํ์ธ
- ๋์ผ prefix์ 3๊ฐ ํ์ผ ์กด์ฌ:
features.csv,schema.json,metadata.json - metadata.json ๊ฐ ์ฑ์์ง:
schema_hash,feature_uri,generated_at,source - KubernetesExecutor Task Pod์์ AWS credential chain ์ ์ ๋์
9๏ธโฃ ํธ๋ฌ๋ธ์ํ (์ด๋ฒ ๊ตฌ์ถ์์ ์ค์ ๋ก ๊ฑธ๋ฆฐ ์ง์ )
(1) ArgoCD Application apply ์๋ฌ: spec.orphanedResources
- CRD ๋ฒ์ ์ ๋ฐ๋ผ ํ๋๊ฐ ์์ด์ strict decode ์คํจ
- ํด๊ฒฐ: ํด๋น ํ๋ ์ ๊ฑฐ(๋๋ ArgoCD ๋ฒ์ ๊ณผ ์คํ ์ ํฉ ๋ง์ถ๊ธฐ)
(2) Task Pod์์ Variable.get(default_var=...) ์๋ฌ
- ์๋ฌ:
unexpected keyword argument 'default_var' - ์์ธ: Airflow ๋ฒ์ /SDK ์ฐจ์ด
- ํด๊ฒฐ: ์์ธ ์ฒ๋ฆฌ๋ก ์์ ํ
def _get_var(key: str, default: str) -> str:
try:
return Variable.get(key)
except Exception:
return default
(3) mlops_lib ๋ถ๋ฆฌ ํ import ์๋ฌ
- Airflow๊ฐ ๋ผ์ด๋ธ๋ฌ๋ฆฌ ํ์ผ๊น์ง DAG์ฒ๋ผ ์ค์บํ๋ฉด์ import๊ฐ ๊นจ์ง
- ํด๊ฒฐ:
.airflowignore๋ก ์ค์บ ์ฐจ๋จ
(4) KubernetesExecutor ๋ก๊ทธ ์ถ์ ๋์ด๋
- Task Pod๊ฐ ์งง๊ฒ ์ฌ๋ผ์๋ค๊ฐ ์ฌ๋ผ์ง
- ์ค๋ฌด ํ: Airflow UI ๋ก๊ทธ + remote logging(๊ถ์ฅ), ๋๋ ๋น ๋ฅด๊ฒ
kubectl logs -f๋ก ์ถ์
์ค๊ณ ํ๋จ (Why This Way?)
Feast ๋์ ์ ์ ์คํค๋งยท๋ฒ์ ํยท์ฌํ์ฑ ๊ณ์ฝ์ ๋จผ์ ํ๋ฆฝํด์ผ ๋๊ตฌ ์์ ์น๊ธฐ๋ง ํ๋ฉด ๋๊ณ , ์คํค๋ง๋ ConfigMap์ผ๋ก ๋ถ๋ฆฌํ์ฌ ArgoCD sync ๊ธฐ๋ฐ GitOps ๋ฐฐํฌ์ ๋กค๋ฐฑ์ ๊ฐ๋ฅํ๊ฒ ํ์ต๋๋ค. ๋ฒ์ ๋๋ ํ ๋ฆฌ๋ฅผ 1๋จ ๊ตฌ์กฐ๋ก ๊ณ ์ ํ์ฌ ์ต์ ๋ฒ์ ํ์๊ณผ latest ํฌ์ธํฐ ๋์ ์ ๋จ์ํํ์ต๋๋ค.
๋ค์์ ์ฝ์ ๊ธ
โ Feature Store & Feast - Feast โ Feast ๋์ ๊ณผ ์ด์