현대 웹 애플리케이션은 점점 더 많은 동시 사용자와 대용량 트래픽을 처리해야 합니다.
전통적인 서블릿 기반의 Spring MVC로는 한계가 있을 때, Spring WebFlux가 해답이 될 수 있습니다.
이 글에서는 Spring WebFlux의 핵심 개념부터 실무 적용까지 완벽하게 다뤄보겠습니다.
Spring WebFlux란? 리액티브 프로그래밍의 핵심
Spring WebFlux는 Spring Framework 5.0에서 도입된 완전히 새로운 웹 프레임워크입니다.
기존의 Spring MVC가 서블릿 API 기반의 블로킹 I/O 모델을 사용한다면, WebFlux는 논블로킹 I/O와 리액티브 스트림을 기반으로 합니다.
리액티브 프로그래밍이란 데이터의 흐름과 변화의 전파에 중점을 둔 프로그래밍 패러다임입니다.
비동기 데이터 스트림을 다루며, 백프레셔(backpressure) 처리를 통해 시스템의 안정성을 보장합니다.
WebFlux의 핵심 특징
논블로킹 I/O 처리: 스레드가 I/O 작업을 기다리지 않고 다른 작업을 수행할 수 있습니다.
이벤트 루프 모델: 적은 수의 스레드로 많은 요청을 처리할 수 있습니다.
백프레셔 지원: 데이터 생산자와 소비자 간의 속도 차이를 조절합니다.
함수형 프로그래밍 지원: 함수형 엔드포인트를 통해 더 간결한 코드 작성이 가능합니다.
Mono와 Flux: WebFlux의 핵심 타입 이해하기
Spring WebFlux는 Project Reactor의 Mono
와 Flux
타입을 기반으로 합니다.
이 두 타입은 리액티브 스트림의 Publisher 인터페이스를 구현한 것입니다.
Mono: 0개 또는 1개의 데이터
import reactor.core.publisher.Mono;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class UserController {
@GetMapping("/user/{id}")
public Mono<User> getUser(@PathVariable String id) {
return userService.findById(id)
.switchIfEmpty(Mono.error(new UserNotFoundException("사용자를 찾을 수 없습니다")))
.doOnSuccess(user -> log.info("사용자 조회 성공: {}", user.getName()))
.doOnError(error -> log.error("사용자 조회 실패", error));
}
}
Mono
는 최대 1개의 데이터를 비동기적으로 처리하는 타입입니다.
HTTP 응답, 데이터베이스 단일 조회 결과 등에 주로 사용됩니다.
Flux: 0개 이상의 데이터 스트림
import reactor.core.publisher.Flux;
import org.springframework.http.MediaType;
@RestController
public class ProductController {
@GetMapping(value = "/products/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<Product> streamProducts() {
return productService.getAllProducts()
.delayElements(Duration.ofSeconds(1))
.doOnNext(product -> log.info("상품 스트리밍: {}", product.getName()))
.onErrorResume(error -> {
log.error("상품 스트리밍 오류", error);
return Flux.empty();
});
}
@GetMapping("/products")
public Flux<Product> getProducts() {
return Flux.fromIterable(Arrays.asList(
new Product("상품1", 10000),
new Product("상품2", 20000),
new Product("상품3", 30000)
))
.filter(product -> product.getPrice() > 15000)
.map(product -> product.withDiscount(0.1));
}
}
Flux
는 여러 개의 데이터를 스트림으로 처리하는 타입입니다.
실시간 데이터, 목록 조회, 스트리밍 응답 등에 활용됩니다.
WebFlux 프로젝트 설정 및 기본 구성
Spring Boot 프로젝트에서 WebFlux를 사용하려면 적절한 의존성 설정이 필요합니다.
Maven 의존성 설정
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-r2dbc</artifactId>
</dependency>
<dependency>
<groupId>io.r2dbc</groupId>
<artifactId>r2dbc-h2</artifactId>
<scope>runtime</scope>
</dependency>
</dependencies>
Gradle 의존성 설정
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-webflux'
implementation 'org.springframework.boot:spring-boot-starter-data-r2dbc'
runtimeOnly 'io.r2dbc:r2dbc-h2'
testImplementation 'org.springframework.boot:spring-boot-starter-test'
testImplementation 'io.projectreactor:reactor-test'
}
주의사항: spring-boot-starter-web
과 spring-boot-starter-webflux
를 함께 사용하면 안 됩니다.
Spring Boot는 기본적으로 서블릿 기반을 우선하므로 WebFlux가 제대로 동작하지 않습니다.
기본 애플리케이션 구성
@SpringBootApplication
@EnableR2dbcRepositories
public class WebFluxApplication {
public static void main(String[] args) {
SpringApplication.run(WebFluxApplication.class, args);
}
@Bean
public ConnectionFactory connectionFactory() {
return ConnectionFactories.get("r2dbc:h2:mem:///testdb");
}
@Bean
public WebFilter corsFilter() {
return (exchange, chain) -> {
ServerHttpResponse response = exchange.getResponse();
HttpHeaders headers = response.getHeaders();
headers.add("Access-Control-Allow-Origin", "*");
headers.add("Access-Control-Allow-Methods", "GET, POST, PUT, DELETE");
headers.add("Access-Control-Allow-Headers", "Content-Type");
return chain.filter(exchange);
};
}
}
함수형 엔드포인트: 더 간결한 라우팅
WebFlux는 애노테이션 기반 컨트롤러 외에도 함수형 엔드포인트를 지원합니다.
함수형 엔드포인트는 더 명시적이고 테스트하기 쉬운 코드를 작성할 수 있게 해줍니다.
HandlerFunction과 RouterFunction
@Component
public class ProductHandler {
private final ProductService productService;
public ProductHandler(ProductService productService) {
this.productService = productService;
}
public Mono<ServerResponse> getAllProducts(ServerRequest request) {
String category = request.queryParam("category").orElse("all");
return productService.findByCategory(category)
.collectList()
.flatMap(products -> ServerResponse.ok()
.contentType(MediaType.APPLICATION_JSON)
.bodyValue(products))
.onErrorResume(error -> ServerResponse.status(HttpStatus.INTERNAL_SERVER_ERROR)
.bodyValue(Map.of("error", "상품 조회 중 오류가 발생했습니다")));
}
public Mono<ServerResponse> getProduct(ServerRequest request) {
String id = request.pathVariable("id");
return productService.findById(id)
.flatMap(product -> ServerResponse.ok()
.contentType(MediaType.APPLICATION_JSON)
.bodyValue(product))
.switchIfEmpty(ServerResponse.notFound().build());
}
public Mono<ServerResponse> createProduct(ServerRequest request) {
return request.bodyToMono(Product.class)
.flatMap(product -> productService.save(product))
.flatMap(savedProduct -> ServerResponse.status(HttpStatus.CREATED)
.contentType(MediaType.APPLICATION_JSON)
.bodyValue(savedProduct))
.onErrorResume(error -> ServerResponse.badRequest()
.bodyValue(Map.of("error", error.getMessage())));
}
}
라우터 설정
@Configuration
public class RouterConfig {
@Bean
public RouterFunction<ServerResponse> productRoutes(ProductHandler handler) {
return RouterFunctions
.route(GET("/api/products").and(accept(MediaType.APPLICATION_JSON)), handler::getAllProducts)
.andRoute(GET("/api/products/{id}").and(accept(MediaType.APPLICATION_JSON)), handler::getProduct)
.andRoute(POST("/api/products").and(accept(MediaType.APPLICATION_JSON)), handler::createProduct)
.andRoute(PUT("/api/products/{id}").and(accept(MediaType.APPLICATION_JSON)), handler::updateProduct)
.andRoute(DELETE("/api/products/{id}"), handler::deleteProduct);
}
@Bean
public RouterFunction<ServerResponse> userRoutes(UserHandler handler) {
return RouterFunctions
.route(RequestPredicates.path("/api/users/**"),
nested(handler::authenticate, RouterFunctions
.route(GET("/api/users"), handler::getAllUsers)
.andRoute(GET("/api/users/{id}"), handler::getUser)
.andRoute(POST("/api/users"), handler::createUser)));
}
}
함수형 엔드포인트의 장점은 라우팅 로직이 명확하고, 컴파일 타임에 타입 안정성을 보장한다는 것입니다.
또한 조건부 라우팅이나 미들웨어 체이닝이 더 직관적입니다.
R2DBC를 활용한 리액티브 데이터베이스 연동
기존의 JPA는 블로킹 I/O 기반이므로 WebFlux와 함께 사용할 수 없습니다.
대신 R2DBC(Reactive Relational Database Connectivity)를 사용해야 합니다.
Entity 및 Repository 정의
@Table("products")
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Product {
@Id
private Long id;
@Column("name")
private String name;
@Column("price")
private BigDecimal price;
@Column("category")
private String category;
@Column("created_at")
private LocalDateTime createdAt;
@Column("updated_at")
private LocalDateTime updatedAt;
public Product withDiscount(double discountRate) {
BigDecimal discountedPrice = price.multiply(BigDecimal.valueOf(1 - discountRate));
return new Product(id, name, discountedPrice, category, createdAt, updatedAt);
}
}
public interface ProductRepository extends ReactiveCrudRepository<Product, Long> {
@Query("SELECT * FROM products WHERE category = :category")
Flux<Product> findByCategory(String category);
@Query("SELECT * FROM products WHERE price BETWEEN :minPrice AND :maxPrice")
Flux<Product> findByPriceRange(BigDecimal minPrice, BigDecimal maxPrice);
@Query("SELECT * FROM products WHERE name LIKE :name")
Flux<Product> findByNameContaining(String name);
@Modifying
@Query("UPDATE products SET price = price * :multiplier WHERE category = :category")
Mono<Integer> updatePriceByCategory(String category, BigDecimal multiplier);
}
서비스 레이어 구현
@Service
@Transactional
public class ProductService {
private final ProductRepository productRepository;
private final RedisTemplate<String, Object> redisTemplate;
public ProductService(ProductRepository productRepository,
RedisTemplate<String, Object> redisTemplate) {
this.productRepository = productRepository;
this.redisTemplate = redisTemplate;
}
public Flux<Product> findByCategory(String category) {
String cacheKey = "products:category:" + category;
return Mono.fromCallable(() -> redisTemplate.opsForValue().get(cacheKey))
.cast(List.class)
.flatMapMany(Flux::fromIterable)
.cast(Product.class)
.switchIfEmpty(productRepository.findByCategory(category)
.collectList()
.doOnNext(products -> redisTemplate.opsForValue()
.set(cacheKey, products, Duration.ofMinutes(10)))
.flatMapMany(Flux::fromIterable));
}
public Mono<Product> save(Product product) {
return Mono.just(product)
.map(p -> {
if (p.getCreatedAt() == null) {
p.setCreatedAt(LocalDateTime.now());
}
p.setUpdatedAt(LocalDateTime.now());
return p;
})
.flatMap(productRepository::save)
.doOnSuccess(savedProduct ->
log.info("상품 저장 완료: ID={}, 이름={}", savedProduct.getId(), savedProduct.getName()));
}
public Flux<Product> searchProducts(String keyword, String category,
BigDecimal minPrice, BigDecimal maxPrice) {
Flux<Product> baseQuery = productRepository.findAll();
if (StringUtils.hasText(keyword)) {
baseQuery = productRepository.findByNameContaining("%" + keyword + "%");
}
return baseQuery
.filter(product -> category == null || category.equals(product.getCategory()))
.filter(product -> minPrice == null || product.getPrice().compareTo(minPrice) >= 0)
.filter(product -> maxPrice == null || product.getPrice().compareTo(maxPrice) <= 0)
.sort(Comparator.comparing(Product::getName));
}
}
R2DBC는 완전한 논블로킹 데이터베이스 액세스를 제공하여 WebFlux의 성능 이점을 극대화합니다.
트랜잭션 처리도 리액티브하게 이루어지므로 기존 JPA와는 다른 접근이 필요합니다.
에러 처리 및 예외 관리 전략
리액티브 프로그래밍에서는 전통적인 try-catch 블록을 사용할 수 없습니다.
대신 리액티브 스트림의 에러 처리 연산자를 활용해야 합니다.
글로벌 에러 핸들러
@Component
@Order(-2)
public class GlobalErrorHandler implements ErrorWebExceptionHandler {
private final ObjectMapper objectMapper;
public GlobalErrorHandler(ObjectMapper objectMapper) {
this.objectMapper = objectMapper;
}
@Override
public Mono<Void> handle(ServerWebExchange exchange, Throwable ex) {
ServerHttpResponse response = exchange.getResponse();
response.getHeaders().add("Content-Type", "application/json");
ErrorResponse errorResponse;
HttpStatus status;
if (ex instanceof ValidationException) {
status = HttpStatus.BAD_REQUEST;
errorResponse = new ErrorResponse("VALIDATION_ERROR", ex.getMessage());
} else if (ex instanceof ResourceNotFoundException) {
status = HttpStatus.NOT_FOUND;
errorResponse = new ErrorResponse("RESOURCE_NOT_FOUND", ex.getMessage());
} else if (ex instanceof BusinessException) {
status = HttpStatus.UNPROCESSABLE_ENTITY;
errorResponse = new ErrorResponse("BUSINESS_ERROR", ex.getMessage());
} else {
status = HttpStatus.INTERNAL_SERVER_ERROR;
errorResponse = new ErrorResponse("INTERNAL_ERROR", "서버 내부 오류가 발생했습니다");
log.error("예상치 못한 오류 발생", ex);
}
response.setStatusCode(status);
try {
DataBuffer buffer = response.bufferFactory()
.wrap(objectMapper.writeValueAsBytes(errorResponse));
return response.writeWith(Mono.just(buffer));
} catch (Exception e) {
log.error("에러 응답 생성 실패", e);
return response.setComplete();
}
}
}
리액티브 에러 처리 패턴
@Service
public class OrderService {
private final ProductService productService;
private final PaymentService paymentService;
private final InventoryService inventoryService;
public Mono<Order> processOrder(OrderRequest request) {
return validateOrder(request)
.flatMap(this::checkInventory)
.flatMap(this::processPayment)
.flatMap(this::createOrder)
.onErrorResume(PaymentException.class, error -> {
log.warn("결제 실패: {}", error.getMessage());
return Mono.error(new OrderProcessingException("결제 처리 중 오류가 발생했습니다"));
})
.onErrorResume(InventoryException.class, error -> {
log.warn("재고 부족: {}", error.getMessage());
return Mono.error(new OrderProcessingException("상품 재고가 부족합니다"));
})
.doOnError(error -> log.error("주문 처리 실패", error))
.retry(3)
.timeout(Duration.ofSeconds(30));
}
private Mono<OrderRequest> validateOrder(OrderRequest request) {
return Mono.just(request)
.filter(req -> req.getProductId() != null)
.switchIfEmpty(Mono.error(new ValidationException("상품 ID는 필수입니다")))
.filter(req -> req.getQuantity() > 0)
.switchIfEmpty(Mono.error(new ValidationException("수량은 0보다 커야 합니다")));
}
public Flux<Order> getOrdersByUser(String userId) {
return orderRepository.findByUserId(userId)
.onErrorResume(error -> {
log.error("사용자 주문 조회 실패: userId={}", userId, error);
return Flux.empty(); // 에러 시 빈 결과 반환
})
.switchIfEmpty(Flux.defer(() -> {
log.info("사용자의 주문 내역이 없음: userId={}", userId);
return Flux.empty();
}));
}
}
서킷 브레이커 패턴
@Component
public class ExternalApiClient {
private final WebClient webClient;
private final CircuitBreaker circuitBreaker;
public ExternalApiClient(WebClient.Builder webClientBuilder) {
this.webClient = webClientBuilder
.baseUrl("https://api.external-service.com")
.build();
this.circuitBreaker = CircuitBreaker.ofDefaults("externalApi");
}
public Mono<ExternalApiResponse> callExternalApi(String request) {
return Mono.fromCallable(() -> circuitBreaker.executeSupplier(() ->
webClient.post()
.uri("/api/data")
.bodyValue(request)
.retrieve()
.bodyToMono(ExternalApiResponse.class)
.block(Duration.ofSeconds(5))))
.flatMap(Mono::fromCallable)
.onErrorResume(CallNotPermittedException.class, error -> {
log.warn("서킷 브레이커 열림: 외부 API 호출 차단");
return getDefaultResponse();
})
.onErrorResume(error -> {
log.error("외부 API 호출 실패", error);
return getDefaultResponse();
});
}
private Mono<ExternalApiResponse> getDefaultResponse() {
return Mono.just(new ExternalApiResponse("기본 응답", "서비스 일시 불가"));
}
}
WebClient를 이용한 리액티브 HTTP 클라이언트
WebFlux 환경에서는 기존의 RestTemplate 대신 WebClient를 사용해야 합니다.
WebClient는 논블로킹 I/O를 지원하며 리액티브 타입과 완벽하게 통합됩니다.
WebClient 기본 설정
@Configuration
public class WebClientConfig {
@Bean
public WebClient webClient() {
return WebClient.builder()
.baseUrl("https://api.example.com")
.defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
.defaultHeader(HttpHeaders.USER_AGENT, "MyApp/1.0")
.codecs(configurer -> configurer.defaultCodecs().maxInMemorySize(2 * 1024 * 1024))
.filter(ExchangeFilterFunction.ofRequestProcessor(clientRequest -> {
log.info("Request: {} {}", clientRequest.method(), clientRequest.url());
return Mono.just(clientRequest);
}))
.filter(ExchangeFilterFunction.ofResponseProcessor(clientResponse -> {
log.info("Response Status: {}", clientResponse.statusCode());
return Mono.just(clientResponse);
}))
.build();
}
@Bean
public WebClient secureWebClient() {
SslContext sslContext = SslContextBuilder.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE).build();
HttpClient httpClient = HttpClient.create()
.secure(sslSpec -> sslSpec.sslContext(sslContext))
.responseTimeout(Duration.ofSeconds(10))
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000);
return WebClient.builder()
.clientConnector(new ReactorClientHttpConnector(httpClient))
.build();
}
}
다양한 HTTP 요청 패턴
@Service
public class ApiService {
private final WebClient webClient;
public ApiService(WebClient webClient) {
this.webClient = webClient;
}
// GET 요청
public Mono<UserResponse> getUser(String userId) {
return webClient.get()
.uri("/users/{id}", userId)
.header("Authorization", "Bearer " + getAccessToken())
.retrieve()
.onStatus(HttpStatus::is4xxClientError, response -> {
if (response.statusCode() == HttpStatus.NOT_FOUND) {
return Mono.error(new UserNotFoundException("사용자를 찾을 수 없습니다"));
}
return response.bodyToMono(String.class)
.flatMap(body -> Mono.error(new ApiException("API 호출 실패: " + body)));
})
.bodyToMono(UserResponse.class)
.timeout(Duration.ofSeconds(5))
.retryWhen(Retry.backoff(3, Duration.ofSeconds(1)));
}
// POST 요청
public Mono<CreateUserResponse> createUser(CreateUserRequest request) {
return webClient.post()
.uri("/users")
.bodyValue(request)
.retrieve()
.bodyToMono(CreateUserResponse.class)
.doOnSuccess(response -> log.info("사용자 생성 성공: {}", response.getId()))
.doOnError(error -> log.error("사용자 생성 실패", error));
}
// 스트리밍 응답 처리
public Flux<EventData> streamEvents() {
return webClient.get()
.uri("/events/stream")
.accept(MediaType.TEXT_EVENT_STREAM)
.retrieve()
.bodyToFlux(EventData.class)
.delayElements(Duration.ofMillis(100))
.doOnNext(event -> log.debug("이벤트 수신: {}", event))
.onErrorResume(error -> {
log.error("이벤트 스트림 오류", error);
return Flux.empty();
});
}
// 파일 업로드
public Mono<UploadResponse> uploadFile(String fileName, byte[] fileData) {
MultiValueMap<String, HttpEntity<?>> parts = new LinkedMultiValueMap<>();
parts.add("file", new ByteArrayResource(fileData) {
@Override
public String getFilename() {
return fileName;
}
});
return webClient.post()
.uri("/upload")
.contentType(MediaType.MULTIPART_FORM_DATA)
.bodyValue(parts)
.retrieve()
.bodyToMono(UploadResponse.class);
}
}
성능 최적화 및 메모리 관리
WebFlux는 높은 동시성을 제공하지만, 올바른 사용법을 모르면 오히려 성능이 저하될 수 있습니다.
백프레셔 처리
@Service
public class DataProcessingService {
public Flux<ProcessedData> processLargeDataSet(Flux<RawData> dataStream) {
return dataStream
.onBackpressureBuffer(1000, BufferOverflowStrategy.DROP_LATEST)
.publishOn(Schedulers.parallel(), 32)
.map(this::processData)
.filter(Objects::nonNull)
.onBackpressureDrop(dropped -> log.warn("데이터 드롭됨: {}", dropped))
.doOnError(error -> log.error("데이터 처리 오류", error));
}
public Flux<String> processWithRateLimit(Flux<String> input) {
return input
.delayElements(Duration.ofMillis(100)) // 초당 10개로 제한
.onBackpressureLatest() // 최신 데이터만 유지
.doOnNext(item -> log.debug("처리 중: {}", item));
}
}
스케줄러 최적화
@Configuration
public class SchedulerConfig {
@Bean
public Scheduler ioScheduler() {
return Schedulers.newBoundedElastic(
100, // 최대 스레드 수
10000, // 큐 크기
"io-scheduler"
);
}
@Bean
public Scheduler cpuScheduler() {
return Schedulers.newParallel(
"cpu-scheduler",
Runtime.getRuntime().availableProcessors()
);
}
}
@Service
public class OptimizedService {
private final Scheduler ioScheduler;
private final Scheduler cpuScheduler;
public Mono<ProcessResult> processData(InputData data) {
return Mono.fromCallable(() -> validateInput(data))
.subscribeOn(cpuScheduler) // CPU 집약적 작업
.flatMap(validData -> saveToDatabase(validData))
.subscribeOn(ioScheduler) // I/O 작업
.flatMap(savedData -> sendNotification(savedData))
.subscribeOn(ioScheduler); // 또 다른 I/O 작업
}
}
메모리 사용량 모니터링
@Component
public class MemoryMonitor {
@EventListener
@Async
public void handleMemoryWarning(MemoryWarningEvent event) {
Runtime runtime = Runtime.getRuntime();
long totalMemory = runtime.totalMemory();
long freeMemory = runtime.freeMemory();
long usedMemory = totalMemory - freeMemory;
double usagePercentage = (double) usedMemory / totalMemory * 100;
if (usagePercentage > 80) {
log.warn("메모리 사용률 높음: {:.2f}%, 사용량: {}MB, 전체: {}MB",
usagePercentage, usedMemory / 1024 / 1024, totalMemory / 1024 / 1024);
// 가비지 컬렉션 강제 실행
System.gc();
// 알림 발송
sendMemoryAlert(usagePercentage);
}
}
@Scheduled(fixedRate = 60000) // 1분마다 체크
public void checkMemoryUsage() {
MemoryMXBean memoryBean = ManagementFactory.getMemoryMXBean();
MemoryUsage heapUsage = memoryBean.getHeapMemoryUsage();
long used = heapUsage.getUsed();
long max = heapUsage.getMax();
double percentage = (double) used / max * 100;
if (percentage > 75) {
ApplicationEventPublisher publisher = ApplicationContextProvider.getApplicationContext()
.getBean(ApplicationEventPublisher.class);
publisher.publishEvent(new MemoryWarningEvent(this, percentage));
}
}
}
실제 운영 환경에서의 WebFlux 적용 사례
실무에서 WebFlux를 도입할 때 고려해야 할 점들과 성공적인 적용 사례를 살펴보겠습니다.
대용량 트래픽 처리 시나리오
@Service
public class HighTrafficService {
private final ReactiveRedisTemplate<String, Object> redisTemplate;
private final ProductRepository productRepository;
// 초당 10만 건의 상품 조회 요청 처리
public Mono<Product> getProductWithCache(String productId) {
String cacheKey = "product:" + productId;
return redisTemplate.opsForValue()
.get(cacheKey)
.cast(Product.class)
.switchIfEmpty(
productRepository.findById(productId)
.flatMap(product ->
redisTemplate.opsForValue()
.set(cacheKey, product, Duration.ofMinutes(30))
.thenReturn(product)
)
)
.timeout(Duration.ofMillis(500))
.onErrorResume(error -> {
log.error("상품 조회 실패: productId={}", productId, error);
return getProductFromSecondaryCache(productId);
});
}
// 실시간 재고 업데이트
public Flux<StockUpdate> streamStockUpdates() {
return Flux.interval(Duration.ofSeconds(1))
.flatMap(tick -> productRepository.findProductsWithLowStock(10))
.map(product -> new StockUpdate(product.getId(), product.getStock()))
.distinctUntilChanged(StockUpdate::getProductId)
.share(); // 멀티캐스트로 여러 구독자에게 전송
}
}
마이크로서비스 간 통신 최적화
@Service
public class MicroserviceClient {
private final WebClient userServiceClient;
private final WebClient orderServiceClient;
private final WebClient paymentServiceClient;
// 여러 마이크로서비스 병렬 호출
public Mono<UserDashboard> getUserDashboard(String userId) {
Mono<User> userInfo = userServiceClient.get()
.uri("/users/{id}", userId)
.retrieve()
.bodyToMono(User.class);
Mono<List<Order>> recentOrders = orderServiceClient.get()
.uri("/orders/user/{userId}/recent", userId)
.retrieve()
.bodyToFlux(Order.class)
.collectList();
Mono<PaymentSummary> paymentSummary = paymentServiceClient.get()
.uri("/payments/user/{userId}/summary", userId)
.retrieve()
.bodyToMono(PaymentSummary.class);
return Mono.zip(userInfo, recentOrders, paymentSummary)
.map(tuple -> new UserDashboard(
tuple.getT1(), // 사용자 정보
tuple.getT2(), // 최근 주문
tuple.getT3() // 결제 요약
))
.timeout(Duration.ofSeconds(3));
}
// 서킷 브레이커와 함께 사용
public Mono<RecommendationResponse> getRecommendations(String userId) {
return recommendationServiceClient.get()
.uri("/recommendations/{userId}", userId)
.retrieve()
.bodyToMono(RecommendationResponse.class)
.transform(CircuitBreakerOperator.of(circuitBreaker))
.onErrorResume(Exception.class, error -> {
log.warn("추천 서비스 호출 실패, 기본 추천 반환: userId={}", userId);
return getDefaultRecommendations(userId);
});
}
}
스트리밍 API 구현
@RestController
public class StreamingController {
private final EventService eventService;
// Server-Sent Events (SSE)
@GetMapping(value = "/api/events/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<EventData>> streamServerSentEvents() {
return eventService.getEventStream()
.map(event -> ServerSentEvent.<EventData>builder()
.id(String.valueOf(event.getId()))
.event(event.getType())
.data(event)
.retry(Duration.ofSeconds(1))
.build())
.doOnCancel(() -> log.info("클라이언트가 스트림 연결을 종료했습니다"))
.doOnError(error -> log.error("스트림 오류 발생", error));
}
// WebSocket 연결
@MessageMapping("/websocket/notifications")
public Flux<NotificationMessage> handleWebSocketNotifications(Flux<String> userIds) {
return userIds
.flatMap(userId -> notificationService.getNotificationsForUser(userId))
.delayElements(Duration.ofMillis(100));
}
// 청크 스트리밍 다운로드
@GetMapping("/api/files/{fileId}/download")
public Mono<ResponseEntity<Flux<DataBuffer>>> downloadFile(@PathVariable String fileId) {
return fileService.getFileInfo(fileId)
.map(fileInfo -> {
Flux<DataBuffer> fileStream = fileService.readFileAsStream(fileId)
.onErrorResume(error -> {
log.error("파일 읽기 오류: fileId={}", fileId, error);
return Flux.empty();
});
return ResponseEntity.ok()
.header(HttpHeaders.CONTENT_DISPOSITION,
"attachment; filename=\"" + fileInfo.getFileName() + "\"")
.contentType(MediaType.APPLICATION_OCTET_STREAM)
.body(fileStream);
});
}
}
모니터링 및 디버깅 전략
WebFlux 애플리케이션의 성능과 안정성을 보장하기 위한 모니터링 및 디버깅 방법입니다.
액추에터와 메트릭스 설정
@Configuration
public class MonitoringConfig {
@Bean
public MeterRegistry meterRegistry() {
return new PrometheusMeterRegistry(PrometheusConfig.DEFAULT);
}
@Bean
public TimedAspect timedAspect(MeterRegistry registry) {
return new TimedAspect(registry);
}
@Component
public static class WebFluxMetrics {
private final Counter requestCounter;
private final Timer responseTimer;
private final Gauge activeConnections;
public WebFluxMetrics(MeterRegistry meterRegistry) {
this.requestCounter = Counter.builder("webflux.requests.total")
.description("WebFlux 요청 총 개수")
.register(meterRegistry);
this.responseTimer = Timer.builder("webflux.response.time")
.description("WebFlux 응답 시간")
.register(meterRegistry);
this.activeConnections = Gauge.builder("webflux.connections.active")
.description("활성 연결 수")
.register(meterRegistry, this, WebFluxMetrics::getActiveConnections);
}
public void incrementRequestCount() {
requestCounter.increment();
}
public Timer.Sample startTimer() {
return Timer.start();
}
public void recordResponseTime(Timer.Sample sample) {
sample.stop(responseTimer);
}
private double getActiveConnections() {
// 실제 활성 연결 수 계산 로직
return Runtime.getRuntime().availableProcessors();
}
}
}
리액티브 디버깅 기법
@Service
public class DebuggingService {
public Mono<ProcessResult> processWithLogging(InputData input) {
return Mono.just(input)
.doOnSubscribe(subscription -> log.debug("구독 시작: {}", input.getId()))
.map(this::validateInput)
.doOnNext(validated -> log.debug("검증 완료: {}", validated))
.flatMap(this::enrichData)
.doOnNext(enriched -> log.debug("데이터 강화 완료: {}", enriched))
.flatMap(this::processData)
.doOnSuccess(result -> log.info("처리 성공: {}", result))
.doOnError(error -> log.error("처리 실패: input={}", input, error))
.doFinally(signalType -> log.debug("처리 완료: signal={}", signalType))
.checkpoint("데이터 처리 체크포인트"); // 스택 트레이스에 표시
}
public Flux<String> debugComplexFlow(Flux<String> input) {
return input
.log("input") // 모든 신호를 로그로 출력
.filter(item -> item.length() > 5)
.log("filtered")
.map(String::toUpperCase)
.log("mapped")
.onErrorContinue((error, item) -> {
log.warn("항목 처리 중 오류 발생: item={}", item, error);
});
}
// 성능 프로파일링
@Timed(value = "service.process.time", description = "처리 시간 측정")
public Mono<String> profiledProcess(String input) {
return Mono.fromCallable(() -> {
// CPU 집약적 작업 시뮬레이션
Thread.sleep(100);
return input.toUpperCase();
})
.subscribeOn(Schedulers.boundedElastic())
.doOnNext(result -> {
// 메모리 사용량 체크
Runtime runtime = Runtime.getRuntime();
long usedMemory = runtime.totalMemory() - runtime.freeMemory();
log.debug("메모리 사용량: {}MB", usedMemory / 1024 / 1024);
});
}
}
헬스 체크 및 리액티브 액추에이터
@Component
public class ReactiveHealthIndicator implements ReactiveHealthIndicator {
private final ProductRepository productRepository;
private final RedisTemplate<String, Object> redisTemplate;
@Override
public Mono<Health> health() {
return Mono.zip(
checkDatabase(),
checkRedis(),
checkExternalApi()
)
.map(tuple -> {
boolean allHealthy = tuple.getT1() && tuple.getT2() && tuple.getT3();
return allHealthy
? Health.up()
.withDetail("database", "UP")
.withDetail("redis", "UP")
.withDetail("external-api", "UP")
.build()
: Health.down()
.withDetail("database", tuple.getT1() ? "UP" : "DOWN")
.withDetail("redis", tuple.getT2() ? "UP" : "DOWN")
.withDetail("external-api", tuple.getT3() ? "UP" : "DOWN")
.build();
})
.onErrorReturn(Health.down().withDetail("error", "헬스 체크 실패").build());
}
private Mono<Boolean> checkDatabase() {
return productRepository.count()
.map(count -> count >= 0)
.onErrorReturn(false)
.timeout(Duration.ofSeconds(5));
}
private Mono<Boolean> checkRedis() {
return Mono.fromCallable(() -> {
redisTemplate.opsForValue().set("health:check", "ok", Duration.ofSeconds(10));
return true;
})
.onErrorReturn(false)
.timeout(Duration.ofSeconds(3));
}
private Mono<Boolean> checkExternalApi() {
return webClient.get()
.uri("/health")
.retrieve()
.toBodilessEntity()
.map(response -> response.getStatusCode().is2xxSuccessful())
.onErrorReturn(false)
.timeout(Duration.ofSeconds(5));
}
}
Spring MVC vs WebFlux 비교 및 선택 가이드
언제 WebFlux를 선택해야 하는지, 그리고 Spring MVC와의 차이점을 명확히 이해해보겠습니다.
성능 비교
특성 | Spring MVC | Spring WebFlux |
---|---|---|
동시성 모델 | 스레드 풀 기반 | 이벤트 루프 기반 |
메모리 사용량 | 높음 (스레드당 1-2MB) | 낮음 |
응답성 | 블로킹 I/O로 인한 지연 | 논블로킹으로 빠른 응답 |
처리량 | 스레드 수에 제한 | 높은 동시 처리 가능 |
WebFlux 선택 기준
WebFlux를 선택해야 하는 경우:
- 높은 동시성이 필요한 애플리케이션
- I/O 집약적인 작업이 많은 경우
- 실시간 스트리밍이나 SSE가 필요한 경우
- 마이크로서비스 아키텍처에서 서비스 간 비동기 통신
- 외부 API 호출이 빈번한 경우
Spring MVC를 선택해야 하는 경우:
- 기존 서블릿 기반 라이브러리 의존성이 높은 경우
- 팀의 리액티브 프로그래밍 경험이 부족한 경우
- CRUD 위주의 단순한 애플리케이션
- 동기적 처리가 더 적합한 비즈니스 로직
마이그레이션 전략
// 점진적 마이그레이션 예시
@RestController
public class HybridController {
// 기존 Spring MVC 스타일 (유지)
@GetMapping("/legacy/users/{id}")
public ResponseEntity<User> getLegacyUser(@PathVariable String id) {
User user = userService.findById(id);
return ResponseEntity.ok(user);
}
// 새로운 WebFlux 스타일 (신규 개발)
@GetMapping("/reactive/users/{id}")
public Mono<ResponseEntity<User>> getReactiveUser(@PathVariable String id) {
return userService.findByIdReactive(id)
.map(ResponseEntity::ok)
.defaultIfEmpty(ResponseEntity.notFound().build());
}
// 어댑터 패턴을 통한 기존 서비스 활용
@GetMapping("/users/{id}/orders")
public Flux<Order> getUserOrders(@PathVariable String id) {
// 기존 블로킹 서비스를 리액티브로 래핑
return Mono.fromCallable(() -> orderService.findByUserId(id))
.subscribeOn(Schedulers.boundedElastic())
.flatMapMany(Flux::fromIterable);
}
}
실무 팁 및 베스트 프랙티스
WebFlux를 성공적으로 도입하기 위한 실무 노하우를 공유합니다.
코드 품질 향상
// 좋은 예: 명확한 에러 처리와 로깅
public class BestPracticeService {
public Mono<OrderResult> processOrder(OrderRequest request) {
return validateOrderRequest(request)
.flatMap(this::calculateOrderAmount)
.flatMap(this::processPayment)
.flatMap(this::updateInventory)
.flatMap(this::createOrderRecord)
.doOnSuccess(result -> {
log.info("주문 처리 성공: orderId={}, amount={}",
result.getOrderId(), result.getAmount());
publishOrderEvent(result);
})
.onErrorResume(ValidationException.class, error -> {
log.warn("주문 검증 실패: {}", error.getMessage());
return Mono.just(OrderResult.failed("검증 오류: " + error.getMessage()));
})
.onErrorResume(PaymentException.class, error -> {
log.error("결제 처리 실패", error);
return Mono.just(OrderResult.failed("결제 오류"));
})
.onErrorResume(error -> {
log.error("예상치 못한 오류 발생", error);
return Mono.just(OrderResult.failed("시스템 오류"));
});
}
// 나쁜 예: 에러 처리 없는 체이닝
public Mono<String> badExample(String input) {
return Mono.just(input)
.map(String::toUpperCase)
.flatMap(this::someExternalCall)
.map(result -> result.getValue()); // NPE 가능성
}
}
테스트 작성 가이드
@ExtendWith(SpringExtension.class)
@WebFluxTest(ProductController.class)
class ProductControllerTest {
@Autowired
private WebTestClient webTestClient;
@MockBean
private ProductService productService;
@Test
void shouldReturnProductWhenExists() {
// Given
Product product = new Product("1", "테스트 상품", BigDecimal.valueOf(10000));
when(productService.findById("1")).thenReturn(Mono.just(product));
// When & Then
webTestClient.get()
.uri("/api/products/1")
.exchange()
.expectStatus().isOk()
.expectBody(Product.class)
.value(p -> {
assertThat(p.getName()).isEqualTo("테스트 상품");
assertThat(p.getPrice()).isEqualTo(BigDecimal.valueOf(10000));
});
}
@Test
void shouldReturnNotFoundWhenProductNotExists() {
// Given
when(productService.findById("999")).thenReturn(Mono.empty());
// When & Then
webTestClient.get()
.uri("/api/products/999")
.exchange()
.expectStatus().isNotFound();
}
@Test
void shouldHandleStreamingResponse() {
// Given
Flux<Product> productStream = Flux.just(
new Product("1", "상품1", BigDecimal.valueOf(1000)),
new Product("2", "상품2", BigDecimal.valueOf(2000))
);
when(productService.getAllProducts()).thenReturn(productStream);
// When & Then
webTestClient.get()
.uri("/api/products/stream")
.accept(MediaType.TEXT_EVENT_STREAM)
.exchange()
.expectStatus().isOk()
.expectHeader().contentType(MediaType.TEXT_EVENT_STREAM)
.expectBodyList(Product.class)
.hasSize(2);
}
}
// StepVerifier를 이용한 리액티브 스트림 테스트
@Test
void shouldProcessDataStream() {
Flux<String> dataStream = Flux.just("a", "b", "c")
.map(String::toUpperCase)
.delayElements(Duration.ofMillis(100));
StepVerifier.create(dataStream)
.expectNext("A")
.expectNext("B")
.expectNext("C")
.verifyComplete();
}
결론: WebFlux로 미래를 준비하세요
Spring WebFlux는 현대적인 웹 애플리케이션 개발의 새로운 패러다임을 제시합니다.
리액티브 프로그래밍을 통해 더 효율적이고 확장 가능한 시스템을 구축할 수 있습니다.
핵심 포인트 요약
성능 이점: 논블로킹 I/O를 통한 높은 동시성과 효율적인 리소스 사용
확장성: 적은 스레드로 더 많은 요청 처리 가능
현대적 접근: 함수형 프로그래밍과 리액티브 패턴의 조화
실시간 처리: 스트리밍과 실시간 데이터 처리에 최적화
시작하기 위한 단계
- 학습: 리액티브 프로그래밍 개념과 Project Reactor 이해
- 실습: 간단한 CRUD API부터 시작하여 점진적 복잡성 증가
- 적용: 기존 프로젝트의 일부분을 WebFlux로 리팩터링
- 최적화: 성능 모니터링과 튜닝을 통한 지속적 개선
WebFlux는 단순한 기술 선택이 아닌, 미래 지향적 개발자로 성장하기 위한 필수 역량입니다.
대용량 트래픽과 실시간 처리가 일상이 된 현재, WebFlux 마스터리는 여러분의 개발 커리어에 새로운 차원을 열어줄 것입니다.
관련 참조 링크:
'Spring & Spring Boot 실무 가이드' 카테고리의 다른 글
Spring Modulith로 모놀리식을 모듈화하기: 스프링 모듈리스 아키텍처 완벽 가이드 (0) | 2025.06.20 |
---|---|
Spring Boot 테스트 컨테이너 실전 가이드 - Docker 없이 통합 테스트 자동화 (0) | 2025.06.18 |
Event Sourcing과 CQRS 패턴 심화 구현 - Spring Boot로 고급 이벤트 드리븐 아키텍처 구축 (0) | 2025.06.07 |
Event Sourcing과 CQRS 패턴 입문 - Axon Framework로 시작하는 이벤트 드리븐 개발 (0) | 2025.06.06 |
Spring Boot 3.0 Native Image 완벽 가이드 - GraalVM으로 초고속 애플리케이션 만들기 (0) | 2025.06.04 |