1. Spark 성능 문제는 왜 발생할까?
Spark는 대용량 데이터를 여러 노드에 분산하여 처리하는 시스템이다. 그래서 단일 머신에서는 발생하지 않는 문제들이 분산 환경에서는 주요 병목이 된다.
핵심 병목 원인 세 가지:
•
네트워크 전송: 노드 간 데이터를 주고받는 과정은 디스크 I/O보다 느릴 수 있으며, 데이터 양이 많을수록 비용이 급격히 증가
•
데이터 분포 불균형(Skew): 특정 노드나 파티션에 데이터가 몰리면 해당 task가 전체 job의 병목이 됨
•
불필요한 I/O: 필요하지 않은 데이터까지 읽고 처리하면 그만큼 자원낭비
따라서 Shuffle 최소화, 스큐 방지, I/O 절감 세가지로 Spark 성능 최적화를 할 수 있다.
2. Shuffle 원리와 비용
2.1 Shuffle 이란?
Shuffle은 특정 연산(Join, GroupBy, Distinct 등)을 수행하기 위해 같은 키를 가진 데이터를 같은 파티션으로 재분배하는 과정이다.
Spark는 셔플 시 hash(key) % numPartitions 방식으로 각 레코드가 어느 파티션으로 이동할지 결정한다. 같은 키는 반드시 같은 파티션으로 이동해야 Join이나 GroupBy가 정확하게 수행된다.
2.2 Shuffle이 비싼 이유
Shuffle은 Spark에서 가장 비용이 높은 연산이다.
1.
네트워크 전송: 데이터가 노드 간 이동한다. 수십~수백 GB의 데이터가 네트워크를 통해 이동하면 그 자체로 큰 오버헤드다.
2.
디스크 Spill: 셔플 데이터가 메모리를 초과하면 디스크에 임시 저장(spill)된다. 디스크 I/O는 메모리 접근보다 수백 배 느리다.
3.
Serialization / Deserialization: 데이터를 네트워크로 전송하기 위해 직렬화하고, 받는 쪽에서 역직렬화하는 과정이 CPU를 소모한다.
4.
Task 대기 시간: 셔플 이후 다음 스테이지는 모든 셔플 write가 완료될 때까지 시작할 수 없다. 특정 task가 느리면 전체가 기다린다.
Spark 튜닝의 핵심은 Shuffle을 줄이는 것이다.
3. Join 전략 이해
Spark의 Join 전략은 크게 Broadcast Join과 Shuffle Join 두 가지로 나뉜다.
3.1 Broadcast Join
작은 테이블을 각 executor에 복제(broadcast)하고, 큰 테이블은 셔플 없이 스캔만 한다.
동작 방식:
1.
작은 테이블을 driver가 수집
2.
모든 executor에 broadcast
3.
큰 테이블 각 파티션에서 로컬 Join 수행 → 셔플 없음
예시:
•
주문 테이블: 1억 건
•
쿠폰 테이블: 50건
Driver가 쿠폰 테이블 전체를 각 executor에 복제
↓ ↓ ↓
Executor 1 Executor 2 Executor 3
[쿠폰 50건] [쿠폰 50건] [쿠폰 50건]
[주문 3300만] [주문 3300만] [주문 3400만]
↓ ↓ ↓
로컬 Join 로컬 Join 로컬 Join
(네트워크 이동 없음)
Python
복사
3.2 Shuffle Join
Shuffle Join은 양쪽 테이블 모두 Join 키 기준으로 셔플해서 같은 키를 같은 파티션으로 모은 뒤 Join을 수행한다. 양쪽 테이블이 모두 네트워크를 통해 이동하기 때문에 셔플 비용이 두 배가 된다.
셔플 이후 파티션 안에서 Join을 처리하는 방식에 따라 Sort-Merge Join과 Shuffle Hash Join 두 가지로 나뉜다.
3.2.1 Sort-Merge Join
Spark의 기본 셔플 조인 전략이다. 셔플 후 양쪽 파티션을 키 기준으로 정렬한 다음, 포인터 두 개가 양쪽을 동시에 순서대로 훑으면서 같은 키끼리 매칭한다.
[셔플 후 같은 파티션에 모인 상태]
Partition 0
orders: user_id=1, user_id=2, user_id=3 ← 정렬됨
users: user_id=1, user_id=2, user_id=3 ← 정렬됨
↓
포인터 두 개가 양쪽을 동시에 순차적으로 훑음
user_id=1 매칭 → user_id=2 매칭 → user_id=3 매칭
Python
복사
정렬 비용이 있지만 데이터를 순차적으로 흘려보내며 처리하기 때문에 전체를 한 번에 메모리에 올릴 필요가 없다. 데이터가 메모리를 초과해도 일부를 디스크에 spill하면서 계속 진행할 수 있어 대용량에 안정적이다.
3.2.2. Shuffle Hash Join
셔플 후 작은 쪽 파티션 전체를 해시 테이블로 빌드해 메모리에 올려놓고, 큰 쪽 파티션을 한 행씩 읽으면서 해시 테이블을 조회하는 방식이다.
[셔플 후 같은 파티션에 모인 상태]
Partition 0
작은 테이블 → 해시 테이블로 빌드 (메모리에 올림)
{ user_id=1: row_data, user_id=2: row_data, ... }
큰 테이블 → 한 행씩 읽으면서 해시 테이블 조회(probe)
user_id=1 → O(1) 조회 → 매칭
user_id=2 → O(1) 조회 → 매칭
Python
복사
정렬이 없어서 Sort-Merge보다 빠를 수 있지만, 작은 쪽 파티션 전체를 해시 테이블로 메모리에 올려야 하기 때문에 그 파티션이 메모리를 초과하면 OOM이 발생할 수 있다. Spark 기본값은 Sort-Merge이며, Shuffle Hash는 spark.sql.join.preferSortMergeJoin=false로 설정해야 사용된다.
3.3 Join 전략 비교
구분 | Broadcast Join | Sort-Merge Join | Shuffle Hash Join |
셔플 발생 | 없음 | 양쪽 모두 | 양쪽 모두 |
적합한 상황 | 한쪽이 매우 작을 때 | 양쪽 모두 클 때 | 한쪽이 상대적으로 작을 때 |
정렬 필요 | 없음 | 있음 | 없음 |
메모리 부담 | executor당 작은 테이블 크기 | 낮음 (순차 처리) | 작은 쪽 파티션 전체 |
Spark 기본값 | 임계값 이하 자동 선택 | 셔플 조인 기본값 | 수동 설정 필요 |
4. 데이터 스큐와 해결 방법
4.1 스큐(Skew)란?
앞서 설명했듯 Spark 셔플은 해시로 파티션을 결정한다. 해시 함수 자체는 균등 분산을 목표로 하지만, 키의 빈도 분포가 불균등하면 결과도 불균등해진다.
예를 들어 user_id로 Join하는데 특정 user_id가 전체 데이터의 40%를 차지하면, 그 키는 항상 동일한 파티션으로 해시되어 해당 파티션 하나가 40%를 처리하게 된다.
스큐가 발생하면:
•
특정 task만 수십 분씩 실행되고 나머지는 이미 완료된 상태로 대기
•
해당 stage 전체가 그 task 하나를 기다리며 블로킹되어 결국 전체 잡이 느려지게 된다.
4.2 해결 방법
방법 1: salting
키에 랜덤 suffix를 붙여서 같은 키를 인위적으로 여러 파티션으로 분산하는 방법이다.
예를 들어 user_id=1이 전체 데이터의 40%를 차지한다면, user_id=1_0, user_id=1_1, ..., user_id=1_9 처럼 랜덤 숫자를 suffix로 붙여서 10개의 서로 다른 키로 만든다. 이렇게 하면 user_id=1에 해당하는 데이터가 10개의 파티션에 균등하게 분산된다.
단, Join을 하려면 상대 테이블도 같은 방식으로 키를 맞춰줘야 한다. user_id=1_0부터 user_id=1_9까지 모든 버전의 키가 상대 테이블에도 존재해야 Join이 되기 때문에, 상대 테이블의 user_id=1 행을 10개로 복제해서 각각 suffix를 붙여준다.
결국 Salting은 스큐가 있는 쪽의 데이터를 분산하는 대신, 상대 테이블을 SALT_FACTOR배로 복제하는 트레이드오프가 발생한다. 상대 테이블이 너무 크면 복제 비용이 오히려 더 커지기 때문에, 상대 테이블이 충분히 작을 때 효과적이다. 상대 테이블도 크다면 Broadcast Join이 불가능한 상황이므로, AQE Skew Join을 사용하거나 파티셔닝 전략 자체를 재검토해야 한다.
방법 2: Broadcast Join으로 셔플 자체를 회피
스큐가 발생하는 Join에서 한쪽 테이블이 충분히 작다면, Broadcast Join으로 셔플 자체를 없애는 것이 가장 단순하고 효과적인 해결책이다.
from pyspark.sql.functions import broadcast
result = large_df.join(broadcast(small_df), "user_id")
Python
복사
방법 3: AQE Skew Join (다음 섹션에서 설명)
5. AQE(Adaptive Query Execution)
5.1 AQE란?
AQE는 Spark 3.0에서 도입된 기능으로, 쿼리 실행 중 수집된 런타임 통계를 기반으로 실행 계획을 동적으로 재최적화한다. 기존 Spark는 실행 전 정적으로 수립된 계획만 따랐기 때문에, 실제 데이터 분포와 계획이 맞지 않는 문제가 빈번했다.
# AQE 활성화 (Spark 3.2부터 기본값 true)
spark.conf.set("spark.sql.adaptive.enabled", "true")
Python
복사
5.2 AQE의 세 가지 핵심 기능
5.2.1 셔플 파티션 자동 조정
spark.sql.shuffle.partitions는 기본값이 200이다. 이 값은 셔플 이후 생성될 파티션 수를 고정적으로 결정한다.
•
너무 크면 (예: 데이터는 1GB인데 파티션이 200개): 파티션 하나당 데이터가 수 MB에 불과해지고, task overhead(스케줄링, 메타데이터 관리)가 실제 처리 시간보다 커진다. 또한 수백 개의 작은 파일이 생성되어 이후 읽기 성능도 저하된다.
•
너무 작으면 (예: 데이터는 1TB인데 파티션이 10개): 파티션 하나가 수백 GB를 처리하게 되어 메모리 부족으로 디스크 spill이 발생하거나, 병렬성이 떨어져 처리 시간이 길어진다.
AQE는 셔플 이후 실제 데이터 크기를 보고 파티션 수를 자동으로 조정한다.
# 셔플 파티션 자동 병합 활성화
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
# 파티션 하나의 목표 크기. AQE는 이 크기에 맞춰 작은 파티션들을 병합한다.
# 예: 전체 셔플 데이터가 1.28GB라면 약 10개로 병합 (1280MB / 128MB)
spark.conf.set("spark.sql.adaptive.advisoryPartitionSizeInBytes", "128m")
Python
복사
5.2.2 스큐 조인 자동 처리
런타임에 특정 파티션이 다른 파티션보다 현저히 크다고 판단되면, 해당 파티션을 자동으로 더 작은 단위로 분할하여 Join을 분산처리한다.
# 스큐 조인 자동 처리 활성화
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
# 어떤 파티션을 스큐로 볼 것인가: 파티션 크기가 전체 파티션의 중앙값보다
# 이 배수 이상 크면 스큐 파티션으로 판단한다.
# 예: 중앙값이 100MB이고 factor=5라면, 500MB 이상인 파티션을 스큐로 판단
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionFactor", "5") # 중앙값의 5배 이상이면 스큐로 판단
# 위 조건을 만족하더라도, 절대적인 크기가 이 임계값 이상일 때만 스큐로 판단한다.
# 중앙값이 10MB이고 factor=5라면 50MB가 기준이 되지만,
# 이 설정이 256MB라면 실제로는 256MB 이상이어야 스큐로 처리한다.
# 즉, 두 조건을 모두 만족해야 AQE가 스큐 파티션을 분할한다.
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", "256m")
Python
복사
5.2.3 런타임 Join 전략 재선택
실행 전에는 테이블 크기를 정확히 알 수 없어 Shuffle Join으로 계획했더라도, 실제 셔플 데이터가 broadcast 임계값 이하라면 AQE가 런타임에 Broadcast Join으로 전략을 변경한다.
5.3 AQE가 없다면?
AQE가 없던 Spark 2.x 환경이나 AQE를 비활성화해야 하는 상황에서는 위 세 가지를 수동으로 처리해야 한다.
AQE 기능 | 수동 튜닝 |
셔플 파티션 자동 조정 | spark.sql.shuffle.partitions를 데이터 크기에 맞게 직접 설정
일반적으로 파티션 하나당 128~256MB를 목표로 전체 셔플 데이터 크기를 나눠서 계산 |
스큐 조인 자동 처리 | Salting으로 키를 분산 |
런타임 Join 전략 재선택 | broadcast() 힌트 명시 또는 autoBroadcastJoinThreshold 임계값 조정 |
결국 AQE는 이 세 가지를 런타임 통계 기반으로 자동화한 것이다. AQE가 없으면 개발자가 데이터 분포를 파악하고 수동으로 하나씩 튜닝해야 한다.
6. I/O 최적화
6.1 Predicate Pushdown (프레디케이트 푸시다운)
필터 조건을 스토리지 레벨에서 수행해 필요한 데이터만 읽어오는 최적화 기법이다.
Parquet은 파일 내부에 Row Group 단위로 min/max 통계를 저장하고 있다. Spark는 필터 조건과 이 통계를 비교해서 해당 Row Group을 읽을 필요가 없으면 아예 건너뛴다. 예를 들어 order_date = '2024-01-01' 조건이 있을 때, Row Group의 min이 2024-02-01이면 그 Row Group 전체를 스킵한다.
파티션 키로 필터링하면 효과가 더 크다. Parquet을 event_date 기준으로 파티셔닝해서 저장했다면, event_date = '2024-01-01' 필터는 해당 날짜의 디렉토리만 스캔하고 나머지 파티션 디렉토리는 아예 접근하지 않는다.
df = spark.read.parquet("s3://bucket/events/")
result = df.filter(F.col("event_date") == "2024-01-01")
Python
복사
6.2 Column Pruning (컬럼 프루닝)
실제로 사용하는 컬럼만 읽는 최적화다. Parquet은 컬럼 단위로 저장되기 때문에, 필요한 컬럼만 선택하면 디스크 읽기가 크게 줄어든다.
result = spark.read.parquet("s3://bucket/users/").select("user_id", "age")
Python
복사
7. Spark UI로 성능 병목 진단하기
성능 문제를 해결하려면 먼저 어디서 병목이 발생하는지 파악해야 한다. Spark UI는 이를 위한 핵심 도구다.
Stage 탭:
•
Stage별 실행 시간: 어느 stage에서 시간이 가장 많이 소요되는지 확인
•
Shuffle Read/Write 크기: 크다면 셔플 최소화 또는 파티션 수 조정 필요
Tasks 탭:
•
Task 분포 (max vs median): max task 시간이 median보다 수배 크면 스큐 의심
•
Spill 발생 여부: Spill (memory), Spill (disk) 컬럼에 값이 있으면 메모리 부족 또는 파티션 수 문제
•
파티션 수: task 수가 지나치게 많거나 적으면 파티션 수 조정 필요
진단 시나리오 예시
Stage 3 실행 시간: 45분
├── Task 분포: median 30초, max 44분 → 스큐 의심
├── Shuffle Read: 200GB → 셔플 비용 높음
└── Spill (disk): 50GB → 메모리 부족
Python
복사
위 상황이라면 순서대로 스큐 원인 파악 → Salting 또는 AQE skewJoin 활성화 → shuffle.partitions 조정 → executor 메모리 증설 순으로 접근한다.
Spark 성능 최적화의 핵심은 연산 자체보다 데이터 이동과 불균형을 줄이는 것에 있다.
Shuffle을 최소화하고, 데이터 분포를 균등하게 유지하는 것이 기본이며, Broadcast Join, AQE, Predicate Pushdown은 이를 보완하는 수단이다.
같은 로직이라도 데이터가 어떻게 이동하는지를 이해하고 있으면 성능은 달라진다.