본문 바로가기

Backend/Spring

[Spring] 알림 기능에 SSE 적용하기

* 주기적으로 Client에서 데이터를 가져와야 하는 경우 사용할 수 있는 방법

  • Polling 
    • 일정 주기를 가지고 서버의 API를 호출하는 방법
    • 실시간으로 데이터가 업데이트 되지 않는다는 단점 존재
    • 불필요한 요청이 발생하기 때문에 서버 부하 발생
    • 호환성이 좋다는 장점을 가짐

 

  • Long-Polling
    • 서버로 요청이 들어올 경우, 일정 시간 동안 대기 했다가 요청한 데이터가 업데이트 된 경우 서버에서 웹 브라우저로 응답을 보냄
    • 연결이 된 경우 실시간으로 데이터가 들어올 수 있다는 장점이 존재
    • Polling보다 개선된 형태이지만, 데이터의 업데이트가 빈번한 경우 Polling과 유사하다는 단점이 존재

 

  • SSE (Server-Sent Event)
    • 서버에서 웹 브라우저로 데이터를 보냄.
    • 웹 브라우저에서 서버 쪽으로 특정 이벤트를 구독함을 알려줌
    • 서버에서는 해당 이벤트가 발생하면, 웹 브라우저로 이벤트를 보냄
    • 다만 서버에서 웹 브라우저로의 데이터만 전송이 가능하고 그 반대는 불가능하다는 단점이 존재
    • 최대 동시 접속 횟수의 제한

 

  • WebSocket 
    • 서버에서 웹 브라우저 사이 양방향 통신이 가능한 방법

 

프로젝트 적용

환경: Spring  Boot 2.7.5

* 알람 기능에 SSE를 적용하여 클라이언트로 알림이 왔음을 알려주도록 코드를 작성한다.

 

다음은 SSE를 사용하기 위한 React Client 코드이다.

useEffect(() => {
    eventSource = new EventSource("http://localhost:8080/api/v1/alarms/subscribe?token=" + localStorage.getItem('token'));

    eventSource.addEventListener("open", function (event) {
      console.log("connection opened");
    });

    eventSource.addEventListener("alarm", function (event) {
       console.log(event.data);
    });

    eventSource.addEventListener("error", function (event) {
      console.log(event.target.readyState);
      if (event.target.readyState === EventSource.CLOSED) {
        console.log("eventsource closed");
      }
      eventSource.close();
    });

}, []);
  • EventSource를 사용하여 Back Server에 구독하고, 이벤트가 발생했을 시 처리, 에러가 발생했을 경우 처리를 각각 구현한다.
  • EventSource는 헤더를 지원하지 않기 때문에, 쿼리 파라미터로 token을 세팅해서 요청을 보낸다.

 

다음은 SSE를 사용하기 위한 Back Server 코드이다.

@RestController
@RequiredArgsConstructor
public class AlarmController {

    private final AlarmService alarmService;

    @GetMapping("/subscribe")
    public SseEmitter subscribe(Authentication authentication) {
        return alarmService.connectAlarm(authentication.getName());
    }
}

@Slf4j
@Service
@RequiredArgsConstructor
public class AlarmService {

    private final EmitterRepository emitterRepository;

    private final Long DEFAULT_TIMEOUT = 60L * 1000 * 60;
    private final String ALARM_NAME = "alarm";

    public void send(final long alarmId, final String username, final String msg) {
        emitterRepository.get(username).ifPresentOrElse(sseEmitter -> {
            try {
                sseEmitter.send(SseEmitter.event().id(createAlarmId(username, alarmId)).name(ALARM_NAME).data(msg));
            } catch (IOException e) {
                emitterRepository.delete(username);
                throw new SnsApplicationException(ResponseCode.INTERNAL_SERVER_ERROR);
            }
        }, () -> log.info("[SseEmitter] {} SseEmitter Not Founded", username));
    }

    public SseEmitter connectAlarm(String username) {
        Member findMember = memberCacheRepository.getMember(username);

        SseEmitter sseEmitter = new SseEmitter(DEFAULT_TIMEOUT);
        emitterRepository.save(username, sseEmitter);

        // 종료 되었을 때 처리
        sseEmitter.onCompletion(() -> {
            emitterRepository.delete(username);
        });

        // timeOut 시 처리
        sseEmitter.onTimeout(() -> {
            emitterRepository.delete(username);
        });

        try {
            sseEmitter.send(SseEmitter.event().id(createAlarmId(username, null)).name(ALARM_NAME).data("connect completed!!"));
        } catch (IOException e) {
            throw new SnsApplicationException(ResponseCode.ALARM_CONNECT_ERROR);
        }

        return sseEmitter;
    }

    private String createAlarmId(String username, Long alarmId) {
        if (alarmId == null) {
            return username + "_" + System.currentTimeMillis();
        }

        return username + "_" + alarmId;
    }
}

@Slf4j
@Repository
public class EmitterRepository {

    private Map<String, SseEmitter> emitterMap = new ConcurrentHashMap<>();

    public SseEmitter save(String username, SseEmitter sseEmitter) {
        if (emitterMap.containsKey(username)) {
            emitterMap.remove(username);
        }
        emitterMap.put(username, sseEmitter);

        log.info("[SseEmitter] Set {}", username);
        return sseEmitter;
    }

    public Optional<SseEmitter> get(String username) {
        SseEmitter sseEmitter = emitterMap.get(username);

        log.info("[SseEmitter] Get {}", username);
        return Optional.ofNullable(sseEmitter);
    }

    public void delete(String username) {
        emitterMap.remove(username);
    }
}
  • Java에서 SSE를 사용하기 위해서는 SseEmitter를 사용한다.
  • SseEmitter는 비동기 프로세스를 제공한다.
  • 여러 스레드에서 동시에 사용할 수 있기 때문에 SseEmitter 저장 Map은 동시성을 보장하는 ConcurrentHashMap을 사용한다.
  • SseEmitter에 Timeout을 설정할 수 있고 Timeout 시 처리할 메서드를 등록할 수 있다.
  • SseEmitter.send()를 통해 구독 중인 브라우저에 알림을 보낼 수 있다.

 

/api/v1/alarms/subscribe?token={token}을 호출하면 다음과 같은 결과를 확인할 수 있다.

=> 연결이 성공된 것을 확인할 수 있고, 응답을 반환하여 요청 후 연결이 끊어진 것이 아닌 지속적으로 연결 중인 상태임을 확인할 수 있다.

'Backend > Spring' 카테고리의 다른 글

[Spring] checkstyle 적용하기  (0) 2023.05.03
DDD(Domain Driven Design)란?  (1) 2023.04.03
[Spring] Rest-Docs 연결  (0) 2023.01.13
[Spring] Logback 설정  (0) 2022.11.16
[Slf4j] Slf4j란?  (0) 2022.09.19