简介说起 RxJava ,相信诸多 Android 开发者都不会陌生。作为一个知名的响应式编程库,从前年开始逐渐变得火热,从小众到被众多 Android 开发者们广泛引入与流传,其在 GitHub 的仓库截止笔者写这篇文章时,已经有16400+个 star 。甚至有一些大牛专门为 Android 写了 RxJava 的适配库,如 为什么 RxJava 如此受到 Android 开发者们的欢迎。我想不外乎两个原因。 1. 异步 2. 链式操作 异步对 Android 线程有所了解的朋友都知道, Android的 UI 绘制 与 事件响应是在主线程的,为了保证界面的流畅性,很多耗时操作如读写数据库、读写文件、请求网络,我们都会挪到异步线程去完成,再回调到主线程。当然在4.0以后主线程直接就不允许请求网络了。 在过去没有 RxJava 的时候,开发者一般都是通过 AsyncTask , Thread ,更好些的就是通过线程池来完成这些任务。而有了 RxJava 以后,简简单单的一句话就可以随意的切换线程,简直不用太舒服。 最典型的 RxJava 中的Observable 类,提供了2个函数, 分别是subscribeOn 与observeOn 。前者可以切换被观察时的线程(如果说数据发射的线程不够严谨,数据并非一定在观察时发射的,尤其是开发者自定义OnSubscribe 时),后者可以切换数据被消费时的线程。 举一个切换线程的例子: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | Log.i( "debug" , Thread.currentThread().getName());
Observable.empty()
.doOnCompleted( new Action0() {
@Override
public void call() {
Log.i( "debug" , Thread.currentThread().getName());
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.doOnCompleted( new Action0() {
@Override
public void call() {
Log.i( "debug" , Thread.currentThread().getName());
}
})
.subscribe();
|
这里我们没有任何数据,就仅仅发射了一个onComplete ,但是在切换线程的代码中,我们增加了onComplte 时要额外执行的代码,输出结果如下: 1 2 3 | 08 - 27 10 : 47 : 41.173 6741 - 6741 /com.dieyidezui.rxjavademo I/debug: main
08 - 27 10 : 47 : 41.201 6741 - 6762 /com.dieyidezui.rxjavademo I/debug: RxIoScheduler- 2
08 - 27 10 : 47 : 41.217 6741 - 6741 /com.dieyidezui.rxjavademo I/debug: main
|
这仅仅是简单的例子, RxJava 提供了很多便捷的操作符供我们使用,如map 、filter 、flatMap 、merge 、concat 等。可见当熟练使用后对我们的编程效率确实有很大帮助。尤其是 MVP 模式, RxJava 与之结合可谓是”天作之合”。 链式操作上面笔者演示的代码其实就是 RxJava 的典型使用方式: 发射数据源 中间操作 处理结果
其中中间操作包含诸多用法, 如果切换线程,变换数据等。 为什么我说链式操作很好。第一,链式逻辑替代深度回调逻辑,容易编写,不易出 BUG 。第二,RxJava 提供诸多了整体处理数据的操作符,非常实用。第三,配合 Java8 的 lambda 表达式,使代码简短优雅。 好了,对 RxJava 的介绍就此为止了。进阶用法、原理剖析以后会有专门的文章。对 RxJava 不熟悉的同学,建议先去看一下官方的 wiki 。链接:https://github.com/ReactiveX/RxJava/wiki RxJava2.0前天, RxJava终于发布了2.0 RC1 版本,一直关注于此的笔者立刻就进去尝鲜了。结合官方的介绍,笔者总结并翻译了一些与 1.x 的异同与大家分享。 包名与MAVEN依赖首先要说的就是 RxJava 2和1是互相独立的。因此包名与 maven 的依赖也是不一样的,就类似于 OkHttp 3与2一样。 RxJava 2.x的依赖是全新的io.reactivex.rxjava2:rxjava:2.x.y ,并且类处于该io.reactivex 包名下,而不再是rx 。 接口变化RxJava2 是遵循 Reactive Streams Specification 的规范完成的,新的特性依赖其提供的4个基础接口。分别是 Publisher Subscriber Subscription Processor
Flowable与Observable 新的实现叫做Flowable , 同时旧的Observable 也保留了。因为在 RxJava1.x 中,有很多事件不被能正确的背压,从而抛出MissingBackpressureException 。 举个简单的例子,在 RxJava1.x 中的 observeOn , 因为是切换了消费者的线程,因此内部实现用队列存储事件。在 Android 中默认的 buffersize 大小是16,因此当消费比生产慢时, 队列中的数目积累到超过16个,就会抛出MissingBackpressureException , 初学者很难明白为什么会这样,使得学习曲线异常得陡峭。 而在 2.0 中,Observable 不再支持背压,而Flowable 支持非阻塞式的背压。并且规范要求,所有的操作符强制支持背压。幸运的是, Flowable 中的操作符大多与旧有的 Observable 类似。 Single、Completable Single 与 Completable 都基于新的 Reactive Streams 的思想重新设计了接口,主要是消费者的接口, 现在他们是这样的: 1 2 3 4 5 6 7 8 9 10 11 | interface SingleObserver<T> {
void onSubscribe(Disposable d);
void onSuccess(T value);
void onError(Throwable error);
}
interface CompletableObserver<T> {
void onSubscribe(Disposable d);
void onComplete();
void onError(Throwable error);
}
|
Subscriber 对比一下 Subscriber : 1 2 3 4 5 6 | public interface Subscriber<T> {
public void onSubscribe(Subscription s);
public void onNext(T t);
public void onError(Throwable t);
public void onComplete();
}
|
我们会发现和以前不一样的是多了一个onSubscribe 的方法,Subscription 如下: Subscription 1 2 3 4 | public interface Subscription {
public void request( long n);
public void cancel();
}
|
熟悉 RxJava 1.x 的朋友能发现, 新的Subscription 更像是综合了旧的Producer 与Subscription 的综合体。他既可以向上游请求数据,又可以打断并释放资源。而旧的Subscription 在这里因为名字被占,而被重新命名成了Disposable Disposable 1 2 3 4 | public interface Disposable {
void dispose();
boolean isDisposed();
}
|
这里最大的不同就是这个onSubscribe ,根据 Specification, 这个函数一定是第一个被调用的, 然后就会传给调用方一个Subscription ,通过这种方式组织新的背压关系。当我们消费数据时,可以通过Subscription 对象,自己决定请求数据。 这里就可以解释上面的非阻塞的背压。旧的阻塞式的背压,就是根据下游的消费速度,中游可以选择阻塞住等待下游的消费,随后向上游请求数据。而新的非阻塞就不在有中间阻塞的过程,由下游自己决定取多少,还有背压策略,如抛弃最新、抛弃最旧、缓存、抛异常等。 而新的接口带来的新的调用方式与旧的也不太一样, subscribe 后不再会有 Subscription 也就是如今的 Disposable,为了保持向后的兼容, Flowable 提供了 subscribeWith方法 返回当前的Subscriber 对象, 并且同时提供了DefaultSubscriber , ResourceSubscriber ,DisposableSubscriber ,让他们提供了Disposable 接口, 可以完成和以前类似的代码: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 | ResourceSubscriber<Integer> subscriber = new ResourceSubscriber<Integer>() {
@Override
public void onStart() {
request(Long.MAX_VALUE);
}
@Override
public void onNext(Integer t) {
System.out.println(t);
}
@Override
public void onError(Throwable t) {
t.printStackTrace();
}
@Override
public void onComplete() {
System.out.println( "Done" );
}
};
Flowable.range( 1 , 10 ).delay( 1 , TimeUnit.SECONDS).subscribe(subscriber);
subscriber.dispose();
|
收回 create 方法权限在RxJava 1.x 最明显的问题就是由于 create 的太过开放,导致其被开发者滥用,而不是学习使用提供的操作符。 并且用户对 RxJava 不够了解,导致各种各样的问题,如背压、异常处理等。 由于规范要求所有的操作符强制支持背压,因此新的 create 采用了保守的设计,让用户实现FlowableOnSubscribe 接口,并选取背压策略,然后在内部实现封装支持背压,简单的例子如下: 1 2 3 4 5 | Flowable.create((FlowableEmitter<Integer> emitter) -> {
emitter.onNext( 1 );
emitter.onNext( 2 );
emitter.onComplete();
}, BackpressureStrategy.BUFFER);
|
Functions可以抛出异常新的ActionX 、FunctionX 的方法声明都增加了一个throws Exception ,这带来了显而易见的好处,现在我们可以这样写: 1 2 3 | Flowable.just( "file.txt" )
.map(name -> Files.readLines(name))
.subscribe(lines -> System.out.println(lines.size()), Throwable::printStackTrace);
|
而在以前是不行的, 因为Files.readLines(name) 会显式的抛出一个IOException 。这样对 lambda 更加友好,而不必再去 try catch 。 Scheduler可以直接schedule在以前是必须要先createWorker ,用 Worker 对象去 shedule, 现在可以直接在Scheduler 用这些方法: 1 2 3 4 5 6 7 8 9 10 11 12 13 | public abstract class Scheduler {
public Disposable scheduleDirect(Runnable task) { ... }
public Disposable scheduleDirect(Runnable task, long delay, TimeUnit unit) { ... }
public Disposable scheduleDirectPeriodically(Runnable task, long initialDelay,
long period, TimeUnit unit) { ... }
public long now(TimeUnit unit) { ... }
}
|
这算是一个小优化,方便开发者使用。 Observable的一些继承并入了Flowable中如ConnectableObservable 、BlockObservable 等,这样可以直接在Flowable 中写出这样的代码: 1 | List<Integer> list = Flowable.range( 1 , 100 ).toList().blockingFirst();
|
其他修改还有一些普通开发者不太在意的修改: 等其他变动。 结语RxJava 作为开源的经典之作,笔者一直都有所关注。后续笔者会继续为大家带来 RxJava 的源码解析与进阶使用系列等。感谢大家的阅读,如有不知之处,欢迎讨论交流。 转自:蝶翼的罪个人博客 |