All
배경
•
Product 관련 통계 데이터를 Athena에서 S3로 저장하는 파이프라인에서 지속적인 OOM 발생
•
해당 DAG은 00시 5분에 실행되는데 자정 근처에는 작업이 많아 OOM이 자주 발생하고 새벽마다 알람이 옴
문제 분석
기존 구조
athena_result = self.fetchall_athena(query_string=_query_string)
df = pd.DataFrame(athena_result[1:], columns=athena_result[0])
self.save_to_parquet(df=df, s3_output_path=...)
Python
복사
•
fetchall_athena()가 Athena 쿼리 결과 전체를 리스트로 반환
•
pd.DataFrame()이 메모리에 모든 행을 한꺼번에 적재
•
to_parquet()에서 단일 대용량 DataFrame을 단일파일로 직렬화
1.
데이터를 한번에 가져와서 처리하고 저장하다보니 메모리 사용량이 치솟아 -9 killed 발생
2.
중복되는 Athena 호출 로직의 정리 필요 (동일한 fetchall 함수 몇십군데 모듈에서 복제해서 똑같이 사용중..)
개선
1) 메인 모듈 개선
항목 | Before | After |
데이터 로드 방식 | 전체 결과를 한 번에 메모리에 적재 | fetchall_query_chunked()로 청크 단위 스트리밍 |
저장 방식 | pandas.DataFrame.to_parquet() 단일 호출 | pyarrow.ParquetWriter로 반복 append 저장 |
메모리 관리 | 없음 | 청크 처리 후 del 및 gc.collect() 호출 |
평균 메모리 사용량 | 약 956MB | 약 330MB로 감소 |
개선 코드 예시
for i, (headers, chunk) in enumerate(
CustomAthenaQuery.fetchall_query_chunked(client, query, chunk_size=30_000),
start=1,
):
df = pd.DataFrame(chunk, columns=headers)
tbl = pa.Table.from_pandas(df, preserve_index=False)
if writer is None:
writer = pq.ParquetWriter(output_file, tbl.schema, compression="snappy", filesystem=s3)
writer.write_table(tbl)
del df, tbl, chunk
Python
복사
ParquetWriter 란?
•
pandas.DataFrame.to_parquet()은 모든 데이터를 한 번에 파일로 직렬화
•
반면 pyarrow.parquet.ParquetWriter는 파일 스트림을 열고, 여러 번 append 가능
•
대용량 데이터를 chunk 단위로 반복 기록할 수 있어 메모리 효율이 매우 높음
•
내부적으로 각 chunk를 Row Group으로 나누어 저장하므로 병렬 읽기 및 column pruning 성능도 향상됨
2) CustomAthenaQuery 모듈 리팩토링
문제점
•
쿼리 실행 함수가 여러 곳에 중복 (get_query, fetchall_query 등)
•
페이지네이션 및 청크 단위 처리 미지원
개선 방향
•
로직을 역할별로 분리하여 재사용성 강화
◦
run_and_wait(): 쿼리 실행 및 완료 대기
◦
iter_rows(): 결과 행을 제너레이터(stream) 형태로 반환
•
위 두 함수로 기존 fetch_all 코드 수정하고, 청크 단위 스트리밍을 위한 fetchall_query_chunked() 신규 추가
기존 청크 없이 fetchall 대체
def fetchall_query(client, query: str) -> list:
qid = CustomAthenaQuery.run_and_wait(client, query)
return list(CustomAthenaQuery.iter_rows(client, qid))
Python
복사
청크 단위 yield 방식 추가
def fetchall_query_chunked(client, query: str, chunk_size: int = 10000):
qid = CustomAthenaQuery.run_and_wait(client, query)
row_iter = CustomAthenaQuery.iter_rows(client, qid)
headers = list(next(row_iter))
chunk_data = []
for row in row_iter:
chunk_data.append(list(row))
if len(chunk_data) >= chunk_size:
yield headers, chunk_data
chunk_data = []
if chunk_data:
yield headers, chunk_data
Python
복사
성과
기존 메모리 956MB 사용에서 330MB 로 사용량 축소 및 OOM 해소
•
before
•
after
CustomAthenaQuery 리팩토링으로 중복 코드 없애고 재사용성 향상

