All
배경
•
탭 배너 광고들의 성과를 집계해 최종적으로 MariaDB에 저장하는 파이프라인
•
간단 아키텍처 설명: MariaDB 스냅샷(개발환경 포함) → 데이터 전처리 → Spark 변환 처리 → 파티션 리페어 → 최종 통계 데이터 다시 MariaDB로 저장
•
잦은 에러
◦
OOM 이후 생성 안 된 EMR 클러스터를 종료하려다 실패
◦
중간 스텝 실패가 감지되지 않음(마지막 스텝만 센싱)
◦
S3 경로 미존재 에러
이전 구조의 문제와 원인
1.
EMR 클러스터 종료의 ALL_DONE 트리거 룰 문제
•
클러스터를 삭제하는 태스크가 ALL_DONE 으로 항상 실행
•
클러스터가 생성되지 않았음에도 terminate 작업이 실행되어 에러 발생
•
실제로는 클러스터 생성이 성공한 이후에 종료 명령이 가야함
2.
EMR Step Sensor 트래킹 문제
•
마지막 스텝만 추적하므로 중간 스텝 실패가 종종 누락되는 경우 발생
•
모든 스텝의 상태를 확인해야 함
3.
DAG 정의 복잡한 의존성과 중복/장문 구조
•
10여개의 DB 테이블 스냅샷 태스크가 개발/상용 비슷한 패턴으로 각 반복 정의
•
DB 스냅샷 태스크에서 아테나 테이블이 파티션 프로젝션이 되어있지 않아 매번 table repair 따로 호출
4.
Develop 환경의 데이터 미존재
•
생성된 S3 경로 없는데 체크 없이 조회해서 에러
5.
OOM 이후 연쇄 실패
•
DB 스냅샷 작업 중 OOM 발생 (이후 클러스터 종료 태스크까지 연쇄 실패)
•
병렬/자원 조정 필요
개선한 내용
1) EMR 관련
1.
스텝 센서 개선
•
실제 스텝 체크하도록 센서 개별 생성 및 센서 그룹으로 묶음
•
reschedule 모드로 워커 리소스 점유 ↓
2.
클러스터 종료 연쇄 실패 개선
•
ShortCircuitOperator(if_cluster_created) 클러스터 생성 성공 시에만 종료 시도
ShortCircuitOperator
의존성 예시
•
기존
( ...
>> 앞 task ..
>> _emr_tasks["create_cluster"]
>> _emr_tasks["step_adder"]
>> _emr_tasks["step_checker"]
>> _emr_tasks["remove_cluster"]
>> 후속 task
...
)
Python
복사
•
개선 후
( ...
>> 앞 task ..
>> _emr_tasks["create_cluster"]
>> _emr_tasks["step_adder"]
>> _emr_tasks["sensors_group"]
)
# 클러스터 정리 브랜치
(
[_emr_tasks["create_cluster"], _emr_tasks["sensors_group"]]
>> _emr_tasks["if_cluster_created"]
>> _emr_tasks["remove_cluster"]
)
# 정상 후속 브랜치
(
_emr_tasks["sensors_group"]
>> 후속 task
...
)
Python
복사
•
스텝 센서 성공 시
◦
메인 후속 작업 실행
◦
동시에 클러스터 정리 브랜치도 실행되어 종료
•
스텝 센서 실패 시
◦
메인 후속 작업은 기본 ALL_SUCCESS라서 멈춤
◦
클러스터 정리 브랜치는 if_cluster_created 덕분에 실행되어 종료 시도
•
앞단에서 클러스터 자체가 안 만들어졌을 때
◦
if_cluster_created에서 False → remove_cluster skip
◦
후속 작업도 센서가 실행되지 않으니 자동 중단
2) DAG 정의부 구조 개선
1.
Task Factory 패턴 도입
•
반복적인 task 생성을 python operator 생성하는 팩토리로 표준화
예시
•
기존
@task
def product(
inlets=[],
outlets=[]
) -> None:
"""
PRODUCT 테이블 스냅샷
"""
from db_snapshot.tab_product import Product
Product().main()
@task
def tab_product(
inlets=[],
outlets=[]
) -> None:
"""
TAB_PRODUCT 테이블 스냅샷
"""
from db_snapshot.tab_product import TabProduct
TabProduct().main()
product = product()
product = tab_product()
이하 동일한 방식으로 몇십개 반복
...
>> product
>> tab_product
...
Python
복사
•
개선 후 예시
◦
factory로 operator 생성후 딕셔너리로 반환받아 한줄로 사용
# 각 Pyhton Operator 딕셔너리 반환
prod_tasks = create_stats_group()
...
>> prod_tasks["product"]
>> prod_tasks["tab_product"]
...
Python
복사
2.
필요없는 태스크 제거
•
DB 스냅샷 태스크 파티션 프로젝션 추가해 리페어 태스크 제거
3.
EMR 워크플로우 공통 유틸 도입
•
클러스터 생성 - 스텝 생성 - 스텝 센서로 체크 - 클러스터 종료 워크플로우 표준화
•
DAG 본문은 유틸에서 태스크만 반환받아 데이터 플로우 중심으로 읽히도록 개선
# lib emr 전용 유틸 생성
def create_emr_workflow(...):
return {
"create_cluster": create_cluster,
"step_adder": step_adder,
"sensors_group": sensors_group,
"step_sensors": step_sensors,
"if_cluster_created": if_cluster_created,
"remove_cluster": remove_cluster,
}
# dag.py
_emr_tasks = create_emr_workflow()
...
>> _emr_tasks["create_cluster"]
>> _emr_tasks["step_adder"]
>> _emr_tasks["sensors_group"]
...
Python
복사
3) Spark 모듈 리팩토링
1.
S3 경로 상수화
•
생성자에 S3 경로가 10개 가량 하드코딩되어있음 (lint Too many instance attributes 에러)
•
상수 클래스로 분리하고 S3엔진/버킷/상세경로로 변수를 나눠 사용처에서 필요한 단위로 uri 조립해 사용할 수 있도록 개선
2.
S3 경로 존재 여부 체크
•
동일 config로 생성되는 Spark Session 중복 코드 공통 lib으로 분리
4) OOM 방지
•
메모리 확인해 병렬 태스크 직렬로 분산
•
동일 시간대 다른 파이프라인 태스크 시간 조정으로 메모리 확보
성과
•
에러 해소
◦
OOM, 스텝 센서, S3 경로 에러, 미생성 클러스터 종료 에러 → 해결 완료
•
유지보수성 상승
◦
DAG 정의 파일 560줄에서 180줄로 축소
◦
재사용 유틸 만들어 다른 파이프라인에서도 활용 가능
•
리소스 안정화
◦
태스크 분산, reschedule 센서, 경로 검증, 불필요한 task 제거 등
