본문 바로가기

Reactive-Programming

[Spring] Reactive Web (1)

Servlet 비동기 기술

  • @Async 어노테이션을 사용하여 비동기 로직을 실행한다.
  • @EnableAsync 어노테이션을 사용하여 활성화한다.
  • @Async : ThreadPoolTaskExecutor / ExcutorService가 존재하지 않으면 기본 설정에 따른다.

* ThreadPoolTaskExecutor 설정하기

| 코드

@Bean
ThreadPoolTaskExecutor tp() {
    ThreadPoolTaskExecutor te = new ThreadPoolTaskExecutor();

    // 처음 스레드 요청이 들어오면 스레드 10개 생성 -> 11번 째 요청 시 대기 큐에 저장 -> 대기 큐가 200개로 꽉 참 -> maxPoolSize까지 스레드 풀을 늘린다.
    te.setCorePoolSize(10); // 스레드 10개 생성 -> 바로 만드는 것이 아닌, 첫 스레드 요청이오면 생성한다. 기본은 8
    te.setMaxPoolSize(100); // 대기 큐가 꽉 차면 그때 100개 까지 생성한다.
    te.setQueueCapacity(200); // 대기 큐 사이즈 : 200개 요청 대기 가능

    te.setKeepAliveSeconds(30); // 반환 받은 스레드가 30초 동안 사용되지 않으면 제거
//        te.setTaskDecorator(); // 스레드를 만들거나 반환하기 시작 전, 시작 후로 작업을 적용할 수 있다.
    te.setThreadNamePrefix("test"); // 스레드 이름 설정
    te.initialize(); // 초기화 반환 전 받드시 실행
    return te;
}
  • @Async(value = "빈 이름")으로 사용한다.

 

* @Async 간단한 구현 예제

| 코드

@Slf4j
@Component
@EnableAsync
public class SpringAsyncEx {

    @Async
    @Component
    public static class MyService {
        public String hello() throws InterruptedException {
            log.info("hello()");
            Thread.sleep(1000);
            return "Hello";
        }
    }

    @Autowired MyService myService;

    @Bean
    ApplicationRunner run() {
        return args -> {
            log.info("run()");
            String res = myService.hello();
            log.info("res = {}", res);
            log.info("exit");
        };
    }
}

실행결과

  • res 결과 값을 받아올 수 없어 null로 출력된다.
비동기 실행 결과를 가져오는 방법에는 Future, Callback 두 가지 방법이 존재한다.


| 코드

@Slf4j
@Component
@EnableAsync
public class SpringAsyncEx {

    @Async
    @Component
    public static class MyService {
        public Future<String> hello() throws InterruptedException {
            log.info("hello()");
            Thread.sleep(1000);
            return new AsyncResult<>("Hello");
        }
    }

    @Autowired MyService myService;

    @Bean
    ApplicationRunner run() {
        return args -> {
            log.info("run()");
            Future<String> res = myService.hello();
            log.info("res = {}", res.get()); // 결과 값을 받기 전까지 Block
            log.info("exit");
        };
    }
}

실행 결과

* 비동기를 사용하는 이유는 긴 시간이 걸리는 작업을 별도의 스레드에서 처리하게 하고 main 스레드는 그 동안 Blocking 되지 않고 다른 작업을 처리하기 위해서이다.

  • get을 사용하게 되면 main 스레드도 해당 작업이 끝날 때까지 Blocking이 되어 효율적이지 않다.
  • Callback을 사용해서 Blocking 되지 않고 작업이 종료되었을 때 처리 하도록 한다. -> 효율성 증가


| 코드

@Slf4j
@Component
@EnableAsync
public class SpringAsyncEx {
    @Async
    @Component
    public static class MyService {
        public ListenableFuture<String> hello() throws InterruptedException {
            log.info("hello()");
            Thread.sleep(1000);
            return new AsyncResult<>("Hello");
        }
    }

    @Autowired MyService myService;

    @Bean
    ApplicationRunner run() {
        return args -> {
            log.info("run()");
            ListenableFuture<String> res = myService.hello();
            res.addCallback(data -> log.info("res = {}", data), err -> log.error("err =", err));
            log.info("exit");
        };
    }
}

실행 결과

  • ListenableFuture를 사용하여 Callback을 등록 후 다른 작업 처리

* WebApplication 적용

  • 서블릿 스레드는 기본적으로 blocking이다.
    • HttpServletRequest, HttpServletResponse는 내부적으로 InputStream, OutputStream사용한다.
    • InputStream, OutputStream 기본적으로 blocking으로 동작한다. -> 값이 없으면 대기하고 있는다
  • Blocking 방식은 CPU와 메모리 자원을 많이 사용한다. -> Context Switching 발생으로 인한 낭비
  • Req(Servlet Thread) -> Blocking I/O (DB, API) (Work Thread) -> Res(Servlet Thread) : 스레드는 blocking 상태 -> 낭비
  • Tomcat은 기본적으로 200개 스레드 사용 -> 별도의 설정이 없을 시
  • 스레드를 무한정 많이 만들 경우
    • 스레드 하나 당 가지고 있는 별도의 데이터가 존재한다. -> OutOfMemory
    • 무수히 많은 Context Switching 시간으로 인한 CPU 부하
  • DB, API 작업 시 Servlet Thread는 Pool에 반납하고, Work Thread만 사용하자! -> 비동기 서블릿 기술

비동기 서블릿
  • HTTP connection은 이미 non-blocking I/O
  • 서블릿 요청 읽기, 응답, 쓰기는 blocking -> Callback 방식 처리 가능
  • 비동기 작업 시작 즉시 서블릿 쓰레드 반납
  • 비동기 작업이 완료되면 서블릿 스레드 재할당
  • 비동기 서블릿 컨텍스트 이용(Async Context)
  • non-blocking 서블릿 요청, 응답 처리

* 작업 스레드에서 비동기 작업 후 서블릿 응답 처리

  • 클라이언트 요청이 갔으면 응답이 와야한다.
  • 응답을 처리하는 스레드 -> 서블릿 스레드 -> 아주 빠르게 응답을 처리하고 다시 서블릿 스레드 풀로 반납
  • 서블릿 스레드는 적은 양을 가지고 있다.

| 코드

@Slf4j
@RestController
public class SpringAsyncController {

    @GetMapping("/callable")
    public Callable<String> callable() {
        log.info("callable");
        return () -> {
            log.info("async");
            Thread.sleep(2000);
            return "hello";
        };
    }

}
  • Callable을 반환하여 실행 로직과 응답 스레드를 분리
  • MVC 결과처럼 응답
  • 실행 결과 -> 2초 후 hello 반환

* 동기, 비동기 처리 실행 속도 실험

  • application.yml -> spring.task.execution.pool.core-size: 100
    • spring-task-excution-pool : Work Thread
    • 현재는 스레드 8개 생성 : 이를 100개로 변경
  • application.yml -> server.tomcat.threads.max = 20
    • Servelt Thread 20개 생성


| 코드 1 -> blocking

@Slf4j
@RestController
public class SpringAsyncController {

    static AtomicInteger counter = new AtomicInteger(0);

    @GetMapping("/basic")
    public String async() throws InterruptedException {
        Thread.sleep(2000);
        return "basic";
    }

    public static void main(String[] args) throws InterruptedException {
        ExecutorService es = Executors.newFixedThreadPool(100);

        RestTemplate restTemplate= new RestTemplate();
        String url = "http://localhost:8080/basic";

        StopWatch main = new StopWatch();
        main.start();

        for (int i = 0; i < 100; i++) {
            es.execute(() -> {
                int idx = counter.addAndGet(1);
                log.info("Thread {}", idx);

                StopWatch sw = new StopWatch();
                sw.start();

                restTemplate.getForObject(url, String.class);

                sw.stop();
                log.info("Elapsed {} {}", idx, sw.getTotalTimeSeconds());

            });
        }

        es.shutdown();
        es.awaitTermination(100, TimeUnit.SECONDS);

        main.stop();
        log.info("Total time = {}", main.getTotalTimeSeconds());
    }

}

실행 결과

=> Servlet Thread 20개로 총 100개의 요청 처리 = 2 * (100 / 20) = 10초 소요

 

| 코드 2 -> non-blocking

@Slf4j
@RestController
public class SpringAsyncController {

    static AtomicInteger counter = new AtomicInteger(0);

    @GetMapping("/callable")
    public Callable<String> callable() throws InterruptedException {
        log.info("callable");
        return () -> {
            log.info("async");
            Thread.sleep(2000);
            return "hello";
        };
    }

    public static void main(String[] args) throws InterruptedException {
        ExecutorService es = Executors.newFixedThreadPool(100);

        RestTemplate restTemplate= new RestTemplate();
        String url = "http://localhost:8080/callable";

        StopWatch main = new StopWatch();
        main.start();

        for (int i = 0; i < 100; i++) {
            es.execute(() -> {
                int idx = counter.addAndGet(1);
                log.info("Thread {}", idx);

                StopWatch sw = new StopWatch();
                sw.start();

                restTemplate.getForObject(url, String.class);

                sw.stop();
                log.info("Elapsed {} {}", idx, sw.getTotalTimeSeconds());

            });
        }

        es.shutdown();
        es.awaitTermination(100, TimeUnit.SECONDS);

        main.stop();
        log.info("Total time = {}", main.getTotalTimeSeconds());
    }

}

실행 결과

=> Servlet Tread 1개, Work Thread 100개, non-blocking 시 2초 소요

 

위 방법은 클라이언트 요청 당 작업을 할 Work Thread를 한 개씩 사용하기 때문에 200개의 요청이 동시에 들어오면 2배의 시간 걸린다.

* Work Thread를 늘리지 않고 동시에 많은 작업을 처리할 수 있는 방법

  • 10개의 요청이 왔을 때, 각각의 요청 결과를 지연된 결과 큐에 담아 놨다가 나중에 응답 한다.
  • 복잡한 작업 없이 OS에 따라 수천, 수백만 개의 커넥션을 물고 있을 수 있다.

| 코드

@Slf4j
@RestController
public class SpringAsyncController {
	
    Queue<DeferredResult<String>> results = new ConcurrentLinkedDeque<>();
   
    @GetMapping("/dr")
    public DeferredResult<String> dr() throws InterruptedException {
        log.info("dr");
        DeferredResult<String> dr = new DeferredResult<>();
        results.add(dr);
        return dr;
    }

    @GetMapping("/dr/count")
    public String drCount() {
        return String.valueOf(results.size());
    }

    @GetMapping("/dr/event")
    public String drEvent(String msg) {
        for (DeferredResult<String> dr : results) {
            dr.setResult("DrResult " + msg);
            results.remove(dr);
        }

        return "OK";
    }
}
  1. /dr 요청 시 -> DeferredResult 생성 -> 클라이언트는 응답 대기
  2. /dr/count 요청 시 -> DeferredResult 개수 확인
  3. /dr/event 요청 시 -> 1번의 요청으로 인해 생성된 DeferredResult에 msg 값을 더해 Setting 후 제거
  4. /dr 요청한 클라이언트는 3번에서 Setting된 결과 값을 응답으로 받는다.

* Servlet Thread 1개로 다수의 응답을 처리할 수 있다. : 요청 100개가 오면 Servlet Thread는 Queue에 저장해 놓는다. (이때, 클라이언트는 응답 대기 상태) -> 요청에 대한 작업이 완료되면 DeferredResult에 값을 세팅하고 Queue에서 제거한다. -> 클라이언트는 응답을 받는다.

 

* Emitter

데이터를 하나만 보내는 것이 아닌 여러 번에 나눠서 보내는 기능을 제공한다. -> 한 번 요청에 여러 번의 응답을 보낸다.

 

| 코드

@Slf4j
@RestController
public class SpringAsyncController {

	@GetMapping("/emitter")
    public ResponseBodyEmitter emitter(String msg) {
        ResponseBodyEmitter emitter = new ResponseBodyEmitter();

        Executors.newSingleThreadExecutor().submit(() -> {
            for (int i = 0; i < 51; i++) {
                try {
                    emitter.send("<p> Stream " + i + "<p/>");
                    Thread.sleep(500);
                } catch (IOException | InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        });

        return emitter;
    }
}

=> 클라이언트 요청을 받으면 0.5초 씩 응답을 50번 보낸다.

 

* REF

https://www.youtube.com/watch?v=aSTuQiPB4Ns&list=PLOLeoJ50I1kkqC4FuEztT3xKSfR2fpw&index=4

'Reactive-Programming' 카테고리의 다른 글

[Spring] Reactive Web (3)  (0) 2022.09.30
[Spring] Reactive Web (2)  (0) 2022.09.27
[Java] 비동기 기술  (0) 2022.09.24
[Reactive Streams] Schedulers  (0) 2022.09.21
[Reactive Streams] Operators  (0) 2022.09.14