banner
jzman

jzman

Coding、思考、自觉。
github

RxJava2の作成オペレーター

RxJava は、ReactiveX の Java におけるオープンソース実装であり、観測可能なシーケンスを使用して非同期プログラミングとイベントベースのプログラムを行うためのライブラリです。公式サイトの紹介では、主に非同期プログラミング、チェーン呼び出し、イベントシーケンスに焦点を当てています。

  1. RxJava の導入
  2. 概念
  3. 基本的な実装
  4. Just オペレーター
  5. from オペレーター
  6. defer オペレーター
  7. empty オペレーター
  8. never オペレーター
  9. timer オペレーター
  10. interval オペレーター
  11. range オペレーター
  12. まとめ

RxJava の導入#

implementation "io.reactivex.rxjava2:rxjava:2.2.3"
implementation 'io.reactivex.rxjava2:rxandroid:2.1.0'

概念#

RxJava におけるいくつかの重要な概念は、オブザーバー(Observer)、オブザーバブル(Observable)、およびイベントシーケンスです。イベントシーケンスは完全にオブザーバブルによって制御されます。オブザーバブルが必要なときにオブザーバーに通知するには、オブザーバーとオブザーバブルの間に購読関係を確立する必要があります。購読関係が確立されると、オブザーバブルが変化したときに、オブザーバーはその変化を即座に受け取ることができます。

RxJava2 におけるオブザーバー(Observer)のイベントコールバックメソッドは 4 つあります:

  • onSubscribe:購読関係を解除するために使用されます
  • onNext:イベントが送信されるときにオブザーバーがこのメソッドをコールバックして送信されたイベントシーケンスを受け取ります
  • onError:イベントが送信されるときにオブザーバーがこのメソッドをコールバックしてイベントシーケンスに異常が発生したことを示し、以降のイベントの送信を許可しません
  • onComplete:イベントが送信されるときにオブザーバーがこのメソッドをコールバックしてイベントシーケンスの送信が完了したことを示し、イベントの送信を許可します

注意

  1. onError が呼び出された後はイベントの送信を続けることはできず、onComplete が呼び出された後はイベントの送信が許可されます。どちらが呼び出されてもオブザーバーはメッセージを受信しません;
  2. onError と onComplete は排他的であり、どちらか一方のみを呼び出すことが許可されます。onComplete の後に onError を呼び出すとプログラムは必ずクラッシュしますが、onError の後に onComplete を呼び出してもクラッシュしないのは、onError の後はイベントの送信が許可されないため、自然にエラーが発生しないからです;
  3. 4 つのコールバックメソッドの中で、オブザーバーとオブザーバブルが一度購読関係を確立すると onSubscribe メソッドがコールバックされ、onNext、onError、onComplete メソッドのコールバックは完全にオブザーバブルによってトリガーされるかどうかが決定されます。ここには誤解を招く可能性があります。

基本的な実装#

  1. オブザーバー(Observer)を作成します。オブザーバーは、イベントが発生したときにどのように処理するかを決定します。具体的には以下の通りです:
//オブザーバー
Observer<String> observer = new Observer<String>() {
    @Override
    public void onSubscribe(Disposable d) {
        //購読解除
        Log.i(TAG, "onSubscribe--->");
    }

    @Override
    public void onNext(String s) {
        //イベント送信時にオブザーバーがコールバック
        Log.i(TAG, "onNext--->"+s);
    }

    @Override
    public void onError(Throwable e) {
        //イベント送信時にオブザーバーがコールバック(イベントシーケンスに異常が発生)
        Log.i(TAG, "onError--->");
    }

    @Override
    public void onComplete() {
        //イベント送信時にオブザーバーがコールバック(イベントシーケンスの送信が完了)
        Log.i(TAG, "onComplete--->");
    }
};
  1. オブザーバブル(Observable)を作成します。オブザーバブルは、いつイベントをトリガーするか、どのようなイベントをトリガーするかを決定します。具体的には以下の通りです:
//オブザーバブル
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(ObservableEmitter<String> emitter) throws Exception {
        emitter.onNext("Event1");
        emitter.onNext("Event2");
        emitter.onComplete();
        emitter.onNext("Event3");
    }
});
  1. オブザーバーとオブザーバブルの間に購読関係を確立します。具体的には以下の通りです:
//オブザーバーとオブザーバブルの間に購読関係を確立
observable.subscribe(observer);

上記のコードの出力結果は以下の通りです:

onSubscribe--->
onNext--->Event1
onNext--->Event2
onComplete--->

明らかに、Event2 を送信した後に onComplete メソッドが呼び出されたため、その後送信されたイベント Event3 はオブザーバーには届きません。

上記のコードは次のように書くこともでき、結果は同じです。具体的には以下の通りです:

Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(ObservableEmitter<String> emitter) throws Exception {
        emitter.onNext("Event1");
        emitter.onNext("Event2");
        emitter.onComplete();
        emitter.onNext("Event3");
    }
}).subscribe(new Observer<String>() {
    @Override
    public void onSubscribe(Disposable d) {
        Log.i(TAG, "onSubscribe--->");
    }

    @Override
    public void onNext(String s) {
        Log.i(TAG, "onNext--->"+s);
    }

    @Override
    public void onError(Throwable e) {
        Log.i(TAG, "onError--->");
    }

    @Override
    public void onComplete() {
        Log.i(TAG, "onComplete--->");
    }
});

上記のコードでは、Observable の create メソッドを使用して Observable を作成し、それを使用して関連するイベントを送信します。理解を助けるために、公式の create オペレーターの図を見てみましょう:

create

Observable には、Observable を作成するための多くの静的メソッドも提供されています。以下では、これらの一般的なメソッドについて説明します。

Just オペレーター#

just を使用すると、指定されたイベントを送信する Observable を作成できます。just の送信イベントの上限は 10 で、最大で 10 個のイベントを送信できます。create に比べて処理フローをある程度簡素化しています。just のオーバーロードメソッドは以下の通りです:

public static <T> Observable<T> just(T item) 
public static <T> Observable<T> just(T item1, T item2)
public static <T> Observable<T> just(T item1, T item2, T item3)
public static <T> Observable<T> just(T item1, T item2, T item3, T item4)
public static <T> Observable<T> just(T item1, T item2, T item3, T item4, T item5)
public static <T> Observable<T> just(T item1, T item2, T item3, T item4, T item5, T item6)
public static <T> Observable<T> just(T item1, T item2, T item3, T item4, T item5, T item6, T item7)
public static <T> Observable<T> just(T item1, T item2, T item3, T item4, T item5, T item6, T item7, T item8)
public static <T> Observable<T> just(T item1, T item2, T item3, T item4, T item5, T item6, T item7, T item8, T item9)
public static <T> Observable<T> just(T item1, T item2, T item3, T item4, T item5, T item6, T item7, T item8, T item9, T item10)

以下は just オペレーターの簡単な使用例です:

//justオペレーターの簡単な使用
Observable.just("Event1", "Event2", "Event3")
        .subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.i(TAG, "onSubscribe--->");
            }

            @Override
            public void onNext(String s) {
                Log.i(TAG, "onNext--->" + s);
            }

            @Override
            public void onError(Throwable e) {
                Log.i(TAG, "onError--->");
            }

            @Override
            public void onComplete() {
                Log.i(TAG, "onComplete--->");
            }
        });

上記のコードの出力結果は以下の通りです:

onSubscribe--->
onNext--->Event1
onNext--->Event2
onNext--->Event3
onComplete--->

公式の just オペレーターの図を見てみましょう。以下は just が 4 つのイベントを送信する際の図です:

image

from オペレーター#

from に関連するオペレーターを使用すると、配列(array)、コレクション(Iterable)、および非同期タスク(future)を送信する Observable を作成できます。from に関連するオペレーターは以下のように分類できます:

//配列
public static <T> Observable<T> fromArray(T... items)
//コレクション
public static <T> Observable<T> fromIterable(Iterable<? extends T> source)
//非同期タスク
public static <T> Observable<T> fromFuture(Future<? extends T> future)
//非同期タスク+タイムアウト
public static <T> Observable<T> fromFuture(Future<? extends T> future, long timeout, TimeUnit unit)
//非同期タスク+タイムアウト+スレッドスケジューラー
public static <T> Observable<T> fromFuture(Future<? extends T> future, long timeout, TimeUnit unit, Scheduler scheduler)
//非同期タスク+スレッドスケジューラー
public static <T> Observable<T> fromFuture(Future<? extends T> future, Scheduler scheduler)
//Reactive Streamsのパブリッシャー、使用方法はcreateオペレーターに似ており、イベントの送信はパブリッシャー(オブザーバブル)によって自ら決定されます
public static <T> Observable<T> fromPublisher(Publisher<? extends T> publisher)

fromArray/fromIterable#

以下は fromArray の使用方法です:

//fromArrayオペレーターの簡単な使用
String[] events = {"Event1", "Event2", "Event3"};
Observable.fromArray(events).subscribe(new Observer<String>() {
    @Override
    public void onSubscribe(Disposable d) {
        Log.i(TAG, "onSubscribe--->");
    }

    @Override
    public void onNext(String s) {
        Log.i(TAG, "onNext--->" + s);
    }

    @Override
    public void onError(Throwable e) {
        Log.i(TAG, "onError--->");
    }

    @Override
    public void onComplete() {
        Log.i(TAG, "onComplete--->");
    }
});

fromArray の公式図を見てみましょう。以下はその図です:

image

以下は fromIterable の使用方法です:

//fromIterableオペレーターの簡単な使用
List<String> list = new ArrayList<>();
list.add("Event1");
list.add("Event2");
list.add("Event3");
Observable.fromIterable(list).subscribe(new Observer<String>() {
    @Override
    public void onSubscribe(Disposable d) {
        Log.i(TAG, "onSubscribe--->");
    }

    @Override
    public void onNext(String s) {
        Log.i(TAG, "onNext--->" + s);
    }

    @Override
    public void onError(Throwable e) {
        Log.i(TAG, "onError--->" + e);
    }

    @Override
    public void onComplete() {
        Log.i(TAG, "onComplete--->");
    }
});

fromIterable の公式図を見てみましょう。以下はその図です:

image

上記のコードの出力結果は以下の通りです:

onSubscribe--->
onNext--->Event1
onNext--->Event2
onNext--->Event3
onComplete--->

fromCallable#

Callable は java.util.concurrent パッケージにあり、Runnable に似ていますが、戻り値を持っています。fromCallable から発信されるイベントはメインスレッドから発信されます。購読しない限り、call 内の操作は実行されません。fromCallable を使用する際には以下の点に注意が必要です:

  1. 時間のかかるタスクには subscribeOn を使用して購読スレッドを切り替える必要があります;
  2. 時間のかかるタスクの実行中に Observable から発信された値を受け取るには、observeOn を使用してメインスレッドに切り替える必要があります;
  3. メモリリークなどの問題を避けるために、対応する onDestroy メソッド内で購読をキャンセルします。

以下は fromCallable の簡単な使用例です:

//fromCallableオペレーターの簡単な使用
Observable.fromCallable(new Callable<String>() {
    @Override
    public String call() throws Exception {
        //他の操作...
        return "call";
    }
}).subscribe(new Observer<String>() {

    @Override
    public void onSubscribe(Disposable d) {
        Log.i(TAG, "onSubscribe--->");
    }

    @Override
    public void onNext(String s) {
        Log.i(TAG, "onNext--->" + s + Thread.currentThread());
    }

    @Override
    public void onError(Throwable e) {
        Log.i(TAG, "onError--->" + e);
    }

    @Override
    public void onComplete() {
        Log.i(TAG, "onComplete--->");
    }
});

上記の実行結果は以下の通りです:

onSubscribe--->
onNext--->call
onComplete--->

fromCallable の公式図を見てみましょう。以下はその図です:

image

fromFuture#

上記からわかるように、fromFuture には 4 つのオーバーロードメソッドがあり、パラメータに非同期タスク、タスクのタイムアウト、スレッドスケジューラーなどを指定できます。まず、Future インターフェースを理解しましょう。Future インターフェースは java.util.concurrent パッケージにあり、Runnable と Callable の非同期タスクの実行に対するタスクの実行状況の判断、タスク結果の取得、特定のタスクのキャンセルなどを行うためのものです。Runnable と Callable はスレッドの実行に伴うものであり、これは fromFuture から発信されるイベントがメインスレッド以外から発信されることを意味します。時間のかかるタスクを実行する場合は、subscribeOn を使用して購読スレッドを切り替えることを忘れないでください。以下は FutureTask を例にして fromFuture の使用方法を説明します。

非同期タスクを実行するための Callable を作成します。具体的には以下の通りです:

//非同期タスク
private class MCallable implements Callable<String> {
    @Override
    public String call() throws Exception {
        Log.i(TAG, "タスク実行開始--->");
        Thread.sleep(5000);
        Log.i(TAG, "タスク実行終了--->");
        return "MCallable";
    }
}

次に、FutureTask を作成します。具体的には以下の通りです:

//FutureTaskを作成
MCallable mCallable = new MCallable();
FutureTask<String> mFutureTask = new FutureTask<>(mCallable);

次に、Thread を使用して上記で作成した Future を実行します。具体的には以下の通りです:

//FutureTaskを実行
new Thread(mFutureTask).start();

最後に、fromFuture を使用して対応する Observable を作成し、購読します。具体的には以下の通りです:

//fromFuture
Observable.fromFuture(mFutureTask)
        .subscribeOn(Schedulers.io()) //購読スレッドを切り替え
        .subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.i(TAG, "onSubscribe--->");
            }

            @Override
            public void onNext(String s) {
                Log.i(TAG, "onNext--->" + s + Thread.currentThread());
            }

            @Override
            public void onError(Throwable e) {
                Log.i(TAG, "onError--->" + e);
            }

            @Override
            public void onComplete() {
                Log.i(TAG, "onComplete--->");
            }
        });

上記のコードの出力結果は以下の通りです:

タスク実行開始--->
onSubscribe--->
タスク実行終了--->
onNext--->MCallable
onComplete--->

fromFuture の公式図を見てみましょう。以下は fromFuture メソッドが Future パラメータを持つ際の図です:

image

上記の非同期タスクは 5 秒の遅延があります。fromFuture のオーバーロードメソッドを使用してタイムアウトを 4 秒に指定した場合、具体的には以下の通りです:

//タイムアウトを4秒に指定
Observable.fromFuture(mFutureTask, 4, TimeUnit.SECONDS, Schedulers.io())
//...

この場合、非同期タスクが 4 秒以内に完了できないため、Observer は onError メソッドがトリガーされ、実行結果は以下の通りです:

タスク実行開始--->
onSubscribe---> 
onError--->java.util.concurrent.TimeoutException
タスク実行終了--->

この非同期タスクをキャンセルするにはどうすればよいでしょうか。これが Future の利点であり、タスクを自由にキャンセルできます。具体的には以下の通りです:

//非同期タスクのキャンセル
public void cancelTask(View view) {
    if (mFutureTask.isDone()) {
        Log.i(TAG, "タスクはすでに完了しています--->");
    } else {
        Log.i(TAG, "タスクは実行中です--->");
        boolean cancel = mFutureTask.cancel(true);
        Log.i(TAG, "タスクキャンセルは成功しましたか--cancel->" + cancel);
        Log.i(TAG, "タスクキャンセルは成功しましたか--isCancelled->" + mFutureTask.isCancelled());
    }
}

以下はタスク実行中にタスクをキャンセルした際の実行結果です:

タスク実行開始--->
onSubscribe--->
タスクは実行中です--->
タスクキャンセルは成功しましたか--cancel->true
タスクキャンセルは成功しましたか--isCancelled->true
onError--->java.util.concurrent.CancellationException

これにより、実行中の非同期タスクがキャンセルされました。この部分は Java Future に関連する知識についての内容が多いです。

defer オペレーター#

defer を使用して Observable を作成する際、購読時にのみ Observable が作成され、関連するイベントが送信されます。以下は defer オペレーターの使用例です:

//defer
defer = "old";
Observable<String> observable = Observable.defer(new Callable<ObservableSource<String>>() {
    @Override
    public ObservableSource<String> call() throws Exception {
        return Observable.just(defer);
    }
});

defer = "new";
observable.subscribe(new Observer<String>() {
    @Override
    public void onSubscribe(Disposable d) {
        Log.i(TAG, "onSubscribe--->");
    }

    @Override
    public void onNext(String s) {
        Log.i(TAG, "onNext--->" + s);
    }

    @Override
    public void onError(Throwable e) {
        Log.i(TAG, "onError--->" + e);
    }

    @Override
    public void onComplete() {
        Log.i(TAG, "onComplete--->");
    }
});

上記のコードの実行結果は以下の通りです:

onSubscribe--->
onNext--->new
onComplete--->

明らかに、購読前に Observable ファクトリが最新の Observable を作成し、onNext で受信するデータも最新のものであることが示されています。defer オペレーターを理解するために、公式の defer オペレーターの図を見てみましょう:

image

empty オペレーター#

empty オペレーターを使用すると、データを発生させずに正常に終了する Observable を作成できます。具体的には以下の通りです:

//empty
Observable.empty().subscribe(new Observer<Object>() {
    @Override
    public void onSubscribe(Disposable d) {
        Log.i(TAG, "onSubscribe--->");
    }

    @Override
    public void onNext(Object o) {
        Log.i(TAG, "onNext--->" + o);
    }

    @Override
    public void onError(Throwable e) {
        Log.i(TAG, "onError--->" + e);
    }

    @Override
    public void onComplete() {
        Log.i(TAG, "onComplete--->");
    }
});

上記のコードの出力結果は以下の通りです:

onSubscribe--->
onComplete--->

empty オペレーターの使用を理解しやすくするために、いくつかの empty オペレーターの公式図を見てみましょう:

image

never オペレーター#

never オペレーターを使用すると、データを発生させず、終了もしない Observable を作成できます。具体的には以下の通りです:

//never
Observable.never().subscribe(new Observer<Object>() {
    @Override
    public void onSubscribe(Disposable d) {
        Log.i(TAG, "onSubscribe--->");
    }

    @Override
    public void onNext(Object o) {
        Log.i(TAG, "onNext--->" + o);
    }

    @Override
    public void onError(Throwable e) {
        Log.i(TAG, "onError--->" + e);
    }

    @Override
    public void onComplete() {
        Log.i(TAG, "onComplete--->");
    }
});

上記のコードの出力結果は以下の通りです:

onSubscribe--->

never オペレーターの使用を理解しやすくするために、いくつかの never オペレーターの公式図を見てみましょう:

image

timer オペレーター#

timer オペレーターを使用すると、遅延を伴う固定値 0 を送信する Observable を作成できます。また、スレッドスケジューラーを指定することもできます。timer のオーバーロードメソッドは以下の通りです:

//遅延
public static Observable<Long> timer(long delay, TimeUnit unit)
//遅延+スレッドスケジューラー
public static Observable<Long> timer(long delay, TimeUnit unit, Scheduler scheduler) 

以下は timer の使用方法です:

//timer
Observable.timer(3, TimeUnit.SECONDS, Schedulers.io()).subscribe(new Observer<Long>() {
    @Override
    public void onSubscribe(Disposable d) {
        Log.i(TAG, "onSubscribe--->");
    }

    @Override
    public void onNext(Long s) {
        Log.i(TAG, "onNext--->" + s);
        Log.i(TAG, "現在のスレッド--->" + Thread.currentThread().getName());
    }

    @Override
    public void onError(Throwable e) {
        Log.i(TAG, "onError--->" + e);
    }

    @Override
    public void onComplete() {
        Log.i(TAG, "onComplete--->");
    }
});

上記のコードの実行結果は以下の通りです:

onSubscribe--->
//3秒遅延してデータを受信
onNext--->0
現在のスレッド--->RxCachedThreadScheduler-1
onComplete--->

timer オペレーターの使用を理解しやすくするために、いくつかの timer オペレーターの公式図を見てみましょう。以下は timer が遅延器とスレッドスケジューラーを指定する方法の例です:

image

interval オペレーター#

interval オペレーターを使用すると、固定の時間間隔で整数値を送信する Observable を作成できます。interval は初期遅延時間、時間間隔、スレッドスケジューラーなどを指定できます。interval のオーバーロードメソッドは以下の通りです:

//初期遅延+時間間隔
public static Observable<Long> interval(long initialDelay, long period, TimeUnit unit) 
//初期遅延+時間間隔+スレッドスケジューラー
public static Observable<Long> interval(long initialDelay, long period, TimeUnit unit, Scheduler scheduler)
//時間間隔
public static Observable<Long> interval(long period, TimeUnit unit)
//時間間隔+スレッドスケジューラー
public static Observable<Long> interval(long period, TimeUnit unit, Scheduler scheduler) 

以下は interval の使用方法です:

//interval
Observable.interval(3, TimeUnit.SECONDS).subscribe(new Observer<Long>() {
    @Override
    public void onSubscribe(Disposable d) {
        Log.i(TAG, "onSubscribe--->");
    }

    @Override
    public void onNext(Long aLong) {
        Log.i(TAG, "onNext--->" + aLong);
    }

    @Override
    public void onError(Throwable e) {
        Log.i(TAG, "onError--->" + e);
    }

    @Override
    public void onComplete() {
        Log.i(TAG, "onComplete--->");
    }
});

上記のコードを実行すると、3 秒ごとに整数値のイベントが継続的に送信されます。実行結果は以下の通りです:

onSubscribe--->
onNext--->0
onNext--->1
onNext--->2
...

interval オペレーターの使用を理解しやすくするために、いくつかの interval オペレーターの公式図を見てみましょう。以下は interval が間隔時間と時間単位を指定する方法の例です:

image

range オペレーター#

range オペレーターを使用すると、指定された整数範囲の値を送信する Observable を作成できます。range に関連するメソッドは 2 つあり、数値の範囲の表現が異なります。2 つのメソッドの宣言は以下の通りです:

// int
public static Observable<Integer> range(final int start, final int count)
// long
public static Observable<Long> rangeLong(long start, long count)

以下は range の使用方法です。具体的には以下の通りです:

//range
Observable.range(1, 5).subscribe(new Observer<Integer>() {
    @Override
    public void onSubscribe(Disposable d) {
        Log.i(TAG, "onSubscribe--->");
    }

    @Override
    public void onNext(Integer integer) {
        Log.i(TAG, "onNext--->" + integer);
    }

    @Override
    public void onError(Throwable e) {
        Log.i(TAG, "onError--->" + e);
    }

    @Override
    public void onComplete() {
        Log.i(TAG, "onComplete--->");
    }
});

上記のコードの実行結果は以下の通りです:

onSubscribe--->
onNext--->1
onNext--->2
onNext--->3
onNext--->4
onNext--->5
onComplete--->

range オペレーターの使用を理解しやすくするために、いくつかの range オペレーターの公式図を見てみましょう:

image

まとめ#

この記事では、主に RxJava2 に関連する基礎知識と RxJava2 における作成型オペレーターの理解と使用について説明しました。

読み込み中...
文章は、創作者によって署名され、ブロックチェーンに安全に保存されています。