-
SparkSQL에서 증분 테이블 처리하기Programming 2024. 2. 25. 21:49
배경
- ETL처리시 Spark를 통해 Trasfroming을 하는데 1000만 로우가 넘는 테이블을 이관하며 기존 Overwrite하는 방식이 아닌 증분되는 양만큼만 반영하기로 함
- updated_at 컬럼으로 증분되는 양만 읽어서 기존 테이블에 upsert를 하는 방식
- 현재 운영하는 데이터웨어하우스는 일반적인 RDB가 아닌 HDFS기반이므로 불변성을 위해서 upsert를 할 수 없음
옵션
- Hudi or Delta Lake or Iceberg와 같은 동시성이 제공되는 오픈소스를 사용한다
- 현재 실시간 테이블을 위해서 Delta Lake를 사용되고 있지만 이관을 위해서는 따로 사용하고 있지 않으므로 다른 테이블과 사용성 및 통합을 위해서 해당 옵션은 보류
- SQL 윈도우 함수를 통해서 증분된 만큼 처리하고 기존 값과 UNION한다.
- 해당 방법으로 진행
트러블슈팅
spark.sql(""" SELECT * FROM ( SELECT column_lists..., tmp_num, row_number() over(partition by id order by tmp_num desc) as row_rnk FROM ( SELECT a.* FROM ( SELECT *, 1 as tmp_num FROM db.table_name union all SELECT *, 2 as tmp_num FROM table_name_new ) a ) b ) c where c.row_rnk = 1 """).write.mode("overwrite").insertInto("db.table_name",overwrite=True)
두번째 방안으로 위와 같이 spark sql을 처리시 다음과 같은 에러가 난다.
AnalysisException: Cannot overwrite a path that is also being read from.
원인
문제는 spark에서 같은 경로를 읽고 있을 때 그 경로를 덮어쓰려고 할 때 발생됨. 아래 두가지 이유로 추정됨
- InsertIntoHadoopFsRelationCommand가 정적 파티션 삽입으로 실행될 때 실행 전에 읽는 주소가 쓰는 주소를 포함하는지 확인하기 때문(참고)
- 동일한 경로로 읽고 쓰려고 할 때, spark 자체 지원인 parquet를 사용해서 읽으면 동시에 메타데이터를 캐싱하기 때문에 읽는 동안 쓰기 작업을 수행할 수 없음(스파크 문서의 해당 단락 참조)
해결
1. temp DB를 거치는 방법
- https://stackoverflow.com/questions/38746773/read-from-a-hive-table-and-write-back-to-it-using-spark-sql
- 위 링크와 같은 방식으로 SaveAsTable 혹은 아예 insertInto를 통해 temp 테이블을 만들고 그 테이블과 UNION 하면 읽는 것과 쓰는 테이블이 달라진다.
Read from a hive table and write back to it using spark sql
I am reading a Hive table using Spark SQL and assigning it to a scala val val x = sqlContext.sql("select * from some_table") Then I am doing some processing with the dataframe x and finally comi...
stackoverflow.com
- 하지만 쓰는 테이블이 두개로 나누어지고 심지어 같은 데이터를 공유한다. 테이블양이 많아지면 양적인면 속도적인면 모두 비효율적이므로 이 방법은 지양하도록 한다.
2. spark의 convertMetastoreParquet의 옵션을 false로 설정
spark.conf.set("spark.sql.hive.convertMetastoreParquet", "false")
- spark.sql.hive.convertMetastoreParquet 설정을 false로 설정하면, Spark SQL은 Hive의 SerDe를 사용하여 Hive metastore Parquet 테이블을 읽고 쓰게 된다. 이는 Spark 자체 Parquet 지원을 이용하는 것이 아니라 Hive 방식을 사용하게 됨.
- presto로 읽을 때도 hive serde를 통해서 읽으므로 이 설정을 적용해도 presto에서 문제없이 테이블을 읽을 수 있음
참고문서
Read from a hive table and write back to it using spark sql
I am reading a Hive table using Spark SQL and assigning it to a scala val val x = sqlContext.sql("select * from some_table") Then I am doing some processing with the dataframe x and finally comi...
stackoverflow.com
HiveQL을 Spark SQL로 이전 시 발생하는 문제 해결하기
들어가며 안녕하세요. LINE Plus Messaging Data Eng Dev (NP) 팀에서 데이터 엔지니어로 일하는 정우영입니다. 최근 HiveQL(Hive SQL) 작업을 Spark SQL로 이전하는 과제를 진행했습니다. 주로 INSERT OVERWRITE TA...
engineering.linecorp.com
Parquet Files - Spark 3.5.0 Documentation
spark.apache.org
'Programming' 카테고리의 다른 글
스파크에서 지원하는 압축 알고리즘 비교 (0) 2024.06.26 당신의 인덱스는 안녕하신가요?(커버링 인덱스) (0) 2024.03.31 PrestoSQL to Trino Migration 할 때 주의할 점 (0) 2023.12.22 kafka retention 용량 설정 값 이해하기 (0) 2023.05.01 ELT 툴 Airbyte 개요 및 M1 mac local환경 세팅 (0) 2022.04.13