Search

[Airflow 운영 안정화] 배너 통계 파이프라인 3가지 에러 해결과 구조 재정비

제목
작성일

배경

탭 배너 광고들의 성과를 집계해 최종적으로 MariaDB에 저장하는 파이프라인
간단 아키텍처 설명: MariaDB 스냅샷(개발환경 포함) → 데이터 전처리 → Spark 변환 처리 → 파티션 리페어 → 최종 통계 데이터 다시 MariaDB로 저장
잦은 에러
OOM 이후 생성 안 된 EMR 클러스터를 종료하려다 실패
중간 스텝 실패가 감지되지 않음(마지막 스텝만 센싱)
S3 경로 미존재 에러

이전 구조의 문제와 원인

1.
EMR 클러스터 종료의 ALL_DONE 트리거 룰 문제
클러스터를 삭제하는 태스크가 ALL_DONE 으로 항상 실행
클러스터가 생성되지 않았음에도 terminate 작업이 실행되어 에러 발생
실제로는 클러스터 생성이 성공한 이후에 종료 명령이 가야함
2.
EMR Step Sensor 트래킹 문제
마지막 스텝만 추적하므로 중간 스텝 실패가 종종 누락되는 경우 발생
모든 스텝의 상태를 확인해야 함
3.
DAG 정의 복잡한 의존성과 중복/장문 구조
10여개의 DB 테이블 스냅샷 태스크가 개발/상용 비슷한 패턴으로 각 반복 정의
DB 스냅샷 태스크에서 아테나 테이블이 파티션 프로젝션이 되어있지 않아 매번 table repair 따로 호출
4.
Develop 환경의 데이터 미존재
생성된 S3 경로 없는데 체크 없이 조회해서 에러
5.
OOM 이후 연쇄 실패
DB 스냅샷 작업 중 OOM 발생 (이후 클러스터 종료 태스크까지 연쇄 실패)
병렬/자원 조정 필요

개선한 내용

1) EMR 관련

1.
스텝 센서 개선
실제 스텝 체크하도록 센서 개별 생성 및 센서 그룹으로 묶음
reschedule 모드로 워커 리소스 점유 ↓
2.
클러스터 종료 연쇄 실패 개선
ShortCircuitOperator(if_cluster_created) 클러스터 생성 성공 시에만 종료 시도
ShortCircuitOperator
의존성 예시
기존
( ... >> 앞 task .. >> _emr_tasks["create_cluster"] >> _emr_tasks["step_adder"] >> _emr_tasks["step_checker"] >> _emr_tasks["remove_cluster"] >> 후속 task ... )
Python
복사
개선 후
( ... >> 앞 task .. >> _emr_tasks["create_cluster"] >> _emr_tasks["step_adder"] >> _emr_tasks["sensors_group"] ) # 클러스터 정리 브랜치 ( [_emr_tasks["create_cluster"], _emr_tasks["sensors_group"]] >> _emr_tasks["if_cluster_created"] >> _emr_tasks["remove_cluster"] ) # 정상 후속 브랜치 ( _emr_tasks["sensors_group"] >> 후속 task ... )
Python
복사
스텝 센서 성공 시
메인 후속 작업 실행
동시에 클러스터 정리 브랜치도 실행되어 종료
스텝 센서 실패 시
메인 후속 작업은 기본 ALL_SUCCESS라서 멈춤
클러스터 정리 브랜치는 if_cluster_created 덕분에 실행되어 종료 시도
앞단에서 클러스터 자체가 안 만들어졌을 때
if_cluster_created에서 False → remove_cluster skip
후속 작업도 센서가 실행되지 않으니 자동 중단

2) DAG 정의부 구조 개선

1.
Task Factory 패턴 도입
반복적인 task 생성을 python operator 생성하는 팩토리로 표준화
예시
기존
@task def product( inlets=[], outlets=[] ) -> None: """ PRODUCT 테이블 스냅샷 """ from db_snapshot.tab_product import Product Product().main() @task def tab_product( inlets=[], outlets=[] ) -> None: """ TAB_PRODUCT 테이블 스냅샷 """ from db_snapshot.tab_product import TabProduct TabProduct().main() product = product() product = tab_product() 이하 동일한 방식으로 몇십개 반복 ... >> product >> tab_product ...
Python
복사
개선 후 예시
factory로 operator 생성후 딕셔너리로 반환받아 한줄로 사용
# 각 Pyhton Operator 딕셔너리 반환 prod_tasks = create_stats_group() ... >> prod_tasks["product"] >> prod_tasks["tab_product"] ...
Python
복사
2.
필요없는 태스크 제거
DB 스냅샷 태스크 파티션 프로젝션 추가해 리페어 태스크 제거
3.
EMR 워크플로우 공통 유틸 도입
클러스터 생성 - 스텝 생성 - 스텝 센서로 체크 - 클러스터 종료 워크플로우 표준화
DAG 본문은 유틸에서 태스크만 반환받아 데이터 플로우 중심으로 읽히도록 개선
# lib emr 전용 유틸 생성 def create_emr_workflow(...): return { "create_cluster": create_cluster, "step_adder": step_adder, "sensors_group": sensors_group, "step_sensors": step_sensors, "if_cluster_created": if_cluster_created, "remove_cluster": remove_cluster, } # dag.py _emr_tasks = create_emr_workflow() ... >> _emr_tasks["create_cluster"] >> _emr_tasks["step_adder"] >> _emr_tasks["sensors_group"] ...
Python
복사

3) Spark 모듈 리팩토링

1.
S3 경로 상수화
생성자에 S3 경로가 10개 가량 하드코딩되어있음 (lint Too many instance attributes 에러)
상수 클래스로 분리하고 S3엔진/버킷/상세경로로 변수를 나눠 사용처에서 필요한 단위로 uri 조립해 사용할 수 있도록 개선
2.
S3 경로 존재 여부 체크
동일 config로 생성되는 Spark Session 중복 코드 공통 lib으로 분리

4) OOM 방지

메모리 확인해 병렬 태스크 직렬로 분산
동일 시간대 다른 파이프라인 태스크 시간 조정으로 메모리 확보

성과

에러 해소
OOM, 스텝 센서, S3 경로 에러, 미생성 클러스터 종료 에러 → 해결 완료
유지보수성 상승
DAG 정의 파일 560줄에서 180줄로 축소
재사용 유틸 만들어 다른 파이프라인에서도 활용 가능
리소스 안정화
태스크 분산, reschedule 센서, 경로 검증, 불필요한 task 제거 등