-
SparkSQL에서 증분 테이블 처리하기Programming 2024. 2. 25. 21:49
배경
- ETL처리시 Spark를 통해 Trasfroming을 하는데 1000만 로우가 넘는 테이블을 이관하며 기존 Overwrite하는 방식이 아닌 증분되는 양만큼만 반영하기로 함
- updated_at 컬럼으로 증분되는 양만 읽어서 기존 테이블에 upsert를 하는 방식
- 현재 운영하는 데이터웨어하우스는 일반적인 RDB가 아닌 HDFS기반이므로 불변성을 위해서 upsert를 할 수 없음
옵션
- Hudi or Delta Lake or Iceberg와 같은 동시성이 제공되는 오픈소스를 사용한다
- 현재 실시간 테이블을 위해서 Delta Lake를 사용되고 있지만 이관을 위해서는 따로 사용하고 있지 않으므로 다른 테이블과 사용성 및 통합을 위해서 해당 옵션은 보류
- SQL 윈도우 함수를 통해서 증분된 만큼 처리하고 기존 값과 UNION한다.
- 해당 방법으로 진행
트러블슈팅
spark.sql(""" SELECT * FROM ( SELECT column_lists..., tmp_num, row_number() over(partition by id order by tmp_num desc) as row_rnk FROM ( SELECT a.* FROM ( SELECT *, 1 as tmp_num FROM db.table_name union all SELECT *, 2 as tmp_num FROM table_name_new ) a ) b ) c where c.row_rnk = 1 """).write.mode("overwrite").insertInto("db.table_name",overwrite=True)
두번째 방안으로 위와 같이 spark sql을 처리시 다음과 같은 에러가 난다.
AnalysisException: Cannot overwrite a path that is also being read from.
원인
문제는 spark에서 같은 경로를 읽고 있을 때 그 경로를 덮어쓰려고 할 때 발생됨. 아래 두가지 이유로 추정됨
- InsertIntoHadoopFsRelationCommand가 정적 파티션 삽입으로 실행될 때 실행 전에 읽는 주소가 쓰는 주소를 포함하는지 확인하기 때문(참고)
- 동일한 경로로 읽고 쓰려고 할 때, spark 자체 지원인 parquet를 사용해서 읽으면 동시에 메타데이터를 캐싱하기 때문에 읽는 동안 쓰기 작업을 수행할 수 없음(스파크 문서의 해당 단락 참조)
해결
1. temp DB를 거치는 방법
- https://stackoverflow.com/questions/38746773/read-from-a-hive-table-and-write-back-to-it-using-spark-sql
- 위 링크와 같은 방식으로 SaveAsTable 혹은 아예 insertInto를 통해 temp 테이블을 만들고 그 테이블과 UNION 하면 읽는 것과 쓰는 테이블이 달라진다.
- 하지만 쓰는 테이블이 두개로 나누어지고 심지어 같은 데이터를 공유한다. 테이블양이 많아지면 양적인면 속도적인면 모두 비효율적이므로 이 방법은 지양하도록 한다.
2. spark의 convertMetastoreParquet의 옵션을 false로 설정
spark.conf.set("spark.sql.hive.convertMetastoreParquet", "false")
- spark.sql.hive.convertMetastoreParquet 설정을 false로 설정하면, Spark SQL은 Hive의 SerDe를 사용하여 Hive metastore Parquet 테이블을 읽고 쓰게 된다. 이는 Spark 자체 Parquet 지원을 이용하는 것이 아니라 Hive 방식을 사용하게 됨.
- presto로 읽을 때도 hive serde를 통해서 읽으므로 이 설정을 적용해도 presto에서 문제없이 테이블을 읽을 수 있음
참고문서
'Programming' 카테고리의 다른 글
스파크에서 지원하는 압축 알고리즘 비교 (0) 2024.06.26 당신의 인덱스는 안녕하신가요?(커버링 인덱스) (0) 2024.03.31 PrestoSQL to Trino Migration 할 때 주의할 점 (0) 2023.12.22 kafka retention 용량 설정 값 이해하기 (0) 2023.05.01 ELT 툴 Airbyte 개요 및 M1 mac local환경 세팅 (0) 2022.04.13