RxJava sudah sangat terkenal sekali di kalangan java programmer baik itu eb programmer ataupun android programmer. Pada kesempatan kali ini kita bahas tentang rxjava 2 bagaimana konsepnya dan apa yang terkandung didalamnya.

Tutorial ini berhubungan dengan list tutorial reactive programming saya di youtube, hangan lupa untuk cek tutorial reactive programming lainya di youtube saya ya guys.

Reactive Streams

Reactive Streams adalah konsep pemrograman untuk menangani aliran data asynchronous secara non-blocking sembari menyediakan tekanan balik ke stream publishers.

public interface Publisher<T> {
    public void subscribe(Subscriber<? super T> s);
}

Reactive Streams memiliki dua object utama yaitu Publisher<T> dan Subscriber<T>, dimana Publisher adalah object stream dan Subscriber yang mengakses data yang dimana T adalah bject datanya.

public interface Subscription {
    public void request(long n); //request n items
    public void cancel();
}

Ketika Subscriber sudah siap untuk menerina data, maka adan mengakses request kepada Subscription

public interface Subscriber<T> {
    // Ketika subscriber siap menerima akses data
    public void onSubscribe(Subscription s); 

    public void onNext(T t);
    public void onError(Throwable t);
    public void onComplete();
}

Ketika data stream mulai di prosess maka akan mengakses Subscriber::onNext(T) untuk setiap data T.

Prosess ini terus berjalan sampai dengan Subscriber::onComplete().  Apabila pada saat steam berjalan ada error maka error tersebut akan di lemparkan melalui Subscriber::onError(Throwable).

Flowable and Observable

RxJava menyediakan beberapa event publishers:

  1. Flowable Publisher yang meng-emits (Memberikan aliran data) 0..N elements, dan bisa memberikan hasil berhasil atau gagal.
  2. Observable sama seperti Flowables akan tetapi tanpa backpressure strategy atau tekanan priorias.
  3. Single Digunakan untuk memberikan stream data tanpa ada hasil error, jika ada error maka data stream itu tidak di kirimkan ke subscriber.
  4. Maybe memberikan data stream yang bisa dengan ata tanpa data atau berhasil dengan event error.
  5. Completable hanya memberikan signals apakan stream berhasil sukses atau error.

Contoh Menggunakan Flowable

Berikut ini adalah cara membuat data stream dengan menggunakan Flowable.

Flowable<Integer> flowable = Flowable.just(1, 5, 10);
Flowable<Integer> flowable = Flowable.range(1, 10);
Flowable<String> flowable = Flowable.fromArray(new String[] {"red", "green", "blue"});
Flowable<String> flowable = Flowable.fromIterable(List.of("red", "green", "blue"));

MEMBUAT CUSTOM DATA STREAM

Observable<Integer> stream = Observable.create(subscriber -> {
    log.info("Started emitting");

    log.info("Emitting 1st");
    subscriber.onNext(1);

    log.info("Emitting 2nd");
    subscriber.onNext(2);

    subscriber.onComplete();
});

Flowable<Integer> stream = Flowable.create(subscriber -> {
    log.info("Started emitting");

    log.info("Emitting 1st");
    subscriber.onNext(1);

    log.info("Emitting 2nd");
    subscriber.onNext(2);

    subscriber.onComplete();
}, BackpressureStrategy.MISSING);

stream.subscribe(
       val -> log.info("Subscriber received: {}", val),
       err -> log.error("Subscriber received error", err),
       () -> log.info("Subscriber got Completed event")
);

 

One thought on “Mendalami RxJava 2

Leave a Reply

Your email address will not be published. Required fields are marked *