Search

[Airflow 운영 안정화] Athena 대용량 쿼리 OOM 해결기

제목
작성일

배경

Product 관련 통계 데이터를 Athena에서 S3로 저장하는 파이프라인에서 지속적인 OOM 발생
해당 DAG은 00시 5분에 실행되는데 자정 근처에는 작업이 많아 OOM이 자주 발생하고 새벽마다 알람이 옴

문제 분석

기존 구조
athena_result = self.fetchall_athena(query_string=_query_string) df = pd.DataFrame(athena_result[1:], columns=athena_result[0]) self.save_to_parquet(df=df, s3_output_path=...)
Python
복사
fetchall_athena()가 Athena 쿼리 결과 전체를 리스트로 반환
pd.DataFrame()이 메모리에 모든 행을 한꺼번에 적재
to_parquet()에서 단일 대용량 DataFrame을 단일파일로 직렬화
1.
데이터를 한번에 가져와서 처리하고 저장하다보니 메모리 사용량이 치솟아 -9 killed 발생
2.
중복되는 Athena 호출 로직의 정리 필요 (동일한 fetchall 함수 몇십군데 모듈에서 복제해서 똑같이 사용중..)

개선

1) 메인 모듈 개선

항목
Before
After
데이터 로드 방식
전체 결과를 한 번에 메모리에 적재
fetchall_query_chunked()로 청크 단위 스트리밍
저장 방식
pandas.DataFrame.to_parquet() 단일 호출
pyarrow.ParquetWriter로 반복 append 저장
메모리 관리
없음
청크 처리 후 delgc.collect() 호출
평균 메모리 사용량
약 956MB
약 330MB로 감소
개선 코드 예시
for i, (headers, chunk) in enumerate( CustomAthenaQuery.fetchall_query_chunked(client, query, chunk_size=30_000), start=1, ): df = pd.DataFrame(chunk, columns=headers) tbl = pa.Table.from_pandas(df, preserve_index=False) if writer is None: writer = pq.ParquetWriter(output_file, tbl.schema, compression="snappy", filesystem=s3) writer.write_table(tbl) del df, tbl, chunk
Python
복사
ParquetWriter 란?
pandas.DataFrame.to_parquet()모든 데이터를 한 번에 파일로 직렬화
반면 pyarrow.parquet.ParquetWriter파일 스트림을 열고, 여러 번 append 가능
대용량 데이터를 chunk 단위로 반복 기록할 수 있어 메모리 효율이 매우 높음
내부적으로 각 chunk를 Row Group으로 나누어 저장하므로 병렬 읽기 및 column pruning 성능도 향상됨

2) CustomAthenaQuery 모듈 리팩토링

문제점
쿼리 실행 함수가 여러 곳에 중복 (get_query, fetchall_query 등)
페이지네이션 및 청크 단위 처리 미지원
개선 방향
로직을 역할별로 분리하여 재사용성 강화
run_and_wait(): 쿼리 실행 및 완료 대기
iter_rows(): 결과 행을 제너레이터(stream) 형태로 반환
위 두 함수로 기존 fetch_all 코드 수정하고, 청크 단위 스트리밍을 위한 fetchall_query_chunked() 신규 추가
기존 청크 없이 fetchall 대체
def fetchall_query(client, query: str) -> list: qid = CustomAthenaQuery.run_and_wait(client, query) return list(CustomAthenaQuery.iter_rows(client, qid))
Python
복사
청크 단위 yield 방식 추가
def fetchall_query_chunked(client, query: str, chunk_size: int = 10000): qid = CustomAthenaQuery.run_and_wait(client, query) row_iter = CustomAthenaQuery.iter_rows(client, qid) headers = list(next(row_iter)) chunk_data = [] for row in row_iter: chunk_data.append(list(row)) if len(chunk_data) >= chunk_size: yield headers, chunk_data chunk_data = [] if chunk_data: yield headers, chunk_data
Python
복사

성과

기존 메모리 956MB 사용에서 330MB 로 사용량 축소 및 OOM 해소
before
after
CustomAthenaQuery 리팩토링으로 중복 코드 없애고 재사용성 향상