Intro to Reactor, Mono, and Flux

This entry gives a quick overview of reactor project, mono, flux, error handling, defer, inner mono, and common pitfalls of using reactor. You may find this entry helpful as well, Non-Blocking vs Reactive

What is  the reactor project

Reactor is a project by the reactive foundation that provides a reactive programming library for building reactive systems in Java. Reactor provides a set of reactive types such as Mono and Flux, as well as a number of operators for manipulating and transforming reactive streams.

Reactor is built on the Project Reactor Core library, which provides a non-blocking and scalable foundation for building reactive applications. It’s designed to handle large amounts of data, making it suitable for building high-performance and scalable applications.

Reactor is compatible with the Reactive Streams specification and supports the reactive programming paradigm, where data is processed as a stream of events over time. Reactor provides a number of reactive types, such as Mono and Flux, that make it easier to work with reactive streams and implement reactive systems in Java.

Reactor is widely used in the Java ecosystem and is supported
by many popular frameworks and libraries, such as Spring and Netty, making it an excellent choice for building reactive systems in Java.

Mono and Flux

In the context of reactive programming and Java, Mono, and Flux are types of reactive streams.

Mono represents a stream of 0 or 1 element. It’s useful when you want to represent a stream that either completes successfully or fails with an error.

Flux represents a stream of 0 to N elements. It’s useful when you want to represent a stream of events, where each event is processed as it arrives.

Examples: Suppose you have an API that returns information about a single user. You can use Mono to represent the response of the API call as a reactive stream. The API could return a Mono<User> where User is a class that represents the information of a single user.

On the other hand, suppose you have an API that returns a list of users. You can use Flux to represent the response of the API call as a reactive stream. The API could return a Flux<User> where User is a class that represents the information of a single user.

Reactor Java is a library that implements reactive programming in Java, and Mono and Flux are two of its core classes for representing reactive streams.

Examples of Mono and Flux

In this example, we create a Mono that emits a single “Hello” string and a Flux that emits a sequence of integers from 1 to 5. Both Mono and Flux are subscribed to and the values are printed out.

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class MonoFluxExample {

  public static void main(String[] args) {
    // Mono example
    Mono<String> mono = Mono.just("Hello");
    mono.subscribe(System.out::println);

    // Flux example
    Flux<Integer> flux = Flux.just(1, 2, 3, 4, 5);
    flux.subscribe(System.out::println);
  }
}

In this example, we create a Mono that emits a single “Hello” string and a Flux that emits a sequence of integers from 1 to 5. Both Mono and Flux are subscribed to and the values are printed out.

Map and FlatMap

In reactive programming, map and flatMap are two important operators for transforming reactive streams.

map is a transformation operator that transforms the elements in a reactive stream by applying a given function to each element. The function takes in an element from the stream and returns a new element that is emitted to the transformed stream. The map operator preserves the number of elements in the stream, so if the original stream has N elements, the transformed stream will also have N elements.

flatMap is a transformation operator that transforms the elements in a reactive stream by applying a given function to each element and flattening the resulting streams into a single stream. The function takes in an element from the stream and returns a new reactive stream. The elements of this new reactive stream are then flattened into a single stream that is emitted to the transformed stream. The flatMap operator can change the number of elements in the stream, as the number of elements in the resulting stream can be different from the number of elements in the original stream.

Here’s an example to illustrate the difference between map and flatMap:

Flux<Integer> numbers = Flux.just(1, 2, 3, 4, 5);

// map example
Flux<String> mappedNumbers = numbers.map(i -> i * 10).map(i -> "Number " + i);

// flatMap example
Flux<String> flatMappedLetters = numbers.flatMap(i -> Flux.just("Letter " + (char) (i + 64), "Letter " + (char) (i + 96)));

In this example, map takes a stream of numbers and maps each number to its corresponding value multiplied by 10, and then to a string “Number X”, where X is the transformed number. The flatMap operator takes a stream of numbers and maps each number to a flux of two strings, “Letter X” and “Letter x”, where X and x are the corresponding uppercase and lowercase letters of the alphabet. The elements of the resulting streams are then flattened into a single stream.

Inner Mono

An “inner Mono” in Reactor Java refers to a Mono that is used inside another reactive type, such as another Mono or a Flux.

In Reactor, Mono represents a reactive stream that emits 0 or 1 element. An inner Mono can be used in various ways, for example:

  • As a source of data for another Mono: In this case, the inner Mono is used to provide data that is needed for the creation of another Mono.
  • To model a dependent operation: In this case, the inner Mono is used to represent an operation that needs to be executed before another operation can be executed.
  • To model a subprocess: In this case, the inner Mono is used to represent a subprocess that runs as part of a larger process and emits a result that is needed by the larger process.

Here’s an example to illustrate the use of an inner Mono:

public Mono<String> getDataFromDB() {
    // code to fetch data from the database
    return Mono.just("Data from the database");
}

public Mono<String> processData(String data) {
    // code to process data
    return Mono.just("Processed " + data);
}

public Mono<String> processDataFromDB() {
    return getDataFromDB()
        .flatMap(data -> processData(data));
}

In this example, getDataFromDB returns a Mono that fetches data from a database. processData takes a string and returns a Mono that represents the processed data. processDataFromDB uses flatMap to chain the two Mono operations together. The getDataFromDB Mono is used as the source of data for the processData Mono, and the result of the getDataFromDB Mono is passed as an argument to the processData Mono.

Defer

The defer operator in Reactor Java is used to defer the creation of a reactive type until it is subscribed to. This means that the reactive type is not created until a subscriber subscribes to it.

The defer operator is useful when you want to create a reactive type that depends on some dynamic information that is not available at the time of creation. By using defer, you can delay the creation of the reactive type until the information is available, and then create the reactive type with the updated information.

Here’s an example to illustrate the use of defer:

public Mono<String> getDataFromDB() {
    return Mono.defer(() -> {
        // code to fetch data from the database
        return Mono.just("Data from the database");
    });
}

In this example, the getDataFromDB method returns a Mono created using defer. The Mono is created using a supplier that returns a Mono that fetches data from a database. The Mono is not created until a subscriber subscribes to it.

When using defer, you can also access dynamic information within the supplier to create the reactive type. This allows you to create a reactive type that depends on dynamic information that is not available at the time of creation. For example, you can access the current time or a randomly generated number within the supplier to create a reactive type that depends on this information.

Error Handling

Error handling is an important aspect of reactive programming in Reactor. Reactor provides various operators to handle errors in a reactive stream.

Here are some of the key concepts and operators for error handling in Reactor:

  1. Error signal: Reactor represents errors as an error signal in the form of a Mono or Flux that emits a Throwable. This allows errors to be propagated through the reactive stream and handled by downstream operators.
  2. onErrorReturn operator: The onErrorReturn operator is used to provide a default value for a reactive type when an error occurs. The operator takes a default value as a parameter and returns a new reactive type that emits the default value if an error occurs in the original reactive type.
  3. onErrorResume operator: The onErrorResume operator is used to provide an alternate reactive type when an error occurs. The operator takes a function that maps a Throwable to a new reactive type and returns a new reactive type that emits the elements from the alternate reactive type if an error occurs in the original reactive type.
  4. doOnError operator: The doOnError operator is used to perform an action when an error occurs in a reactive type. The operator takes a Consumer that accepts a Throwable and performs the action when an error occurs in the reactive type.
  5. retry operator: The retry operator is used to retry a reactive type when an error occurs. The operator takes a number of retries as a parameter and returns a new reactive type that retries the original reactive type the specified number of times when an error occurs.

These are some of the key error handling operators in Reactor. By using these operators, you can handle errors in a reactive stream and ensure that the reactive stream continues to emit data even in the presence of errors.

Example of onErrorReturn

In this example, a Mono that emits the value 1 is transformed by a mapping function that divides by zero, causing an error. The onErrorReturn operator provides a default value of 0 when an error occurs, so the reactive stream continues and emits 0 instead of terminating with an error.

Mono.just(1)
    .map(i -> i / 0)
    .onErrorReturn(0)
    .subscribe(System.out::println);

Example of onErrorResume

In this example, a Mono that emits the value 1 is transformed by a mapping function that divides by zero, causing an error. The onErrorResume operator provides an alternate Mono that emits the value 2 when an error occurs, so the reactive stream continues and emits 2 instead of terminating with an error.

Mono.just(1)
    .map(i -> i / 0)
    .onErrorResume(e -> Mono.just(2))
    .subscribe(System.out::println);

Example of  doOnError

In this example, a Mono that emits the value 1 is transformed by a mapping function that divides by zero, causing an error. The doOnError operator performs an action when an error occurs, in this case printing an error message. The reactive stream terminates with an error, but the error is handled and logged by the doOnError operator.

Mono.just(1)
    .map(i -> i / 0)
    .doOnError(e -> System.out.println("Error: " + e))
    .subscribe();

Example of retry

In this example, a Mono that emits the value 1 is transformed by a mapping function that throws an error. The retry operator retries the reactive stream the specified number of times (in this case, 2) when an error occurs. The error occurs the first two times, but the reactive stream continues and emits 1 on the third attempt.

AtomicInteger counter = new AtomicInteger();
Mono.just(1)
    .map(i -> {
        if (counter.getAndIncrement() < 2) {
            throw new RuntimeException("Error");
        }
        return i;
    })
    .retry(2)
    .subscribe(System.out::println);

Common Pitfalls when Using Reactor

Here are some common mistakes that developers make when using Reactor:

  1. Not handling errors properly: Reactor provides various operators to handle errors, but if not used correctly, errors can be propagated in unexpected ways and cause the reactive stream to terminate prematurely.
  2. Not subscribing to reactive types: Reactive types in Reactor are lazy, meaning that they do not start emitting data until they are subscribed to. Failing to subscribe to a reactive type will result in the reactive stream not being executed.
  3. Blocking on reactive types: Reactor is designed to be non-blocking and asynchronous. Blocking on a reactive type, such as by calling its block() method, defeats the purpose of using a reactive library and can lead to performance issues.
  4. Overusing parallel processing: Reactor provides operators to process data in parallel, but overusing parallel processing can lead to increased memory usage, reduced performance, and increased complexity.
  5. Not using backpressure: Reactor provides backpressure support to control the rate of data emission in reactive streams, but if not used correctly, it can lead to buffer overflow and dropped events.
  6. Not testing reactive code: Reactor provides test support for reactive streams, but not testing reactive code can lead to bugs and unexpected behavior.
  7. Not being mindful of resource cleanup: Reactor provides support for resource cleanup, but failing to cleanup resources can lead to memory leaks and other resource-related issues.

It’s important to be aware of these common mistakes and to take steps to avoid them when using Reactor.