-
데이터 오케스트레이션 dagster와 dbt에 대해서 알아보기Review 2024. 7. 9. 23:08
뭔가 비웃는거 같다... dagster
- 데이터 오케스트레이션을 강조하는 스케줄러
- op로 파이프라인의 잡을 정의하며 op로 이어놓은 workflow들은 job으로 구현한다
- 각각 op와 job은 데코레이터로 정의된다
- 하나의 스크립트에 다수의 pipeline을 포함할 수 있으며 이를 통해 여러 workflow를 구현가능하다→하나의 스크립트를 repository라 정의
- 각각의 op는 별도의 input과 output을 가질 수 있으며, op간 변수 상속이 가능하다
- Op: 가장 작은 단위로, 개별 작업을 수행한다.
- Asset: job의 결과물로 생성된 데이터 또는 자료
- Job: 여러 op를 결합하여 특정 작업을 수행한다. job은 op 간의 실행 순서를 정의한다
- op와 asset은 task 단위라는 성격은 같지만 프로세싱하는 주체의 유무와 데펜던시의 업다운스트림 관계에 따라서 명칭이 갈린다.
- Assets은 데이터와 리니지에 초점을 맞추고 운영은 컴퓨팅 작업에 초점을 맞춤
- Assets은 의존성을 인식하는 반면에 op는 그래프에 배치되기 전까지는 의존성을 알지 못함
- Assets은 데이터를 정의하고 관리하는 데 사용되며, op는 컴퓨팅 워크플로우의 단계를 정의하는 데 사용됨
// asset example @asset def logins(website_events: DataFrame) -> DataFrame: return website_events[website_events["type"] == "login"] // op example @op def extract(): ... @op def transform(extracted_data): ... @op def load(transformed_data): ...
아키텍처
- 기본적으로 웹서버와 데몬, 코드 저장소는 항상 떠있고 task 실행시 일시적으로 익스큐터가 동작해서 작업한 뒤 사라지는 형식. airflow 쿠버네티스 잡 오퍼레이터와 유사함
- 웹서버는 레플리카셋을 가질 수 있으나 데몬은 단독 실행만 가능(큐잉 역할을 하기 때문인듯)
- 에어플로우와 유사하지만 코드 저장소를 따로 둔다는 것과 워커가 존재하지 않는다는 차이가 있음→셀러리 익스큐터를 사용하더라도 항상 떠있지는 않는듯
특징
- dbt, snowflake, csv, Fivetran, TensorFlow 등 다양한 에셋을 통해 데이터를 모델링할 수있음→airflow의 오퍼레이터와 유사하지만 좀더 가시적으로 보고 관리할 수 있음
- 리니지 관계를 job 단위가 아니라 글로벌 스코프로 확인할 수 있다.
- 굳이 이것을 위해서 넘어가기에는 큰 피쳐도 아니고 마이그레이션 비용이 더 커보임
기대한 것
- 코드 최소화
- 번거로운 작업 없이나 코드 푸시 없이 웹 콘솔이나 간단하게 작업 가능한지->불가능
- 작업 간 휴먼 에러 최소화
- 코드 최소화하는 이유이기도 하며 테스트코드를 통해서 어느정도 극복 가능함
- 의존성 표현
- dagster는 화면에서 좀더 깔끔하게 사용하는 외부 객체나 의존성들을 표현함
dbt 모델을 어떻게 표현하는가
from dagster_airbyte import build_airbyte_assets from dagster_dbt import load_assets_from_dbt_project from dagster import asset, Definitions from .constants import AIRBYTE_CONNECTION_ID, DBT_PROJECT_DIR airbyte_assets = build_airbyte_assets( connection_id=AIRBYTE_CONNECTION_ID, destination_tables=["orders", "users"], asset_key_prefix=["postgres_replica"], ) dbt_assets = load_assets_from_dbt_project(project_dir=DBT_PROJECT_DIR) @asset(compute_kind="python") def order_forecast_model(daily_order_summary: pd.DataFrame) -> np.ndarray: train_set = daily_order_summary.to_numpy() xdata, ydata = train_set[:, 0], train_set[:, 2] return optimize.curve_fit(f=model_func, xdata=xdata, ydata=ydata, p0=[10, 100])[0] defs = Definitions( assets=[*airbyte_assets, *dbt_assets, order_forecast_model] )
- 따로 의존성을 코드로 표현하는 것이 아닌 dbt에서 sql의 from절의 변수명을 파싱해서 upstream과 downstream을 표시
{{ config(materialized='table') }} select 1 from {{ ref('coalition_agent_schedule') }}
dbt
- ELT에서 T(transformation)를 담당
- 오픈 소스로 활용할 수 있고, DBT Cloud 같이 유료 버전도 있음
- SQL base의 transformation만 가능하고, 대신 jinja template을 활용해서 다양한 처리(ex. 사용자 정의 함수)를 제공함
- ex) base_orders, base_payments model을 통해 orders라는 모델을 만듦
select orders.id, orders.status, sum(case when payments.payment_method = 'bank_transfer' then payments.amount else 0 end) as bank_transfer_amount, sum(case when payments.payment_method = 'credit_card' then payments.amount else 0 end) as credit_card_amount, sum(case when payments.payment_method = 'gift_card' then payments.amount else 0 end) as gift_card_amount, sum(amount) as total_amount from {{ ref('base_orders') }} as orders left join {{ ref('base_payments') }} as payments on payments.order_id = orders.id
기본 구조
- models: transformation 관련 쿼리를 정의
- macros: 쿼리에 활용될 사용자 정의 함수 등을 정의
- dbt profile: dbt에서 쿼리를 실행할 때 어떤 쿼리엔진 또는 DB를 사용할 것인지 (ex. spark thrift server, postgresql, etc.)
- dbt_project.yml: project에서 사용되는 model에 대한 설정을 정의...할 수 있지만 모델별로 {{ config() }}로 설정할 수 있음
모델 정의
materialized라는 항목으로 모델의 타입을 정의할 수 있음
- table: 실제 테이블로 생성
- view: view로 생성
- incremental: 증분 처리할 경우 사용, is_incremental()과 함께 쓰임
- ephemeral: 임시 테이블(실행할 때만 잠깐 생성됐다가 실행 후 삭제).
문서화 가능
- 간단하게 로컬에서 데이터 거버넌스 관리 가능
dbt docs generate dbt docs serve
요청 자동화 관점
- on_schema_change→컬럼 추가요청
- 모델이 변경할때 대한 설정, dbt의 기본 설정은 ignore. 이 설정을 append_new_columns로 설정하면 스키마가 추가되는 경우에 컬럼이 append될 수 있음→incremental의 경우이고 테이블이 설정되어있으면 전체 적용
- materalized: table→테이블 추가요청
- sql문을 통해서 바로 쿼리 변경없이 추가 가능
- astronomer-cosmos
- dbt 모델을 파싱하면서 의존성을 파악하고 airflow task로 그려줌
- DbtTaskGroup 인스턴스를 사용해 dbt run, dbt test 명령어를 하나의 task group 내에서 실행할 수 있도록 여러 모델을 task group 단위로 묶을 수 있음
- RenderConfig 인스턴스의 select argument를 사용하면 특정 모델만 선택하여 그룹 생성 가능
def test_dag(): event_summary = DbtTaskGroup( group_id="eventSummary", project_config=ProjectConfig(DBT_PROJECT_PATH), profile_config=profile_config, render_config=RenderConfig( select=["eventSummary"]), operator_args={ "vars": '{"ds": {{ params.ds }} }', }, default_args={"retries": 2}, ) event_summary
- ExternalTaskSensor
- 단 astronomer-cosmos만으로는 다른 파이프라인과 DBT 모델 간의 의존성을 해결할 수 없음 ExternalTaskSensor를 활용한 당근마켓의 예처럼 ExternalTaskSensor를 활용하면 쉽게 라이브러리로 만들 수 있을 것으로 보임
DBT와 Airflow 도입하며 마주한 7가지 문제들
안녕하세요. 당근 데이터 가치화팀의 Henry예요.
medium.com
- spark나 Trino, duckDB 등 다양한 엔진을 프로세싱에 사용할 수 있음
장점
- dbt docs를 통해서 테이블 명세나 리니지 등을 쉽게 확인하고 작업시에 참고할 수 있음
- 엔지니어의 도움없이 쉽게 칼럼 추가, 테이블 추가 자동화, 테스트 자동화(로컬에서도 가능)
- 여러 모던 아키텍처 도구들과 통합이 용이하다(airbyte, prefect, dagster 등)
단점
- 결국 코드 작성 및 푸시를 해야함(초반 허들 및 교육 필요)
- 애드훅하게 결과를 확인할 수 없기 때문에 간단한 문법 오류나 디버깅하기가 불편함
- 배포를 위해서는 유닛 테스트와 함께 CI/CD 기반이 되어야지 쓸만할듯
결론
- 제로베이스에서면 몰라도 airflow를 운용하고있는데 굳이 dagster로 마이그레이션하거나 추가할 필요성은 없어보임
- cosmos도 많이 발전하긴 했지만 dbt를 운용하고 있다면 의존성 표현과 스케줄러를 위해서 dagster도 괜찮은 선택 같아보임
'Review' 카테고리의 다른 글
if kakao 2024 주키퍼 없이 운영하는 카프카 운영 노하우 세션 정리 (0) 2025.02.11 네이버 DAN24 플링크와 아이스버그를 활용한 데이터 웨어하우스 세션 정리 (1) 2025.01.19 분산 쿼리 엔진 prestoSQL의 바뀐 이름 Trino (2) 2022.01.04 재택근무를 희망하는 히치하이커를 위한 안내서 (0) 2021.10.17 제닉스 XPAM 키보드 팜레스트 리뷰 (0) 2021.03.01