React

关注公众号 jb51net

关闭
首页 > 网络编程 > JavaScript > javascript类库 > React > React Project Reactor

React编程模型之Project Reactor实际应用实例

作者:百锦再@新空间

这篇文章主要介绍了React编程模型之Project Reactor实际应用实例,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧

3.3 Project Reactor(Spring Reactor)

Project Reactor是Spring生态系统中的响应式编程库,它为构建非阻塞、异步和事件驱动的应用程序提供了强大的工具集。作为Spring WebFlux的默认响应式库,Reactor实现了Reactive Streams规范,使开发者能够以声明式的方式处理异步数据流。

3.3.1 Mono(0-1个数据流)

Mono是Project Reactor中表示最多包含一个元素的异步序列的核心类型。它代表了一种可能在未来某个时间点产生单个值(或空值)的异步计算。

核心特性

创建Mono的方式

// 1. 从值创建
Mono<String> mono1 = Mono.just("Hello");
Mono<String> mono2 = Mono.justOrEmpty(null); // 允许空值
// 2. 从Supplier创建
Mono<String> mono3 = Mono.fromSupplier(() -> "Hello from Supplier");
// 3. 从Callable创建
Mono<String> mono4 = Mono.fromCallable(() -> "Hello from Callable");
// 4. 从Future创建
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello from Future");
Mono<String> mono5 = Mono.fromFuture(future);
// 5. 空Mono
Mono<Void> mono6 = Mono.empty();
// 6. 错误Mono
Mono<String> mono7 = Mono.error(new RuntimeException("Something went wrong"));

常用操作符

转换操作符

Mono<String> original = Mono.just("Hello");
Mono<Integer> mapped = original.map(String::length);
Mono<String> flatMapped = original.flatMap(s -> Mono.just(s + " World"));
Flux<String> flatMapMany = original.flatMapMany(s -> Flux.just(s.split("")));

过滤操作符

Mono<String> filtered = Mono.just("Hello")
    .filter(s -> s.length() > 3);
Mono<String> withDefault = Mono.<String>empty()
    .defaultIfEmpty("Default Value");

错误处理操作符

Mono<String> withErrorHandling = Mono.error(new RuntimeException())
    .onErrorReturn("Fallback Value");
Mono<String> withResume = Mono.error(new RuntimeException())
    .onErrorResume(e -> Mono.just("Recovered from " + e.getMessage()));

组合操作符

Mono<String> first = Mono.just("Hello");
Mono<String> second = Mono.just("World");
Mono<String> zipped = first.zipWith(second, (f, s) -> f + " " + s);
Mono<Void> thenOperation = Mono.just("Hello").then(Mono.empty());

时间相关操作符

Mono<String> delayed = Mono.just("Hello")
    .delayElement(Duration.ofSeconds(1));
Mono<String> withTimeout = Mono.just("Hello")
    .delayElement(Duration.ofSeconds(2))
    .timeout(Duration.ofSeconds(1));

订阅Mono

Mono是惰性的,只有订阅时才会执行:

// 1. 简单订阅
mono.subscribe();
// 2. 带消费者的订阅
mono.subscribe(
    value -> System.out.println("Received: " + value), // onNext
    error -> System.err.println("Error: " + error),   // onError
    () -> System.out.println("Completed")             // onComplete
);
// 3. 带Subscription控制的订阅
mono.subscribe(new BaseSubscriber<String>() {
    @Override
    protected void hookOnSubscribe(Subscription subscription) {
        System.out.println("Subscribed");
        request(1); // 请求第一个元素
    }
    @Override
    protected void hookOnNext(String value) {
        System.out.println("Received: " + value);
        request(1); // 请求下一个元素(对于Mono通常不需要)
    }
});

实际应用场景

@RestController
public class UserController {
    private final UserRepository userRepository;
    public UserController(UserRepository userRepository) {
        this.userRepository = userRepository;
    }
    @GetMapping("/users/{id}")
    public Mono<User> getUser(@PathVariable String id) {
        return userRepository.findById(id)
            .switchIfEmpty(Mono.error(new UserNotFoundException(id)));
    }
}

3.3.2 Flux(0-N个数据流)

Flux是Project Reactor中表示0到N个元素的异步序列的核心类型。它代表了一个可能在未来某个时间点产生多个值的异步数据流。

核心特性

// 1. 从多个值创建
Flux<String> flux1 = Flux.just("A", "B", "C");
// 2. 从数组或集合创建
Flux<String> flux2 = Flux.fromArray(new String[]{"A", "B", "C"});
Flux<String> flux3 = Flux.fromIterable(Arrays.asList("A", "B", "C"));
// 3. 从范围创建
Flux<Integer> flux4 = Flux.range(1, 5); // 1, 2, 3, 4, 5
// 4. 从流生成器创建
Flux<Long> flux5 = Flux.generate(
    () -> 0L, // 初始状态
    (state, sink) -> {
        sink.next(state);
        if (state == 10) sink.complete();
        return state + 1;
    }
);
// 5. 从间隔创建(周期性发出值)
Flux<Long> flux6 = Flux.interval(Duration.ofMillis(100)); // 0, 1, 2... 每100ms
// 6. 空Flux
Flux<String> flux7 = Flux.empty();
// 7. 错误Flux
Flux<String> flux8 = Flux.error(new RuntimeException("Flux error"));

常用操作符

转换操作符

Flux<String> original = Flux.just("apple", "banana", "cherry");
Flux<Integer> mapped = original.map(String::length);
Flux<String> flatMapped = original.flatMap(s -> Flux.fromArray(s.split("")));

过滤操作符

Flux<Integer> numbers = Flux.range(1, 10);
Flux<Integer> evens = numbers.filter(n -> n % 2 == 0);
Flux<Integer> first5 = numbers.take(5);

组合操作符

Flux<String> fluxA = Flux.just("A", "B", "C");
Flux<String> fluxB = Flux.just("1", "2", "3");
Flux<String> merged = fluxA.mergeWith(fluxB); // A, 1, B, 2, C, 3
Flux<String> concatenated = fluxA.concatWith(fluxB); // A, B, C, 1, 2, 3
Flux<String> zipped = fluxA.zipWith(fluxB, (a, b) -> a + b); // A1, B2, C3

错误处理操作符

Flux<Integer> withErrorHandling = Flux.just(1, 2, 0, 4)
    .map(i -> 10 / i)
    .onErrorResume(e -> Flux.just(-1));
Flux<Integer> withContinue = Flux.just(1, 2, 0, 4)
    .map(i -> {
        try {
            return 10 / i;
        } catch (Exception e) {
            throw Exceptions.propagate(e);
        }
    })
    .onErrorContinue((e, o) -> System.out.println("Error for " + o + ": " + e.getMessage()));

背压操作符

Flux.range(1, 1000)
    .onBackpressureBuffer(50) // 缓冲区大小为50
    .subscribe(new BaseSubscriber<Integer>() {
        @Override
        protected void hookOnSubscribe(Subscription subscription) {
            request(10); // 初始请求10个元素
        }
        @Override
        protected void hookOnNext(Integer value) {
            System.out.println("Received: " + value);
            // 根据需要请求更多元素
            if (value % 10 == 0) {
                request(10);
            }
        }
    });

时间相关操作符

Flux.range(1, 5)
    .delayElements(Duration.ofMillis(500))
    .subscribe(System.out::println);

订阅Flux

与Mono类似,Flux也是惰性的,需要订阅才能执行:

// 1. 简单订阅
flux.subscribe();
// 2. 带消费者的订阅
flux.subscribe(
    value -> System.out.println("Received: " + value), // onNext
    error -> System.err.println("Error: " + error),   // onError
    () -> System.out.println("Completed"),           // onComplete
    subscription -> subscription.request(3)          // 初始请求3个元素
);
// 3. 带Subscription控制的订阅
flux.subscribe(new BaseSubscriber<String>() {
    @Override
    protected void hookOnSubscribe(Subscription subscription) {
        System.out.println("Subscribed");
        request(1); // 请求第一个元素
    }
    @Override
    protected void hookOnNext(String value) {
        System.out.println("Received: " + value);
        // 处理完当前元素后请求下一个
        request(1);
    }
});

实际应用场景

@RestController
public class EventController {
    private final EventPublisher eventPublisher;
    public EventController(EventPublisher eventPublisher) {
        this.eventPublisher = eventPublisher;
    }
    @GetMapping(value = "/events", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<Event> getEvents() {
        return eventPublisher
            .publish()
            .map(event -> {
                // 转换或丰富事件数据
                return event;
            })
            .onBackpressureBuffer(100)
            .log();
    }
}

3.3.3 Schedulers(调度器)

在响应式编程中,Schedulers负责管理线程和执行上下文。Project Reactor提供了多种预定义的Scheduler实现,用于控制异步操作的执行位置。

核心概念

预定义的Scheduler类型

Schedulers.immediate()

Schedulers.single()

Schedulers.elastic()(已弃用,推荐使用boundedElastic):

Schedulers.boundedElastic()

Schedulers.parallel()

Schedulers.fromExecutorService()

使用Scheduler

发布到Scheduler

Flux.range(1, 5)
    .map(i -> {
        System.out.println("Map on " + Thread.currentThread().getName());
        return i * 2;
    })
    .publishOn(Schedulers.boundedElastic())
    .filter(i -> {
        System.out.println("Filter on " + Thread.currentThread().getName());
        return i % 3 == 0;
    })
    .subscribeOn(Schedulers.parallel())
    .subscribe();

指定操作符的Scheduler
许多操作符接受可选的Scheduler参数

Flux.interval(Duration.ofMillis(100), Schedulers.single())
    .subscribe(System.out::println);

调度策略选择指南

计算密集型操作

阻塞I/O操作

非阻塞异步操作

UI交互

最佳实践

避免在响应式链中阻塞

合理选择调度器

注意上下文传播

资源清理

对于自定义调度器或长时间运行的应用程序,注意关闭调度器

Scheduler scheduler = Schedulers.newBoundedElastic(10, 100, "custom");
try {
    Flux.just(1, 2, 3)
        .publishOn(scheduler)
        .subscribe(System.out::println);
} finally {
    scheduler.dispose();
}

实际应用示例

@RestController
public class DataController {
    private final DataService dataService;
    public DataController(DataService dataService) {
        this.dataService = dataService;
    }
    @GetMapping("/data/{id}")
    public Mono<Data> getData(@PathVariable String id) {
        // 使用boundedElastic处理潜在的阻塞操作
        return Mono.fromCallable(() -> dataService.blockingGetData(id))
            .subscribeOn(Schedulers.boundedElastic())
            .timeout(Duration.ofSeconds(2))
            .onErrorResume(e -> Mono.just(Data.fallbackData()));
    }
    @GetMapping("/stream")
    public Flux<Data> streamData() {
        // 使用parallel处理计算密集型流
        return dataService.dataStream()
            .publishOn(Schedulers.parallel())
            .map(data -> {
                // 计算密集型转换
                return data.withProcessedPayload(processPayload(data.getPayload()));
            })
            .log();
    }
    private String processPayload(String payload) {
        // 模拟计算密集型处理
        return payload.toUpperCase();
    }
}

总结

Project Reactor为Java响应式编程提供了强大的工具集:

通过合理组合这些组件,开发者可以构建高效、可扩展的响应式应用程序,充分利用现代硬件资源,同时保持代码的清晰和可维护性。响应式编程模型特别适合高并发、低延迟的应用场景,如微服务架构、实时数据处理和事件驱动系统。

到此这篇关于React编程模型:Project Reactor深度解析的文章就介绍到这了,更多相关React Project Reactor内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文