본문 바로가기

Reactive-Programming

[Spring] Reactive Web (2)

Spring 비동기 기술

  • Client 요청의 응답이 될 떄까지 커넥션을 물고 있는다.
  • 클라이언트로부터 요청을 받은 후 실제 작업은 Work Thread Poll에 위임하고 Servlet Thread는 Pool에 반환 한 후, 다음 요청이 들어올 경우 해당 Servlet Thread를 바로 사용할 수 있도록 한다.
  •  한계점
    • 하나의 요청에 대한 작업을 수행하면서, 외부의 서비스들을 호출하는 작업이 많이 있는 경우, Servlet Thread는 바로 사용이 가능하지만, Work Thread는 I/O 작업으로 인한 Block 상태이기 때문에 결국 대기한다.

* Thread Pool Hell

Pool안에 있는 Thread에 대한 사용 요청이 급격하게 증가해 추가적인 요청이 들어올 때, 사용 가능한 스레드가 없어 사용자 요청에 대한 응답이 느려지게 되는 상태를 의미한다.

최근 서비스들은 하나의 클라이언트 요청을 처리함에 있어 다른 서버로의 요청(Network I/O)이 많아 졌다. 이전의 비동기 서블릿을 사용하더라도 하나의 요청을 처리하는 동안 Work Thread는 Block 상태로 대기하기 때문에 Thread Pool의 가용성이 떨어지게 된다.

 

| 코드

* server1 (Servlet Thread 1개) -> server2 (Thread 1000개)

* 클라이언트 동시 100개 요청 -> server1은 server2로 API 호출 -> server2 작업이 2초 걸린다. -> server2의 결과를 받아 server1 응답

----------------Sever 1----------------
@Slf4j
@RestController
@RequestMapping("/v2")
public class SpringAsyncControllerV2 {

    private final RestTemplate restTemplate = new RestTemplate();

    static AtomicInteger counter = new AtomicInteger(0);

    @GetMapping("/rest")
    public String rest(int idx) {
        String res = restTemplate.getForObject(
                "http://localhost:9090/remote-service?req={req}", String.class, "hello " + idx
        );
        return res;
    }
}

----------------Sever 2----------------
@SpringBootApplication
public class RemoteService {

    @RestController
    public static class RemoteController {
        @GetMapping("/remote-service")
        public String rest(String req) throws InterruptedException {
            Thread.sleep(2000);
            return req + "/remote-service";
        }
    }

    public static void main(String[] args) {
        System.setProperty("server.port", "9090");
        System.setProperty("server.tomcat.threads.max", "1000");
        SpringApplication.run(RemoteService.class, args);
    }
}
@Slf4j
public class LoadTest {

    static AtomicInteger counter = new AtomicInteger(0);

    public static void main(String[] args) throws InterruptedException, BrokenBarrierException {
        ExecutorService es = Executors.newFixedThreadPool(100);

        RestTemplate restTemplate= new RestTemplate();
        String url = "http://localhost:8080/v2/rest?idx={idx}";

        CyclicBarrier barrier = new CyclicBarrier(101); // 스레드 동기화 기능 제공

        StopWatch main = new StopWatch();
        main.start();

        for (int i = 0; i < 100; i++) {
            es.submit(() -> {
                int idx = counter.addAndGet(1);
                log.info("Thread {}", idx);

                barrier.await(); // Blocking -> .await()을 만난 스레드의 수가 위의 지정한 숫자(101)과 같아질 때까지 Blocking

                StopWatch sw = new StopWatch();
                sw.start();

                String res = restTemplate.getForObject(url, String.class, idx);

                sw.stop();
                log.info("Elapsed {} {} -> {}", idx, sw.getTotalTimeSeconds(), res);

                return null;
            });
        }

        barrier.await();

        es.shutdown();
        es.awaitTermination(100, TimeUnit.SECONDS);

        main.stop();
        log.info("Total time = {}", main.getTotalTimeSeconds());
    }

}

실행 결과

=> 총 소요 시간 : 2초 * 100 = 200초 -> Server2 요청 시 Block 되므로 Servlet Thread는 2초마다 사용자의 요청을 받을 수 있다.

 

Service1Uri

해결 방법 : 비동기 API 호출을 사용한다. -> Server2의 작업이 끝나기 전에 Thread를 리턴한다. -> 리턴된 Thread로 다음 요청을 받는다. -> Server2는 총 1000개의 WorkThread를 가지고 있기 때문에 총 2초의 시간이 걸릴 것이다.

* AsyncRestTemplate 사용 -> 현재는 Deprecated 되었다.

@Slf4j
@RestController
@RequestMapping("/v2")
@SuppressWarnings("deprecation")
public class SpringAsyncControllerV2 {

    private final AsyncRestTemplate restTemplate = new AsyncRestTemplate();

    static AtomicInteger counter = new AtomicInteger(0);

    @GetMapping("/rest")
    public ListenableFuture<ResponseEntity<String>> rest(int idx) {
        ListenableFuture<ResponseEntity<String>> res = restTemplate.getForEntity(
                "http://localhost:9090/remote-service?req={req}", String.class, "hello " + idx
        );
        return res;
    }
}
  • ListenableFuture를 반환하기 때문에 성공 시 Callback, 실패 시 Callback을 등록할 수 있다.
  • Spring에서 ListenableFuture Return 시, Servlet Thread를 반환하고 RestTemplate 호출을 비동기로 실행 -> 결과 값이 오면 응답 Callback을 Spring에서 지원한다.

실행 결과

=> 사용자 요청 100개의 응답까지 2초 가량 걸린다.

만일, Server2의 Servlet Thread 개수 역시 1이면 2 * 100 => 200초가 걸린다.

 

AsyncRestTemplate 

  • 요청을 보낼 때, 비동기 작업을 처리하기 위해 백 그라운드에서 Thread를 생성한다.
  • 100개의 비동기 작업을 처리할 때, 100개의 Thread를 생성한다.
  • 서버 자원을 작업 수 만큼 더 사용한다. -> 바람직하지 않다.

API 호출을 비동기로 처리를 위해 최소한의 Thread 자원만 사용하고 싶다.

* non-blocking I/O 방식을 사용해서 내부의 API를 호출하는 라이브러리를 사용해야 한다. (Netty...)

HTTP Client Libaray netty 사용

@Slf4j
@RestController
@RequestMapping("/v2")
@SuppressWarnings("deprecation")
public class SpringAsyncControllerV2 {
	
    private final AsyncRestTemplate asyncRestTemplate2 = new AsyncRestTemplate(
            new Netty4ClientHttpRequestFactory(new NioEventLoopGroup(1))
    );
    
    @GetMapping("/rest2")
    public ListenableFuture<ResponseEntity<String>> rest2(int idx) {
        ListenableFuture<ResponseEntity<String>> future = asyncRestTemplate2.getForEntity(
                "http://localhost:9090/remote-service?req={req}", String.class, "hello " + idx
        );
        return future;
    }
}

Netty가 사용할 Thread를 1개로 설정하여 내부 API 호출 시 사용할 Work Thread의 수를 제한한다.

* 결과는 위 방식과 동일하다.


데이터 가공

DeferredResult를 사용하여, 비동기로 호출한 API 응답 값을 가공해서 응답한다.

 

| 코드

@Slf4j
@RestController
@RequestMapping("/v2")
@SuppressWarnings("deprecation")
public class SpringAsyncControllerV2 {
	
    private final AsyncRestTemplate asyncRestTemplate2 = new AsyncRestTemplate(
            new Netty4ClientHttpRequestFactory(new NioEventLoopGroup(1))
    );
    
    @GetMapping("/rest3")
    public ListenableFuture<ResponseEntity<String>> rest2(int idx) {
        DeferredResult<String> dr = new DeferredResult<>();

        ListenableFuture<ResponseEntity<String>> future = asyncRestTemplate2.getForEntity(
                "http://localhost:9090/remote-service?req={req}", String.class, "hello " + idx
        );

//        future.get(); // 호출하는 순간 Block으로 호출한 의미가 사라진다.
        future.addCallback(data -> {
            dr.setResult(data.getBody() + " something add");
        }, ex -> {
        //            throw new ex; Stack Trace를 모르기 때문에 전파하면 안 된다.
            dr.setErrorResult(ex);
        });

        return dr;
    }
}

** Callback에서 예외를 전파할 시, 어떤 Stack Trace를 타고 실행되는 지 모르기 때문에 전파가 아닌 다른 방법을 사용해야 한다. -> 여기서는 dr.setErrorResult()를 사용한다.


의존성이 존재할 경우

Client Request -> Server1에 요청 -> Server1에서 Server2로 API 호출로 데이터를 가져온다. -> Server2에서 응답 받은 결과를 바탕으로 Server1에서 Server3으로 API 호출을 진행한다. -> Server3으로부터 받은 결과를 Client에 응답한다.

 

| 코드 : Block

@Slf4j
@RestController
@RequestMapping("/v2")
@SuppressWarnings("deprecation")
public class SpringAsyncControllerV2 {
	
    private final AsyncRestTemplate asyncRestTemplate2 = new AsyncRestTemplate(
            new Netty4ClientHttpRequestFactory(new NioEventLoopGroup(1))
    );
    
    @GetMapping("/rest4")
    public DeferredResult rest4(int idx) {
        DeferredResult<String> dr = new DeferredResult<>();

        ListenableFuture<ResponseEntity<String>> future = asyncRestTemplate2.getForEntity(
                "http://localhost:9090/remote-service?req={req}", String.class, "service1 " + idx
        );

        future.addCallback(data -> {
            ListenableFuture<ResponseEntity<String>> future2 = asyncRestTemplate.getForEntity(
                    "http://localhost:9090/remote-service2?req={req}", String.class, data.getBody()
            );

            try {
                ResponseEntity<String> result = future2.get();
                dr.setResult(result.getBody());

            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } catch (ExecutionException e) {
                throw new RuntimeException(e);
            }
        }, ex -> {
            dr.setErrorResult(ex);
        });

        return dr;
    }
}

=> 응답에 get()을 사용하는 순간 Thread는 반환되지 않고 Block 상태 -> 응답 결과 4, 6, 8, 10, 12... 로, 총 202초가 소요된다.

 

| 코드 : Callback

@Slf4j
@RestController
@RequestMapping("/v2")
@SuppressWarnings("deprecation")
public class SpringAsyncControllerV2 {
	
    private final AsyncRestTemplate asyncRestTemplate2 = new AsyncRestTemplate(
            new Netty4ClientHttpRequestFactory(new NioEventLoopGroup(1))
    );
    
    @GetMapping("/rest4")
    public DeferredResult rest4(int idx) {
        DeferredResult<String> dr = new DeferredResult<>();

        ListenableFuture<ResponseEntity<String>> future = asyncRestTemplate2.getForEntity(
                "http://localhost:9090/remote-service?req={req}", String.class, "service1 " + idx
        );

        future.addCallback(data -> {
            ListenableFuture<ResponseEntity<String>> future2 = asyncRestTemplate.getForEntity(
                    "http://localhost:9090/remote-service2?req={req}", String.class, data.getBody()
            );

            future2.addCallback(data2 -> {
                dr.setResult(data.getBody());
            }, ex -> {
                dr.setErrorResult(ex);
            });
        }, ex -> {
            dr.setErrorResult(ex);
        });

        return dr;
    }
}

실행 결과

=> Client Request -> Server1 -> Server2 -> Server3 -> Response => 총 4초의 시간이 소요된다.


내부적 비동기 작업 수행

| 코드 

@Slf4j
@RestController
@RequestMapping("/v2")
@SuppressWarnings("deprecation")
public class SpringAsyncControllerV2 {
	
    private final AsyncRestTemplate asyncRestTemplate2 = new AsyncRestTemplate(
            new Netty4ClientHttpRequestFactory(new NioEventLoopGroup(1))
    );
    
    private final MyServiceV2 myServiceV2;

    public SpringAsyncControllerV2(MyServiceV2 myServiceV2) {
        this.myServiceV2 = myServiceV2;
    }
    
    @GetMapping("/rest4")
    public DeferredResult rest4(int idx) {
        DeferredResult<String> dr = new DeferredResult<>();

        ListenableFuture<ResponseEntity<String>> future = asyncRestTemplate2.getForEntity(
                "http://localhost:9090/remote-service?req={req}", String.class, "service1 " + idx
        );

        future.addCallback(data -> {
            ListenableFuture<ResponseEntity<String>> future2 = asyncRestTemplate.getForEntity(
                    "http://localhost:9090/remote-service2?req={req}", String.class, data.getBody()
            );

            future2.addCallback(data2 -> {
                ListenableFuture<String> future3 = myServiceV2.work(data.getBody());
                future3.addCallback(
                        data3 -> dr.setResult(data3),
                        ex -> dr.setErrorResult(ex)
                );
            }, ex -> {
                dr.setErrorResult(ex);
            }); 
        }, ex -> {
            dr.setErrorResult(ex);
        });

        return dr;
    }
}

@Service
@EnableAsync
public class MyServiceV2 {

    @Async(value = "myThreadPool")
    public ListenableFuture<String> work(String req) {
        return new AsyncResult<>(req + "/asyncWork");
    }

    @Bean
    ThreadPoolTaskExecutor myThreadPool() {
        ThreadPoolTaskExecutor te = new ThreadPoolTaskExecutor();
        te.setCorePoolSize(1);;
        te.setMaxPoolSize(1);
        te.initialize();

        return te;
    }
}

=> Service를 비동기적으로 수행하도록 작성한다.

 

** 총 Thread 3개 사용 : Servlet Thread 1개, API 호출 Thread 1개, Service 로직 처리 Thread 1개


 

문제점 : Callback Hell 발생

future.addCallback(data -> {
    ListenableFuture<ResponseEntity<String>> future2 = asyncRestTemplate.getForEntity(
            Service2Uri, String.class, data.getBody()
    );
    future2.addCallback(data2 -> {
        ListenableFuture<String> future3 = myServiceV2.work(data.getBody());
        future3.addCallback(
                data3 -> dr.setResult(data3),
                ex -> dr.setErrorResult(ex)
        );
    }, ex -> {
        dr.setErrorResult(ex);
    });
}, ex -> {
    dr.setErrorResult(ex);
});

** 읽기 불편한, 지저분한 코드가 만들어 진다.

 

* REF

https://www.youtube.com/watch?v=aSTuQiPB4Ns&list=PLOLeoJ50I1kkqC4FuEztT__3xKSfR2fpw&index=4

https://jongmin92.github.io/2019/04/13/Java/java-async-2/

'Reactive-Programming' 카테고리의 다른 글

[Spring] Reactive Web (4)  (0) 2022.10.01
[Spring] Reactive Web (3)  (0) 2022.09.30
[Spring] Reactive Web (1)  (0) 2022.09.25
[Java] 비동기 기술  (0) 2022.09.24
[Reactive Streams] Schedulers  (0) 2022.09.21