Servlet 비동기 기술
- @Async 어노테이션을 사용하여 비동기 로직을 실행한다.
- @EnableAsync 어노테이션을 사용하여 활성화한다.
- @Async : ThreadPoolTaskExecutor / ExcutorService가 존재하지 않으면 기본 설정에 따른다.
* ThreadPoolTaskExecutor 설정하기
| 코드
@Bean
ThreadPoolTaskExecutor tp() {
ThreadPoolTaskExecutor te = new ThreadPoolTaskExecutor();
// 처음 스레드 요청이 들어오면 스레드 10개 생성 -> 11번 째 요청 시 대기 큐에 저장 -> 대기 큐가 200개로 꽉 참 -> maxPoolSize까지 스레드 풀을 늘린다.
te.setCorePoolSize(10); // 스레드 10개 생성 -> 바로 만드는 것이 아닌, 첫 스레드 요청이오면 생성한다. 기본은 8
te.setMaxPoolSize(100); // 대기 큐가 꽉 차면 그때 100개 까지 생성한다.
te.setQueueCapacity(200); // 대기 큐 사이즈 : 200개 요청 대기 가능
te.setKeepAliveSeconds(30); // 반환 받은 스레드가 30초 동안 사용되지 않으면 제거
// te.setTaskDecorator(); // 스레드를 만들거나 반환하기 시작 전, 시작 후로 작업을 적용할 수 있다.
te.setThreadNamePrefix("test"); // 스레드 이름 설정
te.initialize(); // 초기화 반환 전 받드시 실행
return te;
}
- @Async(value = "빈 이름")으로 사용한다.
* @Async 간단한 구현 예제
| 코드
@Slf4j
@Component
@EnableAsync
public class SpringAsyncEx {
@Async
@Component
public static class MyService {
public String hello() throws InterruptedException {
log.info("hello()");
Thread.sleep(1000);
return "Hello";
}
}
@Autowired MyService myService;
@Bean
ApplicationRunner run() {
return args -> {
log.info("run()");
String res = myService.hello();
log.info("res = {}", res);
log.info("exit");
};
}
}
- res 결과 값을 받아올 수 없어 null로 출력된다.
비동기 실행 결과를 가져오는 방법에는 Future, Callback 두 가지 방법이 존재한다.
| 코드
@Slf4j
@Component
@EnableAsync
public class SpringAsyncEx {
@Async
@Component
public static class MyService {
public Future<String> hello() throws InterruptedException {
log.info("hello()");
Thread.sleep(1000);
return new AsyncResult<>("Hello");
}
}
@Autowired MyService myService;
@Bean
ApplicationRunner run() {
return args -> {
log.info("run()");
Future<String> res = myService.hello();
log.info("res = {}", res.get()); // 결과 값을 받기 전까지 Block
log.info("exit");
};
}
}
* 비동기를 사용하는 이유는 긴 시간이 걸리는 작업을 별도의 스레드에서 처리하게 하고 main 스레드는 그 동안 Blocking 되지 않고 다른 작업을 처리하기 위해서이다.
- get을 사용하게 되면 main 스레드도 해당 작업이 끝날 때까지 Blocking이 되어 효율적이지 않다.
- Callback을 사용해서 Blocking 되지 않고 작업이 종료되었을 때 처리 하도록 한다. -> 효율성 증가
| 코드
@Slf4j
@Component
@EnableAsync
public class SpringAsyncEx {
@Async
@Component
public static class MyService {
public ListenableFuture<String> hello() throws InterruptedException {
log.info("hello()");
Thread.sleep(1000);
return new AsyncResult<>("Hello");
}
}
@Autowired MyService myService;
@Bean
ApplicationRunner run() {
return args -> {
log.info("run()");
ListenableFuture<String> res = myService.hello();
res.addCallback(data -> log.info("res = {}", data), err -> log.error("err =", err));
log.info("exit");
};
}
}
- ListenableFuture를 사용하여 Callback을 등록 후 다른 작업 처리
* WebApplication 적용
- 서블릿 스레드는 기본적으로 blocking이다.
- HttpServletRequest, HttpServletResponse는 내부적으로 InputStream, OutputStream을 사용한다.
- InputStream, OutputStream 기본적으로 blocking으로 동작한다. -> 값이 없으면 대기하고 있는다
- Blocking 방식은 CPU와 메모리 자원을 많이 사용한다. -> Context Switching 발생으로 인한 낭비
- Req(Servlet Thread) -> Blocking I/O (DB, API) (Work Thread) -> Res(Servlet Thread) : 스레드는 blocking 상태 -> 낭비
- Tomcat은 기본적으로 200개 스레드 사용 -> 별도의 설정이 없을 시
- 스레드를 무한정 많이 만들 경우
- 스레드 하나 당 가지고 있는 별도의 데이터가 존재한다. -> OutOfMemory
- 무수히 많은 Context Switching 시간으로 인한 CPU 부하
- DB, API 작업 시 Servlet Thread는 Pool에 반납하고, Work Thread만 사용하자! -> 비동기 서블릿 기술
비동기 서블릿
- HTTP connection은 이미 non-blocking I/O
- 서블릿 요청 읽기, 응답, 쓰기는 blocking -> Callback 방식 처리 가능
- 비동기 작업 시작 즉시 서블릿 쓰레드 반납
- 비동기 작업이 완료되면 서블릿 스레드 재할당
- 비동기 서블릿 컨텍스트 이용(Async Context)
- non-blocking 서블릿 요청, 응답 처리
* 작업 스레드에서 비동기 작업 후 서블릿 응답 처리
- 클라이언트 요청이 갔으면 응답이 와야한다.
- 응답을 처리하는 스레드 -> 서블릿 스레드 -> 아주 빠르게 응답을 처리하고 다시 서블릿 스레드 풀로 반납
- 서블릿 스레드는 적은 양을 가지고 있다.
| 코드
@Slf4j
@RestController
public class SpringAsyncController {
@GetMapping("/callable")
public Callable<String> callable() {
log.info("callable");
return () -> {
log.info("async");
Thread.sleep(2000);
return "hello";
};
}
}
- Callable을 반환하여 실행 로직과 응답 스레드를 분리
- MVC 결과처럼 응답
- 실행 결과 -> 2초 후 hello 반환
* 동기, 비동기 처리 실행 속도 실험
- application.yml -> spring.task.execution.pool.core-size: 100
- spring-task-excution-pool : Work Thread
- 현재는 스레드 8개 생성 : 이를 100개로 변경
- application.yml -> server.tomcat.threads.max = 20
- Servelt Thread 20개 생성
- Servelt Thread 20개 생성
| 코드 1 -> blocking
@Slf4j
@RestController
public class SpringAsyncController {
static AtomicInteger counter = new AtomicInteger(0);
@GetMapping("/basic")
public String async() throws InterruptedException {
Thread.sleep(2000);
return "basic";
}
public static void main(String[] args) throws InterruptedException {
ExecutorService es = Executors.newFixedThreadPool(100);
RestTemplate restTemplate= new RestTemplate();
String url = "http://localhost:8080/basic";
StopWatch main = new StopWatch();
main.start();
for (int i = 0; i < 100; i++) {
es.execute(() -> {
int idx = counter.addAndGet(1);
log.info("Thread {}", idx);
StopWatch sw = new StopWatch();
sw.start();
restTemplate.getForObject(url, String.class);
sw.stop();
log.info("Elapsed {} {}", idx, sw.getTotalTimeSeconds());
});
}
es.shutdown();
es.awaitTermination(100, TimeUnit.SECONDS);
main.stop();
log.info("Total time = {}", main.getTotalTimeSeconds());
}
}
=> Servlet Thread 20개로 총 100개의 요청 처리 = 2 * (100 / 20) = 10초 소요
| 코드 2 -> non-blocking
@Slf4j
@RestController
public class SpringAsyncController {
static AtomicInteger counter = new AtomicInteger(0);
@GetMapping("/callable")
public Callable<String> callable() throws InterruptedException {
log.info("callable");
return () -> {
log.info("async");
Thread.sleep(2000);
return "hello";
};
}
public static void main(String[] args) throws InterruptedException {
ExecutorService es = Executors.newFixedThreadPool(100);
RestTemplate restTemplate= new RestTemplate();
String url = "http://localhost:8080/callable";
StopWatch main = new StopWatch();
main.start();
for (int i = 0; i < 100; i++) {
es.execute(() -> {
int idx = counter.addAndGet(1);
log.info("Thread {}", idx);
StopWatch sw = new StopWatch();
sw.start();
restTemplate.getForObject(url, String.class);
sw.stop();
log.info("Elapsed {} {}", idx, sw.getTotalTimeSeconds());
});
}
es.shutdown();
es.awaitTermination(100, TimeUnit.SECONDS);
main.stop();
log.info("Total time = {}", main.getTotalTimeSeconds());
}
}
=> Servlet Tread 1개, Work Thread 100개, non-blocking 시 2초 소요
위 방법은 클라이언트 요청 당 작업을 할 Work Thread를 한 개씩 사용하기 때문에 200개의 요청이 동시에 들어오면 2배의 시간 걸린다.
* Work Thread를 늘리지 않고 동시에 많은 작업을 처리할 수 있는 방법
- 10개의 요청이 왔을 때, 각각의 요청 결과를 지연된 결과 큐에 담아 놨다가 나중에 응답 한다.
- 복잡한 작업 없이 OS에 따라 수천, 수백만 개의 커넥션을 물고 있을 수 있다.
| 코드
@Slf4j
@RestController
public class SpringAsyncController {
Queue<DeferredResult<String>> results = new ConcurrentLinkedDeque<>();
@GetMapping("/dr")
public DeferredResult<String> dr() throws InterruptedException {
log.info("dr");
DeferredResult<String> dr = new DeferredResult<>();
results.add(dr);
return dr;
}
@GetMapping("/dr/count")
public String drCount() {
return String.valueOf(results.size());
}
@GetMapping("/dr/event")
public String drEvent(String msg) {
for (DeferredResult<String> dr : results) {
dr.setResult("DrResult " + msg);
results.remove(dr);
}
return "OK";
}
}
- /dr 요청 시 -> DeferredResult 생성 -> 클라이언트는 응답 대기
- /dr/count 요청 시 -> DeferredResult 개수 확인
- /dr/event 요청 시 -> 1번의 요청으로 인해 생성된 DeferredResult에 msg 값을 더해 Setting 후 제거
- /dr 요청한 클라이언트는 3번에서 Setting된 결과 값을 응답으로 받는다.
* Servlet Thread 1개로 다수의 응답을 처리할 수 있다. : 요청 100개가 오면 Servlet Thread는 Queue에 저장해 놓는다. (이때, 클라이언트는 응답 대기 상태) -> 요청에 대한 작업이 완료되면 DeferredResult에 값을 세팅하고 Queue에서 제거한다. -> 클라이언트는 응답을 받는다.
* Emitter
데이터를 하나만 보내는 것이 아닌 여러 번에 나눠서 보내는 기능을 제공한다. -> 한 번 요청에 여러 번의 응답을 보낸다.
| 코드
@Slf4j
@RestController
public class SpringAsyncController {
@GetMapping("/emitter")
public ResponseBodyEmitter emitter(String msg) {
ResponseBodyEmitter emitter = new ResponseBodyEmitter();
Executors.newSingleThreadExecutor().submit(() -> {
for (int i = 0; i < 51; i++) {
try {
emitter.send("<p> Stream " + i + "<p/>");
Thread.sleep(500);
} catch (IOException | InterruptedException e) {
throw new RuntimeException(e);
}
}
});
return emitter;
}
}
=> 클라이언트 요청을 받으면 0.5초 씩 응답을 50번 보낸다.
* REF
https://www.youtube.com/watch?v=aSTuQiPB4Ns&list=PLOLeoJ50I1kkqC4FuEztT3xKSfR2fpw&index=4
'Reactive-Programming' 카테고리의 다른 글
[Spring] Reactive Web (3) (0) | 2022.09.30 |
---|---|
[Spring] Reactive Web (2) (0) | 2022.09.27 |
[Java] 비동기 기술 (0) | 2022.09.24 |
[Reactive Streams] Schedulers (0) | 2022.09.21 |
[Reactive Streams] Operators (0) | 2022.09.14 |