본문 바로가기

Reactive-Programming

[Reactor] Flux란? Flux란, Reactive Streams에서 정의한 Publisher의 구현체로 0..N 개의 데이터를 발행할 수 있다. 하나의 데이터를 전달할 때마다 onNext 이벤트 발생 Flux 내의 모든 데이터의 전달 처리가 완료되면 onComplete 이벤트 발생 Flux 내의 데이터 처리 중 에러가 발생하면 onError 이벤트 코드 @Slf4j public class FluxEx { @RestController public static class FluxExampleController { @GetMapping("/flux/ex1") public Flux fluxEx1() { return Flux.just( new Event(1, "event1"), new Event(2, "event2") ); } } @.. 더보기
[Reactor] Mono란? Mono란, Reactive Streams의 Publisher 인터페이스를 구현하는 구현체이며, 0..1 개의 데이터를 처리한다. 코드 @Slf4j public class MonoEx { @RestController @RequestMapping("/mono") public static class MonoExampleController { @GetMapping("/1") public Mono ex1() { log.info("first"); Mono mono = Mono.just("Hello WebFlux").log(); log.info("second"); return mono; } } } 위 코드는 동기적으로 실행되어, 실행 결과로 first 출력 -> Mono 로그 -> second 출력을 기대했는데 실.. 더보기
[Spring] WebFlux란? Spring5에서 새롭게 추가된 Reactive 스타일의 어플리케이션 개발을 도와주는 Reactive-Stack Web Framework 특징 클라이언트와 서버에서 Reactive 애플리케이션 개발을 위한 non-blocking Reactive Stream(Reactor)을 지원한다. 적은 양의 스레드로 동시성을 처리한다. Functional Programming AsyncRestTemplate -> WebClient DeferredResult -> Mono, Flux(Publisher) SpringMVC와 WebFlux 비교 SpringMVC 하나의 요청에 대해 하나의 스레드가 사용된다.(thread-per-request) -> 다수의 요청을 대비하여 미리 스레드 풀을 생성해 놓으며, 각 요청마다 스레.. 더보기
[Spring] Reactive Web (4) CompletableFuture Java 8에서 부터 사용 가능하다. 비동기 작업 결과 Future -> 비동기 작업의 결과를 담고 있는 Object ListenableFuture -> Callback 구조로 결과 처리 CompletableFuture -> 다양한 비동기 작업을 간단하게 수행할 수 있다. Future, CompletionStage를 구현하고 있다. CompletionStage -> 완료한 결과에 의존적으로 다른 작업을 실행할 수 있는 기능을 제공한다. 별도의 스레드 풀을 가질 수 있다. 리스트의 모든 값이 완료될 때까지 기다리거나 하나의 값만 완료되길 기다릴지 선택할 수 있다. 람다 표현식, 파이프 라인닝을 활용해 Callback Hell 개선 제공 메서드 CompletableFuture.. 더보기
[Spring] Reactive Web (3) Callback Hell 개선 | 코드 future.addCallback(data -> { ListenableFuture future2 = asyncRestTemplate.getForEntity( Service2Uri, String.class, data.getBody() ); future2.addCallback(data2 -> { ListenableFuture future3 = myServiceV2.work(data.getBody()); future3.addCallback( data3 -> dr.setResult(data3), ex -> dr.setErrorResult(ex) ); }, ex -> { dr.setErrorResult(ex); }); }, ex -> { dr.setErrorResult(ex.. 더보기
[Spring] Reactive Web (2) Spring 비동기 기술 Client 요청의 응답이 될 떄까지 커넥션을 물고 있는다. 클라이언트로부터 요청을 받은 후 실제 작업은 Work Thread Poll에 위임하고 Servlet Thread는 Pool에 반환 한 후, 다음 요청이 들어올 경우 해당 Servlet Thread를 바로 사용할 수 있도록 한다. 한계점 하나의 요청에 대한 작업을 수행하면서, 외부의 서비스들을 호출하는 작업이 많이 있는 경우, Servlet Thread는 바로 사용이 가능하지만, Work Thread는 I/O 작업으로 인한 Block 상태이기 때문에 결국 대기한다. * Thread Pool Hell Pool안에 있는 Thread에 대한 사용 요청이 급격하게 증가해 추가적인 요청이 들어올 때, 사용 가능한 스레드가 없어 사.. 더보기
[Spring] Reactive Web (1) Servlet 비동기 기술 @Async 어노테이션을 사용하여 비동기 로직을 실행한다. @EnableAsync 어노테이션을 사용하여 활성화한다. @Async : ThreadPoolTaskExecutor / ExcutorService가 존재하지 않으면 기본 설정에 따른다. * ThreadPoolTaskExecutor 설정하기 | 코드 @Bean ThreadPoolTaskExecutor tp() { ThreadPoolTaskExecutor te = new ThreadPoolTaskExecutor(); // 처음 스레드 요청이 들어오면 스레드 10개 생성 -> 11번 째 요청 시 대기 큐에 저장 -> 대기 큐가 200개로 꽉 참 -> maxPoolSize까지 스레드 풀을 늘린다. te.setCorePoolSiz.. 더보기
[Java] 비동기 기술 비동기 현재 작업하던 스레드가 아닌 새로운 스레드를 만들어 해당 스레드에서 작업을 수행하는 것 -> 병렬적으로 테스크를 수행한다. 요청을 보낸 후 결과가 도달하지 않아도 다음 작업을 수행한다. 스레드를 새로 만들고 폐기하는 것은 많은 비용이 소모된다. -> 그만큼 CPU를 더 사용하므로 -> 스레드 풀을 사용해서 스레드를 미리 만들어두고 사용한다. -> 스레드 사용 후에 스레드 풀로 반환 | 코드 // 1. 비동기 확인 ExecutorService es = Executors.newCachedThreadPool(); es.execute(() -> { try { System.out.println(Thread.currentThread().getName()); Thread.sleep(2000); System.o.. 더보기