java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > Java RxJava可观测数据

Java技巧分享之利用RxJava打造可观测数据RxLiveData

作者:乐征skyline

这篇文章主要来和大家分享一个Java技巧,那就是利用RxJava打造可观测数据RxLiveData,文中的示例代码讲解详细,感兴趣的小伙伴可以了解一下

1. 问题场景

在实际工作中,我们经常需要在不同类对象之间、不同模块之间共享数据,而这些数据通常是可改动的,那么就可能发生一个问题:当数据变动时,相关对象或模块并不知道,没有及时更新数据。这时候,我们希望数据改变时可以通知其他模块同步更新,实现一个类似数据之间联动的效果。最容易想到的应该就是监听回调的观察者模式,下面给出一种以前见过的、不太优雅的实现:

class User {
    //...... Java Bean 的字段略
}
interface Listener {
    void onUserUpdated(User user);
}
class UserManager {
    private static UserManager manager = new UserManager();
    private UserManager() {
    }
    public static UserManager getInstance() {
        return manager;
    }
    private User user;
    private List<Listener> listeners = new LinkedList<>();
    public void addUserListener(Listener listener) {
        listeners.add(listener);
    }
    public void removeUserListener(Listener listener) {
        listeners.remove(listener);
    }
    public User getUser() {
        return user;
    }
    public void setUser(User user) {
        this.user = user;
        for (Listener listener : listeners) {
            listener.onUserUpdated(this.user);
        }
    }
}

这种方式有以下缺点:

面对这样的问题,RxJava、JDK中的Observable和Flow API还有Android里的LiveData都给出了可用的实现方式,在实际开发中,感觉并不是那么方便。而本文要介绍的是我利用RxJava打造一个更加方便的可观测对象工具类--RxLiveData(代码见最底部)。

2. 使用示例

先来看一个比较短的完整示例:

/* 测试用的 Java Bean 数据类*/
class User {
    //...... Java Bean 的字段略
}
/* 一个单例 */
class UserManager {
    private static final UserManager manager = new UserManager();
    private UserManager() {
    }
    public static UserManager getInstance() {
        return manager;
    }
    private final RxLiveData<User> userData = new RxLiveData<>();
    public RxLiveData<User> getUserData() {
        return userData;
    }
}
class A {
    public void init() {
        //订阅可观测对象,使得数据发生改变时可以被回调
        UserManager.getInstance().getUserData().getObservable()
                .subscribe((User user) -> {//使用lambda版
                    update(user);// 每次用户信息改变,这里会被调用
                });
        update(UserManager.getInstance().getUserData().getValue());
    }
    private void update(User user) {
        System.out.println("user changed");
    }
}
class B{
    public B() {
        UserManager.getInstance().getUserData().getObservable().subscribe(this::update);//方法引用版
    }
    private void update(User user) {
        System.out.println("user changed");
    }
}
public class Main {
    public static void main(String[] args) throws InterruptedException {
        A a = new A();
        a.init();
        B b = new B();
        //更新UserManager中的数据,这时候A和B中的对应方法会被调用
        UserManager.getInstance().getUserData().postData(new User());
    }
}

这里模拟UserManager的数据在AB类的对象之间共享,当UserManager的内容发生改变时,可以通知到AB,执行相应操作。

这时如果还想给UserManager增加一个数据,例如一个long类型的time,只需要按照下面这样添加一个属性和一个getter方法就可以了:

private final RxLiveData<Long> timeData = new RxLiveData<>();
public RxLiveData<Long> getTimeData() {
    return timeData;
}

如果是在Android应用开发中,还可以借助RxAndroid和RxLifecycle的功能,来控制回调的执行线程并在界面销毁时取消订阅,例如:

userManager.getUserData().getObservable()
        .compose(bindUntilEvent(ActivityEvent.DESTROY))//指定在onDestroy回调时取消订阅
        .observeOn(AndroidSchedulers.mainThread())//指定主线程
        .subscribe(user -> {
        }, Throwable::printStackTrace);

3. 主要方法介绍

3.1 getObservable 方法

方法签名public Observable<T> getObservable()

这个方法用于获取RxJava的Observable,进而对数据进行订阅,还可以得到RxJava相关功能的支持(例如,Stream 操作,指定线程,控制生命周期等等)。

3.2 postData 方法

方法签名public void postData(T value)

这个方法用于更新数据。它会更新存在在当前RxLiveData对象中的数据,并通过RxJava的ObservableEmitter触发观察者的回调。

注意:当参数为null时,由于RxJava会对null抛出异常,所以这里的实现方式是在判断为null的时候只存储数据,不触发观察者的回调。

3.3 getValue 方法

方法签名public T getValue()

这个方法仅用于获得存在在当前RxLiveData中的数据。

3.4 optValue 方法

方法签名public Optional<T> optValue()

getValue方法的Optional版本。

4. 完整实现

import io.reactivex.rxjava3.core.Observable;//如果用的是RxJava2的请改为该版本的包名
import io.reactivex.rxjava3.core.ObservableEmitter;
import io.reactivex.rxjava3.disposables.Disposable;
import java.util.Optional;
public class RxLiveData<T> {
    private final Observable<T> observable;
    private Disposable disposable;
    private T value;
    private ObservableEmitter<T> emitter;
    public RxLiveData() {
        observable = Observable
                .create((ObservableEmitter<T> emitter) -> this.emitter = emitter)
                .publish()
                .autoConnect(0, disposable -> this.disposable = disposable);
    }
    public Observable<T> getObservable() {
        return observable;
    }
    public void postData(T value) {
        this.value = value;
        if (emitter != null && value != null) {
            emitter.onNext(value);
        }
    }
    public T getValue() {
        return value;
    }
    public Optional<T> optValue() {
        return Optional.ofNullable(value);
    }
}

到此这篇关于Java技巧分享之利用RxJava打造可观测数据RxLiveData的文章就介绍到这了,更多相关Java RxJava可观测数据内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文