Spring & Spring Boot 실무 가이드

Spring WebFlux 완벽 가이드: 리액티브 프로그래밍으로 대용량 트래픽 처리하기

devcomet 2025. 6. 10. 13:51
728x90
반응형

spring-webflux-tutorial-thumbnail
spring-webflux-tutorial-thumbnail

 

현대 웹 애플리케이션은 점점 더 많은 동시 사용자와 대용량 트래픽을 처리해야 합니다.

전통적인 서블릿 기반의 Spring MVC로는 한계가 있을 때, Spring WebFlux가 해답이 될 수 있습니다.

이 글에서는 Spring WebFlux의 핵심 개념부터 실무 적용까지 완벽하게 다뤄보겠습니다.


Spring WebFlux란? 리액티브 프로그래밍의 핵심

Spring WebFlux는 Spring Framework 5.0에서 도입된 완전히 새로운 웹 프레임워크입니다.

기존의 Spring MVC가 서블릿 API 기반의 블로킹 I/O 모델을 사용한다면, WebFlux는 논블로킹 I/O와 리액티브 스트림을 기반으로 합니다.

리액티브 프로그래밍이란 데이터의 흐름과 변화의 전파에 중점을 둔 프로그래밍 패러다임입니다.

비동기 데이터 스트림을 다루며, 백프레셔(backpressure) 처리를 통해 시스템의 안정성을 보장합니다.

WebFlux의 핵심 특징

논블로킹 I/O 처리: 스레드가 I/O 작업을 기다리지 않고 다른 작업을 수행할 수 있습니다.

이벤트 루프 모델: 적은 수의 스레드로 많은 요청을 처리할 수 있습니다.

백프레셔 지원: 데이터 생산자와 소비자 간의 속도 차이를 조절합니다.

함수형 프로그래밍 지원: 함수형 엔드포인트를 통해 더 간결한 코드 작성이 가능합니다.

webflux-vs-mvc-architecture
webflux-vs-mvc-architecture


Mono와 Flux: WebFlux의 핵심 타입 이해하기

Spring WebFlux는 Project Reactor의 MonoFlux 타입을 기반으로 합니다.

이 두 타입은 리액티브 스트림의 Publisher 인터페이스를 구현한 것입니다.

Mono: 0개 또는 1개의 데이터

import reactor.core.publisher.Mono;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class UserController {

    @GetMapping("/user/{id}")
    public Mono<User> getUser(@PathVariable String id) {
        return userService.findById(id)
            .switchIfEmpty(Mono.error(new UserNotFoundException("사용자를 찾을 수 없습니다")))
            .doOnSuccess(user -> log.info("사용자 조회 성공: {}", user.getName()))
            .doOnError(error -> log.error("사용자 조회 실패", error));
    }
}

Mono는 최대 1개의 데이터를 비동기적으로 처리하는 타입입니다.

HTTP 응답, 데이터베이스 단일 조회 결과 등에 주로 사용됩니다.

Flux: 0개 이상의 데이터 스트림

import reactor.core.publisher.Flux;
import org.springframework.http.MediaType;

@RestController
public class ProductController {

    @GetMapping(value = "/products/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<Product> streamProducts() {
        return productService.getAllProducts()
            .delayElements(Duration.ofSeconds(1))
            .doOnNext(product -> log.info("상품 스트리밍: {}", product.getName()))
            .onErrorResume(error -> {
                log.error("상품 스트리밍 오류", error);
                return Flux.empty();
            });
    }

    @GetMapping("/products")
    public Flux<Product> getProducts() {
        return Flux.fromIterable(Arrays.asList(
                new Product("상품1", 10000),
                new Product("상품2", 20000),
                new Product("상품3", 30000)
            ))
            .filter(product -> product.getPrice() > 15000)
            .map(product -> product.withDiscount(0.1));
    }
}

Flux는 여러 개의 데이터를 스트림으로 처리하는 타입입니다.

실시간 데이터, 목록 조회, 스트리밍 응답 등에 활용됩니다.


WebFlux 프로젝트 설정 및 기본 구성

Spring Boot 프로젝트에서 WebFlux를 사용하려면 적절한 의존성 설정이 필요합니다.

Maven 의존성 설정

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-webflux</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-r2dbc</artifactId>
    </dependency>
    <dependency>
        <groupId>io.r2dbc</groupId>
        <artifactId>r2dbc-h2</artifactId>
        <scope>runtime</scope>
    </dependency>
</dependencies>

Gradle 의존성 설정

dependencies {
    implementation 'org.springframework.boot:spring-boot-starter-webflux'
    implementation 'org.springframework.boot:spring-boot-starter-data-r2dbc'
    runtimeOnly 'io.r2dbc:r2dbc-h2'
    testImplementation 'org.springframework.boot:spring-boot-starter-test'
    testImplementation 'io.projectreactor:reactor-test'
}

주의사항: spring-boot-starter-webspring-boot-starter-webflux를 함께 사용하면 안 됩니다.

Spring Boot는 기본적으로 서블릿 기반을 우선하므로 WebFlux가 제대로 동작하지 않습니다.

기본 애플리케이션 구성

@SpringBootApplication
@EnableR2dbcRepositories
public class WebFluxApplication {

    public static void main(String[] args) {
        SpringApplication.run(WebFluxApplication.class, args);
    }

    @Bean
    public ConnectionFactory connectionFactory() {
        return ConnectionFactories.get("r2dbc:h2:mem:///testdb");
    }

    @Bean
    public WebFilter corsFilter() {
        return (exchange, chain) -> {
            ServerHttpResponse response = exchange.getResponse();
            HttpHeaders headers = response.getHeaders();
            headers.add("Access-Control-Allow-Origin", "*");
            headers.add("Access-Control-Allow-Methods", "GET, POST, PUT, DELETE");
            headers.add("Access-Control-Allow-Headers", "Content-Type");
            return chain.filter(exchange);
        };
    }
}

 

webflux-application-structure
webflux-application-structure


함수형 엔드포인트: 더 간결한 라우팅

WebFlux는 애노테이션 기반 컨트롤러 외에도 함수형 엔드포인트를 지원합니다.

함수형 엔드포인트는 더 명시적이고 테스트하기 쉬운 코드를 작성할 수 있게 해줍니다.

HandlerFunction과 RouterFunction

@Component
public class ProductHandler {

    private final ProductService productService;

    public ProductHandler(ProductService productService) {
        this.productService = productService;
    }

    public Mono<ServerResponse> getAllProducts(ServerRequest request) {
        String category = request.queryParam("category").orElse("all");

        return productService.findByCategory(category)
            .collectList()
            .flatMap(products -> ServerResponse.ok()
                .contentType(MediaType.APPLICATION_JSON)
                .bodyValue(products))
            .onErrorResume(error -> ServerResponse.status(HttpStatus.INTERNAL_SERVER_ERROR)
                .bodyValue(Map.of("error", "상품 조회 중 오류가 발생했습니다")));
    }

    public Mono<ServerResponse> getProduct(ServerRequest request) {
        String id = request.pathVariable("id");

        return productService.findById(id)
            .flatMap(product -> ServerResponse.ok()
                .contentType(MediaType.APPLICATION_JSON)
                .bodyValue(product))
            .switchIfEmpty(ServerResponse.notFound().build());
    }

    public Mono<ServerResponse> createProduct(ServerRequest request) {
        return request.bodyToMono(Product.class)
            .flatMap(product -> productService.save(product))
            .flatMap(savedProduct -> ServerResponse.status(HttpStatus.CREATED)
                .contentType(MediaType.APPLICATION_JSON)
                .bodyValue(savedProduct))
            .onErrorResume(error -> ServerResponse.badRequest()
                .bodyValue(Map.of("error", error.getMessage())));
    }
}

라우터 설정

@Configuration
public class RouterConfig {

    @Bean
    public RouterFunction<ServerResponse> productRoutes(ProductHandler handler) {
        return RouterFunctions
            .route(GET("/api/products").and(accept(MediaType.APPLICATION_JSON)), handler::getAllProducts)
            .andRoute(GET("/api/products/{id}").and(accept(MediaType.APPLICATION_JSON)), handler::getProduct)
            .andRoute(POST("/api/products").and(accept(MediaType.APPLICATION_JSON)), handler::createProduct)
            .andRoute(PUT("/api/products/{id}").and(accept(MediaType.APPLICATION_JSON)), handler::updateProduct)
            .andRoute(DELETE("/api/products/{id}"), handler::deleteProduct);
    }

    @Bean
    public RouterFunction<ServerResponse> userRoutes(UserHandler handler) {
        return RouterFunctions
            .route(RequestPredicates.path("/api/users/**"), 
                   nested(handler::authenticate, RouterFunctions
                       .route(GET("/api/users"), handler::getAllUsers)
                       .andRoute(GET("/api/users/{id}"), handler::getUser)
                       .andRoute(POST("/api/users"), handler::createUser)));
    }
}

함수형 엔드포인트의 장점은 라우팅 로직이 명확하고, 컴파일 타임에 타입 안정성을 보장한다는 것입니다.

또한 조건부 라우팅이나 미들웨어 체이닝이 더 직관적입니다.


R2DBC를 활용한 리액티브 데이터베이스 연동

기존의 JPA는 블로킹 I/O 기반이므로 WebFlux와 함께 사용할 수 없습니다.

대신 R2DBC(Reactive Relational Database Connectivity)를 사용해야 합니다.

Entity 및 Repository 정의

@Table("products")
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Product {
    @Id
    private Long id;

    @Column("name")
    private String name;

    @Column("price")
    private BigDecimal price;

    @Column("category")
    private String category;

    @Column("created_at")
    private LocalDateTime createdAt;

    @Column("updated_at")
    private LocalDateTime updatedAt;

    public Product withDiscount(double discountRate) {
        BigDecimal discountedPrice = price.multiply(BigDecimal.valueOf(1 - discountRate));
        return new Product(id, name, discountedPrice, category, createdAt, updatedAt);
    }
}

public interface ProductRepository extends ReactiveCrudRepository<Product, Long> {

    @Query("SELECT * FROM products WHERE category = :category")
    Flux<Product> findByCategory(String category);

    @Query("SELECT * FROM products WHERE price BETWEEN :minPrice AND :maxPrice")
    Flux<Product> findByPriceRange(BigDecimal minPrice, BigDecimal maxPrice);

    @Query("SELECT * FROM products WHERE name LIKE :name")
    Flux<Product> findByNameContaining(String name);

    @Modifying
    @Query("UPDATE products SET price = price * :multiplier WHERE category = :category")
    Mono<Integer> updatePriceByCategory(String category, BigDecimal multiplier);
}

서비스 레이어 구현

@Service
@Transactional
public class ProductService {

    private final ProductRepository productRepository;
    private final RedisTemplate<String, Object> redisTemplate;

    public ProductService(ProductRepository productRepository, 
                         RedisTemplate<String, Object> redisTemplate) {
        this.productRepository = productRepository;
        this.redisTemplate = redisTemplate;
    }

    public Flux<Product> findByCategory(String category) {
        String cacheKey = "products:category:" + category;

        return Mono.fromCallable(() -> redisTemplate.opsForValue().get(cacheKey))
            .cast(List.class)
            .flatMapMany(Flux::fromIterable)
            .cast(Product.class)
            .switchIfEmpty(productRepository.findByCategory(category)
                .collectList()
                .doOnNext(products -> redisTemplate.opsForValue()
                    .set(cacheKey, products, Duration.ofMinutes(10)))
                .flatMapMany(Flux::fromIterable));
    }

    public Mono<Product> save(Product product) {
        return Mono.just(product)
            .map(p -> {
                if (p.getCreatedAt() == null) {
                    p.setCreatedAt(LocalDateTime.now());
                }
                p.setUpdatedAt(LocalDateTime.now());
                return p;
            })
            .flatMap(productRepository::save)
            .doOnSuccess(savedProduct -> 
                log.info("상품 저장 완료: ID={}, 이름={}", savedProduct.getId(), savedProduct.getName()));
    }

    public Flux<Product> searchProducts(String keyword, String category, 
                                       BigDecimal minPrice, BigDecimal maxPrice) {
        Flux<Product> baseQuery = productRepository.findAll();

        if (StringUtils.hasText(keyword)) {
            baseQuery = productRepository.findByNameContaining("%" + keyword + "%");
        }

        return baseQuery
            .filter(product -> category == null || category.equals(product.getCategory()))
            .filter(product -> minPrice == null || product.getPrice().compareTo(minPrice) >= 0)
            .filter(product -> maxPrice == null || product.getPrice().compareTo(maxPrice) <= 0)
            .sort(Comparator.comparing(Product::getName));
    }
}

R2DBC는 완전한 논블로킹 데이터베이스 액세스를 제공하여 WebFlux의 성능 이점을 극대화합니다.

트랜잭션 처리도 리액티브하게 이루어지므로 기존 JPA와는 다른 접근이 필요합니다.

 

webflux-r2dbc-dataflow
webflux-r2dbc-dataflow


에러 처리 및 예외 관리 전략

리액티브 프로그래밍에서는 전통적인 try-catch 블록을 사용할 수 없습니다.

대신 리액티브 스트림의 에러 처리 연산자를 활용해야 합니다.

글로벌 에러 핸들러

@Component
@Order(-2)
public class GlobalErrorHandler implements ErrorWebExceptionHandler {

    private final ObjectMapper objectMapper;

    public GlobalErrorHandler(ObjectMapper objectMapper) {
        this.objectMapper = objectMapper;
    }

    @Override
    public Mono<Void> handle(ServerWebExchange exchange, Throwable ex) {
        ServerHttpResponse response = exchange.getResponse();
        response.getHeaders().add("Content-Type", "application/json");

        ErrorResponse errorResponse;
        HttpStatus status;

        if (ex instanceof ValidationException) {
            status = HttpStatus.BAD_REQUEST;
            errorResponse = new ErrorResponse("VALIDATION_ERROR", ex.getMessage());
        } else if (ex instanceof ResourceNotFoundException) {
            status = HttpStatus.NOT_FOUND;
            errorResponse = new ErrorResponse("RESOURCE_NOT_FOUND", ex.getMessage());
        } else if (ex instanceof BusinessException) {
            status = HttpStatus.UNPROCESSABLE_ENTITY;
            errorResponse = new ErrorResponse("BUSINESS_ERROR", ex.getMessage());
        } else {
            status = HttpStatus.INTERNAL_SERVER_ERROR;
            errorResponse = new ErrorResponse("INTERNAL_ERROR", "서버 내부 오류가 발생했습니다");
            log.error("예상치 못한 오류 발생", ex);
        }

        response.setStatusCode(status);

        try {
            DataBuffer buffer = response.bufferFactory()
                .wrap(objectMapper.writeValueAsBytes(errorResponse));
            return response.writeWith(Mono.just(buffer));
        } catch (Exception e) {
            log.error("에러 응답 생성 실패", e);
            return response.setComplete();
        }
    }
}

리액티브 에러 처리 패턴

@Service
public class OrderService {

    private final ProductService productService;
    private final PaymentService paymentService;
    private final InventoryService inventoryService;

    public Mono<Order> processOrder(OrderRequest request) {
        return validateOrder(request)
            .flatMap(this::checkInventory)
            .flatMap(this::processPayment)
            .flatMap(this::createOrder)
            .onErrorResume(PaymentException.class, error -> {
                log.warn("결제 실패: {}", error.getMessage());
                return Mono.error(new OrderProcessingException("결제 처리 중 오류가 발생했습니다"));
            })
            .onErrorResume(InventoryException.class, error -> {
                log.warn("재고 부족: {}", error.getMessage());
                return Mono.error(new OrderProcessingException("상품 재고가 부족합니다"));
            })
            .doOnError(error -> log.error("주문 처리 실패", error))
            .retry(3)
            .timeout(Duration.ofSeconds(30));
    }

    private Mono<OrderRequest> validateOrder(OrderRequest request) {
        return Mono.just(request)
            .filter(req -> req.getProductId() != null)
            .switchIfEmpty(Mono.error(new ValidationException("상품 ID는 필수입니다")))
            .filter(req -> req.getQuantity() > 0)
            .switchIfEmpty(Mono.error(new ValidationException("수량은 0보다 커야 합니다")));
    }

    public Flux<Order> getOrdersByUser(String userId) {
        return orderRepository.findByUserId(userId)
            .onErrorResume(error -> {
                log.error("사용자 주문 조회 실패: userId={}", userId, error);
                return Flux.empty(); // 에러 시 빈 결과 반환
            })
            .switchIfEmpty(Flux.defer(() -> {
                log.info("사용자의 주문 내역이 없음: userId={}", userId);
                return Flux.empty();
            }));
    }
}

서킷 브레이커 패턴

@Component
public class ExternalApiClient {

    private final WebClient webClient;
    private final CircuitBreaker circuitBreaker;

    public ExternalApiClient(WebClient.Builder webClientBuilder) {
        this.webClient = webClientBuilder
            .baseUrl("https://api.external-service.com")
            .build();

        this.circuitBreaker = CircuitBreaker.ofDefaults("externalApi");
    }

    public Mono<ExternalApiResponse> callExternalApi(String request) {
        return Mono.fromCallable(() -> circuitBreaker.executeSupplier(() -> 
                webClient.post()
                    .uri("/api/data")
                    .bodyValue(request)
                    .retrieve()
                    .bodyToMono(ExternalApiResponse.class)
                    .block(Duration.ofSeconds(5))))
            .flatMap(Mono::fromCallable)
            .onErrorResume(CallNotPermittedException.class, error -> {
                log.warn("서킷 브레이커 열림: 외부 API 호출 차단");
                return getDefaultResponse();
            })
            .onErrorResume(error -> {
                log.error("외부 API 호출 실패", error);
                return getDefaultResponse();
            });
    }

    private Mono<ExternalApiResponse> getDefaultResponse() {
        return Mono.just(new ExternalApiResponse("기본 응답", "서비스 일시 불가"));
    }
}

WebClient를 이용한 리액티브 HTTP 클라이언트

WebFlux 환경에서는 기존의 RestTemplate 대신 WebClient를 사용해야 합니다.

WebClient는 논블로킹 I/O를 지원하며 리액티브 타입과 완벽하게 통합됩니다.

WebClient 기본 설정

@Configuration
public class WebClientConfig {

    @Bean
    public WebClient webClient() {
        return WebClient.builder()
            .baseUrl("https://api.example.com")
            .defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
            .defaultHeader(HttpHeaders.USER_AGENT, "MyApp/1.0")
            .codecs(configurer -> configurer.defaultCodecs().maxInMemorySize(2 * 1024 * 1024))
            .filter(ExchangeFilterFunction.ofRequestProcessor(clientRequest -> {
                log.info("Request: {} {}", clientRequest.method(), clientRequest.url());
                return Mono.just(clientRequest);
            }))
            .filter(ExchangeFilterFunction.ofResponseProcessor(clientResponse -> {
                log.info("Response Status: {}", clientResponse.statusCode());
                return Mono.just(clientResponse);
            }))
            .build();
    }

    @Bean
    public WebClient secureWebClient() {
        SslContext sslContext = SslContextBuilder.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE).build();

        HttpClient httpClient = HttpClient.create()
            .secure(sslSpec -> sslSpec.sslContext(sslContext))
            .responseTimeout(Duration.ofSeconds(10))
            .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000);

        return WebClient.builder()
            .clientConnector(new ReactorClientHttpConnector(httpClient))
            .build();
    }
}

다양한 HTTP 요청 패턴

@Service
public class ApiService {

    private final WebClient webClient;

    public ApiService(WebClient webClient) {
        this.webClient = webClient;
    }

    // GET 요청
    public Mono<UserResponse> getUser(String userId) {
        return webClient.get()
            .uri("/users/{id}", userId)
            .header("Authorization", "Bearer " + getAccessToken())
            .retrieve()
            .onStatus(HttpStatus::is4xxClientError, response -> {
                if (response.statusCode() == HttpStatus.NOT_FOUND) {
                    return Mono.error(new UserNotFoundException("사용자를 찾을 수 없습니다"));
                }
                return response.bodyToMono(String.class)
                    .flatMap(body -> Mono.error(new ApiException("API 호출 실패: " + body)));
            })
            .bodyToMono(UserResponse.class)
            .timeout(Duration.ofSeconds(5))
            .retryWhen(Retry.backoff(3, Duration.ofSeconds(1)));
    }

    // POST 요청
    public Mono<CreateUserResponse> createUser(CreateUserRequest request) {
        return webClient.post()
            .uri("/users")
            .bodyValue(request)
            .retrieve()
            .bodyToMono(CreateUserResponse.class)
            .doOnSuccess(response -> log.info("사용자 생성 성공: {}", response.getId()))
            .doOnError(error -> log.error("사용자 생성 실패", error));
    }

    // 스트리밍 응답 처리
    public Flux<EventData> streamEvents() {
        return webClient.get()
            .uri("/events/stream")
            .accept(MediaType.TEXT_EVENT_STREAM)
            .retrieve()
            .bodyToFlux(EventData.class)
            .delayElements(Duration.ofMillis(100))
            .doOnNext(event -> log.debug("이벤트 수신: {}", event))
            .onErrorResume(error -> {
                log.error("이벤트 스트림 오류", error);
                return Flux.empty();
            });
    }

    // 파일 업로드
    public Mono<UploadResponse> uploadFile(String fileName, byte[] fileData) {
        MultiValueMap<String, HttpEntity<?>> parts = new LinkedMultiValueMap<>();
        parts.add("file", new ByteArrayResource(fileData) {
            @Override
            public String getFilename() {
                return fileName;
            }
        });

        return webClient.post()
            .uri("/upload")
            .contentType(MediaType.MULTIPART_FORM_DATA)
            .bodyValue(parts)
            .retrieve()
            .bodyToMono(UploadResponse.class);
    }
}

성능 최적화 및 메모리 관리

WebFlux는 높은 동시성을 제공하지만, 올바른 사용법을 모르면 오히려 성능이 저하될 수 있습니다.

백프레셔 처리

@Service
public class DataProcessingService {

    public Flux<ProcessedData> processLargeDataSet(Flux<RawData> dataStream) {
        return dataStream
            .onBackpressureBuffer(1000, BufferOverflowStrategy.DROP_LATEST)
            .publishOn(Schedulers.parallel(), 32)
            .map(this::processData)
            .filter(Objects::nonNull)
            .onBackpressureDrop(dropped -> log.warn("데이터 드롭됨: {}", dropped))
            .doOnError(error -> log.error("데이터 처리 오류", error));
    }

    public Flux<String> processWithRateLimit(Flux<String> input) {
        return input
            .delayElements(Duration.ofMillis(100)) // 초당 10개로 제한
            .onBackpressureLatest() // 최신 데이터만 유지
            .doOnNext(item -> log.debug("처리 중: {}", item));
    }
}

스케줄러 최적화

@Configuration
public class SchedulerConfig {

    @Bean
    public Scheduler ioScheduler() {
        return Schedulers.newBoundedElastic(
            100, // 최대 스레드 수
            10000, // 큐 크기
            "io-scheduler"
        );
    }

    @Bean
    public Scheduler cpuScheduler() {
        return Schedulers.newParallel(
            "cpu-scheduler",
            Runtime.getRuntime().availableProcessors()
        );
    }
}

@Service
public class OptimizedService {

    private final Scheduler ioScheduler;
    private final Scheduler cpuScheduler;

    public Mono<ProcessResult> processData(InputData data) {
        return Mono.fromCallable(() -> validateInput(data))
            .subscribeOn(cpuScheduler) // CPU 집약적 작업
            .flatMap(validData -> saveToDatabase(validData))
            .subscribeOn(ioScheduler) // I/O 작업
            .flatMap(savedData -> sendNotification(savedData))
            .subscribeOn(ioScheduler); // 또 다른 I/O 작업
    }
}

메모리 사용량 모니터링

@Component
public class MemoryMonitor {

    @EventListener
    @Async
    public void handleMemoryWarning(MemoryWarningEvent event) {
        Runtime runtime = Runtime.getRuntime();
        long totalMemory = runtime.totalMemory();
        long freeMemory = runtime.freeMemory();
        long usedMemory = totalMemory - freeMemory;

        double usagePercentage = (double) usedMemory / totalMemory * 100;

        if (usagePercentage > 80) {
            log.warn("메모리 사용률 높음: {:.2f}%, 사용량: {}MB, 전체: {}MB", 
                    usagePercentage, usedMemory / 1024 / 1024, totalMemory / 1024 / 1024);

            // 가비지 컬렉션 강제 실행
            System.gc();

            // 알림 발송
            sendMemoryAlert(usagePercentage);
        }
    }

    @Scheduled(fixedRate = 60000) // 1분마다 체크
    public void checkMemoryUsage() {
        MemoryMXBean memoryBean = ManagementFactory.getMemoryMXBean();
        MemoryUsage heapUsage = memoryBean.getHeapMemoryUsage();

        long used = heapUsage.getUsed();
        long max = heapUsage.getMax();
        double percentage = (double) used / max * 100;

        if (percentage > 75) {
            ApplicationEventPublisher publisher = ApplicationContextProvider.getApplicationContext()
                .getBean(ApplicationEventPublisher.class);
            publisher.publishEvent(new MemoryWarningEvent(this, percentage));
        }
    }
}

실제 운영 환경에서의 WebFlux 적용 사례

실무에서 WebFlux를 도입할 때 고려해야 할 점들과 성공적인 적용 사례를 살펴보겠습니다.

대용량 트래픽 처리 시나리오

@Service
public class HighTrafficService {

    private final ReactiveRedisTemplate<String, Object> redisTemplate;
    private final ProductRepository productRepository;

    // 초당 10만 건의 상품 조회 요청 처리
    public Mono<Product> getProductWithCache(String productId) {
        String cacheKey = "product:" + productId;

        return redisTemplate.opsForValue()
            .get(cacheKey)
            .cast(Product.class)
            .switchIfEmpty(
                productRepository.findById(productId)
                    .flatMap(product -> 
                        redisTemplate.opsForValue()
                            .set(cacheKey, product, Duration.ofMinutes(30))
                            .thenReturn(product)
                    )
            )
            .timeout(Duration.ofMillis(500))
            .onErrorResume(error -> {
                log.error("상품 조회 실패: productId={}", productId, error);
                return getProductFromSecondaryCache(productId);
            });
    }

    // 실시간 재고 업데이트
    public Flux<StockUpdate> streamStockUpdates() {
        return Flux.interval(Duration.ofSeconds(1))
            .flatMap(tick -> productRepository.findProductsWithLowStock(10))
            .map(product -> new StockUpdate(product.getId(), product.getStock()))
            .distinctUntilChanged(StockUpdate::getProductId)
            .share(); // 멀티캐스트로 여러 구독자에게 전송
    }
}

마이크로서비스 간 통신 최적화

@Service
public class MicroserviceClient {

    private final WebClient userServiceClient;
    private final WebClient orderServiceClient;
    private final WebClient paymentServiceClient;

    // 여러 마이크로서비스 병렬 호출
    public Mono<UserDashboard> getUserDashboard(String userId) {
        Mono<User> userInfo = userServiceClient.get()
            .uri("/users/{id}", userId)
            .retrieve()
            .bodyToMono(User.class);

        Mono<List<Order>> recentOrders = orderServiceClient.get()
            .uri("/orders/user/{userId}/recent", userId)
            .retrieve()
            .bodyToFlux(Order.class)
            .collectList();

        Mono<PaymentSummary> paymentSummary = paymentServiceClient.get()
            .uri("/payments/user/{userId}/summary", userId)
            .retrieve()
            .bodyToMono(PaymentSummary.class);

        return Mono.zip(userInfo, recentOrders, paymentSummary)
            .map(tuple -> new UserDashboard(
                tuple.getT1(), // 사용자 정보
                tuple.getT2(), // 최근 주문
                tuple.getT3()  // 결제 요약
            ))
            .timeout(Duration.ofSeconds(3));
    }

    // 서킷 브레이커와 함께 사용
    public Mono<RecommendationResponse> getRecommendations(String userId) {
        return recommendationServiceClient.get()
            .uri("/recommendations/{userId}", userId)
            .retrieve()
            .bodyToMono(RecommendationResponse.class)
            .transform(CircuitBreakerOperator.of(circuitBreaker))
            .onErrorResume(Exception.class, error -> {
                log.warn("추천 서비스 호출 실패, 기본 추천 반환: userId={}", userId);
                return getDefaultRecommendations(userId);
            });
    }
}

스트리밍 API 구현

@RestController
public class StreamingController {

    private final EventService eventService;

    // Server-Sent Events (SSE)
    @GetMapping(value = "/api/events/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<ServerSentEvent<EventData>> streamServerSentEvents() {
        return eventService.getEventStream()
            .map(event -> ServerSentEvent.<EventData>builder()
                .id(String.valueOf(event.getId()))
                .event(event.getType())
                .data(event)
                .retry(Duration.ofSeconds(1))
                .build())
            .doOnCancel(() -> log.info("클라이언트가 스트림 연결을 종료했습니다"))
            .doOnError(error -> log.error("스트림 오류 발생", error));
    }

    // WebSocket 연결
    @MessageMapping("/websocket/notifications")
    public Flux<NotificationMessage> handleWebSocketNotifications(Flux<String> userIds) {
        return userIds
            .flatMap(userId -> notificationService.getNotificationsForUser(userId))
            .delayElements(Duration.ofMillis(100));
    }

    // 청크 스트리밍 다운로드
    @GetMapping("/api/files/{fileId}/download")
    public Mono<ResponseEntity<Flux<DataBuffer>>> downloadFile(@PathVariable String fileId) {
        return fileService.getFileInfo(fileId)
            .map(fileInfo -> {
                Flux<DataBuffer> fileStream = fileService.readFileAsStream(fileId)
                    .onErrorResume(error -> {
                        log.error("파일 읽기 오류: fileId={}", fileId, error);
                        return Flux.empty();
                    });

                return ResponseEntity.ok()
                    .header(HttpHeaders.CONTENT_DISPOSITION, 
                           "attachment; filename=\"" + fileInfo.getFileName() + "\"")
                    .contentType(MediaType.APPLICATION_OCTET_STREAM)
                    .body(fileStream);
            });
    }
}

모니터링 및 디버깅 전략

WebFlux 애플리케이션의 성능과 안정성을 보장하기 위한 모니터링 및 디버깅 방법입니다.

액추에터와 메트릭스 설정

@Configuration
public class MonitoringConfig {

    @Bean
    public MeterRegistry meterRegistry() {
        return new PrometheusMeterRegistry(PrometheusConfig.DEFAULT);
    }

    @Bean
    public TimedAspect timedAspect(MeterRegistry registry) {
        return new TimedAspect(registry);
    }

    @Component
    public static class WebFluxMetrics {

        private final Counter requestCounter;
        private final Timer responseTimer;
        private final Gauge activeConnections;

        public WebFluxMetrics(MeterRegistry meterRegistry) {
            this.requestCounter = Counter.builder("webflux.requests.total")
                .description("WebFlux 요청 총 개수")
                .register(meterRegistry);

            this.responseTimer = Timer.builder("webflux.response.time")
                .description("WebFlux 응답 시간")
                .register(meterRegistry);

            this.activeConnections = Gauge.builder("webflux.connections.active")
                .description("활성 연결 수")
                .register(meterRegistry, this, WebFluxMetrics::getActiveConnections);
        }

        public void incrementRequestCount() {
            requestCounter.increment();
        }

        public Timer.Sample startTimer() {
            return Timer.start();
        }

        public void recordResponseTime(Timer.Sample sample) {
            sample.stop(responseTimer);
        }

        private double getActiveConnections() {
            // 실제 활성 연결 수 계산 로직
            return Runtime.getRuntime().availableProcessors();
        }
    }
}

리액티브 디버깅 기법

@Service
public class DebuggingService {

    public Mono<ProcessResult> processWithLogging(InputData input) {
        return Mono.just(input)
            .doOnSubscribe(subscription -> log.debug("구독 시작: {}", input.getId()))
            .map(this::validateInput)
            .doOnNext(validated -> log.debug("검증 완료: {}", validated))
            .flatMap(this::enrichData)
            .doOnNext(enriched -> log.debug("데이터 강화 완료: {}", enriched))
            .flatMap(this::processData)
            .doOnSuccess(result -> log.info("처리 성공: {}", result))
            .doOnError(error -> log.error("처리 실패: input={}", input, error))
            .doFinally(signalType -> log.debug("처리 완료: signal={}", signalType))
            .checkpoint("데이터 처리 체크포인트"); // 스택 트레이스에 표시
    }

    public Flux<String> debugComplexFlow(Flux<String> input) {
        return input
            .log("input") // 모든 신호를 로그로 출력
            .filter(item -> item.length() > 5)
            .log("filtered")
            .map(String::toUpperCase)
            .log("mapped")
            .onErrorContinue((error, item) -> {
                log.warn("항목 처리 중 오류 발생: item={}", item, error);
            });
    }

    // 성능 프로파일링
    @Timed(value = "service.process.time", description = "처리 시간 측정")
    public Mono<String> profiledProcess(String input) {
        return Mono.fromCallable(() -> {
            // CPU 집약적 작업 시뮬레이션
            Thread.sleep(100);
            return input.toUpperCase();
        })
        .subscribeOn(Schedulers.boundedElastic())
        .doOnNext(result -> {
            // 메모리 사용량 체크
            Runtime runtime = Runtime.getRuntime();
            long usedMemory = runtime.totalMemory() - runtime.freeMemory();
            log.debug("메모리 사용량: {}MB", usedMemory / 1024 / 1024);
        });
    }
}

헬스 체크 및 리액티브 액추에이터

@Component
public class ReactiveHealthIndicator implements ReactiveHealthIndicator {

    private final ProductRepository productRepository;
    private final RedisTemplate<String, Object> redisTemplate;

    @Override
    public Mono<Health> health() {
        return Mono.zip(
                checkDatabase(),
                checkRedis(),
                checkExternalApi()
            )
            .map(tuple -> {
                boolean allHealthy = tuple.getT1() && tuple.getT2() && tuple.getT3();

                return allHealthy 
                    ? Health.up()
                        .withDetail("database", "UP")
                        .withDetail("redis", "UP")
                        .withDetail("external-api", "UP")
                        .build()
                    : Health.down()
                        .withDetail("database", tuple.getT1() ? "UP" : "DOWN")
                        .withDetail("redis", tuple.getT2() ? "UP" : "DOWN")
                        .withDetail("external-api", tuple.getT3() ? "UP" : "DOWN")
                        .build();
            })
            .onErrorReturn(Health.down().withDetail("error", "헬스 체크 실패").build());
    }

    private Mono<Boolean> checkDatabase() {
        return productRepository.count()
            .map(count -> count >= 0)
            .onErrorReturn(false)
            .timeout(Duration.ofSeconds(5));
    }

    private Mono<Boolean> checkRedis() {
        return Mono.fromCallable(() -> {
            redisTemplate.opsForValue().set("health:check", "ok", Duration.ofSeconds(10));
            return true;
        })
        .onErrorReturn(false)
        .timeout(Duration.ofSeconds(3));
    }

    private Mono<Boolean> checkExternalApi() {
        return webClient.get()
            .uri("/health")
            .retrieve()
            .toBodilessEntity()
            .map(response -> response.getStatusCode().is2xxSuccessful())
            .onErrorReturn(false)
            .timeout(Duration.ofSeconds(5));
    }
}

Spring MVC vs WebFlux 비교 및 선택 가이드

언제 WebFlux를 선택해야 하는지, 그리고 Spring MVC와의 차이점을 명확히 이해해보겠습니다.

성능 비교

특성 Spring MVC Spring WebFlux
동시성 모델 스레드 풀 기반 이벤트 루프 기반
메모리 사용량 높음 (스레드당 1-2MB) 낮음
응답성 블로킹 I/O로 인한 지연 논블로킹으로 빠른 응답
처리량 스레드 수에 제한 높은 동시 처리 가능

WebFlux 선택 기준

WebFlux를 선택해야 하는 경우:

  • 높은 동시성이 필요한 애플리케이션
  • I/O 집약적인 작업이 많은 경우
  • 실시간 스트리밍이나 SSE가 필요한 경우
  • 마이크로서비스 아키텍처에서 서비스 간 비동기 통신
  • 외부 API 호출이 빈번한 경우

Spring MVC를 선택해야 하는 경우:

  • 기존 서블릿 기반 라이브러리 의존성이 높은 경우
  • 팀의 리액티브 프로그래밍 경험이 부족한 경우
  • CRUD 위주의 단순한 애플리케이션
  • 동기적 처리가 더 적합한 비즈니스 로직

마이그레이션 전략

// 점진적 마이그레이션 예시
@RestController
public class HybridController {

    // 기존 Spring MVC 스타일 (유지)
    @GetMapping("/legacy/users/{id}")
    public ResponseEntity<User> getLegacyUser(@PathVariable String id) {
        User user = userService.findById(id);
        return ResponseEntity.ok(user);
    }

    // 새로운 WebFlux 스타일 (신규 개발)
    @GetMapping("/reactive/users/{id}")
    public Mono<ResponseEntity<User>> getReactiveUser(@PathVariable String id) {
        return userService.findByIdReactive(id)
            .map(ResponseEntity::ok)
            .defaultIfEmpty(ResponseEntity.notFound().build());
    }

    // 어댑터 패턴을 통한 기존 서비스 활용
    @GetMapping("/users/{id}/orders")
    public Flux<Order> getUserOrders(@PathVariable String id) {
        // 기존 블로킹 서비스를 리액티브로 래핑
        return Mono.fromCallable(() -> orderService.findByUserId(id))
            .subscribeOn(Schedulers.boundedElastic())
            .flatMapMany(Flux::fromIterable);
    }
}

실무 팁 및 베스트 프랙티스

WebFlux를 성공적으로 도입하기 위한 실무 노하우를 공유합니다.

코드 품질 향상

// 좋은 예: 명확한 에러 처리와 로깅
public class BestPracticeService {

    public Mono<OrderResult> processOrder(OrderRequest request) {
        return validateOrderRequest(request)
            .flatMap(this::calculateOrderAmount)
            .flatMap(this::processPayment)
            .flatMap(this::updateInventory)
            .flatMap(this::createOrderRecord)
            .doOnSuccess(result -> {
                log.info("주문 처리 성공: orderId={}, amount={}", 
                        result.getOrderId(), result.getAmount());
                publishOrderEvent(result);
            })
            .onErrorResume(ValidationException.class, error -> {
                log.warn("주문 검증 실패: {}", error.getMessage());
                return Mono.just(OrderResult.failed("검증 오류: " + error.getMessage()));
            })
            .onErrorResume(PaymentException.class, error -> {
                log.error("결제 처리 실패", error);
                return Mono.just(OrderResult.failed("결제 오류"));
            })
            .onErrorResume(error -> {
                log.error("예상치 못한 오류 발생", error);
                return Mono.just(OrderResult.failed("시스템 오류"));
            });
    }

    // 나쁜 예: 에러 처리 없는 체이닝
    public Mono<String> badExample(String input) {
        return Mono.just(input)
            .map(String::toUpperCase)
            .flatMap(this::someExternalCall)
            .map(result -> result.getValue()); // NPE 가능성
    }
}

테스트 작성 가이드

@ExtendWith(SpringExtension.class)
@WebFluxTest(ProductController.class)
class ProductControllerTest {

    @Autowired
    private WebTestClient webTestClient;

    @MockBean
    private ProductService productService;

    @Test
    void shouldReturnProductWhenExists() {
        // Given
        Product product = new Product("1", "테스트 상품", BigDecimal.valueOf(10000));
        when(productService.findById("1")).thenReturn(Mono.just(product));

        // When & Then
        webTestClient.get()
            .uri("/api/products/1")
            .exchange()
            .expectStatus().isOk()
            .expectBody(Product.class)
            .value(p -> {
                assertThat(p.getName()).isEqualTo("테스트 상품");
                assertThat(p.getPrice()).isEqualTo(BigDecimal.valueOf(10000));
            });
    }

    @Test
    void shouldReturnNotFoundWhenProductNotExists() {
        // Given
        when(productService.findById("999")).thenReturn(Mono.empty());

        // When & Then
        webTestClient.get()
            .uri("/api/products/999")
            .exchange()
            .expectStatus().isNotFound();
    }

    @Test
    void shouldHandleStreamingResponse() {
        // Given
        Flux<Product> productStream = Flux.just(
            new Product("1", "상품1", BigDecimal.valueOf(1000)),
            new Product("2", "상품2", BigDecimal.valueOf(2000))
        );
        when(productService.getAllProducts()).thenReturn(productStream);

        // When & Then
        webTestClient.get()
            .uri("/api/products/stream")
            .accept(MediaType.TEXT_EVENT_STREAM)
            .exchange()
            .expectStatus().isOk()
            .expectHeader().contentType(MediaType.TEXT_EVENT_STREAM)
            .expectBodyList(Product.class)
            .hasSize(2);
    }
}

// StepVerifier를 이용한 리액티브 스트림 테스트
@Test
void shouldProcessDataStream() {
    Flux<String> dataStream = Flux.just("a", "b", "c")
        .map(String::toUpperCase)
        .delayElements(Duration.ofMillis(100));

    StepVerifier.create(dataStream)
        .expectNext("A")
        .expectNext("B")
        .expectNext("C")
        .verifyComplete();
}

결론: WebFlux로 미래를 준비하세요

Spring WebFlux는 현대적인 웹 애플리케이션 개발의 새로운 패러다임을 제시합니다.

리액티브 프로그래밍을 통해 더 효율적이고 확장 가능한 시스템을 구축할 수 있습니다.

핵심 포인트 요약

성능 이점: 논블로킹 I/O를 통한 높은 동시성과 효율적인 리소스 사용

확장성: 적은 스레드로 더 많은 요청 처리 가능

현대적 접근: 함수형 프로그래밍과 리액티브 패턴의 조화

실시간 처리: 스트리밍과 실시간 데이터 처리에 최적화

시작하기 위한 단계

  1. 학습: 리액티브 프로그래밍 개념과 Project Reactor 이해
  2. 실습: 간단한 CRUD API부터 시작하여 점진적 복잡성 증가
  3. 적용: 기존 프로젝트의 일부분을 WebFlux로 리팩터링
  4. 최적화: 성능 모니터링과 튜닝을 통한 지속적 개선

WebFlux는 단순한 기술 선택이 아닌, 미래 지향적 개발자로 성장하기 위한 필수 역량입니다.

대용량 트래픽과 실시간 처리가 일상이 된 현재, WebFlux 마스터리는 여러분의 개발 커리어에 새로운 차원을 열어줄 것입니다.


관련 참조 링크:

728x90
반응형