2021. 6. 19. 17:28

Luigi로 batch job 모니터링 및 스케줄링

개요


Spotify에서 개발한 오픈 소스 실행 프레임워크로, 복잡한 데이터 파이프 라인을 Python으로 배치 작업을 만들어 실행할 수 있다. 종속성 해결, 워크 플로 관리, 시각화, 실패 처리, 명령 줄 통합 등을 처리하는데 사용한다.

하이브나 Pig 또는 Cascading과 같은 로우 레벨에서의 데이터 전처리에 초점에 맞춰져있는 이런 프레임워크를 대체할 수는 없다. 대신에 많은 task들을 하나로 합치게 도움을 준다.

이름이 루이지인건 아마도 슈퍼마리오의 루이지가 배관공이라서 파이프라인을 다루는 일이기 때문이 아닐까 추측

 

기본 구조


Task

class ExampleTask(luigi.Task):
		# Parameter 정의
		count = luigi.IntParameter(default=7)
		
		def requires(self):
				return [
						# 명확한 Dependency 정의
						DependencyTask1(),
						DependencyTask2()
				]

		def run(self):
				# 비즈니스 로직
				do_some_data_work(self.count)

		def output(self):
				# 결과물
				return luigi.LocalTarget(f"/output/example_task_{self.count}.txt")
  • WrapperTask
    • require에 return으로 task들의 list를 받을 수 있는데, 설정된 모든 task들이 성공적으로 완료되어야만 성공하도록 complete 메서드가 미리 구현되어 있다.

Example

import luigi

class TSSCoreRankingTask(luigi.Task):
		date = luigi.DataParameter(default=datetime.today().date())

		def requires(self):
				return [
						ShortVolumeTask(), ShortOutstandingTask(), LoanRateTask(),
						FreeFloatTask(), TradingVolumeTask()
				]

		def run(self):
				short_volume = load_short_volume(self.date)
				short_outstanding = load_short_outstanding(self.date)
				loan_rates = load_loan_ratese(self.date)
				free_floats = load_free_floats(self.date)
				trading_volume = load_another_data(self.date)
				aggregated = aggregate_by_stock_and_date(
						short_volume, short_outstanding, load_loan_rates,
						free_floats, trading_volume)
				ts_scores = calculate_ts_scores(aggregated)
				rankings = sort_by_ts_score(ts_scores)

				with self.output().open("w") as out_file:
						writer = csv.writer(out_file)
						writer.writerows(rankings)

		def output(self):
        return luigi.LocalTarget(f"./output/ts_score_rankings_{self.date}")

 

import luigi
import csv

class ShortVolumeTask(luigi.Task):
		date = luigi.DateParameter(default=datetime.today().date())

		def requires(self):
				return []

		def output(self):
				return luigi.LocalTarget(f"/output/short_volume_{self.date}.csv")

		def run(self):
				short_vlumes = fetch_short_volume(self.date)

				with self.output().open("w") as out_file:
						writer = csv.writer(out_file)
						writer.writerows(short_volumes)
  •  output

output() 함수는 이 task에서 생성 할 파일의 종류를 지정

  • run

run() 함수는 실제 작업을 수행하는 함수

  • require

require() 함수는 task 시작 이전에 필요한 작업을 지정하기 위해 사용

  • complete
  1. 네트워크 문제나 예상치 못한 경우가 발생했을 때 task를 제대로 수행하지 못하고 실패했을 때 luigi에서 재시도를 한다.
  2. complete 조건을 만족하고 True를 리턴했을 때 성공적으로 실행 완료되었다고 판단한다.
  •  
def output(self):
    return luigi.MySqlTarget(
        host = config.host,
        database = config.database,
        user = config.user,
        password = config.password,
        table = "ts_score_rankings",
        update_id = f"ts_score_rankings_{self.date}"
    )
  • HDFS Task
import luigi

class AggregateHotTSScoreTask(luigi.contrib.hadoop.JobTask):
		date_interval = luigi.DateIntervalParameter()

		def output(self):
				return luigi.contrib.hdfs.HdfsTarget("./data/aggregate_ts_scores_%s.tsv" % self.date_interval)

		def requires(self):
				return [TSScoreTask(date) for date in self.date_interval]

		def mapper(self, line):
				timestamp, stock, ts_score = line.strip().split()
				yield stock, 1 if ts_score > 7 else 0

		def reducer(self, key, values):
				yield key, sum(values)
  • Wrapper Task (여러 Task를 하나의 Task로 묶어주는 것)
import luigi

class AllShortDataFetchTask(luigi.WrapperTask):
		date = luigi.DateParameter(default=datetime.today().date())

		def requires(self):
				yield FetchShortVolumeTask(self.date)
				yield FetchShortOutstandingTask(self.date)
				yield FetchLoanRateTask(self.date)
				yield FetchTradingVolumeTask(self.date)
				yield FetchFreeFloatsTask(self.date)

Execution

30 16 * * MON-FRI python -mluigi --module ts_score_ranking_task TSScoreRankingTask
python -m luigi --module ts_score_ranking_task TSScoreRankingTask --date 2018-07-29
python -m luigi --module ts_score_ranking_task RangeDaily --of TSScoreRankingTask --start 2018-0101 --end 2018-07-29

Central Scheduler

luigid --background --pidfile <PATH_TO_PIDFILE> --logdir <PATH_TO_LOGDIR> --state-path <PATH_TO_STATEFILE>
  • 기본 포트는 8082
  • —port 플래그로 포트 변경 가능

 

Tips


  • Dependencies들은 (가능하면) 최대한 심플하게.
  • Task는 최소한의 단위로.
  • Extraction, Transfer, Load 모두 각각의 Task로 구분
  • Task의 규모도 최소한으로
  • Notification 또한 가능한 최소한으로. 정말 꼭 알아야 하는 것만 notification 설정.

 

DEMO