Spring WebFlux
Mono, Flux, non-blocking IO, reactive streams, backpressure
Spring WebFlux
Spring WebFlux is Spring's reactive web framework built on non-blocking I/O, implementing the Reactive Streams specification via Project Reactor.
1. Definition
Spring WebFlux is the reactive web module introduced in Spring Framework 5, built on the Reactive Streams specification (Publisher, Subscriber, Subscription, Processor). The implementation is provided by Project Reactor with two core types: Mono (0 or 1 element) and Flux (0-N elements).
WebFlux uses a non-blocking I/O model: a small number of event loop threads handle all requests, and no thread is blocked while waiting. This enables handling more concurrent connections with fewer resources.
Client → Netty Event Loop → Handler → Reactive Pipeline
→ Non-blocking I/O → Response (back-pressure aware)
WebFlux supports two programming models:
- Annotation-based:
@RestController+Mono/Flux(familiar Spring MVC style) - Functional:
RouterFunction+HandlerFunction(lambda-based routing)
2. Core Concepts
Mono and Flux
| Type | Element count | Analogy |
|---|---|---|
| Mono<T> | 0 or 1 | Optional<T> / CompletableFuture<T> |
| Flux<T> | 0-N | Stream<T> / List<T> |
// Mono: single element or empty
Mono<User> user = userRepository.findById(id);
// Flux: multiple elements as a stream
Flux<User> users = userRepository.findAll();
// Mono creation
Mono.just("Hello"); // One element
Mono.empty(); // Empty
Mono.error(new Exception()); // Error
// Flux creation
Flux.just("A", "B", "C"); // Fixed elements
Flux.fromIterable(list); // From collection
Flux.interval(Duration.ofSeconds(1)); // Infinite timed
Backpressure
Backpressure is the core mechanism of Reactive Streams: the Subscriber signals the Publisher how many elements it can process. This prevents memory overflow when a fast producer overwhelms a slow consumer.
Flux.range(1, 1_000_000)
.onBackpressureBuffer(256) // Buffer up to 256 elements
.onBackpressureDrop() // Drop excess
.onBackpressureLatest() // Keep only the latest
.subscribe(item -> process(item));
Non-blocking I/O
Unlike Spring MVC's thread-per-request model, WebFlux uses an event loop model:
- Netty: default server (not servlet-based)
- Event loop threads: typically CPU cores × 2
- Never block:
Thread.sleep(), JDBC,synchronized→ forbidden! - Blocking calls → must be wrapped in
Schedulers.boundedElastic()
3. Practical Usage
Annotation-based controller
@RestController
@RequestMapping("/api/users")
public class UserController {
private final UserRepository userRepository;
public UserController(UserRepository userRepository) {
this.userRepository = userRepository;
}
@GetMapping
public Flux<UserDto> findAll() {
return userRepository.findAll()
.map(UserDto::from);
}
@GetMapping("/{id}")
public Mono<ResponseEntity<UserDto>> findById(@PathVariable String id) {
return userRepository.findById(id)
.map(user -> ResponseEntity.ok(UserDto.from(user)))
.defaultIfEmpty(ResponseEntity.notFound().build());
}
@PostMapping
@ResponseStatus(HttpStatus.CREATED)
public Mono<UserDto> create(@Valid @RequestBody Mono<CreateUserRequest> request) {
return request
.map(req -> new User(req.name(), req.email()))
.flatMap(userRepository::save)
.map(UserDto::from);
}
@DeleteMapping("/{id}")
@ResponseStatus(HttpStatus.NO_CONTENT)
public Mono<Void> delete(@PathVariable String id) {
return userRepository.deleteById(id);
}
}
Functional routing (RouterFunction)
@Configuration
public class UserRouter {
@Bean
public RouterFunction<ServerResponse> userRoutes(UserHandler handler) {
return RouterFunctions.route()
.GET("/api/users", handler::findAll)
.GET("/api/users/{id}", handler::findById)
.POST("/api/users", handler::create)
.DELETE("/api/users/{id}", handler::delete)
.build();
}
}
@Component
public class UserHandler {
private final UserRepository userRepository;
public UserHandler(UserRepository userRepository) {
this.userRepository = userRepository;
}
public Mono<ServerResponse> findAll(ServerRequest request) {
return ServerResponse.ok()
.body(userRepository.findAll(), User.class);
}
public Mono<ServerResponse> findById(ServerRequest request) {
String id = request.pathVariable("id");
return userRepository.findById(id)
.flatMap(user -> ServerResponse.ok().bodyValue(user))
.switchIfEmpty(ServerResponse.notFound().build());
}
public Mono<ServerResponse> create(ServerRequest request) {
return request.bodyToMono(CreateUserRequest.class)
.map(req -> new User(req.name(), req.email()))
.flatMap(userRepository::save)
.flatMap(user -> ServerResponse
.created(URI.create("/api/users/" + user.getId()))
.bodyValue(user));
}
public Mono<ServerResponse> delete(ServerRequest request) {
String id = request.pathVariable("id");
return userRepository.deleteById(id)
.then(ServerResponse.noContent().build());
}
}
WebClient (reactive HTTP client)
@Service
public class ExternalApiService {
private final WebClient webClient;
public ExternalApiService(WebClient.Builder builder) {
this.webClient = builder
.baseUrl("https://api.example.com")
.defaultHeader(HttpHeaders.CONTENT_TYPE,
MediaType.APPLICATION_JSON_VALUE)
.build();
}
public Mono<ProductDto> getProduct(String id) {
return webClient.get()
.uri("/products/{id}", id)
.retrieve()
.onStatus(HttpStatusCode::is4xxClientError,
resp -> Mono.error(new NotFoundException("Not found")))
.bodyToMono(ProductDto.class)
.timeout(Duration.ofSeconds(5))
.retryWhen(Retry.backoff(3, Duration.ofMillis(500)));
}
public Flux<ProductDto> getAllProducts() {
return webClient.get()
.uri("/products")
.retrieve()
.bodyToFlux(ProductDto.class);
}
}
4. Code Examples
Operator chaining
public Mono<OrderSummary> processOrder(String orderId) {
return orderRepository.findById(orderId) // Mono<Order>
.switchIfEmpty(Mono.error(
new NotFoundException("Order not found")))
.flatMap(order -> {
Mono<User> user = userService.findById(order.getUserId());
Mono<List<Product>> products =
productService.findByIds(order.getProductIds())
.collectList();
return Mono.zip(Mono.just(order), user, products);
})
.map(tuple -> new OrderSummary(
tuple.getT1(), tuple.getT2(), tuple.getT3()))
.doOnSuccess(s -> log.info("Order processed: {}", s.id()))
.doOnError(e -> log.error("Order failed", e));
}
Server-Sent Events (SSE)
@GetMapping(value = "/stream",
produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<StockPrice>> streamPrices() {
return Flux.interval(Duration.ofSeconds(1))
.map(seq -> ServerSentEvent.<StockPrice>builder()
.id(String.valueOf(seq))
.event("price-update")
.data(stockService.getCurrentPrice())
.build());
}
Wrapping blocking calls
// BAD: blocking JDBC call on event loop thread
public Mono<User> findUser(Long id) {
User user = jdbcTemplate.queryForObject(...); // BLOCKING!
return Mono.just(user);
}
// GOOD: delegate to Schedulers.boundedElastic()
public Mono<User> findUser(Long id) {
return Mono.fromCallable(() -> jdbcTemplate.queryForObject(...))
.subscribeOn(Schedulers.boundedElastic());
}
Error handling with operators
public Mono<UserDto> findUser(String id) {
return userRepository.findById(id)
.switchIfEmpty(Mono.error(
new NotFoundException("User not found: " + id)))
.onErrorResume(TimeoutException.class,
e -> Mono.just(UserDto.fallback()))
.onErrorMap(DatabaseException.class,
e -> new ServiceException("DB error", e))
.doOnError(e -> log.error("Error finding user", e));
}
5. Trade-offs
WebFlux vs Spring MVC
| Aspect | WebFlux | Spring MVC |
|---|---|---|
| I/O model | Non-blocking (event loop) | Blocking (thread-per-request) |
| Server | Netty (default) / Undertow | Tomcat (default) / Jetty |
| Database | R2DBC, MongoDB Reactive | JDBC, JPA/Hibernate |
| HTTP client | WebClient | RestTemplate / RestClient |
| Threads | Few (CPU × 2) | Many (~200 default) |
| Scaling | Horizontal, less RAM | Vertical, more threads |
| Debugging | Hard (reactive chain) | Simple (stack trace) |
| Learning curve | High | Low |
| Testing | StepVerifier | MockMvc |
When to use WebFlux
- High concurrency (10,000+ simultaneous connections)
- Streaming (SSE, WebSocket, Kafka)
- Microservice gateway / API gateway
- Non-blocking database (MongoDB, R2DBC)
When NOT to use WebFlux
- CRUD application with traditional JDBC/JPA
- Team unfamiliar with reactive programming
- Dominated by blocking third-party libraries
- Simple REST API with few concurrent requests
6. Common Mistakes
❌ Blocking call on event loop thread
// BAD: JDBC blocks the event loop
@GetMapping("/users/{id}")
public Mono<User> getUser(@PathVariable Long id) {
User user = jdbcTemplate.queryForObject(...); // FORBIDDEN!
return Mono.just(user);
}
// GOOD: delegate to boundedElastic
public Mono<User> getUser(Long id) {
return Mono.fromCallable(() -> jdbcTemplate.queryForObject(...))
.subscribeOn(Schedulers.boundedElastic());
}
❌ Calling subscribe() in the controller
// BAD: manual subscribe → no backpressure, no lifecycle
@PostMapping
public void create(@RequestBody CreateUserRequest req) {
userService.create(req).subscribe(); // DON'T!
}
// GOOD: return the Mono to the framework
@PostMapping
public Mono<UserDto> create(@RequestBody CreateUserRequest req) {
return userService.create(req);
}
❌ Using Mono.block()
// BAD: block() blocks the thread
User user = userRepository.findById(id).block(); // FORBIDDEN in reactive!
// GOOD: chain with flatMap
return userRepository.findById(id)
.flatMap(user -> processUser(user));
❌ Incorrect error handling (try-catch)
// BAD: imperative try-catch does not work in reactive chains
try {
return userService.findById(id);
} catch (Exception e) {
return Mono.empty(); // Never called!
}
// GOOD: onErrorResume operator
return userService.findById(id)
.onErrorResume(e -> Mono.empty());
7. Deep Dive
Event loop model (Netty)
The Netty event loop cycle:
- Accept: accept new connection
- Read: read data (non-blocking)
- Decode: process incoming data
- Process: invoke handler (pipeline)
- Encode: serialize outgoing data
- Write: send data (non-blocking)
- Flush: flush buffer
Event loop thread count is typically: Runtime.getRuntime().availableProcessors() × 2.
Critical rule: never block any event loop thread, because it blocks all connections assigned to that thread.
Scheduler types
| Scheduler | Use case |
|---|---|
Schedulers.immediate() |
Current thread (default) |
Schedulers.single() |
Single reused thread |
Schedulers.parallel() |
CPU-intensive work (CPU core count threads) |
Schedulers.boundedElastic() |
Blocking I/O wrapping (bounded pool) |
Schedulers.fromExecutor() |
Custom ExecutorService |
Hot vs Cold publisher
// Cold: restarts for every subscriber
Flux<Integer> cold = Flux.range(1, 3);
cold.subscribe(i -> log.info("Sub1: {}", i)); // 1, 2, 3
cold.subscribe(i -> log.info("Sub2: {}", i)); // 1, 2, 3
// Hot: real-time, shared
Sinks.Many<String> sink = Sinks.many().multicast().onBackpressureBuffer();
Flux<String> hot = sink.asFlux();
hot.subscribe(s -> log.info("Sub1: {}", s));
sink.tryEmitNext("A"); // Sub1: A
hot.subscribe(s -> log.info("Sub2: {}", s));
sink.tryEmitNext("B"); // Sub1: B, Sub2: B
Testing with StepVerifier
@Test
void shouldReturnAllUsers() {
Flux<User> users = userRepository.findAll();
StepVerifier.create(users)
.expectNextCount(3)
.verifyComplete();
}
@Test
void shouldHandleError() {
Mono<User> error = userService.findById("invalid");
StepVerifier.create(error)
.expectError(NotFoundException.class)
.verify();
}
8. Interview Questions
What is the difference between Mono and Flux? Mono: 0 or 1 element (single value or empty). Flux: 0-N elements (stream). Both are Publisher implementations (Reactive Streams).
What is backpressure and why is it important? The Subscriber signals the Publisher how many elements it can process. It prevents memory overflow when a fast producer overwhelms a slow consumer.
Why is it forbidden to make blocking calls on an event loop thread? The event loop thread handles all connections assigned to it. If it blocks, all those connections stall. Blocking calls →
Schedulers.boundedElastic().What is the difference between map() and flatMap()?
map(): synchronous transformation (T → R).flatMap(): asynchronous transformation (T → Mono<R>/Flux<R>). Use flatMap when the transformation itself is reactive.What is the difference between hot and cold Publishers? Cold: restarts for each subscriber (e.g., DB query). Hot: real-time, shared (e.g., WebSocket, SSE).
How do you test reactive code?
StepVerifier.create(publisher).expectNext(...).verifyComplete(). StepVerifier validates elements, errors, and completion.When should you use WebFlux instead of Spring MVC? High concurrency (10k+ connections), streaming, non-blocking DB (R2DBC), microservice gateway. NOT: traditional JDBC/JPA, blocking libraries.
9. Glossary
| Term | Meaning |
|---|---|
| Mono<T> | Reactive publisher containing 0 or 1 element |
| Flux<T> | Reactive publisher containing 0-N elements |
| Backpressure | Subscriber signals processing capacity to Publisher |
| Event loop | Non-blocking I/O thread, multiplexing connections |
| Reactive Streams | Publisher/Subscriber specification (java.util.concurrent.Flow) |
| Project Reactor | Reactive Streams implementation (Mono, Flux) |
| WebClient | Reactive HTTP client (replaces RestTemplate) |
| R2DBC | Reactive Relational Database Connectivity |
| StepVerifier | Reactive stream testing tool |
| RouterFunction | Functional routing (lambda-based) |
| Schedulers | Thread pool management in reactive context |
| SSE | Server-Sent Events (unidirectional server → client stream) |
10. Cheatsheet
TYPES:
Mono<T> 0 or 1 element
Flux<T> 0-N elements
Mono.just(x) One element
Mono.empty() Empty
Flux.fromIterable(list) From collection
OPERATORS:
.map(T -> R) Synchronous transformation
.flatMap(T -> Mono<R>) Asynchronous transformation
.filter(predicate) Filtering
.switchIfEmpty(alt) Fallback if empty
.zip(mono1, mono2) Parallel wait
.collectList() Flux → Mono<List>
.doOnNext/Error/Success Side effect (logging)
ERROR HANDLING:
.onErrorResume(e -> alt) Fallback publisher
.onErrorMap(e -> newE) Exception transformation
.retry(3) Retry
.timeout(Duration) Timeout
SCHEDULER:
Schedulers.parallel() CPU-intensive
Schedulers.boundedElastic() Blocking I/O wrapping
.subscribeOn(scheduler) Subscription thread
TESTING:
StepVerifier.create(pub)
.expectNext(x)
.expectError(E.class)
.verifyComplete()
🎮 Games
10 questions