RxJava is the open-source implementation of ReactiveX in Java, a library for asynchronous programming and event-based programming using observable sequences. This is the introduction from the official website, with a primary focus on asynchronous programming, chaining calls, and event sequences.
- Introduction to RxJava
- Concepts
- Basic Implementation
- Just Operator
- From Operator
- Defer Operator
- Empty Operator
- Never Operator
- Timer Operator
- Interval Operator
- Range Operator
- Summary
Introduction to RxJava#
implementation "io.reactivex.rxjava2:rxjava:2.2.3"
implementation 'io.reactivex.rxjava2:rxandroid:2.1.0'
Concepts#
Several important concepts in RxJava are: Observer, Observable, and event sequences. The event sequence is entirely controlled by the Observable. So how does the Observable notify the Observer when needed? This requires establishing a subscription relationship between the Observable and the Observer. Once the subscription relationship is established, when the Observable changes, the Observer can receive the changes immediately.
In RxJava2, there are four event callback methods for the Observer:
- onSubscribe: Used to unsubscribe
- onNext: Called by the Observer to receive the emitted event sequence
- onError: Called by the Observer to indicate an error in the event sequence, and no further events will be emitted
- onComplete: Called by the Observer to indicate that the event sequence has completed, allowing for event emission
Note:
- After onError is called, no further events are allowed to be emitted. After onComplete is called, events can still be emitted. Regardless of whether events can continue to be emitted, the Observer will not receive messages for both calls;
- onError and onComplete are mutually exclusive; only one can be called. If you call onError after onComplete, the program will crash. However, calling onComplete after onError does not crash because no events are allowed to be emitted after onError, so there will naturally be no error;
- Among the four callback methods, once the subscription relationship is established, the onSubscribe method will be called. The triggering of onNext, onError, and onComplete methods is entirely determined by the Observable, which can lead to misunderstandings.
Basic Implementation#
- Create an Observer, which decides how to handle events when they occur, as shown below:
// Observer
Observer<String> observer = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
// Unsubscribe
Log.i(TAG, "onSubscribe--->");
}
@Override
public void onNext(String s) {
// Called when the Observer receives an event
Log.i(TAG, "onNext--->" + s);
}
@Override
public void onError(Throwable e) {
// Called when an error occurs in the event sequence
Log.i(TAG, "onError--->");
}
@Override
public void onComplete() {
// Called when the event sequence is complete
Log.i(TAG, "onComplete--->");
}
};
- Create an Observable, which decides when to trigger events and what kind of events to trigger, as shown below:
// Observable
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("Event1");
emitter.onNext("Event2");
emitter.onComplete();
emitter.onNext("Event3");
}
});
- Establish a subscription relationship between the Observer and the Observable, as shown below:
// Establish subscription relationship
observable.subscribe(observer);
The output of the above code is as follows:
onSubscribe--->
onNext--->Event1
onNext--->Event2
onComplete--->
Clearly, since the onComplete method is called after sending Event2, the subsequent event Event3 will not be received by the Observer.
The above code can also be written like this, with the same result, as shown below:
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("Event1");
emitter.onNext("Event2");
emitter.onComplete();
emitter.onNext("Event3");
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.i(TAG, "onSubscribe--->");
}
@Override
public void onNext(String s) {
Log.i(TAG, "onNext--->" + s);
}
@Override
public void onError(Throwable e) {
Log.i(TAG, "onError--->");
}
@Override
public void onComplete() {
Log.i(TAG, "onComplete--->");
}
});
In the above code, the create method of Observable is used to create the Observable and send related events. To help understand, here is the official diagram regarding the create operator:
Observable also provides many static methods to create Observables, which will be introduced in the following sections.
Just Operator#
Using just can create an Observable that emits specified events, with a maximum of 10 events. Compared to create, it simplifies the processing flow to some extent. The overloaded methods of just are as follows:
public static <T> Observable<T> just(T item)
public static <T> Observable<T> just(T item1, T item2)
public static <T> Observable<T> just(T item1, T item2, T item3)
public static <T> Observable<T> just(T item1, T item2, T item3, T item4)
public static <T> Observable<T> just(T item1, T item2, T item3, T item4, T item5)
public static <T> Observable<T> just(T item1, T item2, T item3, T item4, T item5, T item6)
public static <T> Observable<T> just(T item1, T item2, T item3, T item4, T item5, T item6, T item7)
public static <T> Observable<T> just(T item1, T item2, T item3, T item4, T item5, T item6, T item7, T item8)
public static <T> Observable<T> just(T item1, T item2, T item3, T item4, T item5, T item6, T item7, T item8, T item9)
public static <T> Observable<T> just(T item1, T item2, T item3, T item4, T item5, T item6, T item7, T item8, T item9, T item10)
Here is a simple usage of the just operator:
// Simple usage of just operator
Observable.just("Event1", "Event2", "Event3")
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.i(TAG, "onSubscribe--->");
}
@Override
public void onNext(String s) {
Log.i(TAG, "onNext--->" + s);
}
@Override
public void onError(Throwable e) {
Log.i(TAG, "onError--->");
}
@Override
public void onComplete() {
Log.i(TAG, "onComplete--->");
}
});
The output of the above code is as follows:
onSubscribe--->
onNext--->Event1
onNext--->Event2
onNext--->Event3
onComplete--->
Let's take a look at the official diagram regarding the just operator. Below is a diagram showing just emitting four events:
From Operator#
Using from-related operators can create an Observable that emits arrays, collections (Iterable), and asynchronous tasks (future). The from-related operators can be divided into the following categories:
// Array
public static <T> Observable<T> fromArray(T... items)
// Collection
public static <T> Observable<T> fromIterable(Iterable<? extends T> source)
// Asynchronous task
public static <T> Observable<T> fromFuture(Future<? extends T> future)
// Asynchronous task + timeout
public static <T> Observable<T> fromFuture(Future<? extends T> future, long timeout, TimeUnit unit)
// Asynchronous task + timeout + thread scheduler
public static <T> Observable<T> fromFuture(Future<? extends T> future, long timeout, TimeUnit unit, Scheduler scheduler)
// Asynchronous task + thread scheduler
public static <T> Observable<T> fromFuture(Future<? extends T> future, Scheduler scheduler)
// Publisher in Reactive Streams, similar to create operator, event emission is determined by the publisher (Observable)
public static <T> Observable<T> fromPublisher(Publisher<? extends T> publisher)
fromArray/fromIterable#
Here is the usage of fromArray:
// Simple usage of fromArray operator
String[] events = {"Event1", "Event2", "Event3"};
Observable.fromArray(events).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.i(TAG, "onSubscribe--->");
}
@Override
public void onNext(String s) {
Log.i(TAG, "onNext--->" + s);
}
@Override
public void onError(Throwable e) {
Log.i(TAG, "onError--->");
}
@Override
public void onComplete() {
Log.i(TAG, "onComplete--->");
}
});
Let's take a look at the official diagram for fromArray:
Here is the usage of fromIterable:
// Simple usage of fromIterable operator
List<String> list = new ArrayList<>();
list.add("Event1");
list.add("Event2");
list.add("Event3");
Observable.fromIterable(list).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.i(TAG, "onSubscribe--->");
}
@Override
public void onNext(String s) {
Log.i(TAG, "onNext--->" + s);
}
@Override
public void onError(Throwable e) {
Log.i(TAG, "onError--->" + e);
}
@Override
public void onComplete() {
Log.i(TAG, "onComplete--->");
}
});
Let's take a look at the official diagram for fromIterable:
The output of the above code is as follows:
onSubscribe--->
onNext--->Event1
onNext--->Event2
onNext--->Event3
onComplete--->
fromCallable#
Callable is located in the java.util.concurrent package, similar to Runnable, but with a return value. Events emitted by fromCallable are emitted from the main thread. If not subscribed, the operations inside call will not be executed. When using fromCallable, pay attention to the following points:
- For time-consuming tasks, use subscribeOn to switch the subscription thread;
- To receive the emitted values from the Observable for time-consuming tasks, use observeOn to switch to the Main thread;
- To avoid memory leaks and other issues, cancel the subscription in the corresponding onDestroy method.
Here is a simple usage of fromCallable:
// Simple usage of fromCallable operator
Observable.fromCallable(new Callable<String>() {
@Override
public String call() throws Exception {
// Other operations...
return "call";
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.i(TAG, "onSubscribe--->");
}
@Override
public void onNext(String s) {
Log.i(TAG, "onNext--->" + s + Thread.currentThread());
}
@Override
public void onError(Throwable e) {
Log.i(TAG, "onError--->" + e);
}
@Override
public void onComplete() {
Log.i(TAG, "onComplete--->");
}
});
The execution result is as follows:
onSubscribe--->
onNext--->call
onComplete--->
Let's take a look at the official diagram for fromCallable:
fromFuture#
As mentioned above, fromFuture has four overloaded methods, where parameters can specify asynchronous tasks, task timeout, thread schedulers, etc. First, let's understand the Future interface, which is located in the java.util.concurrent package. Its main function is to determine whether a Runnable or Callable asynchronous task has been executed, obtain the task result, and cancel the specific task. Since Runnable and Callable are executed with threads, this means that events emitted by fromFuture are emitted from a non-Main thread. If executing a time-consuming task, remember to use subscribeOn to switch the subscription thread. Below is an example using FutureTask to illustrate the usage of fromFuture.
Create a Callable to execute an asynchronous task, as shown below:
// Asynchronous task
private class MCallable implements Callable<String> {
@Override
public String call() throws Exception {
Log.i(TAG, "Task execution started--->");
Thread.sleep(5000);
Log.i(TAG, "Task execution ended--->");
return "MCallable";
}
}
Then, create a FutureTask, as shown below:
// Create FutureTask
MCallable mCallable = new MCallable();
FutureTask<String> mFutureTask = new FutureTask<>(mCallable);
Next, use a Thread to execute the created Future, as shown below:
// Execute FutureTask
new Thread(mFutureTask).start();
Finally, use fromFuture to create the corresponding Observable and subscribe, as shown below:
// fromFuture
Observable.fromFuture(mFutureTask)
.subscribeOn(Schedulers.io()) // Switch subscription thread
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.i(TAG, "onSubscribe--->");
}
@Override
public void onNext(String s) {
Log.i(TAG, "onNext--->" + s + Thread.currentThread());
}
@Override
public void onError(Throwable e) {
Log.i(TAG, "onError--->" + e);
}
@Override
public void onComplete() {
Log.i(TAG, "onComplete--->");
}
});
The expected output of the above code is as follows:
Task execution started--->
onSubscribe--->
Task execution ended--->
onNext--->MCallable
onComplete--->
Let's take a look at the official diagram for fromFuture. The following diagram shows the fromFuture method carrying a parameter Future:
The asynchronous task delays for 5 seconds. If we use the overloaded method of fromFuture to specify a timeout of 4 seconds, as shown below:
// Specify timeout of 4s
Observable.fromFuture(mFutureTask, 4, TimeUnit.SECONDS, Schedulers.io())
//...
At this point, since the asynchronous task cannot complete within 4 seconds, the Observer will trigger the onError method accordingly, and the execution result is as follows:
Task execution started--->
onSubscribe--->
onError--->java.util.concurrent.TimeoutException
Task execution ended--->
So how do we cancel this asynchronous task? This is also one of the advantages of Future, allowing you to cancel the task at will, as shown below:
// Cancel asynchronous task
public void cancelTask(View view) {
if (mFutureTask.isDone()) {
Log.i(TAG, "Task has been completed--->");
} else {
Log.i(TAG, "Task is executing--->");
boolean cancel = mFutureTask.cancel(true);
Log.i(TAG, "Task cancellation success--cancel->" + cancel);
Log.i(TAG, "Task cancellation success--isCancelled->" + mFutureTask.isCancelled());
}
}
Here is the result of canceling the task during execution:
Task execution started--->
onSubscribe--->
Task is executing--->
Task cancellation success--cancel->true
Task cancellation success--isCancelled->true
onError--->java.util.concurrent.CancellationException
This way, the ongoing asynchronous task has been canceled. This part of the content is more about Java Future-related knowledge.
Defer Operator#
Using defer to create an Observable means that the Observable will only be created and emit related events when subscribed. Here is the usage of the defer operator:
// defer
defer = "old";
Observable<String> observable = Observable.defer(new Callable<ObservableSource<String>>() {
@Override
public ObservableSource<String> call() throws Exception {
return Observable.just(defer);
}
});
defer = "new";
observable.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.i(TAG, "onSubscribe--->");
}
@Override
public void onNext(String s) {
Log.i(TAG, "onNext--->" + s);
}
@Override
public void onError(Throwable e) {
Log.i(TAG, "onError--->" + e);
}
@Override
public void onComplete() {
Log.i(TAG, "onComplete--->");
}
});
The execution result of the above code is as follows:
onSubscribe--->
onNext--->new
onComplete--->
Clearly, the latest Observable was created just before the subscription, and the data received in onNext is also the latest. To understand the defer operator, let's take a look at the official diagram for the defer operator:
Empty Operator#
Using the empty operator can create an Observable that does not emit any data but terminates normally, as shown below:
// empty
Observable.empty().subscribe(new Observer<Object>() {
@Override
public void onSubscribe(Disposable d) {
Log.i(TAG, "onSubscribe--->");
}
@Override
public void onNext(Object o) {
Log.i(TAG, "onNext--->" + o);
}
@Override
public void onError(Throwable e) {
Log.i(TAG, "onError--->" + e);
}
@Override
public void onComplete() {
Log.i(TAG, "onComplete--->");
}
});
The output of the above code is as follows:
onSubscribe--->
onComplete--->
To facilitate understanding of the empty operator's usage, let's look at some official diagrams of the empty operator:
Never Operator#
Using the never operator can create an Observable that does not emit any data and does not terminate, as shown below:
// never
Observable.never().subscribe(new Observer<Object>() {
@Override
public void onSubscribe(Disposable d) {
Log.i(TAG, "onSubscribe--->");
}
@Override
public void onNext(Object o) {
Log.i(TAG, "onNext--->" + o);
}
@Override
public void onError(Throwable e) {
Log.i(TAG, "onError--->" + e);
}
@Override
public void onComplete() {
Log.i(TAG, "onComplete--->");
}
});
The output of the above code is as follows:
onSubscribe--->
To facilitate understanding of the never operator's usage, let's look at some official diagrams of the never operator:
Timer Operator#
The timer operator can create an Observable that emits a fixed value of 0 after a delay, and it can also specify a thread scheduler. The overloaded methods of timer are as follows:
// Delay
public static Observable<Long> timer(long delay, TimeUnit unit)
// Delay + thread scheduler
public static Observable<Long> timer(long delay, TimeUnit unit, Scheduler scheduler)
Here is the usage of the timer operator:
// timer
Observable.timer(3, TimeUnit.SECONDS, Schedulers.io()).subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
Log.i(TAG, "onSubscribe--->");
}
@Override
public void onNext(Long s) {
Log.i(TAG, "onNext--->" + s);
Log.i(TAG, "Current thread--->" + Thread.currentThread().getName());
}
@Override
public void onError(Throwable e) {
Log.i(TAG, "onError--->" + e);
}
@Override
public void onComplete() {
Log.i(TAG, "onComplete--->");
}
});
The execution result of the above code is as follows:
onSubscribe--->
// Data received after a delay of 3 seconds
onNext--->0
Current thread--->RxCachedThreadScheduler-1
onComplete--->
To facilitate understanding of the timer operator's usage, let's look at some official diagrams of the timer operator. Below is an example of the timer specifying a delay and thread scheduler:
Interval Operator#
Using the interval operator can create an Observable that emits integer values at fixed time intervals. The interval can specify initial delay, time interval, thread scheduler, etc. The overloaded methods of interval are as follows:
// Initial delay + time interval
public static Observable<Long> interval(long initialDelay, long period, TimeUnit unit)
// Initial delay + time interval + thread scheduler
public static Observable<Long> interval(long initialDelay, long period, TimeUnit unit, Scheduler scheduler)
// Time interval
public static Observable<Long> interval(long period, TimeUnit unit)
// Time interval + thread scheduler
public static Observable<Long> interval(long period, TimeUnit unit, Scheduler scheduler)
Here is the usage of the interval operator:
// interval
Observable.interval(3, TimeUnit.SECONDS).subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
Log.i(TAG, "onSubscribe--->");
}
@Override
public void onNext(Long aLong) {
Log.i(TAG, "onNext--->" + aLong);
}
@Override
public void onError(Throwable e) {
Log.i(TAG, "onError--->" + e);
}
@Override
public void onComplete() {
Log.i(TAG, "onComplete--->");
}
});
After executing the above code, it will continuously send integer values every 3 seconds, and the execution result is as follows:
onSubscribe--->
onNext--->0
onNext--->1
onNext--->2
...
To facilitate understanding of the interval operator's usage, let's look at some official diagrams of the interval operator. Below is an example of the interval specifying interval time and time unit:
Range Operator#
Using the range operator can create an Observable that emits a specified range of integer values. There are two related methods for range, differing only in how the range of values is represented. The method declarations are as follows:
// int
public static Observable<Integer> range(final int start, final int count)
// long
public static Observable<Long> rangeLong(long start, long count)
Here is the usage of the range operator:
// range
Observable.range(1, 5).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.i(TAG, "onSubscribe--->");
}
@Override
public void onNext(Integer integer) {
Log.i(TAG, "onNext--->" + integer);
}
@Override
public void onError(Throwable e) {
Log.i(TAG, "onError--->" + e);
}
@Override
public void onComplete() {
Log.i(TAG, "onComplete--->");
}
});
The execution result of the above code is as follows:
onSubscribe--->
onNext--->1
onNext--->2
onNext--->3
onNext--->4
onNext--->5
onComplete--->
To facilitate understanding of the range operator's usage, let's look at some official diagrams of the range operator:
Summary#
This article mainly introduces the basic knowledge related to RxJava2 and the understanding and usage of creation operators in RxJava2.