[LALALA] 실시간 채팅 서버 구현기

Java / SpringBoot / MongoDB / Kafka

1. 프로젝트 개요

목표는 MongoDB와 Spring Boot를 기반으로 한 고성능 실시간 채팅 서버를 구현하는 것이다. 주요 기능으로는 채팅방 생성 및 관리, 실시간 메시지 송수신, 사용자 참여/퇴장 관리, 그리고 음악 재생 (스트리밍 서버가 있지만 같이 음악을 듣기 위해 곡 단위 동기화) 기능이다.

2. 기술 스택 선정 이유

이번엔 이유를 가지고 스택을 선정하려고 했다. 크게 와닿지 않아도 사용하는 기술의 특장점들을 알아보기 위해서라도 요구사항에 걸맞는 이유를 붙여봤다.

2.1 MongoDB 선택

MongoDB를 선택한 이유는 다음과 같다:

  1. 유연한 스키마: 채팅 애플리케이션의 요구사항 변경이 빈번할 것으로 예상되어 스키마 변경이 용이한 MongoDB가 적합했다.
  2. 높은 쓰기 성능: 실시간 채팅의 특성상 빈번한 쓰기 작업이 발생하므로, MongoDB의 높은 쓰기 성능이 이에 적합했다.
  3. 확장성: 사용자 증가에 대비한 수평적 확장이 용이하다.

2.2 Spring Boot 활용

Spring Boot를 선택한 이유는 다음과 같다:

  1. 빠른 개발: 자동 설정과 내장 서버로 개발 속도를 높일 수 있다.
  2. 강력한 생태계: 다양한 라이브러리와의 통합이 쉽다.
  3. 의존성 관리: 필요한 라이브러리를 쉽게 추가하고 관리할 수 있다.

Kafka는 이번 채팅서버의 핵심으로 아래 따로 작성했다!

3. 주요 컴포넌트 구현

3.1 RoomController

RoomController는 채팅방 관련 API 제공한다. 주요 메서드는 다음과 같다:

@PostMapping("/create")
public ResponseEntity<BaseResponse<RoomResponseDto>> createRoom(@Valid @RequestBody RoomCreateRequestDto request) {
    RoomResponseDto response = roomService.create(request);
    producers.sendRoomMessage(createIndexingRequestMessage(response));
    return ResponseEntity.ok(BaseResponse.from(HttpStatus.CREATED.value(), "채팅방 생성 성공", response));
}

이 메서드는 새로운 채팅방을 생성한다. @Valid 어노테이션으로 요청 DTO의 유효성을 검증하고, 생성된 채팅방 정보를 Kafka를 통해 비동기적으로 즉각 처리한다.

@GetMapping("/joined")
public ResponseEntity<BaseResponse<List<JoinedRoomResponseDto>>> getJoinedRooms(@RequestParam Long uid) {
    List<JoinedRoomResponseDto> joinedRooms = roomService.findJoinedRoomsByUid(uid);
    return ResponseEntity.ok(BaseResponse.from(HttpStatus.OK.value(), "참여중인 채팅방 리스트 조회 성공", joinedRooms));
}

이 메서드는 특정 사용자가 참여 중인 채팅방 목록을 조회한다. URL 파라미터로 사용자 ID를 받아 처리한다.

3.2 RoomService

RoomService는 채팅방 관련 비즈니스 로직을 처리한다. 주요 메서드는 다음과 같다:

public RoomResponseDto create(RoomCreateRequestDto dto) {
    User owner = createOwner(dto);
    Room room = createRoom(dto, owner);
    Room savedRoom = roomRepository.save(room);
    return converter.convertRoom(savedRoom);
}

이 메서드는 새로운 채팅방을 생성하는 핵심 로직이다. 채팅방 소유자 객체를 생성하고, 채팅방 객체를 생성한 후 MongoDB에 저장한다.

public List<JoinedRoomResponseDto> findJoinedRoomsByUid(Long uid) {
    return roomRepository.findJoinedRoomsByUid(uid).stream()
            .map(this::createJoinedRoomResponseDto)
            .collect(Collectors.toList());
}

이 메서드는 사용자가 참여 중인 채팅방 목록을 조회한다. MongoDB 레포지토리의 커스텀 쿼리 메서드를 호출하고, 스트림 API를 사용하여 각 Room 객체를 JoinedRoomResponseDto로 변환한다. 스트림을 잘 몰랐는데 이번 기회로 아주 톡톡히 써먹고 있다. 연쇄적으로 데이터 처리할 때 너무 유용한 것 같다.

3.3 ChatMessageService

ChatMessageService는 채팅 메시지 처리 로직을 담당한다. 주요 메서드는 다음과 같다:

public ChatMessageDto saveChatMessage(ChatMessageDto dto) {
    ChatMessage message = ChatMessage.from(dto);
    ChatMessage savedMessage = chatMessageRepository.save(message);
    return converter.convertMessage(savedMessage);
}

이 메서드는 새로운 채팅 메시지를 저장한다. DTO를 엔티티로 변환하고, MongoDB에 저장한 후 다시 DTO로 변환하여 반환한다.

public Long getCurrentMusicId(String roomId) {
    Room room = getRoomOrThrow(roomId);
    Duration elapsedTime = Duration.between(room.getCreatedAt(), LocalDateTime.now());
    Duration totalPlaylistTime = room.getPlaylistDuration();

    long currentPlaylistTimeInSeconds = elapsedTime.abs().getSeconds() % totalPlaylistTime.getSeconds();

    return room.getPlaylist().getMusics().stream()
            .reduce(new MusicAccumulator(Duration.ZERO, null),
                    (acc, music) -> {
                        acc.playlistTime = acc.playlistTime.plus(music.getPlayTimeDuration());
                        if (acc.playlistTime.getSeconds() >= currentPlaylistTimeInSeconds && acc.musicId == null) {
                            acc.musicId = music.getId();
                        }
                        return acc;
                    },
                    (acc1, acc2) -> acc1)
            .musicId;
}

이 메서드는 현재 재생 중인 음악의 ID를 계산한다. 채팅방 생성 시간부터 현재까지의 경과 시간을 계산하고, 전체 재생 목록 시간으로 모듈러 연산을 수행하여 현재 재생 시점을 구한다. 즉, 스트림 API와 reduce 연산을 사용하여 현재 재생 중인 음악을 찾는다.

4. 데이터베이스 설계 및 쿼리 최적화

4.1 MongoDB 스키마 설계

채팅 애플리케이션의 특성을 고려한 MongoDB 스키마를 다음과 같이 설계했다:

@Document(collection = "rooms")
public class Room {
    @Id
    private String id;
    private String roomName;
    private List<User> users;
    private Playlist playlist;
    private User playlistOwner;
    private Duration playlistDuration;
    private String thumbnailImage;
    private LocalDateTime createdAt;
}

@Document 어노테이션으로 MongoDB 컬렉션을 지정하고, @Id로 문서의 고유 식별자를 지정했다. 중첩 문서 구조(users, playlist)를 활용하여 관련 데이터를 저장했다.

4.2 인덱싱 및 쿼리 최적화

효율적인 쿼리 수행을 위해 다음과 같은 인덱싱 전략을 사용했다:

@Indexed(name = "room_users_idx")
private List<User> users;

@CompoundIndex(name = "room_created_at_idx", def = "{'createdAt': -1, 'roomName': 1}")

사용자별 참여 채팅방 조회 성능 향상과 채팅방 생성 시간 기반의 정렬 및 검색 성능 개선을 위해 복합 인덱스를 생성했다.

쿼리 최적화를 위해 다음과 같은 커스텀 쿼리를 사용했다:

@Query(value = "{'users.uid': ?0}", fields = "{'id': 1, 'roomName': 1, 'thumbnailImage': 1, 'createdAt': 1}")
List<Room> findJoinedRoomsByUid(Long uid, Sort sort, Pageable pageable);

@Query 어노테이션으로 커스텀 MongoDB 쿼리를 정의하고, 필요한 필드만 선택적으로 조회하여 네트워크 부하를 줄였다.

5. 실시간 통신 구현

5.1 WebSocket 설정

WebSocket을 사용하여 실시간 양방향 통신을 구현했다:

@MessageMapping("/send")
public void sendSocketMessage(@Valid @RequestBody ChatMessageDto chatMessageDto) {
    validateRoomExists(chatMessageDto.getRoomId());
    ChatMessageDto savedMessage = chatMessageService.saveChatMessage(chatMessageDto);
    producers.sendMessage(savedMessage);
}

@MessageMapping 어노테이션을 사용하여 WebSocket 엔드포인트를 정의하고, 메시지 저장 후 Kafka 프로듀서를 통해 메시지를 브로드캐스트한다.

5.2 Kafka 통합

Kafka를 사용하여 이벤트 기반 아키텍처를 구현했다:

private final Producers producers;

// 채팅방 생성 시
producers.sendRoomMessage(createIndexingRequestMessage(response));

// 메시지 전송 시
producers.sendMessage(savedMessage);

Kafka 프로듀서를 사용하여 채팅방 생성, 메시지 전송 등의 이벤트를 비동기적으로 처리한다.

5.3 카프카 vs. 다른 메시지 큐 시스템

카프카를 선정한 이유이다.

1. 카프카의 장점

  • 높은 처리량: 카프카는 대량의 데이터 스트림을 처리할 수 있으며, 높은 TPS(Transactions Per Second)를 지원한다. 대규모 이벤트 데이터 처리에 적합하다.
  • 내구성 및 복제: 카프카는 데이터의 내구성을 보장하며, 메시지의 복제를 통해 데이터 손실을 방지한다. 이를 통해 높은 신뢰성을 제공할 수 있다.
  • 비동기 처리: 카프카는 비동기적으로 메시지를 처리하므로 시스템의 응답성을 개선하고, 높은 처리량을 유지할 수 있다.
  • 확장성: 클러스터링을 통해 수평적으로 확장할 수 있어, 증가하는 데이터 처리 요구에 유연하게 대응할 수 있다.
  • 내장된 스트리밍 처리: 카프카 Streams와 KSQL 같은 내장 스트리밍 처리 기능을 통해 실시간 데이터 처리 및 분석을 지원한다.

2. 다른 메시지 큐 시스템과의 비교

  • RabbitMQ: RabbitMQ는 복잡한 라우팅과 우선순위 기반 큐잉을 지원하지만, 카프카보다 처리량이 낮고, 데이터 내구성과 복제 측면에서 카프카에 비해 상대적으로 약할 수 있다. RabbitMQ는 일반적으로 트랜잭션 처리와 메시지 큐잉이 필요한 애플리케이션에 적합하다.

  • ActiveMQ: ActiveMQ는 다양한 메시징 패턴을 지원하고, JMS(Java Message Service) 표준을 준수한다. 하지만 카프카에 비해 성능과 확장성 측면에서 부족할 수 있다. ActiveMQ는 주로 중소규모 애플리케이션에서 된다고 한다.

  • Amazon SQS: Amazon SQS는 AWS 클라우드 서비스와 통합된 메시징 큐 서비스로, 카프카에 비해 기본적인 기능은 제공하지만, 데이터 내구성과 처리량, 실시간 스트리밍 처리 측면에서 카프카보다 제한적일 수 있다.

3. 결론

카프카를 사용한 이벤트 드리븐 아키텍처는 대량의 이벤트를 신뢰성 있게 처리하고, 높은 성능과 확장성을 제공하여, 복잡한 분산 시스템에서도 안정적인 이벤트 처리를 지원한다. 카프카의 높은 처리량, 내구성, 비동기 처리, 확장성, 스트리밍 처리 기능은 대규모 채팅 시스템과 같은 고성능, 고신뢰성의 요구사항을 충족시키는 데 적합하다고 생각해 선정했다.

6. Kafka를 활용한 메시지 브로커 시스템 구현

실시간 메시지 처리와 시스템 간 비동기 통신을 위해 Kafka를 도입했다. Kafka는 높은 처리량과 확장성을 제공하여 대규모 실시간 데이터 스트리밍에 적합하다.

6.1 Producers 클래스 구현

Producers 클래스는 Kafka로 메시지를 전송하는 역할을 담당한다. 주요 기능은 다음과 같다:

  1. 채팅 메시지 전송
  2. 채팅방 정보 전송 (인덱싱 요청)
@Component
@RequiredArgsConstructor
public class Producers {
    @Value("${kafka.topic.chat-name}")
    private String topicChatName;
    @Value("${kafka.topic.room-name}")
    private String topicRoomName;
    private final KafkaTemplate<String, ChatMessageDto> chatKafkaTemplate;
    private final KafkaTemplate<String, IndexingRequestMessageDto> roomKafkaTemplate;
    private final ChatMessageService chatMessageService;
    private final RoomService roomService;

    public void sendMessage(ChatMessageDto chatMessageDto) {
        if (chatMessageDto.getMessageType() == MessageType.CREATION) {
            // 채팅방 생성 메시지 처리
            RoomResponseDto roomResponseDto = roomService.getRoomInfo(chatMessageDto.getRoomId());
            sendRoomMessage(createIndexingRequestMessage(roomResponseDto));
        } else {
            // 일반 채팅 메시지 처리
            sendChatMessage(chatMessageDto);
        }
    }

    // ... 기타 메서드
}

6.2 비동기 메시지 전송 및 오류 처리

Kafka로의 메시지 전송은 비동기적으로 이루어지며, CompletableFuture를 사용하여 전송 결과를 처리한다:

public void sendRoomMessage(IndexingRequestMessageDto roomMessageDto) {
    CompletableFuture<SendResult<String, IndexingRequestMessageDto>> completableFuture = 
        roomKafkaTemplate.send(topicRoomName, roomMessageDto);
    completableFuture.whenComplete((result, ex) -> {
        if (ex == null) {
            log.info("메시지 전송 성공=[" + roomMessageDto.getRoomId() + "] with offset=[" + 
                      result.getRecordMetadata().offset() + "]");
        } else {
            log.info("메시지 전송 불가=[" + roomMessageDto.getRoomId() + "] 원인 : " + ex.getMessage());
        }
    });
}

고려한 점들:

  1. 비동기 처리: 메시지 전송이 애플리케이션의 주 스레드를 차단하지 않는다.
  2. 오류 처리: 전송 실패 시 로그를 남기고 적절한 후속 조치를 취할 수 있다.
  3. 확장성: 대량의 메시지를 효율적으로 처리할 수 있다.

6.3 메시지 타입에 따른 처리

채팅 메시지의 타입에 따라 다른 처리 로직을 적용했다:

public void sendMessage(ChatMessageDto chatMessageDto) {
    if (chatMessageDto.getMessageType() == MessageType.CREATION) {
        RoomResponseDto roomResponseDto = roomService.getRoomInfo(chatMessageDto.getRoomId());
        List<Long> receivers = roomResponseDto.getUsers().stream()
            .map(UserListResponseDto::getUid)
            .collect(Collectors.toList());
        receivers.remove(chatMessageDto.getSenderId());
        sendRoomMessage(createIndexingRequestMessage(roomResponseDto));
    } else {
        // 일반 채팅 메시지 전송 로직
    }
}

고려한 점들:

  1. 유연성: 메시지 타입에 따라 다른 처리 로직을 쉽게 적용할 수 있다.
  2. 확장성: 새로운 메시지 타입을 추가하기 쉽다.
  3. 관심사의 분리: 채팅방 생성과 일반 메시지 전송 로직을 분리하여 코드의 가독성과 유지보수성을 높였다.

6.4 예외 처리 및 메시지 복구

메시지 전송 실패 시 적절한 예외 처리와 복구 메커니즘을 구현했다:

completableFuture.whenComplete((result, ex) -> {
    if (ex != null) {
        log.error("메시지 전송 불가=[" + chatMessageDto.getContent() + "] 원인 : " + ex.getMessage());
        chatMessageService.deleteChat(chatMessageDto.getId());
        log.info("삭제된 메시지={}", chatMessageDto.getId());
    }
});

고려한 점들:

  1. 데이터 정합성: 전송 실패한 메시지를 데이터베이스에서 삭제하여 일관성을 유지한다.
  2. 오류 추적: 실패한 메시지에 대한 로그를 남겨 문제 해결에 도움을 준다.
  3. 시스템 안정성: 실패한 메시지로 인한 시스템 부하를 방지한다.

7. 성능 최적화

사실 최적화까지는 많이 멀었다. 구현 당시 최적화를 고민했던 지점들에 대해 정리했다.

7.1 페이지네이션

대량의 채팅 메시지를 효율적으로 조회하기 위해 페이지네이션을 구현했다:

public Page<ChatMessageResponseDto> chatMessagePagination(String roomId, int page) {
    return chatMessageRepository.findByRoomIdWithPagingAndFiltering(roomId, page, PAGE_SIZE)
            .map(converter::convertToResponseMessage);
}

이를 통해 클라이언트의 메모리 사용량을 줄이고, 초기 로딩 시간을 단축시켰으며, MongoDB의 skip과 limit 연산을 활용하여 서버 부하도 줄였다.

7.2 캐싱 전략

자주 조회되는 데이터에 대해 캐싱을 적용했다:

@Cacheable(value = "roomInfo", key = "#roomId")
public RoomResponseDto getRoomInfo(String roomId) {
    return roomRepository.findById(roomId)
            .map(converter::convertRoom)
            .orElse(null);
}

@Cacheable 어노테이션을 사용하여 자주 조회되는 채팅방 정보를 캐싱함으로써 데이터베이스 부하를 줄이고 응답 속도를 향상시켰다.


© 2022. All rights reserved.

Powered by Hydejack v9.1.6