안드로이드 RxJava

|

RxJava github

리액티브 프로그래밍이란?

데이터 흐름과 전달에 관한 프로그래밍 패러다임이다.
데이터 흐름을 먼저 정의하고, 데이터가 변경되었을 때 연관되는 함수나 수식이 업데이트된다.

명령형 프로그래밍은 pull 방식이고, 리액티브 프로그래밍은 push 방식이라고 볼 수 있다. 일종의 옵저버 패턴인 것이다.

리액티브 프로그래밍은 함수형 프로그래밍의 지원을 받는데, 함수형 프로그래밍은 부수효과가 없는 순수함수를 지향하기 때문에 멀티스레드 환경에서 안전하다.

RxJava란?

2013년 넷플릭스가 REST 기반의 서비스 API 호출 횟수와 서비스의 전반적인 성능을 개선하는 프로젝트를 진행하고 닷넷환경의 리액티브 확장 라이브러리(Rx) 를 JVM에 포팅하여 RxJav를 만들었다.

RxJava의 장점

  • 콜백이 콜백을 부르는 콜백지옥상황을 해결할 수 있다. 콜백을 사용하지 않는 방향으로 설계되었다.
  • 자바 Future를 조합하기 어렵다는 점을 해결한다
  • 동기적, 비동기적 방식 모두 동작하며, 취소할 수 있고 에러 핸들링도 가능하다.

RxJava 사용하기

implementation "io.reactivex.rxjava3:rxjava:3.x.y"

RxJava를 사용하기 위해서는 Observable (데이터 아이템을 방출하는)를 생성해야 한다.

Observable?

데이터의 변화가 발생하는 데이터 소스이다. Observable의 행동을 create() 함수를 사용하여 수동으로 구현할 수 있다.

public Observable&ltArticle&gt article() {}

Observable create 를 사용하면 create, just, from 등을 사용하여 Observable을 생성하는 방법을 확인할 수 있다.

just, from

객체, 리스트, 배열들을 Observable로 변환할 수 있다.

Observable&ltString&gt o = Observable.from("a", "b", "c");

def list = [5, 6, 7, 8]
Observable&ltInteger&gt o = Observable.from(list);

Observable&ltString&gt o = Observable.just("one object");

create

비동기 i/o, 데이터 스트림등을 구현 가능하다.

동기 Observable 예제

/**
 * This example shows a custom Observable that blocks 
 * when subscribed to (does not spawn an extra thread).
 */
def customObservableBlocking() {
    return Observable.create { aSubscriber ->
        50.times { i -&gt
            if (!aSubscriber.unsubscribed) {
                aSubscriber.onNext("value_${i}")
            }
        }
        // after sending all values we complete the sequence
        if (!aSubscriber.unsubscribed) {
            aSubscriber.onCompleted()
        }
    }
}

비동기 Observable 예제

/**
 * This example shows a custom Observable that does not block
 * when subscribed to as it spawns a separate thread.
 */
def customObservableNonBlocking() {
    return Observable.create({ subscriber ->
        Thread.start {
            for (i in 0..&lt75) {
                if (subscriber.unsubscribed) {
                    return
                }
                subscriber.onNext("value_${i}")
            }
            // after sending all values we complete the sequence
            if (!subscriber.unsubscribed) {
                subscriber.onCompleted()
            }
        }
    } as Observable.OnSubscribe)
}

Flowable?

1에서 2로 버전업을 하며 추가된 클래스이다.
Observable과의 차이는 backpressure buffer의 기본 탑재 유무이다.
pub/sub 모델에서는 생산자가 데이터를 계속 생산해 내고 소비자가 이를 처리하는 속도를 따라가지 못한다면 OOM 또는 Busy Waiting의 문제가 발생할 수 있다.
이를 해결하기 위한 방법이 backpressure buffer이다.
버퍼가 가득 차면 소비자가 이를 처리할 수 없기 때문에 더 이상 publish를 하지 않는것이다.

public static void hello(String... args) {
  Flowable.fromArray(args).subscribe(s -&gt System.out.println("Hello " + s + "!"));
}

Subscribe?

Observalble을 구독한다. Observable이 subscribe() 함수를 호출하면 데이터를 구독자에게 발행한다.

public static void hello(String... args) {
  Flowable.fromArray(args).subscribe(s -&gt System.out.println("Hello " + s + "!"));
}