Python 비동기 처리 성능 개선: Fire-and-Forget 패턴으로 처리량 극대화하기
asyncio.gather()에서 벗어나 진정한 병렬 처리 구현하기
SP
SpacePlanning
SpacePlanning AI Team
## 들어가며
비동기 프로그래밍에서 흔히 마주치는 함정이 있습니다. `asyncio.gather()`를 사용해 여러 작업을 '병렬'로 처리한다고 생각했는데, 실제로는 배치 단위로 순차 처리되어 성능이 기대에 못 미치는 경우입니다. 이 글에서는 메시지 큐 소비자(Consumer) 패턴을 예시로, 진정한 병렬 처리를 구현하는 방법을 소개합니다.
## 문제 상황: 겉으로만 병렬 처리
많은 개발자들이 다음과 같은 패턴으로 메시지를 처리합니다.
```python
# 일반적인 배치 처리 방식
while True:
batch = await consumer.getmany(timeout_ms=5000)
# 배치 내 모든 메시지를 태스크로 생성
tasks = [asyncio.create_task(process_message(msg)) for msg in batch]
# 모든 태스크 완료 대기
await asyncio.gather(*tasks)
# 커밋 후 다음 배치
await consumer.commit()
```
### 핵심 문제점
- **블로킹 대기**: `gather()`가 배치 내 모든 작업이 끝날 때까지 기다림
- **유휴 시간 발생**: 배치 처리 중에는 새 메시지를 가져오지 않음
- **리소스 낭비**: 다른 작업이 준비되어 있어도 처리하지 못함
예를 들어, 10개 메시지 배치 중 9개는 1초 만에 끝나고 1개가 30초 걸린다면, 30초 동안 새 메시지를 가져오지 못합니다.
## 해결책: Fire-and-Forget 패턴
진정한 병렬 처리를 위해서는 "작업을 시작하고 잊어버리는" 패턴이 필요합니다.
```python
import asyncio
from typing import Set
# 실행 중인 태스크 추적
running_tasks: Set[asyncio.Task] = set()
while True:
# 짧은 타임아웃으로 자주 확인
batch = await consumer.getmany(timeout_ms=1000)
for msg in batch:
# 태스크 생성 및 추적 세트에 추가
task = asyncio.create_task(process_message(msg))
running_tasks.add(task)
# 완료되면 자동으로 제거
task.add_done_callback(lambda t: running_tasks.discard(t))
# Non-blocking 완료 확인 (타임아웃 0.1초)
if running_tasks:
done, _ = await asyncio.wait(running_tasks, timeout=0.1)
# 바로 커밋하고 다음 사이클
await consumer.commit()
```
### 핵심 개선 포인트
1. **태스크 세트 관리**: `running_tasks`로 실행 중인 작업 추적
2. **자동 정리**: `add_done_callback()`으로 완료된 태스크 자동 제거
3. **짧은 타임아웃**: 1초마다 새 메시지 확인
4. **비블로킹 대기**: `asyncio.wait(timeout=0.1)`로 빠르게 다음 사이클 진입
## 동시 실행 수 제어
무제한 병렬 처리는 위험합니다. 세마포어(Semaphore)로 제한하세요.
```python
# 최대 동시 실행 수 제한
MAX_CONCURRENT = 5
semaphore = asyncio.Semaphore(MAX_CONCURRENT)
async def process_message(msg):
async with semaphore:
# 실제 처리 로직
await heavy_processing(msg)
```
이렇게 하면 메시지는 계속 가져오지만, 실제 무거운 작업은 5개까지만 동시 실행됩니다.
## 실전 성능 비교
### Before (gather 방식)
- 10분간 처리량: 약 6건
- 평균 대기 시간: 배치당 평균 90초
### After (Fire-and-Forget)
- 10분간 처리량: 11건
- 리소스 활용률: 약 83% 향상
- 여러 종류의 메시지가 동시에 처리됨
## 주의사항
1. **에러 핸들링**: 태스크 내부에서 예외를 반드시 처리해야 합니다. 그렇지 않으면 조용히 실패할 수 있습니다.
```python
async def process_message(msg):
try:
await heavy_processing(msg)
except Exception as e:
logger.error(f"Processing failed: {e}")
# 재시도 로직 등
```
2. **메모리 관리**: `running_tasks` 세트가 무한히 커지지 않도록 모니터링이 필요합니다.
3. **커밋 전략**: 메시지 처리 실패 시 재처리 로직을 고려해야 합니다.
## 결론
`asyncio.gather()`는 편리하지만, 배치 단위로 블로킹되는 한계가 있습니다. Fire-and-Forget 패턴과 태스크 세트 관리를 통해:
- ✅ 지속적인 메시지 수신 가능
- ✅ 리소스 활용률 극대화
- ✅ 처리량 대폭 향상
다음 단계로는 백프레셔(backpressure) 제어, 우선순위 큐 적용 등을 고려해볼 수 있습니다. 여러분의 비동기 애플리케이션에도 이 패턴을 적용해보시기 바랍니다.