Search

[Airflow 운영 안정화] AQE와 리소스 조정으로 EMR Spark 속도 개선

제목
작성일

배경

최근 새벽에 EMR Spark를 사용하는 DAG에서 실패가 발생했다. 평소에 한번도 에러가 난적 없던 파이프라인이라서 수동 백필을 시도했으나 계속해서 에러가 났다.
해당 파이프라인은 Braze webhook 발송 기록을 데이터 플랫폼으로 적재하는 작업으로, CRM팀에서 캠페인 전송량에 따라 데이터 양이 매일 편차가 매우 큰편
이하 트러블슈팅 기록

문제

9/11 데이터가 14.7GB로 갑자기 많이 들어오면서 Spark job이 2시간 execution timeout 으로 실패
보통은 몇백메가 수준에서 많을때는 10GB까지 데이터가 유입됐었고, 기존 최대 처리 시간은 10GB/1시간 30분정도
이번에 첫 타임아웃 사례
앞으로 이처럼 대량 데이터가 들어올 수 있으므로 EMR 리소스 조정과 Spark 코드 개선 필요

개선

1. EMR 리소스 조정

기존 클러스터 구성
인스턴스 타입: m4.large(vCPU 2, 메모리 8 GiB)
마스터 노드 하나에 슬레이브 노드 두개 사용중
기존
executor instances=1, cores=1, memory=5G
인스턴스와 코어 각 하나씩 사용으로 병렬 처리 불가
변경
executor instances=1, cores=2, memory=5G
인스턴스 1개 × 2코어 → 슬롯 2개 확보
코어 수를 vCPU(2개)에 맞춰 병렬 처리 가능하도록 조정
이후 추이 지켜보다가 평균적으로 데이터양이 더욱 늘어나면, 노드와 인스턴스 개수 증가 고려

2. 쓰기 병목 제거

기존
coalesce(1) → 단일 task 쓰기로 long-tail 발생
변경
repartition(12) → 파티션 개수 만큼 병렬 쓰기
coalesce vs repartion 차이점
repartition
파티션 데이터 셔플하고 새로 n개 파티션으로 균등 분배
셔플 때문에 처리 과정이 하나 추가됨
coalesce: 셔플 없이 그냥 파티션을 합침
셔플이 없어서 더 빠른경우도 있음
원래 파티션 데이터가 불균등하면 그대로 불균형이 남음
불균형 때문에 task별 처리 속도가 달라져 job이 제일 오랜 task를 기다려야함 (long-tail문제)

3. AQE(Adaptive Query Execution) 활성화

spark.sql.adaptive.enabled=true 설정
기존 Spark
실행 전 Optimizer가 한 번만 플랜 세움
실행 중에 실제 데이터 분포가 예상과 달라져도 비효율적인 실행 플랜이 고정된 채로 끝까지 감
AQE
쿼리 실행도중 실제 런타임 통계를 수집해 플랜을 re-optimize
셔플 파티션 크기 자동조정, 스테이지 병합, 스큐 처리 등

4. 실행 인자 개선

기존: Spark 내부에서 datetime.now() 사용
개선: Airflow에서 data_interval_start/end를 파라미터로 Spark 전달

개선 결과

항목
개선 전
개선 후
차이
스테이지 개수
26개
3개
−88%
자잘한 listing
listing stage (각 2s), 총 51s
아예 없어짐
경로 리스팅 (979 path)
32s
27s
−15.6%
중간 처리 (Shuffle)
없음
31 tasks, 45s
새로 추가됨
최종 Parquet 쓰기
96s, 7.6MiB
5s, 10.3MiB
−94.8%
출력 파일 수
1개
12개
병렬화, long-tail 제거
쓰기 병목 제거
기존엔 출력 단계가 task 1개로 단일 task가 모든 I/O를 떠안아 오래걸림
repartition(12) 2슬롯 활용한 병렬 쓰기로 쓰기 시간 대폭 감소
AQE 효과
불필요한 listing 스테이지가 합쳐져서 전체 스테이지수 26 → 3으로 단순화
Executor Task Time 비교
항목
개선 전 (coalesce)
개선 후 (repartition+AQE)
차이
Driver Task Time
3.6 min
1.6 min
−55%
Executor Task Time
3.0 min
2.6 min
−13%
총 Task Time
6.6 min
4.2 min
−36%
Complete Tasks
1959
1022
−48%
Task 수 감소: 태스크 개수 자체가 줄면서 오버헤드 감소
Driver Task Time 감소: DAG 단순화로 절반 이상 감소
Executor Task Time: Shuffle 단계가 추가되었음에도 전체 작업 시간 단축

스샷

기존 스테이지
기존 executors
개선 후 스테이지
개선 후 executors

결론

AQE 활성화 + 쓰기 병목 제거 + core 개수 증가 효과로, 전체 잡이 훨씬 단순화되고 안정적으로 동작하게 되었다.
테스트 하느라 데이터가 적은 날 기준으로 체크했지만, 대용량 데이터에서는 개선 효과가 훨씬 크게 나타날 것으로 예상한다.