RxJava 常用操作符.docx
《RxJava 常用操作符.docx》由会员分享,可在线阅读,更多相关《RxJava 常用操作符.docx(22页珍藏版)》请在冰点文库上搜索。
RxJava常用操作符
RxJava常用操作符
1Observable的创建
1.1from()
转换集合为一个每次发射集合中一个元素的Observable对象。
可用来遍历集合。
方法列表:
publicstaticObservablefrom(Future
extendsT>future)
publicstaticObservablefrom(Future
extendsT>future,longtimeout,TimeUnitunit)
publicstaticObservablefrom(Future
extendsT>future,Schedulerscheduler)
publicstaticObservablefrom(Iterable
extendsT>iterable)
publicstaticObservablefrom(T[]array)
栗子:
//1.遍历集合
Observableobservable=Observable.from(newString[]{"hello","hi"});
1
2
//2.使用Future创建Observable,Future表示一个异步计算的结果。
FutureTaskfutureTask=newFutureTask(newCallable(){
@Override
publicStringcall()throwsException{
//TODO执行异步操作并返回数据
return"hihi";
}
});
Scheduler.Workerworker=Schedulers.io().createWorker();
worker.schedule(newAction0(){
@Override
publicvoidcall(){
futureTask.run();
}
});
Observableobservable=Observable.from(futureTask);
1.2just()
转换一个或多个Object为依次发射这些Object的Observable对象。
方法列表:
publicstaticObservablejust(finalTvalue)
publicstaticObservablejust(Tt1,Tt2)
publicstaticObservablejust(Tt1,Tt2,Tt3)
publicstaticObservablejust(Tt1,Tt2,Tt3,Tt4)
publicstaticObservablejust(Tt1,Tt2,Tt3,Tt4,Tt5)
publicstaticObservablejust(Tt1,Tt2,Tt3,Tt4,Tt5,Tt6)
publicstaticObservablejust(Tt1,Tt2,Tt3,Tt4,Tt5,Tt6,Tt7)
publicstaticObservablejust(Tt1,Tt2,Tt3,Tt4,Tt5,Tt6,Tt7,Tt8)
publicstaticObservablejust(Tt1,Tt2,Tt3,Tt4,Tt5,Tt6,Tt7,Tt8,Tt9)
publicstaticObservablejust(Tt1,Tt2,Tt3,Tt4,Tt5,Tt6,Tt7,Tt8,Tt9,Tt10)
栗子:
Observableobservable=Observable.just("hello");
//使用just()遍历几个元素
Observableobservable=Observable.just("hello","hi","...");
//使用from()方法遍历,效果和just()一样。
String[]stringArrs=newString[]{"hello","hi","..."};
Observableobservable=Observable.from(stringArrs);
just()方法可传入1~10个参数,也就说当元素个数小于等于10的时候既可以使用just()也可以使用from(),否则只能用from()方法。
1.3create()
返回一个在被OnSubscribe订阅时执行特定方法的Observable对象。
方法列表:
publicstaticObservablecreate(OnSubscribef)
@BetapublicstaticObservablecreate(SyncOnSubscribesyncOnSubscribe)
@ExperimentalpublicstaticObservablecreate(AsyncOnSubscribeasyncOnSubscribe)
栗子:
Observable.OnSubscribeonSubscribe=newObservable.OnSubscribe(){
@Override
publicvoidcall(Subscriber
superString>subscriber){
//onNext()方法可执行多次
subscribe.onNext("hello");
subscribe.onCompleted();
}
};
Observable
此方法不常用,大多数时候都是使用just()、form()等方法,如上面那串代码就可以写成:
Observable
1
1.4interval()
返回一个每隔指定的时间间隔就发射一个序列号的Observable对象,可用来做倒计时等操作。
方法列表:
publicstaticObservableinterval(longinterval,TimeUnitunit)
publicstaticObservableinterval(longinterval,TimeUnitunit,Schedulerscheduler)
publicstaticObservableinterval(longinitialDelay,longperiod,TimeUnitunit)
publicstaticObservableinterval(longinitialDelay,longperiod,TimeUnitunit,Schedulerscheduler)
栗子:
//每隔1s发送一个序列号,序列号从0开始,每次累加1。
Observableobservable=Observable.interval(1,TimeUnit.SECONDS);
1
2
1.5timer()
创建一个在指定延迟时间后发射一条数据(固定值:
0)的Observable对象,可用来做定时操作。
方法列表:
publicstaticObservabletimer(longdelay,TimeUnitunit)
publicstaticObservabletimer(longdelay,TimeUnitunit,Schedulerscheduler)
栗子:
//定时3s
Observableobservable=Observable.timer(3,TimeUnit.SECONDS);
1
2
1.6range()
创建一个发射指定范围内的连续整数的Observable对象。
方法列表:
publicstaticObservablerange(intstart,intcount)
publicstaticObservablerange(intstart,intcount,Schedulerscheduler)
栗子:
//依次发射5、6、7
Observableobservable=Observable.range(5,3);
1
2
1.7empty()
创建一个不发射任何数据就发出onCompleted()通知的Observable对象。
方法列表:
publicstaticObservableempty()
栗子:
//发出一个onCompleted()通知
Observable
1
2
1.8error()
创建不发射任何数据就发出onError通知的Observable对象。
方法列表:
publicstaticObservableerror(Throwableexception)
栗子:
//发出一个onError()通知
Observable
1
2
1.9never()
创建一个不发射任何数据和通知的Observable对象。
方法列表:
publicstaticObservablenever()
栗子:
Observable
1
1.10defer()
在订阅的时候才会创建Observable对象;每一次订阅都创建一个新的Observable对象。
方法列表:
publicstaticObservabledefer(Func0>observableFactory)
栗子:
Observableobservable=Observable.defer(newFunc0>(){
@Override
publicObservablecall(){
returnObservable.just("string");
}
});
2重做
2.1repeat()
使Observable对象在发出onNext()通知之后重复发射数据。
重做结束才会发出onComplete()通知,若重做过程中出现异常则会中断并发出onError()通知。
方法列表:
publicfinalObservablerepeat()
publicfinalObservablerepeat(finallongcount)
publicfinalObservablerepeat(Schedulerscheduler)
publicfinalObservablerepeat(finallongcount,Schedulerscheduler)
栗子:
Observableobservable=Observable.just("string");
//无限重复执行
observable.repeat();
//重复执行5次
observable.repeat(5);
2.2repeatWhen()
使Observable对象在发出onNext()通知之后有条件的重复发射数据。
重做结束才会发出onCompleted()通知,若重做过程中出现异常则会中断并发出onError()通知。
方法列表:
publicfinalObservablerepeatWhen(finalFunc1
superObservable
extendsVoid>,?
extendsObservable
>>notificationHandler)
publicfinalObservablerepeatWhen(finalFunc1
superObservable
extendsVoid>,?
extendsObservable>?
>notificationHandler,Schedulerscheduler)
栗子:
observable.repeatWhen(newFunc1extendsVoid>,Observable
>>(){
@Override
publicObservable
>call(Observable
extendsVoid>observable){
//重复3次,每次间隔1s
returnobservable.zipWith(Observable.range(1,3),newFunc2(){
@Override
publicIntegercall(VoidaVoid,Integerinteger){
returninteger;
}
}).flatMap(integer->Observable.timer(1,TimeUnit.SECONDS));
}
});
3重试
3.1retry()
在执行Observable对象的序列出现异常时,不直接发出onError()通知,而是重新订阅该Observable对象,直到重做过程中未出现异常,则会发出onNext()和onCompleted()通知;若重做过程中也出现异常,则会继续重试,直到达到重试次数上限,超出次数后发出最新的onError()通知。
方法列表:
publicfinalObservableretry()
publicfinalObservableretry(finallongcount)
publicfinalObservableretry(Func2predicate)
栗子:
Observableobservable=Observable.create(newObservable.OnSubscribe(){
@Override
publicvoidcall(Subscriber
superInteger>subscriber){
System.out.println(".......");
inta=1/0;
subscriber.onNext(a);
subscriber.onCompleted();
}
});
//无限次的重试
observable.retry();
//重试3次
observable.retry(3);
//使用谓语函数决定是否重试
observable.retry(newFunc2(){
@Override
publicBooleancall(Integerinteger,Throwablethrowable){
//参数integer是订阅的次数;参数throwable是抛出的异常
//返回值为true表示重试,返回值为false表示不重试
returnfalse;
}
});
3.2retryWhen()
作用:
有条件的执行重试。
方法列表:
publicfinalObservableretryWhen(finalFunc1
superObservable
extendsThrowable>,?
extendsObservable
>>notificationHandler)
publicfinalObservableretryWhen(finalFunc1
superObservable
extendsThrowable>,?
extendsObservable
>>notificationHandler,Schedulerscheduler)
栗子:
//重试3次,每次间隔1s
observable.retryWhen(newFunc1extendsThrowable>,Observable
>>(){
@Override
publicObservable
>call(Observable
extendsThrowable>observable){
returnobservable.zipWith(Observable.range(1,3),newFunc2(){
@Override
publicObjectcall(Throwablethrowable,Integerinteger){
returninteger;
}
}).flatMap(newFunc1