-
데이터 오케스트레이션 dagster와 dbt에 대해서 알아보기Review 2024. 7. 9. 23:08
dagster
- 데이터 오케스트레이션을 강조하는 스케줄러
- op로 파이프라인의 잡을 정의하며 op로 이어놓은 workflow들은 job으로 구현한다
- 각각 op와 job은 데코레이터로 정의된다
- 하나의 스크립트에 다수의 pipeline을 포함할 수 있으며 이를 통해 여러 workflow를 구현가능하다→하나의 스크립트를 repository라 정의
- 각각의 op는 별도의 input과 output을 가질 수 있으며, op간 변수 상속이 가능하다
- Op: 가장 작은 단위로, 개별 작업을 수행한다.
- Asset: job의 결과물로 생성된 데이터 또는 자료
- Job: 여러 op를 결합하여 특정 작업을 수행한다. job은 op 간의 실행 순서를 정의한다
- op와 asset은 task 단위라는 성격은 같지만 프로세싱하는 주체의 유무와 데펜던시의 업다운스트림 관계에 따라서 명칭이 갈린다.
- Assets은 데이터와 리니지에 초점을 맞추고 운영은 컴퓨팅 작업에 초점을 맞춤
- Assets은 의존성을 인식하는 반면에 op는 그래프에 배치되기 전까지는 의존성을 알지 못함
- Assets은 데이터를 정의하고 관리하는 데 사용되며, op는 컴퓨팅 워크플로우의 단계를 정의하는 데 사용됨
// asset example @asset def logins(website_events: DataFrame) -> DataFrame: return website_events[website_events["type"] == "login"] // op example @op def extract(): ... @op def transform(extracted_data): ... @op def load(transformed_data): ...
아키텍처
- 기본적으로 웹서버와 데몬, 코드 저장소는 항상 떠있고 task 실행시 일시적으로 익스큐터가 동작해서 작업한 뒤 사라지는 형식. airflow 쿠버네티스 잡 오퍼레이터와 유사함
- 웹서버는 레플리카셋을 가질 수 있으나 데몬은 단독 실행만 가능(큐잉 역할을 하기 때문인듯)
- 에어플로우와 유사하지만 코드 저장소를 따로 둔다는 것과 워커가 존재하지 않는다는 차이가 있음→셀러리 익스큐터를 사용하더라도 항상 떠있지는 않는듯
특징
- dbt, snowflake, csv, Fivetran, TensorFlow 등 다양한 에셋을 통해 데이터를 모델링할 수있음→airflow의 오퍼레이터와 유사하지만 좀더 가시적으로 보고 관리할 수 있음
- 리니지 관계를 job 단위가 아니라 글로벌 스코프로 확인할 수 있다.
- 굳이 이것을 위해서 넘어가기에는 큰 피쳐도 아니고 마이그레이션 비용이 더 커보임
기대한 것
- 코드 최소화
- 번거로운 작업 없이나 코드 푸시 없이 웹 콘솔이나 간단하게 작업 가능한지->불가능
- 작업 간 휴먼 에러 최소화
- 코드 최소화하는 이유이기도 하며 테스트코드를 통해서 어느정도 극복 가능함
- 의존성 표현
- dagster는 화면에서 좀더 깔끔하게 사용하는 외부 객체나 의존성들을 표현함
dbt 모델을 어떻게 표현하는가
from dagster_airbyte import build_airbyte_assets from dagster_dbt import load_assets_from_dbt_project from dagster import asset, Definitions from .constants import AIRBYTE_CONNECTION_ID, DBT_PROJECT_DIR airbyte_assets = build_airbyte_assets( connection_id=AIRBYTE_CONNECTION_ID, destination_tables=["orders", "users"], asset_key_prefix=["postgres_replica"], ) dbt_assets = load_assets_from_dbt_project(project_dir=DBT_PROJECT_DIR) @asset(compute_kind="python") def order_forecast_model(daily_order_summary: pd.DataFrame) -> np.ndarray: train_set = daily_order_summary.to_numpy() xdata, ydata = train_set[:, 0], train_set[:, 2] return optimize.curve_fit(f=model_func, xdata=xdata, ydata=ydata, p0=[10, 100])[0] defs = Definitions( assets=[*airbyte_assets, *dbt_assets, order_forecast_model] )
- 따로 의존성을 코드로 표현하는 것이 아닌 dbt에서 sql의 from절의 변수명을 파싱해서 upstream과 downstream을 표시
{{ config(materialized='table') }} select 1 from {{ ref('coalition_agent_schedule') }}
dbt
- ELT에서 T(transformation)를 담당
- 오픈 소스로 활용할 수 있고, DBT Cloud 같이 유료 버전도 있음
- SQL base의 transformation만 가능하고, 대신 jinja template을 활용해서 다양한 처리(ex. 사용자 정의 함수)를 제공함
- ex) base_orders, base_payments model을 통해 orders라는 모델을 만듦
select orders.id, orders.status, sum(case when payments.payment_method = 'bank_transfer' then payments.amount else 0 end) as bank_transfer_amount, sum(case when payments.payment_method = 'credit_card' then payments.amount else 0 end) as credit_card_amount, sum(case when payments.payment_method = 'gift_card' then payments.amount else 0 end) as gift_card_amount, sum(amount) as total_amount from {{ ref('base_orders') }} as orders left join {{ ref('base_payments') }} as payments on payments.order_id = orders.id
기본 구조
- models: transformation 관련 쿼리를 정의
- macros: 쿼리에 활용될 사용자 정의 함수 등을 정의
- dbt profile: dbt에서 쿼리를 실행할 때 어떤 쿼리엔진 또는 DB를 사용할 것인지 (ex. spark thrift server, postgresql, etc.)
- dbt_project.yml: project에서 사용되는 model에 대한 설정을 정의...할 수 있지만 모델별로 {{ config() }}로 설정할 수 있음
모델 정의
materialized라는 항목으로 모델의 타입을 정의할 수 있음
- table: 실제 테이블로 생성
- view: view로 생성
- incremental: 증분 처리할 경우 사용, is_incremental()과 함께 쓰임
- ephemeral: 임시 테이블(실행할 때만 잠깐 생성됐다가 실행 후 삭제).
문서화 가능
- 간단하게 로컬에서 데이터 거버넌스 관리 가능
dbt docs generate dbt docs serve
요청 자동화 관점
- on_schema_change→컬럼 추가요청
- 모델이 변경할때 대한 설정, dbt의 기본 설정은 ignore. 이 설정을 append_new_columns로 설정하면 스키마가 추가되는 경우에 컬럼이 append될 수 있음→incremental의 경우이고 테이블이 설정되어있으면 전체 적용
- materalized: table→테이블 추가요청
- sql문을 통해서 바로 쿼리 변경없이 추가 가능
- astronomer-cosmos
- dbt 모델을 파싱하면서 의존성을 파악하고 airflow task로 그려줌
- DbtTaskGroup 인스턴스를 사용해 dbt run, dbt test 명령어를 하나의 task group 내에서 실행할 수 있도록 여러 모델을 task group 단위로 묶을 수 있음
- RenderConfig 인스턴스의 select argument를 사용하면 특정 모델만 선택하여 그룹 생성 가능
def test_dag(): event_summary = DbtTaskGroup( group_id="eventSummary", project_config=ProjectConfig(DBT_PROJECT_PATH), profile_config=profile_config, render_config=RenderConfig( select=["eventSummary"]), operator_args={ "vars": '{"ds": {{ params.ds }} }', }, default_args={"retries": 2}, ) event_summary
- ExternalTaskSensor
- 단 astronomer-cosmos만으로는 다른 파이프라인과 DBT 모델 간의 의존성을 해결할 수 없음 ExternalTaskSensor를 활용한 당근마켓의 예처럼 ExternalTaskSensor와 테이블 디펜던시 테이블을 활용하면 쉽게 라이브러리로 만들 수 있을 것으로 보임
- spark나 Trino, duckDB 등 다양한 엔진을 프로세싱에 사용할 수 있음
장점
- dbt docs를 통해서 테이블 명세나 리니지 등을 쉽게 확인하고 작업시에 참고할 수 있음
- 엔지니어의 도움없이 쉽게 칼럼 추가, 테이블 추가 자동화, 테스트 자동화(로컬에서도 가능)
- 여러 모던 아키텍처 도구들과 통합이 용이하다(airbyte, prefect, dagster 등)
단점
- 결국 코드 작성 및 푸시를 해야함(초반 허들 및 교육 필요)
- 애드훅하게 결과를 확인할 수 없기 때문에 간단한 문법 오류나 디버깅하기가 불편함
- 배포를 위해서는 유닛 테스트와 함께 CI/CD 기반이 되어야지 쓸만할듯
결론
- 제로베이스에서면 몰라도 airflow를 운용하고있는데 굳이 dagster로 마이그레이션하거나 추가할 필요성은 없어보임
- cosmos도 많이 발전하긴 했지만 dbt를 운용하고 있다면 의존성 표현과 스케줄러를 위해서 dagster도 괜찮은 선택 같아보임
'Review' 카테고리의 다른 글
분산 쿼리 엔진 prestoSQL의 바뀐 이름 Trino (2) 2022.01.04 재택근무를 희망하는 히치하이커를 위한 안내서 (0) 2021.10.17 제닉스 XPAM 키보드 팜레스트 리뷰 (0) 2021.03.01 맥북, 노트북 거치대 추천 (0) 2020.09.27