[LALALA] 검색 서버 구현기

Python / FastAPI / ElasticSearch / Kafka

개요

주 사용 기술이 Java와 SpringBoot인 상황에서도 새로운 도전을 해보고 싶어, 처음 개발 공부를 시작했던 언어인 Python을 사용하여 검색 서버를 구축하기로 결정했다. 이 포스트에서는 FastAPI를 이용해 고성능 비동기 API를 구현하고, ElasticSearch와 Kafka를 활용한 검색 및 인덱싱 시스템을 구축하는 과정을 기록한다.

구현 초기에 직접 작성했던 요구사항 리스트는 다음과 같다:

  • FastAPI를 사용하여 고성능 비동기 API를 구현한다.
  • Pydantic을 이용해 데이터 모델을 정의하고 유효성을 검사한다.
  • ElasticSearch 클라이언트를 사용하여 검색 기능을 구현한다.
  • Kafka 컨슈머를 통해 실시간 이벤트 처리를 구현한다.
  • JSON 모듈로 Kafka 메시지를 파싱한다.
  • threading 모듈을 사용하여 Kafka 컨슈머를 별도의 스레드에서 실행한다.

과정

FastAPI 애플리케이션 설정

app = FastAPI()
  • FastAPI 애플리케이션 인스턴스를 생성한다. 이는 전체 API의 진입점이 된다.

Elasticsearch 클라이언트 설정

es = Elasticsearch([{'host': 'elasticsearch', 'port': 9200, 'scheme': 'http'}])
  • Docker로 띄운 Elasticsearch 클라이언트를 초기화한다. 여러 개의 노드를 지원할 수 있도록 리스트 형태로 설정한다.

Kafka Consumer 설정

kafka_consumer = KafkaConsumer(
    'chatroom_created',
    'playlist_created',
    bootstrap_servers=['localhost:9092'],
    auto_offset_reset='earliest',
    enable_auto_commit=True,
    group_id='search_indexer',
    value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)
  • Kafka 컨슈머를 설정한다. 여러 토픽을 한 번에 구독하여 코드 중복을 줄인다.
  • auto_offset_reset='earliest'로 설정하여 모든 메시지를 처리한다.
  • enable_auto_commit=True로 설정하여 오프셋을 자동으로 커밋한다.
  • value_deserializer를 사용하여 JSON 메시지를 자동으로 파싱한다.

검색 및 인덱싱 API 구현

목표는 Kafka에서 발행된 검색 대상(document)을 Consumer를 통해 처리(indexing)하고, 검색 요청이 들어오면 ElasticSearch SearchAPI를 통해 저장된 데이터를 반환하는 것이다.

search_router = APIRouter(prefix="/v1/api/search", tags=["search API"])
index_router = APIRouter(tags=["index API"])
  • 각각 검색, 인덱싱 역할을 하는 라우터를 설정한다.
class IndexRequest(BaseModel):
    id: str
    name: str
    description: str
  • 인덱싱 요청에 사용하는 DTO를 정의한다. FastAPI에서는 Pydantic 모델을 사용하여 인덱스 요청의 구조를 정의하고 유효성을 검사한다.
def search_index(index: str, query: str, size: int, sort: str, start: int):
    body = {
        "query": {"match": {"name": query}},
        "size": size,
        "from": start,
        "sort": [{"created_at" if sort == "latest" else "popularity": {"order": "desc"}}]
    }
    return es.search(index=index, body=body)
  • sort 파라미터를 통해 정렬 기준을 동적으로 선택할 수 있다.
@search_router.get("/{index}")
async def search(
    index: str,
    query: str = Query(..., description="검색 쿼리"),
    size: int = Query(10, description="결과 수"),
    sort: str = Query("latest", description="정렬 순서 (latest 또는 popularity)"),
    start: int = Query(0, description="시작 위치")
):
    try:
        results = search_index(index, query, size, sort, start)
        return {"results": results["hits"]["hits"]}
    except Exception as e:
        raise HTTPException(status_code=500, detail=f"검색 실패: {str(e)}")
  • 하나의 엔드포인트로 여러 인덱스의 검색을 처리할 수 있다.
  • Query 파라미터를 사용하여 검색 옵션을 처리하고, HTTPException을 통해 예외를 관리한다.
def index_document(index: str, doc: IndexRequest):
    try:
        es.index(index=index, id=doc.id, body=doc.dict())
        return {"message": f"{index} 인덱싱 성공"}
    except Exception as e:
        raise HTTPException(status_code=500, detail=f"인덱싱 실패: {str(e)}")
  • 인덱싱 실패 시 적절한 예외 처리를 통해 신뢰성을 높인다.
@index_router.post("/index/{index}")
async def index(index: str, doc: IndexRequest):
    return index_document(index, doc)
  • 단일 엔드포인트로 다양한 문서 유형의 인덱싱을 처리한다.

Kafka 메시지 소비

def consume_kafka_messages():
    for message in kafka_consumer:
        topic = message.topic
        value = message.value
        
        if topic == 'chatroom_created':
            index_document('chatroom', IndexRequest(**value))
        elif topic == 'playlist_created':
            index_document('playlist', IndexRequest(**value))
  • Kafka 메시지 소비 로직을 별도의 함수로 분리하여 관심사를 분리한다.
  • 토픽에 따라 다른 인덱스에 문서를 저장할 수 있다.
threading.Thread(target=consume_kafka_messages, daemon=True).start()
  • Kafka 컨슈머를 별도의 데몬 스레드에서 실행하여 메인 애플리케이션의 성능에 영향을 주지 않도록 한다. 이로 인해 메인 스레드는 다른 요청을 처리하고, Kafka 메시지는 백그라운드에서 비동기적으로 처리되므로 전체 시스템의 효율성을 높일 수 있다.

비동기 처리와 async의 필요성

FastAPI는 비동기 처리를 지원하는 웹 프레임워크로, 비동기 함수는 async 키워드로 정의된다. 비동기 처리는 I/O 작업, 특히 네트워크 요청이나 데이터베이스 쿼리와 같은 작업에서 중요한 역할을 한다.

  • 비동기 처리의 필요성: I/O 작업은 시간이 소요될 수 있으며, 비동기 처리를 통해 이러한 작업이 완료되기를 기다리는 동안 다른 요청을 동시에 처리할 수 있다. 이를 통해 서버의 응답성과 처리량을 높일 수 있다.

  • 성능 개선: 비동기 함수는 await를 사용하여 비동기 작업의 완료를 기다리면서도 다른 작업을 처리할 수 있다. 이로 인해 동시에 여러 요청을 처리할 수 있어, 서버의 전체 처리량이 증가하고 자원 사용을 최적화할 수 있다.

  • 코드 구조: 비동기 함수는 async def로 정의되며, 비동기 작업을 자연스럽게 코드에 표현할 수 있도록 도와준다. await로 비동기 작업의 완료를 기다림으로써 비동기 프로그래밍을 더 직관적이고 관리하기 쉽게 만들어준다.

개발 환경 설정

app.include_router(search_router)
app.include_router(index_router)
  • 라우터를 애플리케이션에 포함시켜 API 구조를 모듈화한다.
if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)
  • 스크립트가 직접 실행될 때만 서버를 시작하도록 하여 모듈로 임포트될 때는 서버가 자동으로 시작되지 않도록 한다. 목적은 다음과 같다:
    • 의도된 실행: if __name__ == "__main__": 블록은 코드가 스크립트로 실행될 때만 해당 블록의 코드가 실행되도록 보장한다. 이를 통해 모듈이 import될 때는 서버 시작 코드가 실행되지 않으며, 스크립트로 직접 실행할 때만 서버가 시작되도록 보장하려고 했다.
    • 개발 환경과 배포 환경의 분리: uvicorn.run() 호출은 개발 환경에서 서버를 직접 실행하는 데 유용하지만, LALALA는 Docker-Compose를 통해 실행된다. 개발 환경에서는 로컬 개발을 위해 서버를 직접 실행할 필요가 있어 이 때만 uvicorn이 호출되도록 보장한다.

Python의 특장점이 async라고는 하는데, 솔직히 이해가 부족하고 Pythonic하게 작성했다는 생각이 들지 않는다. 이번엔 다른 언어, 프레임워크로 간단한 API를 구현했다는 것에 의의를 두고 다음에 제대로 공부해보고 싶다.


© 2022. All rights reserved.

Powered by Hydejack v9.2.1