배경
Airflow에서 Sensor는 외부 상태를 주기적으로 polling 하며 특정 조건이 만족될 때까지 대기하는 태스크다.
EMR Step 완료 대기, S3 파일 생성 확인, 외부 API 처리 완료 등 언젠가 끝나는 작업을 기다리는 용도로 사용된다.
Sensor는 기본적으로 다음 루프를 반복한다.
조건 확인 → 미충족 → 대기 → 다시 확인
Python
복사
하지만 실제 운영에서는 이 단순한 구조가 문제를 만든다.
Sensor는 대부분 수 분~수 시간 이상 대기하기 때문에 대기하는 동안 리소스를 어떻게 사용하는지가 시스템 안정성을 결정한다.
예를 들어,
•
워커를 계속 점유하면 → 리소스 고갈
•
스케줄러와 상태가 어긋나면 → 중복 실행 / 좀비 태스크
•
여러 Sensor가 동시에 돌면 → 순간 부하 발생
이 문제를 해결하기 위해 Airflow는 Sensor에 대해 세 가지 실행 모드를 제공한다.
•
Poke
•
Reschedule
•
Deferrable
세 모드는 모두 동일한 polling 구조를 가지지만 대기 상태를 어디에서 처리하느냐가 다르다.
Poke 모드
동작 방식
•
센서 태스크 시작 → 워커 프로세스 점유 → 주기적으로 외부 상태 체크 → 완료 시 SUCCESS
[Worker]
while True:
상태 확인
if 완료:
종료
sleep(poke_interval)
Python
복사
특징 및 한계
•
구현이 단순하고 직관적인 polling 방식
•
하지만 조건이 만족될 때까지 워커를 지속적으로 점유한다는 구조적인 한계가 있음
장시간 실행되는 Sensor에서는 다음과 같은 문제가 발생할 수 있다.
•
워커 슬롯이 장시간 점유되어 리소스 고갈
•
heartbeat 지연으로 인해 스케줄러와 상태 불일치 발생
•
동일 태스크의 중복 실행 시도 또는 좀비 태스크 발생 가능
Reschedule 모드
동작 방식
•
센서 실행 → 상태 확인 → 조건 미충족 시 UP_FOR_RESCHEDULE 상태로 전환 → 워커 반환 → 일정 시간 후 다시 실행
[Worker]
상태 확인
if 미완료:
상태 = UP_FOR_RESCHEDULE
워커 반환
[Scheduler]
일정 시간 후 다시 큐잉 → Worker 재할당
Python
복사
특징 및 한계
•
대기 상태에서 워커를 점유하지 않아 Poke 대비 리소스 효율이 크게 개선됨
•
장시간 대기하는 Sensor에서도 워커 고갈 문제를 완화할 수 있음
하지만 polling 자체는 유지되기 때문에 다음과 같은 한계가 존재
•
poke_interval마다 워커가 다시 할당되면서 주기적인 실행 스파이크 발생
•
다수의 Sensor가 동일한 주기로 동작할 경우 동시 워커 기동 → 순간적인 리소스 사용 증가
•
특히 리소스가 제한된 환경에서는 메모리 스파이크로 이어질 수 있음
Deferrable 모드
핵심 개념: Triggerer
Deferrable 모드는 Sensor의 대기 처리를 워커가 아닌 Triggerer에게 위임한다.
Triggerer는 Airflow에서 별도로 동작하는 비동기 이벤트 처리 프로세스로 asyncio 기반의 이벤트 루프를 사용해 다수의 대기 작업을 효율적으로 처리한다.
•
하나의 프로세스에서 수백~수천 개의 대기 작업 처리 가능
•
코루틴 기반 → 대기 중에는 CPU/메모리 거의 사용하지 않음
•
워커와 분리된 실행 모델
동작 방식
•
워커: 초기 실행 → 상태 확인 → Trigger 등록 → 즉시 종료
•
Triggerer: 비동기 루프에서 상태 체크 → 조건 만족 시 이벤트 발생
•
워커: 이벤트 수신 후 재개 → SUCCESS
[Worker]
초기 실행 → 상태 확인 → Trigger 등록 → 종료
[Triggerer]
비동기 상태 체크 (polling)
조건 만족 → TriggerEvent 발생
[Worker]
이벤트 수신 → 재실행 → SUCCESS
Python
복사
내부 동작
async def run(self):
while True:
response = await emr_client.describe_step(...)
state = response["Step"]["Status"]["State"]
if state == "COMPLETED":
yield TriggerEvent({"status": "success"})
return
await asyncio.sleep(self.poke_interval)
Python
복사
await asyncio.sleep() 동안 이벤트 루프가 다른 코루틴을 실행하기 때문에 여러 Sensor를 하나의 프로세스에서 동시에 처리 가능
특징
•
동작을 Triggerer에게 넘기므로 대기 상태에서 워커 점유 없음
•
주기적인 워커 재기동도 없음 (Reschedule 한계 해결)
•
비동기 기반으로 확장성 매우 높음
•
장기 대기 + 다수 Sensor 환경에 최적
Trade-off
Deferrable은 구조적으로 가장 효율적인 방식이지만 몇 가지 전제가 존재한다.
•
Triggerer라는 새로운 컴포넌트에 의존
◦
기존 Worker + Scheduler 구조에서
◦
Deferrable: Worker + Scheduler + Triggerer 로 확장
◦
→ 장애 포인트 증가
•
모든 Sensor가 deferrable을 지원하지 않음
◦
provider / Airflow 버전에 따라 지원 여부 상이
◦
커스텀 Sensor의 경우 Trigger 직접 구현 필요
•
운영 및 디버깅 난이도 증가
◦
Worker 로그만으로는 상태 추적 어려움
◦
Triggerer 로그까지 함께 확인 필요
◦
이벤트 기반 구조로 인해 흐름 추적이 상대적으로 복잡
•
짧은 작업에서는 오버엔지니어링 가능
◦
수초 단위 Sensor에서는 구조적 이점이 거의 없음
Deferrable 전환 이후 이슈
Deferrable 자체의 문제는 아니지만, 구조 변화로 인해 새로운 병목이 발생할 수 있다.
AWS API Throttling
•
다수의 Sensor가 동시에 시작되면서 초기 polling 요청이 한 번에 발생
•
외부 API Rate Limit 초과
ThrottlingException: Rate exceeded when calling DescribeStep
Plain Text
복사
해결 방법
•
retries = 3
•
retry_exponential_backoff = True
•
jitter 적용
→ 요청 타이밍을 분산해 집중 호출을 완화하여 해결함
실제 문제 사례 (운영 환경 기준)
이론적으로는 단순히 대기 처리 방식의 차이처럼 보이지만, 실제 운영 환경에서는 이 차이가 장애 형태로 드러났다. EMR Step Sensor를 다수 병렬로 운영하다보니 대기 시간이 길어지면서 각 모드의 한계가 실제 문제로 이어졌다.
문제 1. 성공했는데 실패 알람 발생
Poke 모드에서는 센서가 워커를 장시간 점유한 채 계속 실행된다.
이 과정에서 scheduler heartbeat가 지연되면, 스케줄러가 해당 태스크의 상태를 정상적으로 추적하지 못하는 경우가 생긴다.
그 결과 아직 실행 중인 TaskInstance에 대해 스케줄러가 다시 실행을 시도하면서 아래와 같은 로그가 발생했다.
Task Instance Not Running
FAILED: Task is in the 'running' state
Python
복사
새로 시도된 실행은 [이미 running 상태]이기 때문에 즉시 실패 처리된다.
문제는 원래 실행 중이던 태스크는 그대로 살아 있어서, 이후 정상적으로 성공한다는 점이다.
•
원본 태스크는 최종적으로 SUCCESS
•
중간에 중복 실행 시도는 FAILED
•
on_failure_callback 이 호출되면서 Slack 실패 알람 발생
결과적으로 작업은 성공했는데 실패 알람이 오는 false failure alert 상황이 발생했다.
문제 2. 좀비 태스크 판정으로 인한 EMR 클러스터 강제 종료
비슷하게 Poke 모드에서 센서가 오래 실행되던 중 heartbeat를 잃으면, Airflow가 해당 태스크를 zombie task 로 판정하는 경우가 있었다.
문제는 이 태스크가 단순히 실패 처리되는 수준에서 끝나지 않았다는 점이다.
후속 정리 로직에서 remove_cluster 가 실행되면서, 실제로는 정상 동작 중이던 EMR step에 대해 terminate가 호출됐다.
•
실제 EMR step은 정상적으로 실행 중
•
Airflow는 heartbeat 유실로 태스크를 비정상 상태로 판단
•
클러스터 정리 로직 실행
•
결과적으로 EMR 작업 강제 종료
문제 3. Reschedule 전환 이후 주기적 메모리 스파이크
Poke 모드의 장기 워커 점유 문제를 줄이기 위해 Reschedule 모드로 전환한 뒤에는 다른 형태의 문제가 나타났다. Reschedule 모드는 대기 중에는 워커를 점유하지 않지만 poke_interval 마다 다시 워커를 할당받아 상태를 확인한다. 따라서 여러 센서가 같은 시점에 시작되면 매 주기마다 워커들이 한 번에 다시 올라오는 패턴이 반복된다.
예를 들면 다음과 같은 식이다.
t=0s : 20개 워커 동시 실행
t=60s : 다시 20개 실행
t=120s : 다시 20개 실행
Plain Text
복사
이 구조는 평균 사용량은 낮아 보여도 특정 시점에 동시 스폰 burst 를 만든다.
특히 MWAA Small 환경에서는 워커 하나의 메모리가 2GB로 제한되어 있었고, worker concurrency까지 고려하면 태스크 하나가 사용할 수 있는 여유 메모리가 매우 크지 않았다.
Airflow worker는 태스크를 실행할 때 Python 프로세스, operator 로딩, provider 패키지, boto3/airflow context 등을 함께 올리므로, 태스크 내용이 단순하더라도 기본 메모리 사용량이 적지 않다.
이 상태에서 여러 Sensor가 같은 시점에 재개되면
•
짧은 시간 동안 여러 worker process가 동시에 기동
•
프로세스 초기화 메모리 사용이 겹침
•
제한된 2GB 메모리에서 순간 사용량 급증
•
결국 worker OOM (return code -9) 발생
Reschedule은 장기 점유 문제는 줄였지만 대신 주기적 재기동에 따른 순간 메모리 스파이크 라는 새로운 한계를 드러냈다.
정리
Sensor 모드의 차이는 대기를 어디서 처리하느냐가 핵심 차이이다.
구분 | Poke | Reschedule | Deferrable |
대기 중 워커 점유 | O (계속 점유) | X | X |
대기 처리 주체 | 워커 | 스케줄러 재큐잉 | Triggerer 코루틴 |
20개 동시 실행 시 | 워커 20개 상시 점유 | 60초마다 20개 동시 스폰 | Triggerer 코루틴 20개 |
Heartbeat 유실 위험 | 높음 | 낮음 | 없음 |
OOM 위험 | 높음 | 중간 (주기적 스파이크) | 없음 |
False failure alert | 발생 가능 | 해소 | 해소 |
짦은 대기에 센서수가 적다면 Poke/Reschdule로도 충분하지만 장기 대기 + 다수 Sensor 환경이라면 Deferrable 모드를 사용하는게 좋다.