Advanced Reading time: ~9 min

Spring WebFlux

Mono, Flux, non-blocking IO, reactive streams, backpressure

Spring WebFlux

Spring WebFlux is Spring's reactive web framework built on non-blocking I/O, implementing the Reactive Streams specification via Project Reactor.


1. Definition

Spring WebFlux is the reactive web module introduced in Spring Framework 5, built on the Reactive Streams specification (Publisher, Subscriber, Subscription, Processor). The implementation is provided by Project Reactor with two core types: Mono (0 or 1 element) and Flux (0-N elements).

WebFlux uses a non-blocking I/O model: a small number of event loop threads handle all requests, and no thread is blocked while waiting. This enables handling more concurrent connections with fewer resources.

Client → Netty Event Loop → Handler → Reactive Pipeline
    → Non-blocking I/O → Response (back-pressure aware)

WebFlux supports two programming models:

  • Annotation-based: @RestController + Mono/Flux (familiar Spring MVC style)
  • Functional: RouterFunction + HandlerFunction (lambda-based routing)

2. Core Concepts

Mono and Flux

Type Element count Analogy
Mono<T> 0 or 1 Optional<T> / CompletableFuture<T>
Flux<T> 0-N Stream<T> / List<T>
// Mono: single element or empty
Mono<User> user = userRepository.findById(id);

// Flux: multiple elements as a stream
Flux<User> users = userRepository.findAll();

// Mono creation
Mono.just("Hello");          // One element
Mono.empty();                // Empty
Mono.error(new Exception()); // Error

// Flux creation
Flux.just("A", "B", "C");           // Fixed elements
Flux.fromIterable(list);            // From collection
Flux.interval(Duration.ofSeconds(1)); // Infinite timed

Backpressure

Backpressure is the core mechanism of Reactive Streams: the Subscriber signals the Publisher how many elements it can process. This prevents memory overflow when a fast producer overwhelms a slow consumer.

Flux.range(1, 1_000_000)
    .onBackpressureBuffer(256)    // Buffer up to 256 elements
    .onBackpressureDrop()         // Drop excess
    .onBackpressureLatest()       // Keep only the latest
    .subscribe(item -> process(item));

Non-blocking I/O

Unlike Spring MVC's thread-per-request model, WebFlux uses an event loop model:

  • Netty: default server (not servlet-based)
  • Event loop threads: typically CPU cores × 2
  • Never block: Thread.sleep(), JDBC, synchronized → forbidden!
  • Blocking calls → must be wrapped in Schedulers.boundedElastic()

3. Practical Usage

Annotation-based controller

@RestController
@RequestMapping("/api/users")
public class UserController {

    private final UserRepository userRepository;

    public UserController(UserRepository userRepository) {
        this.userRepository = userRepository;
    }

    @GetMapping
    public Flux<UserDto> findAll() {
        return userRepository.findAll()
                .map(UserDto::from);
    }

    @GetMapping("/{id}")
    public Mono<ResponseEntity<UserDto>> findById(@PathVariable String id) {
        return userRepository.findById(id)
                .map(user -> ResponseEntity.ok(UserDto.from(user)))
                .defaultIfEmpty(ResponseEntity.notFound().build());
    }

    @PostMapping
    @ResponseStatus(HttpStatus.CREATED)
    public Mono<UserDto> create(@Valid @RequestBody Mono<CreateUserRequest> request) {
        return request
                .map(req -> new User(req.name(), req.email()))
                .flatMap(userRepository::save)
                .map(UserDto::from);
    }

    @DeleteMapping("/{id}")
    @ResponseStatus(HttpStatus.NO_CONTENT)
    public Mono<Void> delete(@PathVariable String id) {
        return userRepository.deleteById(id);
    }
}

Functional routing (RouterFunction)

@Configuration
public class UserRouter {

    @Bean
    public RouterFunction<ServerResponse> userRoutes(UserHandler handler) {
        return RouterFunctions.route()
                .GET("/api/users", handler::findAll)
                .GET("/api/users/{id}", handler::findById)
                .POST("/api/users", handler::create)
                .DELETE("/api/users/{id}", handler::delete)
                .build();
    }
}

@Component
public class UserHandler {

    private final UserRepository userRepository;

    public UserHandler(UserRepository userRepository) {
        this.userRepository = userRepository;
    }

    public Mono<ServerResponse> findAll(ServerRequest request) {
        return ServerResponse.ok()
                .body(userRepository.findAll(), User.class);
    }

    public Mono<ServerResponse> findById(ServerRequest request) {
        String id = request.pathVariable("id");
        return userRepository.findById(id)
                .flatMap(user -> ServerResponse.ok().bodyValue(user))
                .switchIfEmpty(ServerResponse.notFound().build());
    }

    public Mono<ServerResponse> create(ServerRequest request) {
        return request.bodyToMono(CreateUserRequest.class)
                .map(req -> new User(req.name(), req.email()))
                .flatMap(userRepository::save)
                .flatMap(user -> ServerResponse
                        .created(URI.create("/api/users/" + user.getId()))
                        .bodyValue(user));
    }

    public Mono<ServerResponse> delete(ServerRequest request) {
        String id = request.pathVariable("id");
        return userRepository.deleteById(id)
                .then(ServerResponse.noContent().build());
    }
}

WebClient (reactive HTTP client)

@Service
public class ExternalApiService {

    private final WebClient webClient;

    public ExternalApiService(WebClient.Builder builder) {
        this.webClient = builder
                .baseUrl("https://api.example.com")
                .defaultHeader(HttpHeaders.CONTENT_TYPE,
                        MediaType.APPLICATION_JSON_VALUE)
                .build();
    }

    public Mono<ProductDto> getProduct(String id) {
        return webClient.get()
                .uri("/products/{id}", id)
                .retrieve()
                .onStatus(HttpStatusCode::is4xxClientError,
                        resp -> Mono.error(new NotFoundException("Not found")))
                .bodyToMono(ProductDto.class)
                .timeout(Duration.ofSeconds(5))
                .retryWhen(Retry.backoff(3, Duration.ofMillis(500)));
    }

    public Flux<ProductDto> getAllProducts() {
        return webClient.get()
                .uri("/products")
                .retrieve()
                .bodyToFlux(ProductDto.class);
    }
}

4. Code Examples

Operator chaining

public Mono<OrderSummary> processOrder(String orderId) {
    return orderRepository.findById(orderId)           // Mono<Order>
            .switchIfEmpty(Mono.error(
                    new NotFoundException("Order not found")))
            .flatMap(order -> {
                Mono<User> user = userService.findById(order.getUserId());
                Mono<List<Product>> products =
                        productService.findByIds(order.getProductIds())
                                .collectList();
                return Mono.zip(Mono.just(order), user, products);
            })
            .map(tuple -> new OrderSummary(
                    tuple.getT1(), tuple.getT2(), tuple.getT3()))
            .doOnSuccess(s -> log.info("Order processed: {}", s.id()))
            .doOnError(e -> log.error("Order failed", e));
}

Server-Sent Events (SSE)

@GetMapping(value = "/stream",
        produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<StockPrice>> streamPrices() {
    return Flux.interval(Duration.ofSeconds(1))
            .map(seq -> ServerSentEvent.<StockPrice>builder()
                    .id(String.valueOf(seq))
                    .event("price-update")
                    .data(stockService.getCurrentPrice())
                    .build());
}

Wrapping blocking calls

// BAD: blocking JDBC call on event loop thread
public Mono<User> findUser(Long id) {
    User user = jdbcTemplate.queryForObject(...); // BLOCKING!
    return Mono.just(user);
}

// GOOD: delegate to Schedulers.boundedElastic()
public Mono<User> findUser(Long id) {
    return Mono.fromCallable(() -> jdbcTemplate.queryForObject(...))
            .subscribeOn(Schedulers.boundedElastic());
}

Error handling with operators

public Mono<UserDto> findUser(String id) {
    return userRepository.findById(id)
            .switchIfEmpty(Mono.error(
                    new NotFoundException("User not found: " + id)))
            .onErrorResume(TimeoutException.class,
                    e -> Mono.just(UserDto.fallback()))
            .onErrorMap(DatabaseException.class,
                    e -> new ServiceException("DB error", e))
            .doOnError(e -> log.error("Error finding user", e));
}

5. Trade-offs

WebFlux vs Spring MVC

Aspect WebFlux Spring MVC
I/O model Non-blocking (event loop) Blocking (thread-per-request)
Server Netty (default) / Undertow Tomcat (default) / Jetty
Database R2DBC, MongoDB Reactive JDBC, JPA/Hibernate
HTTP client WebClient RestTemplate / RestClient
Threads Few (CPU × 2) Many (~200 default)
Scaling Horizontal, less RAM Vertical, more threads
Debugging Hard (reactive chain) Simple (stack trace)
Learning curve High Low
Testing StepVerifier MockMvc

When to use WebFlux

  • High concurrency (10,000+ simultaneous connections)
  • Streaming (SSE, WebSocket, Kafka)
  • Microservice gateway / API gateway
  • Non-blocking database (MongoDB, R2DBC)

When NOT to use WebFlux

  • CRUD application with traditional JDBC/JPA
  • Team unfamiliar with reactive programming
  • Dominated by blocking third-party libraries
  • Simple REST API with few concurrent requests

6. Common Mistakes

❌ Blocking call on event loop thread

// BAD: JDBC blocks the event loop
@GetMapping("/users/{id}")
public Mono<User> getUser(@PathVariable Long id) {
    User user = jdbcTemplate.queryForObject(...); // FORBIDDEN!
    return Mono.just(user);
}

// GOOD: delegate to boundedElastic
public Mono<User> getUser(Long id) {
    return Mono.fromCallable(() -> jdbcTemplate.queryForObject(...))
            .subscribeOn(Schedulers.boundedElastic());
}

❌ Calling subscribe() in the controller

// BAD: manual subscribe → no backpressure, no lifecycle
@PostMapping
public void create(@RequestBody CreateUserRequest req) {
    userService.create(req).subscribe(); // DON'T!
}

// GOOD: return the Mono to the framework
@PostMapping
public Mono<UserDto> create(@RequestBody CreateUserRequest req) {
    return userService.create(req);
}

❌ Using Mono.block()

// BAD: block() blocks the thread
User user = userRepository.findById(id).block(); // FORBIDDEN in reactive!

// GOOD: chain with flatMap
return userRepository.findById(id)
        .flatMap(user -> processUser(user));

❌ Incorrect error handling (try-catch)

// BAD: imperative try-catch does not work in reactive chains
try {
    return userService.findById(id);
} catch (Exception e) {
    return Mono.empty(); // Never called!
}

// GOOD: onErrorResume operator
return userService.findById(id)
        .onErrorResume(e -> Mono.empty());

7. Deep Dive

Event loop model (Netty)

The Netty event loop cycle:

  1. Accept: accept new connection
  2. Read: read data (non-blocking)
  3. Decode: process incoming data
  4. Process: invoke handler (pipeline)
  5. Encode: serialize outgoing data
  6. Write: send data (non-blocking)
  7. Flush: flush buffer

Event loop thread count is typically: Runtime.getRuntime().availableProcessors() × 2.

Critical rule: never block any event loop thread, because it blocks all connections assigned to that thread.

Scheduler types

Scheduler Use case
Schedulers.immediate() Current thread (default)
Schedulers.single() Single reused thread
Schedulers.parallel() CPU-intensive work (CPU core count threads)
Schedulers.boundedElastic() Blocking I/O wrapping (bounded pool)
Schedulers.fromExecutor() Custom ExecutorService

Hot vs Cold publisher

// Cold: restarts for every subscriber
Flux<Integer> cold = Flux.range(1, 3);
cold.subscribe(i -> log.info("Sub1: {}", i)); // 1, 2, 3
cold.subscribe(i -> log.info("Sub2: {}", i)); // 1, 2, 3

// Hot: real-time, shared
Sinks.Many<String> sink = Sinks.many().multicast().onBackpressureBuffer();
Flux<String> hot = sink.asFlux();
hot.subscribe(s -> log.info("Sub1: {}", s));
sink.tryEmitNext("A"); // Sub1: A
hot.subscribe(s -> log.info("Sub2: {}", s));
sink.tryEmitNext("B"); // Sub1: B, Sub2: B

Testing with StepVerifier

@Test
void shouldReturnAllUsers() {
    Flux<User> users = userRepository.findAll();

    StepVerifier.create(users)
            .expectNextCount(3)
            .verifyComplete();
}

@Test
void shouldHandleError() {
    Mono<User> error = userService.findById("invalid");

    StepVerifier.create(error)
            .expectError(NotFoundException.class)
            .verify();
}

8. Interview Questions

  1. What is the difference between Mono and Flux? Mono: 0 or 1 element (single value or empty). Flux: 0-N elements (stream). Both are Publisher implementations (Reactive Streams).

  2. What is backpressure and why is it important? The Subscriber signals the Publisher how many elements it can process. It prevents memory overflow when a fast producer overwhelms a slow consumer.

  3. Why is it forbidden to make blocking calls on an event loop thread? The event loop thread handles all connections assigned to it. If it blocks, all those connections stall. Blocking calls → Schedulers.boundedElastic().

  4. What is the difference between map() and flatMap()? map(): synchronous transformation (T → R). flatMap(): asynchronous transformation (T → Mono<R>/Flux<R>). Use flatMap when the transformation itself is reactive.

  5. What is the difference between hot and cold Publishers? Cold: restarts for each subscriber (e.g., DB query). Hot: real-time, shared (e.g., WebSocket, SSE).

  6. How do you test reactive code? StepVerifier.create(publisher).expectNext(...).verifyComplete(). StepVerifier validates elements, errors, and completion.

  7. When should you use WebFlux instead of Spring MVC? High concurrency (10k+ connections), streaming, non-blocking DB (R2DBC), microservice gateway. NOT: traditional JDBC/JPA, blocking libraries.


9. Glossary

Term Meaning
Mono<T> Reactive publisher containing 0 or 1 element
Flux<T> Reactive publisher containing 0-N elements
Backpressure Subscriber signals processing capacity to Publisher
Event loop Non-blocking I/O thread, multiplexing connections
Reactive Streams Publisher/Subscriber specification (java.util.concurrent.Flow)
Project Reactor Reactive Streams implementation (Mono, Flux)
WebClient Reactive HTTP client (replaces RestTemplate)
R2DBC Reactive Relational Database Connectivity
StepVerifier Reactive stream testing tool
RouterFunction Functional routing (lambda-based)
Schedulers Thread pool management in reactive context
SSE Server-Sent Events (unidirectional server → client stream)

10. Cheatsheet

TYPES:
  Mono<T>                  0 or 1 element
  Flux<T>                  0-N elements
  Mono.just(x)             One element
  Mono.empty()             Empty
  Flux.fromIterable(list)  From collection

OPERATORS:
  .map(T -> R)             Synchronous transformation
  .flatMap(T -> Mono<R>)   Asynchronous transformation
  .filter(predicate)       Filtering
  .switchIfEmpty(alt)      Fallback if empty
  .zip(mono1, mono2)       Parallel wait
  .collectList()           Flux → Mono<List>
  .doOnNext/Error/Success  Side effect (logging)

ERROR HANDLING:
  .onErrorResume(e -> alt) Fallback publisher
  .onErrorMap(e -> newE)   Exception transformation
  .retry(3)                Retry
  .timeout(Duration)       Timeout

SCHEDULER:
  Schedulers.parallel()         CPU-intensive
  Schedulers.boundedElastic()   Blocking I/O wrapping
  .subscribeOn(scheduler)       Subscription thread

TESTING:
  StepVerifier.create(pub)
      .expectNext(x)
      .expectError(E.class)
      .verifyComplete()

🎮 Games

10 questions