[LALALA] 검색 서버 구현기
Python / FastAPI / ElasticSearch / Kafka
개요
주 사용 기술이 Java와 SpringBoot인 상황에서도 새로운 도전을 해보고 싶어, 처음 개발 공부를 시작했던 언어인 Python을 사용하여 검색 서버를 구축하기로 결정했다. 이 포스트에서는 FastAPI를 이용해 고성능 비동기 API를 구현하고, ElasticSearch와 Kafka를 활용한 검색 및 인덱싱 시스템을 구축하는 과정을 기록한다.
구현 초기에 직접 작성했던 요구사항 리스트는 다음과 같다:
- FastAPI를 사용하여 고성능 비동기 API를 구현한다.
- Pydantic을 이용해 데이터 모델을 정의하고 유효성을 검사한다.
- ElasticSearch 클라이언트를 사용하여 검색 기능을 구현한다.
- Kafka 컨슈머를 통해 실시간 이벤트 처리를 구현한다.
- JSON 모듈로 Kafka 메시지를 파싱한다.
threading
모듈을 사용하여 Kafka 컨슈머를 별도의 스레드에서 실행한다.
과정
FastAPI 애플리케이션 설정
app = FastAPI()
- FastAPI 애플리케이션 인스턴스를 생성한다. 이는 전체 API의 진입점이 된다.
Elasticsearch 클라이언트 설정
es = Elasticsearch([{'host': 'elasticsearch', 'port': 9200, 'scheme': 'http'}])
- Docker로 띄운 Elasticsearch 클라이언트를 초기화한다. 여러 개의 노드를 지원할 수 있도록 리스트 형태로 설정한다.
Kafka Consumer 설정
kafka_consumer = KafkaConsumer(
'chatroom_created',
'playlist_created',
bootstrap_servers=['localhost:9092'],
auto_offset_reset='earliest',
enable_auto_commit=True,
group_id='search_indexer',
value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)
- Kafka 컨슈머를 설정한다. 여러 토픽을 한 번에 구독하여 코드 중복을 줄인다.
auto_offset_reset='earliest'
로 설정하여 모든 메시지를 처리한다.enable_auto_commit=True
로 설정하여 오프셋을 자동으로 커밋한다.value_deserializer
를 사용하여 JSON 메시지를 자동으로 파싱한다.
검색 및 인덱싱 API 구현
목표는 Kafka에서 발행된 검색 대상(document)을 Consumer를 통해 처리(indexing)하고, 검색 요청이 들어오면 ElasticSearch SearchAPI를 통해 저장된 데이터를 반환하는 것이다.
search_router = APIRouter(prefix="/v1/api/search", tags=["search API"])
index_router = APIRouter(tags=["index API"])
- 각각 검색, 인덱싱 역할을 하는 라우터를 설정한다.
class IndexRequest(BaseModel):
id: str
name: str
description: str
- 인덱싱 요청에 사용하는 DTO를 정의한다. FastAPI에서는 Pydantic 모델을 사용하여 인덱스 요청의 구조를 정의하고 유효성을 검사한다.
def search_index(index: str, query: str, size: int, sort: str, start: int):
body = {
"query": {"match": {"name": query}},
"size": size,
"from": start,
"sort": [{"created_at" if sort == "latest" else "popularity": {"order": "desc"}}]
}
return es.search(index=index, body=body)
sort
파라미터를 통해 정렬 기준을 동적으로 선택할 수 있다.
@search_router.get("/{index}")
async def search(
index: str,
query: str = Query(..., description="검색 쿼리"),
size: int = Query(10, description="결과 수"),
sort: str = Query("latest", description="정렬 순서 (latest 또는 popularity)"),
start: int = Query(0, description="시작 위치")
):
try:
results = search_index(index, query, size, sort, start)
return {"results": results["hits"]["hits"]}
except Exception as e:
raise HTTPException(status_code=500, detail=f"검색 실패: {str(e)}")
- 하나의 엔드포인트로 여러 인덱스의 검색을 처리할 수 있다.
- Query 파라미터를 사용하여 검색 옵션을 처리하고, HTTPException을 통해 예외를 관리한다.
def index_document(index: str, doc: IndexRequest):
try:
es.index(index=index, id=doc.id, body=doc.dict())
return {"message": f"{index} 인덱싱 성공"}
except Exception as e:
raise HTTPException(status_code=500, detail=f"인덱싱 실패: {str(e)}")
- 인덱싱 실패 시 적절한 예외 처리를 통해 신뢰성을 높인다.
@index_router.post("/index/{index}")
async def index(index: str, doc: IndexRequest):
return index_document(index, doc)
- 단일 엔드포인트로 다양한 문서 유형의 인덱싱을 처리한다.
Kafka 메시지 소비
def consume_kafka_messages():
for message in kafka_consumer:
topic = message.topic
value = message.value
if topic == 'chatroom_created':
index_document('chatroom', IndexRequest(**value))
elif topic == 'playlist_created':
index_document('playlist', IndexRequest(**value))
- Kafka 메시지 소비 로직을 별도의 함수로 분리하여 관심사를 분리한다.
- 토픽에 따라 다른 인덱스에 문서를 저장할 수 있다.
threading.Thread(target=consume_kafka_messages, daemon=True).start()
- Kafka 컨슈머를 별도의 데몬 스레드에서 실행하여 메인 애플리케이션의 성능에 영향을 주지 않도록 한다. 이로 인해 메인 스레드는 다른 요청을 처리하고, Kafka 메시지는 백그라운드에서 비동기적으로 처리되므로 전체 시스템의 효율성을 높일 수 있다.
비동기 처리와 async
의 필요성
FastAPI는 비동기 처리를 지원하는 웹 프레임워크로, 비동기 함수는 async
키워드로 정의된다. 비동기 처리는 I/O 작업, 특히 네트워크 요청이나 데이터베이스 쿼리와 같은 작업에서 중요한 역할을 한다.
비동기 처리의 필요성: I/O 작업은 시간이 소요될 수 있으며, 비동기 처리를 통해 이러한 작업이 완료되기를 기다리는 동안 다른 요청을 동시에 처리할 수 있다. 이를 통해 서버의 응답성과 처리량을 높일 수 있다.
성능 개선: 비동기 함수는
await
를 사용하여 비동기 작업의 완료를 기다리면서도 다른 작업을 처리할 수 있다. 이로 인해 동시에 여러 요청을 처리할 수 있어, 서버의 전체 처리량이 증가하고 자원 사용을 최적화할 수 있다.코드 구조: 비동기 함수는
async def
로 정의되며, 비동기 작업을 자연스럽게 코드에 표현할 수 있도록 도와준다.await
로 비동기 작업의 완료를 기다림으로써 비동기 프로그래밍을 더 직관적이고 관리하기 쉽게 만들어준다.
개발 환경 설정
app.include_router(search_router)
app.include_router(index_router)
- 라우터를 애플리케이션에 포함시켜 API 구조를 모듈화한다.
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
- 스크립트가 직접 실행될 때만 서버를 시작하도록 하여 모듈로 임포트될 때는 서버가 자동으로 시작되지 않도록 한다. 목적은 다음과 같다:
- 의도된 실행:
if __name__ == "__main__":
블록은 코드가 스크립트로 실행될 때만 해당 블록의 코드가 실행되도록 보장한다. 이를 통해 모듈이 import될 때는 서버 시작 코드가 실행되지 않으며, 스크립트로 직접 실행할 때만 서버가 시작되도록 보장하려고 했다. - 개발 환경과 배포 환경의 분리:
uvicorn.run()
호출은 개발 환경에서 서버를 직접 실행하는 데 유용하지만, LALALA는 Docker-Compose를 통해 실행된다. 개발 환경에서는 로컬 개발을 위해 서버를 직접 실행할 필요가 있어 이 때만uvicorn
이 호출되도록 보장한다.
- 의도된 실행:
Python의 특장점이 async라고는 하는데, 솔직히 이해가 부족하고 Pythonic하게 작성했다는 생각이 들지 않는다. 이번엔 다른 언어, 프레임워크로 간단한 API를 구현했다는 것에 의의를 두고 다음에 제대로 공부해보고 싶다.