Previous article introduced the use of creation operators. Today, let's take a look at the use of transformation operators in RxJava. The commonly used transformation operators are as follows:
- buffer operator
- window operator
- map operator
- groupBy operator
- cast operator
- scan operator
- To operator
buffer operator#
The buffer operator has many overloaded methods. Here, we will select a few typical ones to illustrate the use of the buffer operator. The use of the buffer operator can be divided into the following three categories:
// First category
public final Observable<List<T>> buffer(int count)
public final Observable<List<T>> buffer(int count, int skip)
// Second category
public final Observable<List<T>> buffer(long timespan, TimeUnit unit)
public final Observable<List<T>> buffer(long timespan, long timeskip, TimeUnit unit)
// Third category
public final <B> Observable<List<T>> buffer(ObservableSource<B> boundary)
public final <TOpening, TClosing> Observable<List<T>> buffer(
ObservableSource<? extends TOpening> openingIndicator,
Function<? super TOpening, ? extends ObservableSource<? extends TClosing>> closingIndicator)
buffer(int count)#
The buffer operator transforms an Observable into another Observable that collects the originally emitted data and then emits these cached data collections. The buffer converts individual emitted events into collections of elements. Below is the official diagram for this situation:
In the event sending process below, if buffer is not set, four emissions are required. If the following buffer is used for transformation, only two emissions are needed. The test code is as follows:
count = 0;
Observable.just("Event1", "Event2", "Event3", "Event4")
.buffer(2)
.subscribe(new Consumer<List<String>>() {
@Override
public void accept(List<String> strings) throws Exception {
count++;
Log.i(TAG, "Received for the " + count + " time...");
Log.i(TAG, "accept--->" + strings.size());
Log.i(TAG, "Received data...");
for (String str : strings) {
Log.i(TAG, "accept--->" + strings.size() + "---" + str);
}
}
});
The execution result of the above code is as follows:
Received for the 1 time...
accept--->2
Received data...
accept--->2---Event1
accept--->2---Event2
Received for the 2 time...
accept--->2
Received data...
accept--->2---Event3
accept--->2---Event4
buffer(int count, int skip)#
Compared to buffer(int count), skip can specify the position of the next collection of events converted from the source Observable. If count equals skip, then buffer(int count, int skip) is equivalent to buffer(int count). The official diagram is as follows:
In the event sending process below, it is equivalent to sending events in groups of 3, but the position parameter for collecting data is set to skip as 2, so there will be data duplication in each collected data. The test code is as follows:
count = 0;
Observable.just("Event1", "Event2", "Event3", "Event4", "Event5")
.buffer(3, 2)
.subscribe(new Consumer<List<String>>() {
@Override
public void accept(List<String> strings) throws Exception {
count++;
Log.i(TAG, "Received for the " + count + " time...");
Log.i(TAG, "accept--->" + strings.size());
Log.i(TAG, "Received data...");
for (String str : strings) {
Log.i(TAG, "accept--->" + strings.size() + "---" + str);
}
}
});
The execution result of the above code is as follows:
Received for the 1 time...
accept--->3
Received data...
accept--->3---Event1
accept--->3---Event2
accept--->3---Event3
Received for the 2 time...
accept--->3
Received data...
accept--->3---Event3
accept--->3---Event4
accept--->3---Event5
Received for the 3 time...
accept--->1
Received data...
accept--->1---Event5
buffer(long timespan, TimeUnit unit)#
The buffer operator transforms an Observable into a new Observable, where timespan determines the time interval at which the new Observable emits cached data. The official diagram is as follows:
In the event sending process below, the source Observable emits events every 2 seconds, while the newly generated Observable from buffer emits cached event collections every 1 second. Of course, this may lead to data loss due to the inability to collect data during the interval. The test code is as follows:
Observable.intervalRange(1,8,0,2, TimeUnit.SECONDS)
.buffer(1,TimeUnit.SECONDS)
.subscribe(new Consumer<List<Long>>() {
@Override
public void accept(List<Long> longs) throws Exception {
Log.i(TAG, "accept--->" + String.valueOf(longs));
}
});
The execution result of the above code is as follows:
accept--->[1]
accept--->[]
accept--->[2]
accept--->[]
accept--->[3]
accept--->[]
accept--->[4]
accept--->[]
accept--->[5]
buffer(long timespan, long timeskip, TimeUnit unit)#
The buffer operator transforms an Observable into another Observable, where timeskip determines when the newly generated Observable starts a new buffer periodically. The new Observable will emit the collection of events collected within the timespan time interval. The official diagram is as follows:
In the event sending process below, the source Observable emits integers from 1 to 12 every second, while the newly generated Observable from buffer receives events from the source Observable every 5 seconds. The test code is as follows:
Observable.intervalRange(1,12,0,1, TimeUnit.SECONDS)
.buffer(1,5, TimeUnit.SECONDS)
.subscribe(new Consumer<List<Long>>() {
@Override
public void accept(List<Long> longs) throws Exception {
Log.i(TAG, "accept--->" + String.valueOf(longs));
}
});
The execution result of the above code is as follows:
accept--->[1]
accept--->[6]
accept--->[11]
buffer(ObservableSource boundary)#
The buffer(boundary) monitors an Observable named boundary. Whenever this Observable emits an event, it creates a new List to start collecting events emitted from the original Observable and sends the collected data. The official diagram is as follows:
In the event sending process below, the number of collected original events sent will vary due to different time intervals. The test code is as follows:
Observable.intervalRange(1,10,0,2, TimeUnit.SECONDS)
.buffer(Observable.interval(3, TimeUnit.SECONDS))
.subscribe(new Consumer<List<Long>>() {
@Override
public void accept(List<Long> longs) throws Exception {
Log.i(TAG, "accept--->" + String.valueOf(longs));
}
});
The execution result of the above code is as follows:
accept--->[1, 2]
accept--->[3]
accept--->[4, 5]
accept--->[6]
accept--->[7, 8]
accept--->[9]
accept--->[10]
buffer(openingIndicator, closingIndicator)#
The buffer(openingIndicator, closingIndicator) monitors an Observable named openingIndicator. Each time this Observable emits an event, it creates a List to collect data sent from the original Observable and gives the collected data to closingIndicator. The closingIndicator returns an Observable, which the buffer monitors. When this Observable emits data, it closes the List and emits the data collected from the openingIndicator. Below is the official diagram for this situation:
In the event sending process below, the original Observable emits integers between 1 and 12 at 1-second intervals. The Observable named openingIndicator creates a List every 3 seconds to send events, and then gives the collected data to closingIndicator, which will emit the data received from the openingIndicator after a 1-second delay. The test code is as follows:
Observable openingIndicator = Observable.interval(3, TimeUnit.SECONDS);
Observable closingIndicator = Observable.timer(1,TimeUnit.SECONDS);
Observable.intervalRange(1,12,0,1, TimeUnit.SECONDS)
.buffer(openingIndicator, new Function<Object, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Object o) throws Exception {
return closingIndicator;
}
})
.subscribe(new Consumer<List<Long>>() {
@Override
public void accept(List<Long> longs) throws Exception {
Log.i(TAG, "accept--->" + String.valueOf(longs));
}
});
The execution result of the above code is as follows:
accept--->[4, 5]
accept--->[7]
accept--->[10]
window operator#
Here, we will use window(long count) as an example to introduce the use of the window operator. The use of the window operator is similar to that of the buffer operator. The difference is that the Observable transformed by buffer emits collections of events sent by the source Observable, while the Observable transformed by the window operator will sequentially emit count events sent by the source Observable. The official diagram for this operator is as follows:
The test code is as follows:
Observable.just("Event1", "Event2", "Event3", "Event4")
.window(2)
.subscribe(new Consumer<Observable<String>>() {
@Override
public void accept(Observable<String> stringObservable) throws Exception {
Log.i(TAG, "accept--Observable->");
stringObservable.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.i(TAG, "accept--->" + s);
}
});
}
});
The execution result of the above code is as follows:
accept--Observable->
accept--->Event1
accept--->Event2
accept--Observable->
accept--->Event3
accept--->Event4
map operator#
map(mapper)#
The map operator can transform the type of emitted data. The official diagram for the map operation is as follows:
In the event sending process below, after being transformed by the map operator, the events emitted by the source Observable can be further processed and transformed. The test code is as follows:
Observable.just("Event1", "Event2", "Event3", "Event4")
.map(new Function<String, String>() {
@Override
public String apply(String s) throws Exception {
return "this is " + s;
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.i(TAG, "accept--->" + s);
}
});
The execution result of the above code is as follows:
accept--->this is Event1
accept--->this is Event2
accept--->this is Event3
accept--->this is Event4
flatMap(mapper)#
The flatMap operator, when used, transforms events emitted by the source Observable into Observables that can emit multiple events. These Observables are eventually merged into a single Observable, which will emit all these events. Here, we will take the commonly used flatMap(mapper) as an example, and its official diagram is as follows:
In the event sending process below, after using the flatMap operator, when the source Observable emits events, corresponding Observables are generated, and the final emitted events are merged into a single Observable, which then calls back the event results to the observer. The test code is as follows:
final Observable observable = Observable.just("Event5", "Event6");
Observable.just("Event1", "Event2", "Event3", "Event4")
.flatMap(new Function<String, Observable<String>>() {
@Override
public Observable<String> apply(String s) throws Exception {
return observable;
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.i(TAG, "accept--->" + s);
}
});
The execution result of the above code is as follows:
accept--->Event5
accept--->Event6
accept--->Event5
accept--->Event6
accept--->Event5
accept--->Event6
accept--->Event5
accept--->Event6
concatMap(mapper)#
The use of concatMap is similar to that of flatMap, but concatMap guarantees the order of event reception, while flatMap does not guarantee the order of event reception. The official diagram for the concatMap operator is as follows:
In the event sending process below, we delay 3 seconds when the source Observable emits the integer 1, and then continue to emit other events. The test code is as follows:
Observable.intervalRange(1, 2, 0, 1, TimeUnit.SECONDS)
.concatMap(new Function<Long, ObservableSource<Long>>() {
@Override
public ObservableSource<Long> apply(Long aLong) throws Exception {
int delay = 0;
if (aLong == 1) {
delay = 3;
}
return Observable.intervalRange(4, 4, delay, 1, TimeUnit.SECONDS);
}
}).subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.i(TAG, "accept--->" + aLong);
}
});
The execution result of the above code using concatMap is as follows:
accept--->4
accept--->5
accept--->6
accept--->7
accept--->4
accept--->5
accept--->6
accept--->7
The execution result of the above code using flatMap is as follows:
accept--->4
accept--->5
accept--->6
accept--->4
accept--->7
accept--->5
accept--->6
accept--->7
As can be seen, concatMap guarantees the order of event reception compared to flatMap.
switchMap(mapper)#
When the source Observable emits events, it transforms them into Observables that can emit multiple events. The switchMap operator only cares about the current Observable. This means that whenever the source Observable emits a new event, it will discard the previously emitted Observable that can send multiple events. The official diagram is as follows:
In the event sending process below, the source Observable emits 1 and 2 every 2 seconds, while the transformed Observable emits integers starting from 4 every second. When the source Observable emits an integer 1, the new Observable that can emit multiple events only sends two integers, which are 4 and 5, and then stops sending because the source Observable starts emitting events again. At this point, the previous Observable that could emit multiple events is discarded, and the listening for the next event from the source Observable begins. The test code is as follows:
Observable.intervalRange(1, 2, 0, 2, TimeUnit.SECONDS)
.switchMap(new Function<Long, ObservableSource<Long>>() {
@Override
public ObservableSource<Long> apply(Long aLong) throws Exception {
Log.i(TAG, "accept-aLong-->" + aLong);
return Observable.intervalRange(4, 4, 0, 1, TimeUnit.SECONDS);
}
}).subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.i(TAG, "accept--->" + aLong);
}
});
The execution result of the above code is as follows:
accept-aLong-->1
accept--->4
accept--->5
accept-aLong-->2
accept--->4
accept--->5
accept--->6
accept--->7
In addition, there are related operators: concatMapDelayError, concatMapEager, concatMapEagerDelayError, concatMapIterable, flatMapIterable, switchMapDelayError, which are all extensions of the above operators and will not be introduced here.
groupBy operator#
The groupBy operator classifies the received data according to specified rules and then outputs it through GroupedObservable and others. The official diagram is as follows:
In the event sending process below, we will group the output according to scores, as follows:
List<DataBean> beanList = new ArrayList<>();
beanList.add(new DataBean("Score is 95", 95));
beanList.add(new DataBean("Score is 70", 70));
beanList.add(new DataBean("Score is 56", 56));
beanList.add(new DataBean("Score is 69", 69));
beanList.add(new DataBean("Score is 90", 90));
beanList.add(new DataBean("Score is 46", 46));
beanList.add(new DataBean("Score is 85", 85));
Observable.fromIterable(beanList)
.groupBy(new Function<DataBean, String>() {
@Override
public String apply(DataBean dataBean) throws Exception {
int score = dataBean.getScore();
if (score >= 80) {
return "A";
}
if (score >= 60 && score < 80) {
return "B";
}
if (score < 60) {
return "C";
}
return null;
}
})
.subscribe(new Consumer<GroupedObservable<String, DataBean>>() {
@Override
public void accept(final GroupedObservable<String, DataBean> groupedObservable) throws Exception {
groupedObservable.subscribe(new Consumer<DataBean>() {
@Override
public void accept(DataBean dataBean) throws Exception {
Log.i(TAG, "accept--->"+ groupedObservable.getKey() + " group--->"+dataBean.getDesc());
}
});
}
});
The execution result of the above code is as follows:
accept--->A group--->Score is 95
accept--->B group--->Score is 70
accept--->C group--->Score is 56
accept--->B group--->Score is 69
accept--->A group--->Score is 90
accept--->C group--->Score is 46
accept--->A group--->Score is 85
cast operator#
The cast operator is used for type conversion. The official diagram for the cast operator is as follows:
The test code is as follows:
Observable.just(1,2,3,4,5)
.cast(String.class)
.subscribe(new Consumer<String>() {
@Override
public void accept(String String) throws Exception {
Log.i(TAG, "accept--->" + String);
}
});
The test will produce the following exception:
java.lang.ClassCastException: Cannot cast java.lang.Integer to java.lang.String
From the result, it can be seen that type conversion between different types will result in a type conversion exception. The cast operator cannot perform type conversion between different types, but it can be used to verify whether the emitted event data type is of the specified type.
scan operator#
The scan operator scans each pair of elements in sequence. The first element does not have a previous element, so the first element's previous element will be ignored. When scanning the second element, the first element will be obtained, and the return value of the apply method will be used as the value of the previous element for the next calculation, ultimately returning the transformed result. The official diagram for scan is as follows:
Let's look at the event sending process below. During the first scan, the first element is 1, which is equivalent to last, and the second element is 2, which is equivalent to item. At this point, the return value of the apply method is 2, which will be used as the value of last for the next scan. Thus, the next return value will definitely be 2 * 3, which is 6. The test code is as follows:
Observable.just(1, 2, 3, 4, 5)
.scan(new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer last, Integer item) throws Exception {
Log.i(TAG, "accept--last->" + last);
Log.i(TAG, "accept--item->" + item);
return last * item;
}
})
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.i(TAG, "accept--->" + integer);
}
});
The execution result of the above code is as follows:
accept--->1
accept--last->1
accept--item->2
accept--->2
accept--last->2
accept--item->3
accept--->6
accept--last->6
accept--item->4
accept--->24
accept--last->24
accept--item->5
accept--->120
To operator#
toList()#
The toList operator converts a series of emitted data into a List and then sends it out all at once. The official diagram for toList is as follows:
The test code is as follows:
Observable.just(1, 2, 3, 4)
.toList()
.subscribe(new Consumer<List<Integer>>() {
@Override
public void accept(List<Integer> integers) throws Exception {
Log.i(TAG, "accept--->" + integers);
}
});
The execution result of the above code is as follows:
accept--->[1, 2, 3, 4]
toMap(keySelector)#
The toMap operator converts the events to be emitted into a Map format according to specified rules and then sends it out all at once. The official diagram for the toMap operator is as follows:
The test code is as follows:
Observable.just(1, 2, 3, 4)
.toMap(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
return "key" + integer;
}
})
.subscribe(new Consumer<Map<String, Integer>>() {
@Override
public void accept(Map<String, Integer> map) throws Exception {
Log.i(TAG, "accept--->" + map);
}
});
The execution result of the above code is as follows:
accept--->{key2=2, key4=4, key1=1, key3=3}
The transformation operators in RxJava are basically as described above, and specific usage should be integrated into actual needs.