## 들어가며
최근 AI 검색 시스템을 구축하면서 26만 건의 PDF 논문을 처리해야 하는 과제를 맡게 되었습니다. 단순히 파일을 읽는 것이 아니라, 텍스트를 추출하고, 검색 가능한 형태로 임베딩(Embedding)을 생성하는 전체 파이프라인을 설계해야 했죠. 이 글에서는 대용량 문서 처리 시스템을 구축하면서 얻은 실전 노하우를 공유합니다.
## 파이프라인 설계의 핵심 요소
### 1. PDF 텍스트 추출 전략
Python의 `pypdf` 라이브러리를 선택했습니다. 대안으로 `pdfplumber`, `PyMuPDF` 등이 있지만, pypdf는 순수 Python으로 구현되어 의존성이 적고 안정적입니다.
```python
import pypdf
def extract_text_from_pdf(pdf_path):
with open(pdf_path, 'rb') as file:
reader = pypdf.PdfReader(file)
text = ""
for page in reader.pages:
text += page.extract_text()
return text
```
### 2. 텍스트 청킹(Chunking) 전략
임베딩 모델의 토큰 제한을 고려해 800토큰 단위로 분할하되, 문맥 유지를 위해 100토큰씩 오버랩시켰습니다. 이는 검색 품질에 큰 영향을 미칩니다.
**청킹 파라미터 선택 기준:**
- 청크 크기: 임베딩 모델 최대 토큰의 70-80% (여유 확보)
- 오버랩: 청크 크기의 10-15% (문맥 연결)
- 문장 경계 고려: 자연스러운 분할 지점에서 끊기
### 3. 임베딩 생성 및 저장
Gemini API를 활용해 3072차원 벡터를 생성했습니다. PostgreSQL의 `pgvector` 확장을 사용하면 벡터 검색이 가능합니다.
```sql
CREATE TABLE document_chunks (
id SERIAL PRIMARY KEY,
document_id INTEGER,
chunk_index INTEGER,
content TEXT,
embedding vector(3072)
);
CREATE INDEX ON document_chunks USING ivfflat (embedding vector_cosine_ops);
```
### 4. 실시간 모니터링 시스템
26만 건 처리에는 며칠이 걸립니다. 진행 상황을 모니터링하기 위해 5분마다 텔레그램 봇으로 진행률을 전송했습니다.
```python
import time
from datetime import datetime
def send_progress_report(processed, total, start_time):
elapsed = time.time() - start_time
rate = processed / elapsed if elapsed > 0 else 0
eta = (total - processed) / rate if rate > 0 else 0
message = f"""
📊 처리 진행 상황
- 완료: {processed:,} / {total:,} ({processed/total*100:.1f}%)
- 처리 속도: {rate:.1f}건/초
- 예상 완료: {eta/3600:.1f}시간 후
"""
# 텔레그램 전송 로직
```
### 5. 데이터베이스 호환성 처리
PDF에서 추출한 텍스트에 NUL 문자(`\x00`)가 포함되면 PostgreSQL 저장 시 오류가 발생합니다. 전처리 단계에서 반드시 제거해야 합니다.
```python
def clean_text(text):
# NUL 문자 제거
text = text.replace('\x00', '')
# 연속된 공백 정리
text = ' '.join(text.split())
return text
```
## 성능 최적화 팁
### 배치 처리 전략
- **논문당 처리 시간**: 20-30초 (텍스트 추출 5초, 임베딩 생성 15-25초)
- **병렬 처리**: API rate limit 고려해 동시 요청 수 제한
- **체크포인트**: 1,000건마다 진행 상황 저장 (재시작 시 이어서 처리)
### 에러 핸들링
```python
def process_document_safely(doc_id):
max_retries = 3
for attempt in range(max_retries):
try:
text = extract_text(doc_id)
chunks = create_chunks(text)
embeddings = generate_embeddings(chunks)
save_to_db(doc_id, chunks, embeddings)
return True
except Exception as e:
log_error(doc_id, e)
if attempt == max_retries - 1:
# 실패 목록에 추가
add_to_failed_queue(doc_id)
time.sleep(2 ** attempt) # exponential backoff
return False
```
## 운영 시 고려사항
1. **비용 관리**: 임베딩 API 비용 모니터링 (26만 건 × 평균 5-10청크 = 130-260만 API 호출)
2. **로그 관리**: 처리 로그를 별도 디렉토리에 날짜별로 보관
3. **재처리 로직**: 실패한 문서 자동 재시도 큐 구현
4. **저장소 용량**: 텍스트 + 임베딩 벡터 용량 사전 계산 (3072차원 float32 = 12KB/청크)
## 결론
대용량 문서 처리 파이프라인의 핵심은 **안정성**과 **가시성**입니다. 처리가 중단되어도 이어서 실행할 수 있도록 체크포인트를 설정하고, 실시간 모니터링으로 문제를 조기에 발견해야 합니다.
이번 프로젝트를 통해 배운 가장 중요한 교훈은 "작게 시작해서 검증 후 스케일업"하는 것입니다. 100건으로 파이프라인을 검증한 뒤 전체 데이터셋에 적용하면 예상치 못한 문제를 사전에 방지할 수 있습니다.
다음 단계로는 벡터 검색 성능 최적화와 하이브리드 검색(키워드 + 벡터) 구현을 계획하고 있습니다.