회사에 합류하고 처음 Airflow DAG 레포지토리를 열었을때 예상보다 훨씬 많은 문제들이 있었다. 수십개 DAG이 운영중이었는데 하드코딩, 중복코드, 몇백줄의 스크립트 등 개선해야할 사항이 많이 보였다. 아래는 이번에 겪은 문제 사항들과 어떻게 개선했는지 정리한 내용이다.
개선 원칙
•
책임 분리
◦
DAG은 실행 흐름만 담당, 설정은 설정파일에서 담당, 특정 태스크를 실행하기 위한 로직은 별도 모듈화
◦
메인함수는 얇게 유지해 데이터 처리 흐름이 드러나게 하기
•
DRY (Don’t Repeat Yourself)
◦
공통 설정, 입출력, 클라이언트 로직은 한곳에 모아두기
◦
최대한 중복 코드 없애기
•
의존성 주입
◦
시크릿, 클라이언트, 커넥션 등은 외부에서 주입해 테스트 가능성 확보
개선사례
1) datetime.now() 하드코딩 제거
•
에어플로우의 장점은 쉽게 과거 구간 재실행이 가능하다는 것
•
하지만 datetime.now()로 처리 시간을 하드코딩하면 재실행이 불가능해짐
•
개선: 에어플로우가 제공하는 data_interval_start, data_interval_end 활용
2) 중복 설정 제거
•
DAG마다 동일한 DEFAULT_ARGS 하드코딩
•
심지어 이메일이 그대로 하드코딩 되어있어서 퇴사자 이메일을 제거하는데 모든 DAG에 들어가 일일히 수정해야 했음(!!)
•
개선: 모듈로 분리, 이메일은 시크릿으로 관리
3) DAG 정의부 얇게 유지
•
문제:
◦
DAG 상단에 EMR config와 task 정의가 수백줄 있어, DAG 역할을 보러갔는데 스크롤을 한참 내려야함
◦
거의 동일한 Task 실행 함수 나열로 DAG 코드 너무 길고 복잡해짐
•
개선:
◦
설정은 별도 모듈로 분리
◦
Task builder 패턴으로 DAG에서는 호출만해서 의존성 그래프만 그림
◦
DAG 파일에는 오직 DAG 플로우 정의만 담도록해 가독성 향상
# Before (각 step을 수백줄로 직접 정의)
SPARK_STEPS = [
{...},
{...},
{...}
]
with DAG(): # DAG는 이미 200줄 이후에 존재
@task
def task1() -> None:
from module.script import A
A().main()
@task
def task2() -> None:
from module.script import B
B().main()
t1 = task1()
t2 = task2()
이하 동일한 방식으로 몇십개 반복
...
>> t1
>> t2
...
# 500줄이 넘어감
Python
복사
# After
from project.spark_config import SPARK_STEPS
...
with DAG(): # 30줄에서 시작
tasks = build_group()
...
>> tasks["A"]
>> tasks["B"]
Python
복사
4) S3 입출력 코드 공통화
•
S3 + Athena 위주의 데이터 플랫폼을 운영 하기 때문에, 파이프라인의 대부분이 S3에서 읽고 Parquet로 저장하는 흐름임
•
S3 저장 경로를 생성하거나 저장하는 코드가 DAG 마다 똑같이 중복
•
개선: 공통 함수 lib.s3_utils 모듈에 분리
◦
비단 S3 뿐 아니라 RDB 입출력이나 프로젝트 전반에 걸쳐 사용하는 부분은 공통 모듈로 분리
# lib/s3_utils.py
def partition_key(prefix, y, m, d, file_name):
base = f"{prefix}/year={y}/month={m}/day={d}"
return f"{base}/{file_name}.parquet"
def write_parquet_to_s3(df, bucket, key):
...
Python
복사
5) 버킷 경로 변수 위치 통일
•
ETL 파이프라인 대부분이 S3에서 읽어 S3에 저장하는데, 경로가 코드 중간중간 하드코딩 되어있음
•
어디서 읽고 어디에 저장하는지 확인이 어렵고 모든 코드를 읽어야 알 수 있음
•
개선: read_path, write_path, bucket 등 외부 메인에서 주입. 해당 태스크가 어디서 어디로 이동하는지 밖에서 확인 가능
class ETLJob:
def __init__(self, read_path, write_path, bucket):
self.read_path = read_path
self.write_path = write_path
self.bucket = bucket
Python
복사
from airflow.decorators import task
@task
def run_etl():
job = ETLJob(
read_path="bronze/events/date=2025-09-01/data.parquet",
write_path="silver/events/date=2025-09-01/data.parquet",
bucket="data-lake"
)
job.run()
# bronze > silver로 가는 흐름이라는 것을 바로 알 수 있음
Python
복사
6) 의존성 주입
•
모듈안에서 직접 시크릿을 불러오거나, boto3 client 불러오는 경우가 많음
•
로컬에서 Variable 불러올 수 없어 동작 테스트를 하기 어렵고, 시크릿 호출 중복
•
개선: 시크릿은 DAG나 Operator 레벨에서 불러와 모듈에 주입
•
효과:
◦
Variable 없이 따로 로컬에서 시크릿 주입해 동작 테스트 가능
◦
시크릿은 Operator 레벨에만 있으니 하위에서는 다른 책임없이 그냥 받아서 사용만 하면됨
# dags/example.py
from airflow.models import Variable
from etl.loader import MyLoader
@task
def run_loader():
token = Variable.get("TOKEN")
loader = MyLoader(token=token)
loader.run()
Python
복사
# etl/loader.py
class MyLoader:
def __init__(self, token: str):
self.token = token
def run(self):
...
Python
복사
7) 함수 분리
•
main 함수 하나에 수백줄 짜리 ETL 로직이 쭉 나열되어 있음
•
동일 로직에 몇개 값만 달라지면서 리포트를 생성하는 코드가 있었는데 모든 코드를 복사 붙여넣기 해서 거의 동일한 코드가 4개가 있는 경우를 확인함
•
if 문 뎁스가 5~6번 들어가 있는 경우도 있음
•
이런 경우는 요구사항이 변경되어 수정해야할때 코드 동작 파악이 너무 힘들기 때문에 적절한 단위로 분리 필요
•
개선: 함수 단위로 분리하고 main에서는 함수 호출 위주로 사용해 얇게 유지할 수 있도록 구조화
•
효과: 코드 재사용성, 어떤 코드가 어떤 역할하는지 명확해지고 유닛테스트 가능해짐
def extract(...): ...
def transform(...): ...
def load(...): ...
def main():
data = extract(...)
result = transform(data)
load(result)
Python
복사
8) 자주 사용하는 함수 중앙화
•
Athena 조회나 S3에 Parquet 저장 등 거의 매번 동일 함수를 여기저기서 똑같이 사용중
•
개선: 공통함수 lib 으로 옮기고 호출해서 사용
9) 테스트 코드 부재
•
함수분리가 안되어있다보니 당연히 유닛테스트도 부재
•
테스트코드가 없다면 문서화라도 있어야 추후 작업자가 유지보수할때 참고해 작업할 수 있는데, 둘다 미비하다보니 조그만 요구사항 변경에도 모든 코드와 동작을 파악하느라 시간이 오래 걸림
•
풀커버리지가 아니더라도 핵심 로직이나 예외 케이스 처리 함수에는 테스트를 작성. 테스트 코드 자체가 스펙문서 역할을 하게 됨
정리
아직 모든 부분을 개선하진 못해서 틈틈이 티켓 할당해 개선해나가고 있다. 특히 EMR Spark 기반 DAG은 여전히 스크립트 나열식 코드와 중복이 많아 하나씩 개선해 나가야 한다. 다만 새로운 DAG은 원칙을 지켜 작성하고 있고, 코드 리뷰에서도 같은 기준을 적용하려고 한다.
크고 어려운 작업이 아니어도 공통 설정과 spark 설정 하나만 분리해줘도 DAG 파일 라인수가 275줄에서 115줄로 줄어들었다. 불필요한 스크롤이 줄고 태스크와 실행 흐름이 더 빨리 눈에 들어와 가독성이 크게 개선됐다.
중요한건 결국 빠른 기능추가보다 잘 읽히고 수정하기 쉬운 코드를 만드는 것이다. 조금 귀찮더라도 지금 한번만 챙기면 앞으로 더 많은 시간을 아낄 수 있다.
작은 것들이 누적되어 전체 품질을 좌우하게 된다.