All
배경
EMR 기반 ETL 파이프라인에서 최근 반복적으로 실패 알람이 발생했다.
•
태스크는 실제로는 정상 성공했는데도 실패 알람이 울리고,
•
어떤 경우는 클러스터가 중간에 종료되면서 이후 단계가 전부 cancel 처리되기도 했다.
원인을 추적해 보니 공통점은 EMR을 사용하는 DAG이었다.
현재 DAG 구조와 오퍼레이터
EMR 사용하는 ETL DAG의 기본 구조는 다음과 같다.
EMRCreateJobFlowOperator → EmrAddStepsOperator → EmrStepSensor → EmrTerminateJobFlowOperator
Python
복사
•
EMRCreateJobFlowOperator
◦
EMR 클러스터 생성
•
EmrAddStepsOperator
◦
EMR 클러스터에 Spark job step을 등록
•
EmrStepSensor
◦
등록된 step이 COMPLETED될 때까지 상태를 polling
•
EmrTerminateJobFlowOperator
◦
모든 step이 끝난 후 클러스터를 종료
클러스터 생성 → 스텝 실행 → 완료 확인 → 종료의 구조
이번 문제는 중간 단계의 센서가 긴 시간 대기하면서 에어플로우 워커/스케줄러와 충돌을 일으킨 것
로그 트래킹
정확한 원인을 파악하려면 Airflow 로그만 봐서는 부족하다.
Airflow는 태스크 상태(RUNNING/FAILED)까지만 보여주고 Spark 내부에서 무슨 일이 일어났는지는 알 수 없다.
따라서 EMR 로그를 함께봐야 디버깅이 가능하다.
Airflow 로그
•
워커에서 어떻게 실행/대기하는지 확인 가능
•
실패한 DAG 재실행하면 웹 UI에서는 실패 로그가 날아가므로 태스크 로그에 들어가서 확인
EMR 로그
•
콘솔 Steps 탭
◦
각 step의 stderr/stdout 바로 확인 가능
•
S3 LogUri
◦
steps/: step 실행 로그 (콘솔에서 보이는 stderr와 동일)
◦
containers/: driver/executor/container별 stdout/stderr. OOM, 드라이버 종료 등 Spark 런타임 에러는 여기서 확인
◦
node/: YARN ResourceManager, NodeManager, HDFS 데몬 로그. 클러스터의 자원 할당/네트워크 문제는 여기서 확인
•
Spark History / YARN UI
◦
EMR에서 UI로 로그 제공. (위의 S3 저장된 로그 다 여기서 볼 수 있음)
에러 케이스
케이스 1: 성공으로 끝났는데 실패 알람 전송
문제 확인
•
EMR step 확인했을때 전부 성공
성공한 step들
•
에어플로우 로그에서는 FAILED: Task is in the 'running' state. 발견
[2025-09-01, 17:04:34 UTC] {{emr.py:575}} INFO - Poking step s-068300718D1Y0E2B2T6G on cluster j-3K4S3DMXBP1XX
[2025-09-01, 17:04:34 UTC] {{emr.py:79}} INFO - Job flow currently RUNNING
[2025-09-01, 17:04:48 UTC] {{taskinstance.py:1149}} INFO - Dependencies not met for <TaskInstance: AMPLITUDE_PIPELINE.spark_step_group.amplitude_rawevents scheduled__2025-08-31T17:00:00+00:00 [running]>, dependency 'Task Instance Not Running' FAILED: Task is in the running state
[2025-09-01, 17:04:48 UTC] {{taskinstance.py:1149}} INFO - Dependencies not met for <TaskInstance: AMPLITUDE_PIPELINE.spark_step_group.amplitude_rawevents scheduled__2025-08-31T17:00:00+00:00 [running]>, dependency 'Task Instance State' FAILED: Task is in the 'running' state.
[2025-09-01, 17:04:48 UTC] {{local_task_job_runner.py:154}} INFO - Task is not able to be run
Bash
복사
원인
•
센서가 기본 poke 모드로 6개 센서 병렬 실행중, 앞 step이 끝날때까지 대기 중에도 PENDING 상태로 워커 슬롯을 장시간 점유
•
이때 스케줄러/워커 타이밍 레이스(heartbeat 지연, 재할당 시도 등)로 동일 TaskInstance를 다시 실행하려는 시도(LocalTaskJob) 발생
•
이미 RUNNING 중인 TaskInstance를 다시 실행하려는 시도가 생겨 해당 시도만 FAILED 처리됨→ on_failure_callback 호출
•
실제 task는 끝까지 마쳐 정상 성공인데 마치 실패인것 처럼 알람
케이스 2) 정상 실행중 Cancel
문제 확인
•
실행중에 갑자기 EmrTerminateJobFlowOperator로 넘어가 클러스터가 종료
•
실패한 기록이 없는데 이후 단계가 전부 cancel 처리됨
원인
•
EmrStepSensor 가 16:59쯤 하트비트 잃으면서 에어플로우에서 이를 좀비 task 취급
•
5분이 지나자 스케줄러가 좀비 태스크를 정리리하면서 EmrTerminateJobFlowOperator 단계로 넘어가 클러스터 강제 terminate
•
실제 EMR 스텝로그는 17:01 부터 계속 정상적으로 RUNNING 중이었지만, 17:05경 terminate 명령 받고 cancel 처리 됨
→ 두가지 케이스 모두 poke 모드 센서의 장기 실행이 문제의 근본 원인
개선 방안
센서 모드 전환
Airflow 센서에는 두 가지 실행 모드가 있다.
모드 | 동작 방식 | 장점 | 단점 | 적합한 케이스 |
poke (기본) | 태스크 프로세스가 살아 있으면서 일정 주기마다 상태 체크 (heartbeat 유지) | - 로그가 실시간으로 남아 디버깅에 용이
- 외부 리소스 체크를 촘촘히 할 수 있음 | - 워커 슬롯 장시간 점유
- 병렬 센서 많으면 리소스 경합 발생 | 짧은 주기 확인이 필요한 경우 (예: 수 분 내 끝나는 외부 API 호출) |
reschedule | 조건 미충족 시 즉시 워커 반납, 스케줄러가 정해진 간격으로 다시 태스크 실행 | - 워커 리소스 효율적 사용
- 병렬 센서 많아도 안정적 | - 체크 타이밍에만 로그가 남음
- 너무 짧은 간격이면 스케줄러 부하 증가 | 장기 실행이 예상되는 외부 리소스 모니터링 (예: EMR Step 완료 대기, S3 파일 생성 감시) |
이번 문제는 모두 poke 모드 한계에서 발생했다.
→ EmrStepSensor를 reschedule 모드로 전환
개선 효과
•
장기 대기 상태에서 워커 점유 해소해 병렬 센서 실행 안정성 높아짐
•
장수 프로세스가 사라져 중복 실행 시도나 하트비트 끊기는 문제 해소
•
일시적 네트워크 오류에도 다음 스케줄에서 자연스럽게 재시도
성과
센서를 reschedule 모드로 변경한 이후:
•
중복 실행으로 인한 실패 알람 사라짐
•
하트비트 끊김 → 조기 클러스터 종료 현상 사라짐
그동안 알 수 없는 이유로 “클러스터 조기 실패”라고 넘겼던 문제를 정확히 짚고 해결해 안정성을 높일 수 있었다.
두번째 케이스 같은 경우, 명확한 fail 로그가 없어 여러 로그를 왔다갔다하며 원인을 찾는게 쉽진 않았다. 하지만 이번 작업으로 EMR 관련 DAG들의 알람이 거의 없어져서 새벽에 불필요한 알람에 대응할일이 없어졌다.
같은 문제를 겪는 분들은 EMR 센서 모드를 확인해보길 권한다.

