ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • 데이터 오케스트레이션 dagster와 dbt에 대해서 알아보기
    Review 2024. 7. 9. 23:08

    뭔가 비웃는거 같다...

    dagster

    • 데이터 오케스트레이션을 강조하는 스케줄러
    • op로 파이프라인의 잡을 정의하며 op로 이어놓은 workflow들은 job으로 구현한다
    • 각각 op와 job은 데코레이터로 정의된다
    • 하나의 스크립트에 다수의 pipeline을 포함할 수 있으며 이를 통해 여러 workflow를 구현가능하다→하나의 스크립트를 repository라 정의
    • 각각의 op는 별도의 input과 output을 가질 수 있으며, op간 변수 상속이 가능하다
    • Op: 가장 작은 단위로, 개별 작업을 수행한다.
    • Asset: job의 결과물로 생성된 데이터 또는 자료
    • Job: 여러 op를 결합하여 특정 작업을 수행한다. job은 op 간의 실행 순서를 정의한다
    • op와 asset은 task 단위라는 성격은 같지만 프로세싱하는 주체의 유무와 데펜던시의 업다운스트림 관계에 따라서 명칭이 갈린다.
      • Assets은 데이터와 리니지에 초점을 맞추고 운영은 컴퓨팅 작업에 초점을 맞춤
      • Assets은 의존성을 인식하는 반면에 op는 그래프에 배치되기 전까지는 의존성을 알지 못함
      • Assets은 데이터를 정의하고 관리하는 데 사용되며, op는 컴퓨팅 워크플로우의 단계를 정의하는 데 사용됨
    // asset example
    @asset
    def logins(website_events: DataFrame) -> DataFrame:
       return website_events[website_events["type"] == "login"]
    
    // op example
    @op
    def extract():
        ...
    
    @op
    def transform(extracted_data):
        ...
    
    @op
    def load(transformed_data):
        ...

    아키텍처

    • 기본적으로 웹서버와 데몬, 코드 저장소는 항상 떠있고 task 실행시 일시적으로 익스큐터가 동작해서 작업한 뒤 사라지는 형식. airflow 쿠버네티스 잡 오퍼레이터와 유사함
      • 웹서버는 레플리카셋을 가질 수 있으나 데몬은 단독 실행만 가능(큐잉 역할을 하기 때문인듯)
    • 에어플로우와 유사하지만 코드 저장소를 따로 둔다는 것과 워커가 존재하지 않는다는 차이가 있음→셀러리 익스큐터를 사용하더라도 항상 떠있지는 않는듯

    특징

    • dbt, snowflake, csv, Fivetran, TensorFlow 등 다양한 에셋을 통해 데이터를 모델링할 수있음→airflow의 오퍼레이터와 유사하지만 좀더 가시적으로 보고 관리할 수 있음

    • 리니지 관계를 job 단위가 아니라 글로벌 스코프로 확인할 수 있다.
    • 굳이 이것을 위해서 넘어가기에는 큰 피쳐도 아니고 마이그레이션 비용이 더 커보임

    기대한 것

    1. 코드 최소화
      • 번거로운 작업 없이나 코드 푸시 없이 웹 콘솔이나 간단하게 작업 가능한지->불가능
    2. 작업 간 휴먼 에러 최소화
      • 코드 최소화하는 이유이기도 하며 테스트코드를 통해서 어느정도 극복 가능함
    3. 의존성 표현
      • dagster는 화면에서 좀더 깔끔하게 사용하는 외부 객체나 의존성들을 표현함

    dbt 모델을 어떻게 표현하는가

    from dagster_airbyte import build_airbyte_assets
    from dagster_dbt import load_assets_from_dbt_project
    from dagster import asset, Definitions
    from .constants import AIRBYTE_CONNECTION_ID, DBT_PROJECT_DIR
    
    airbyte_assets = build_airbyte_assets(
        connection_id=AIRBYTE_CONNECTION_ID,
        destination_tables=["orders", "users"],
        asset_key_prefix=["postgres_replica"],
    )
    
    dbt_assets = load_assets_from_dbt_project(project_dir=DBT_PROJECT_DIR)
    
    @asset(compute_kind="python")
    def order_forecast_model(daily_order_summary: pd.DataFrame) -> np.ndarray:
        train_set = daily_order_summary.to_numpy()
        xdata, ydata = train_set[:, 0], train_set[:, 2]
        return optimize.curve_fit(f=model_func, xdata=xdata, ydata=ydata, p0=[10, 100])[0]
    
    defs = Definitions(
      assets=[*airbyte_assets, *dbt_assets, order_forecast_model]
    )

    • 따로 의존성을 코드로 표현하는 것이 아닌 dbt에서 sql의 from절의 변수명을 파싱해서 upstream과 downstream을 표시
    {{ config(materialized='table') }}
    
    select 1
    from {{ ref('coalition_agent_schedule') }}

    dbt

    • ELT에서 T(transformation)를 담당
    • 오픈 소스로 활용할 수 있고, DBT Cloud 같이 유료 버전도 있음
    • SQL base의 transformation만 가능하고, 대신 jinja template을 활용해서 다양한 처리(ex. 사용자 정의 함수)를 제공함
    • ex) base_orders, base_payments model을 통해 orders라는 모델을 만듦
    select
      orders.id,
      orders.status,
      sum(case when payments.payment_method = 'bank_transfer' then payments.amount else 0 end) as bank_transfer_amount,
      sum(case when payments.payment_method = 'credit_card' then payments.amount else 0 end) as credit_card_amount,
      sum(case when payments.payment_method = 'gift_card' then payments.amount else 0 end) as gift_card_amount,
      sum(amount) as total_amount
    
    from {{ ref('base_orders') }} as orders
    left join {{ ref('base_payments') }} as payments on payments.order_id = orders.id

    기본 구조

    • models: transformation 관련 쿼리를 정의
    • macros: 쿼리에 활용될 사용자 정의 함수 등을 정의
    • dbt profile: dbt에서 쿼리를 실행할 때 어떤 쿼리엔진 또는 DB를 사용할 것인지 (ex. spark thrift server, postgresql, etc.)
    • dbt_project.yml: project에서 사용되는 model에 대한 설정을 정의...할 수 있지만 모델별로 {{ config() }}로 설정할 수 있음

    모델 정의

    materialized라는 항목으로 모델의 타입을 정의할 수 있음

    • table: 실제 테이블로 생성
    • view: view로 생성
    • incremental: 증분 처리할 경우 사용, is_incremental()과 함께 쓰임
    • ephemeral: 임시 테이블(실행할 때만 잠깐 생성됐다가 실행 후 삭제).

    문서화 가능

    • 간단하게 로컬에서 데이터 거버넌스 관리 가능
    dbt docs generate
    
    dbt docs serve

    요청 자동화 관점

    1. on_schema_change→컬럼 추가요청
      1. 모델이 변경할때 대한 설정, dbt의 기본 설정은 ignore. 이 설정을 append_new_columns로 설정하면 스키마가 추가되는 경우에 컬럼이 append될 수 있음→incremental의 경우이고 테이블이 설정되어있으면 전체 적용
    2. materalized: table→테이블 추가요청
      1. sql문을 통해서 바로 쿼리 변경없이 추가 가능
    3. astronomer-cosmos
      1. dbt 모델을 파싱하면서 의존성을 파악하고 airflow task로 그려줌
      2. DbtTaskGroup 인스턴스를 사용해 dbt run, dbt test 명령어를 하나의 task group 내에서 실행할 수 있도록 여러 모델을 task group 단위로 묶을 수 있음
      3. RenderConfig 인스턴스의 select argument를 사용하면 특정 모델만 선택하여 그룹 생성 가능
    def test_dag():
        event_summary = DbtTaskGroup(
            group_id="eventSummary",
            project_config=ProjectConfig(DBT_PROJECT_PATH),
            profile_config=profile_config,
            render_config=RenderConfig(
                select=["eventSummary"]),
            operator_args={
                "vars": '{"ds": {{ params.ds }} }',
            },
            default_args={"retries": 2},
        )
        
        event_summary
    1. ExternalTaskSensor
      • 단 astronomer-cosmos만으로는 다른 파이프라인과 DBT 모델 간의 의존성을 해결할 수 없음 ExternalTaskSensor를 활용한 당근마켓의 예처럼 ExternalTaskSensor와 테이블 디펜던시 테이블을 활용하면 쉽게 라이브러리로 만들 수 있을 것으로 보임

    DBT와 Airflow 도입하며 마주한 7가지 문제들

     

    DBT와 Airflow 도입하며 마주한 7가지 문제들

    안녕하세요. 당근 데이터 가치화팀의 Henry예요.

    medium.com

    • spark나 Trino, duckDB 등 다양한 엔진을 프로세싱에 사용할 수 있음

    장점

    1. dbt docs를 통해서 테이블 명세나 리니지 등을 쉽게 확인하고 작업시에 참고할 수 있음
    2. 엔지니어의 도움없이 쉽게 칼럼 추가, 테이블 추가 자동화, 테스트 자동화(로컬에서도 가능)
    3. 여러 모던 아키텍처 도구들과 통합이 용이하다(airbyte, prefect, dagster 등)

    단점

    1. 결국 코드 작성 및 푸시를 해야함(초반 허들 및 교육 필요)
    2. 애드훅하게 결과를 확인할 수 없기 때문에 간단한 문법 오류나 디버깅하기가 불편함
    3. 배포를 위해서는 유닛 테스트와 함께 CI/CD 기반이 되어야지 쓸만할듯

    결론

    • 제로베이스에서면 몰라도 airflow를 운용하고있는데 굳이 dagster로 마이그레이션하거나 추가할 필요성은 없어보임
    • cosmos도 많이 발전하긴 했지만 dbt를 운용하고 있다면 의존성 표현과 스케줄러를 위해서 dagster도 괜찮은 선택 같아보임

    댓글

Copyright 2023. 은유 All rights reserved.