上篇文章では、作成型オペレーターの使用について紹介しました。今日は、RxJava における変換型オペレーターの使用について見ていきましょう。一般的な変換型オペレーターは以下の通りです:
- buffer オペレーター
- window オペレーター
- map オペレーター
- groupBy オペレーター
- cast オペレーター
- scan オペレーター
- To オペレーター
buffer オペレーター#
buffer オペレーターはオーバーロードメソッドが多く、ここでは典型的なものを選んで buffer オペレーターの使用を説明します。buffer オペレーターの使用は以下の 3 つのカテゴリに分けられます。具体的には以下の通りです:
//第一類
public final Observable<List<T>> buffer(int count)
public final Observable<List<T>> buffer(int count, int skip)
//第二類
public final Observable<List<T>> buffer(long timespan, TimeUnit unit)
public final Observable<List<T>> buffer(long timespan, long timeskip, TimeUnit unit)
//第三類
public final <B> Observable<List<T>> buffer(ObservableSource<B> boundary)
public final <TOpening, TClosing> Observable<List<T>> buffer(
ObservableSource<? extends TOpening> openingIndicator,
Function<? super TOpening, ? extends ObservableSource<? extends TClosing>> closingIndicator)
buffer(int count)#
buffer オペレーターは Observable を Observable に変換します。この Observable は元の送信データを収集し、これらのキャッシュデータ集合を送信します。buffer は送信された単一のイベントを要素集合に変換します。以下はこの状況に対する公式の示意図です:
以下のイベントの送信プロセスでは、buffer を設定しない場合は 4 回送信する必要がありますが、以下の buffer を使用して変換すると、2 回の送信で済みます。テストコードは以下の通りです:
count = 0;
Observable.just("Event1", "Event2", "Event3", "Event4")
.buffer(2)
.subscribe(new Consumer<List<String>>() {
@Override
public void accept(List<String> strings) throws Exception {
count++;
Log.i(TAG, "第" + count + "次接収...");
Log.i(TAG, "accept--->" + strings.size());
Log.i(TAG, "受信したデータ...");
for (String str : strings) {
Log.i(TAG, "accept--->" + strings.size() + "---" + str);
}
}
});
上記のコードの実行結果は以下の通りです:
第1次接収...
accept--->2
受信したデータ...
accept--->2---Event1
accept--->2---Event2
第2次接収...
accept--->2
受信したデータ...
accept--->2---Event3
accept--->2---Event4
buffer(int count, int skip)#
buffer (int count) と比較して、skip は次回のソース Observable から変換される Observable がイベントを収集する位置を指定できます。count が skip と等しい場合、buffer (int count, int skip) は buffer (int count) と等価です。公式の示意図は以下の通りです:
以下のイベント送信プロセスでは、3 つのイベントごとに 1 組で送信されますが、毎回データを収集する位置パラメータ skip は 2 であるため、毎回収集されるデータの中に重複が発生します。テストコードは以下の通りです:
count = 0;
Observable.just("Event1", "Event2", "Event3", "Event4", "Event5")
.buffer(3, 2)
.subscribe(new Consumer<List<String>>() {
@Override
public void accept(List<String> strings) throws Exception {
count++;
Log.i(TAG, "第" + count + "次接収...");
Log.i(TAG, "accept--->" + strings.size());
Log.i(TAG, "受信したデータ...");
for (String str : strings) {
Log.i(TAG, "accept--->" + strings.size() + "---" + str);
}
}
});
上記のコードの実行結果は以下の通りです:
第1次接収...
accept--->3
受信したデータ...
accept--->3---Event1
accept--->3---Event2
accept--->3---Event3
第2次接収...
accept--->3
受信したデータ...
accept--->3---Event3
accept--->3---Event4
accept--->3---Event5
第3次接収...
accept--->1
受信したデータ...
accept--->1---Event5
buffer(long timespan, TimeUnit unit)#
buffer オペレーターは Observable を新しい Observable に変換します。timespan は新しい Observable がキャッシュデータを発信する時間間隔を決定します。公式の示意図は以下の通りです:
以下のイベント送信プロセスでは、ソース Observable が 2 秒ごとにイベントを送信し、buffer によって新しく生成された Observable は 1 秒ごとにキャッシュされたイベント集合を送信します。もちろん、こうすると間隔の時間帯にデータを収集できず、データが失われることになります。テストコードは以下の通りです:
Observable.intervalRange(1,8,0,2, TimeUnit.SECONDS)
.buffer(1,TimeUnit.SECONDS)
.subscribe(new Consumer<List<Long>>() {
@Override
public void accept(List<Long> longs) throws Exception {
Log.i(TAG, "accept--->" + String.valueOf(longs));
}
});
上記のコードの実行結果は以下の通りです:
accept--->[1]
accept--->[]
accept--->[2]
accept--->[]
accept--->[3]
accept--->[]
accept--->[4]
accept--->[]
accept--->[5]
buffer(long timespan, long timeskip, TimeUnit unit)#
buffer オペレーターは Observable を Observable に変換します。timeskip は新しく生成された Observable が定期的に新しいバッファを開始することを決定し、新しい Observable は timespan の時間間隔内に収集されたイベント集合を発信します。公式の示意図は以下の通りです:
以下のイベント送信プロセスでは、ソース Observable が 1 秒ごとに 1 から 12 までの整数を送信し、buffer によって新しく生成された Observable は 5 秒ごとにソース Observable が送信したイベントを受信します。テストコードは以下の通りです:
Observable.intervalRange(1,12,0,1, TimeUnit.SECONDS)
.buffer(1,5, TimeUnit.SECONDS)
.subscribe(new Consumer<List<Long>>() {
@Override
public void accept(List<Long> longs) throws Exception {
Log.i(TAG, "accept--->" + String.valueOf(longs));
}
});
上記のコードの実行結果は以下の通りです:
accept--->[1]
accept--->[6]
accept--->[11]
buffer(ObservableSource boundary)#
buffer (boundary) は、boundary という名前の Observable を監視します。この Observable がイベントを発信するたびに、新しい List を作成し、元の Observable から送信されたイベントを収集し、収集したデータを送信します。公式の示意図は以下の通りです:
以下のイベント送信プロセスでは、収集された元のイベントは時間間隔の違いにより、最終的に送信される収集されたイベントの数も異なります。テストコードは以下の通りです:
Observable.intervalRange(1,10,0,2, TimeUnit.SECONDS)
.buffer(Observable.interval(3, TimeUnit.SECONDS))
.subscribe(new Consumer<List<Long>>() {
@Override
public void accept(List<Long> longs) throws Exception {
Log.i(TAG, "accept--->" + String.valueOf(longs));
}
});
上記のコードの実行結果は以下の通りです:
accept--->[1, 2]
accept--->[3]
accept--->[4, 5]
accept--->[6]
accept--->[7, 8]
accept--->[9]
accept--->[10]
buffer(openingIndicator, closingIndicator)#
buffer (openingIndicator, closingIndicator) は、openingIndicator という名前の Observable を監視します。この Observable がイベントを発信するたびに、List を作成し、元の Observable が送信したデータを収集します。そして収集したデータを closingIndicator に渡します。closingIndicator は Observable を返し、この buffer は closingIndicator が返す Observable を監視し、この Observable のデータを検出すると、List を閉じて、openingIndicator から得たデータを発信します。以下はこの状況に対する公式の示意図です:
以下の時間送信プロセスでは、元の Observable が 1 から 12 の整数を 1 秒ごとに送信し、openingIndicator という Observable が 3 秒ごとに List を作成して送信されたイベントを収集し、closingIndicator に収集したデータを渡します。closingIndicator は 1 秒遅れて openingIndicator から得たデータを送信します。以下はテストコードです:
Observable openingIndicator = Observable.interval(3, TimeUnit.SECONDS);
Observable closingIndicator = Observable.timer(1,TimeUnit.SECONDS);
Observable.intervalRange(1,12,0,1, TimeUnit.SECONDS)
.buffer(openingIndicator, new Function<Object, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Object o) throws Exception {
return closingIndicator;
}
})
.subscribe(new Consumer<List<Long>>() {
@Override
public void accept(List<Long> longs) throws Exception {
Log.i(TAG, "accept--->" + String.valueOf(longs));
}
});
上記のコードの実行結果は以下の通りです:
accept--->[4, 5]
accept--->[7]
accept--->[10]
window オペレーター#
ここでは window (long count) を例にして window オペレーターの使用を紹介します。window オペレーターの使用は buffer の使用に似ていますが、異なる点は buffer によって変換された Observable が元の Observable の送信イベントのイベント集合を送信するのに対し、window オペレーターによって変換された Observable は元の Observable の送信イベントを count 個ずつ送信します。このオペレーターの公式示意図は以下の通りです:
テストコードは以下の通りです:
Observable.just("Event1", "Event2", "Event3", "Event4")
.window(2)
.subscribe(new Consumer<Observable<String>>() {
@Override
public void accept(Observable<String> stringObservable) throws Exception {
Log.i(TAG, "accept--Observable->");
stringObservable.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.i(TAG, "accept--->" + s);
}
});
}
});
上記のコードの実行結果は以下の通りです:
accept--Observable->
accept--->Event1
accept--->Event2
accept--Observable->
accept--->Event3
accept--->Event4
map オペレーター#
map(mapper)#
map オペレーターは送信されたデータの型変換を行います。map オペレーターの公式示意図は以下の通りです:
以下のイベント送信プロセスでは、map オペレーターを通じて、ソース Observable が送信したイベントをさらに加工・変換できます。テストコードは以下の通りです:
Observable.just("Event1", "Event2", "Event3", "Event4")
.map(new Function<String, String>() {
@Override
public String apply(String s) throws Exception {
return "this is " + s;
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.i(TAG, "accept--->" + s);
}
});
上記のコードの実行結果は以下の通りです:
accept--->this is Event1
accept--->this is Event2
accept--->this is Event3
accept--->this is Event4
flatMap(mapper)#
flatMap オペレーターを使用すると、ソース Observable がイベントを発信する際に、複数のイベントを発信できる Observable に変換されます。これらの Observable は最終的に同じ Observable に統合され、この Observable はこれらのイベントを一括して発信します。ここでは、一般的に使用される flatMap (mapper) を例に、公式の示意図は以下の通りです:
以下のイベント送信プロセスでは、flatMap オペレーターを使用した後、ソース Observable がイベントを送信する際に、対応する Observable が生成され、最終的に送信されるイベントはすべて同じ Observable に統合され、イベント結果が観察者にコールバックされます。テストコードは以下の通りです:
final Observable observable = Observable.just("Event5", "Event6");
Observable.just("Event1", "Event2", "Event3", "Event4")
.flatMap(new Function<String, Observable<String>>() {
@Override
public Observable<String> apply(String s) throws Exception {
return observable;
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.i(TAG, "accept--->" + s);
}
});
上記のコードの実行結果は以下の通りです:
accept--->Event5
accept--->Event6
accept--->Event5
accept--->Event6
accept--->Event5
accept--->Event6
accept--->Event5
accept--->Event6
concatMap(mapper)#
concatMap の使用は flatMap の使用に似ていますが、concatMap はイベント受信の順序を保証できるのに対し、flatMap はイベント受信の順序を保証できません。concatMap オペレーターの公式示意図は以下の通りです:
以下のイベント送信プロセスでは、ソース Observable が整数 1 を送信する際に 3 秒の遅延を設け、その後他のイベントを送信します。テストコードは以下の通りです:
Observable.intervalRange(1, 2, 0, 1, TimeUnit.SECONDS)
.concatMap(new Function<Long, ObservableSource<Long>>() {
@Override
public ObservableSource<Long> apply(Long aLong) throws Exception {
int delay = 0;
if (aLong == 1) {
delay = 3;
}
return Observable.intervalRange(4, 4, delay, 1, TimeUnit.SECONDS);
}
}).subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.i(TAG, "accept--->" + aLong);
}
});
concatMap オペレーターを使用した上記のコードの実行結果は以下の通りです:
accept--->4
accept--->5
accept--->6
accept--->7
accept--->4
accept--->5
accept--->6
accept--->7
flatMap オペレーターを使用した上記のコードの実行結果は以下の通りです:
accept--->4
accept--->5
accept--->6
accept--->4
accept--->7
accept--->5
accept--->6
accept--->7
このように、concatMap は flatMap に比べてイベント受信の順序を保証できます。
switchMap(mapper)#
ソース Observable がイベントを送信する際に、複数のイベントを発信できる Observable に変換されます。switchMap オペレーターは現在の Observable のみを気にします。つまり、ソース Observable が新しいイベントを送信するたびに、前の複数のイベントを発信できる Observable を破棄します。公式の示意図は以下の通りです:
以下のイベント送信プロセスでは、ソース Observable が 2 秒ごとに 1 と 2 を送信し、変換された複数のイベントを発信できる Observable が 1 秒ごとに 4 から始まる整数を送信します。switchMap オペレーターを使用する際、ソース Observable が整数 1 を送信すると、この新しい複数のイベントを発信できる Observable は 2 つの整数、つまり 4 と 5 を送信した後、送信を停止します。なぜなら、その時点でソース Observable が再びイベントを送信し始めるからです。この場合、前の複数のイベントを発信できる Observable は破棄され、次のソース Observable のイベント送信のリスニングが始まります。テストコードは以下の通りです:
Observable.intervalRange(1, 2, 0, 2, TimeUnit.SECONDS)
.switchMap(new Function<Long, ObservableSource<Long>>() {
@Override
public ObservableSource<Long> apply(Long aLong) throws Exception {
Log.i(TAG, "accept-aLong-->" + aLong);
return Observable.intervalRange(4, 4, 0, 1, TimeUnit.SECONDS);
}
}).subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.i(TAG, "accept--->" + aLong);
}
});
上記のコードの実行結果は以下の通りです:
accept-aLong-->1
accept--->4
accept--->5
accept-aLong-->2
accept--->4
accept--->5
accept--->6
accept--->7
また、concatMapDelayError、concatMapEager、concatMapEagerDelayError、concatMapIterable、flatMapIterable、switchMapDelayError など、これらのオペレーターに関連する拡張オペレーターもありますが、ここでは紹介しません。
groupBy オペレーター#
groupBy オペレーターは受信したデータを指定されたルールに従って分類し、その後 GroupedObservable などが出力を購読します。公式の示意図は以下の通りです:
以下のイベント送信プロセスでは、成績に基づいてグループ化して出力します。具体的には以下の通りです:
List<DataBean> beanList = new ArrayList<>();
beanList.add(new DataBean("成績は95点です", 95));
beanList.add(new DataBean("成績は70点です", 70));
beanList.add(new DataBean("成績は56点です", 56));
beanList.add(new DataBean("成績は69点です", 69));
beanList.add(new DataBean("成績は90点です", 90));
beanList.add(new DataBean("成績は46点です", 46));
beanList.add(new DataBean("成績は85点です", 85));
Observable.fromIterable(beanList)
.groupBy(new Function<DataBean, String>() {
@Override
public String apply(DataBean dataBean) throws Exception {
int score = dataBean.getScore();
if (score >= 80) {
return "A";
}
if (score >= 60 && score < 80) {
return "B";
}
if (score < 60) {
return "C";
}
return null;
}
})
.subscribe(new Consumer<GroupedObservable<String, DataBean>>() {
@Override
public void accept(final GroupedObservable<String, DataBean> groupedObservable) throws Exception {
groupedObservable.subscribe(new Consumer<DataBean>() {
@Override
public void accept(DataBean dataBean) throws Exception {
Log.i(TAG, "accept--->"+ groupedObservable.getKey() + "グループ--->"+dataBean.getDesc());
}
});
}
});
上記のコードの実行結果は以下の通りです:
accept--->Aグループ--->成績は95点です
accept--->Bグループ--->成績は70点です
accept--->Cグループ--->成績は56点です
accept--->Bグループ--->成績は69点です
accept--->Aグループ--->成績は90点です
accept--->Cグループ--->成績は46点です
accept--->Aグループ--->成績は85点です
cast オペレーター#
cast オペレーターは型変換に使用されます。cast オペレーターの公式示意図は以下の通りです:
テストコードは以下の通りです:
Observable.just(1,2,3,4,5)
.cast(String.class)
.subscribe(new Consumer<String>() {
@Override
public void accept(String String) throws Exception {
Log.i(TAG, "accept--->" + String);
}
});
テストでは以下のような例外が発生します:
java.lang.ClassCastException: Cannot cast java.lang.Integer to java.lang.String
結果からわかるように、異なる型間の変換では型変換例外が発生します。cast オペレーターは異なる型間の変換を行うことはできませんが、送信されたイベントデータの型が指定された型であるかどうかを検証するために使用できます。
scan オペレーター#
scan オペレーターは、順に 2 つの要素をスキャンします。最初の要素には前の要素がない場合、最初の要素の前の要素は無視されます。2 番目の要素をスキャンする際に、最初の要素を取得し、その後 apply メソッドの戻り値が前の要素の値として計算に参加し、最終的に変換された結果を返します。scan の公式示意図は以下の通りです:
以下のイベント送信プロセスを見てみましょう。最初のスキャン時、最初の要素は 1 で、ここでは last に相当します。2 番目の要素は 2 で、ここでは item に相当します。この時 apply メソッドの戻り値は 2 で、この 2 は次回のスキャン計算に参加する last の値となります。次回の戻り値は必ず 2 * 3、つまり 6 になります。テストコードは以下の通りです:
Observable.just(1, 2, 3, 4, 5)
.scan(new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer last, Integer item) throws Exception {
Log.i(TAG, "accept--last->" + last);
Log.i(TAG, "accept--item->" + item);
return last * item;
}
})
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.i(TAG, "accept--->" + integer);
}
});
上記のコードの実行結果は以下の通りです:
accept--->1
accept--->2
accept--->6
accept--->24
accept--->120
To オペレーター#
toList()#
toList オペレーターは送信された一連のデータを List に変換し、一度に送信します。toList の公式示意図は以下の通りです:
テストコードは以下の通りです:
Observable.just(1, 2, 3, 4)
.toList()
.subscribe(new Consumer<List<Integer>>() {
@Override
public void accept(List<Integer> integers) throws Exception {
Log.i(TAG, "accept--->" + integers);
}
});
上記のコードの実行結果は以下の通りです:
accept--->[1, 2, 3, 4]
toMap(keySelector)#
toMap オペレーターは送信されるイベントを指定されたルールに従って Map 形式に変換し、一度に送信します。toMap オペレーターの公式示意図は以下の通りです:
テストコードは以下の通りです:
Observable.just(1, 2, 3, 4)
.toMap(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
return "key"+integer;
}
})
.subscribe(new Consumer<Map<String, Integer>>() {
@Override
public void accept(Map<String, Integer> map) throws Exception {
Log.i(TAG, "accept--->" + map);
}
});
上記のコードの実行結果は以下の通りです:
accept--->{key2=2, key4=4, key1=1, key3=3}
RxJava における変換型オペレーターは基本的に以上です。具体的な使用は実際のニーズに応じて行う必要があります。