rxjava disposable что такое

RxJava. Убираем магию

Я долго боялся использовать RxJava в production. Её назначение и принцип работы оставались для меня загадкой. Чтение исходного кода не добавляло ясности, а статьи только путали. Под катом попытка ответить на вопросы: «Какие задачи эта технология решает лучше аналогов?» и «Как это работает?» с помощью аналогий с классической Java и простых метафор.

rxjava disposable что такое. Смотреть фото rxjava disposable что такое. Смотреть картинку rxjava disposable что такое. Картинка про rxjava disposable что такое. Фото rxjava disposable что такое

Применение

RxJava отлично заменяет Streams API из Java 8 на более ранних версиях Java. Так как Android Java 8 поддерживается далеко не с 4.0, Rx будет оптимальным решением. В статье RxJava рассматривается именно с этого ракурса, так как, по-моему, он наиболее понятный и по-настоящему реактивное приложение под Android с помощью чистой Rx реализовать сложно.

Emitter

Всем нам знаком паттерн Iterator.

За интерфейсом скрывается какой-нибудь источник данных, причём совершенно не важно, какой. Iterator полностью скрывает все детали реализации, предоставляя всего два метода:

next — получить следующий элемент
hasNext — узнать, есть ли ещё данные в источнике

У этого паттерна есть одна особенность: потребитель запрашивает данные и ждёт («зависает»), пока источник не выдаст их. Поэтому в качестве источника обычно выступает конечная, часто заранее сформированная коллекция.

Проведём небольшой рефакторинг.

Думаю, вы уже поняли, к чему я. Интерфейс Emitter из RxJava (для потребителей он дублируется в Observer (Subscriber в RxJava 1)):

Он похож на Iterator, но работает в обратную сторону: источник сообщает потребителю о том, что появились новые данные.

Это позволяет разрешить все проблемы с многопоточностью на стороне источника и, например, если вы проектируете UI, то вы сможете рассчитывать на то, что весь код, отвечающий за графический интерфейс — последовательный. Невероятно удобно. Прощайте, каллбэки! Скучать не буду.

Аналогия с Iterator взята из [1]

Sources

Теперь немного о самих источниках. Они бывают множества типов: Observable, Single, Maybe… И все они похожи на капусту (и монады, но это не так важно).

rxjava disposable что такое. Смотреть фото rxjava disposable что такое. Смотреть картинку rxjava disposable что такое. Картинка про rxjava disposable что такое. Фото rxjava disposable что такое

Потому что создав один источник, можно заворачивать его в другой источник, который можно ещё раз завернуть в ещё один источник и так до OutOfMemory. (Но так как обычный источник весит меньше 100 байт, скорее, пока заряд не кончится.)

Давайте завернём в источник ответ на тот самый вопрос.

Как мы знаем, получение ответа — довольно долгая операция. Поэтому завернём в источник, который выполнит вычисления в специальном потоке:

А ещё мы хотим, чтобы приложение не упало при ответе. Заворачиваем в источник, который вернёт ответ в главном потоке:

И, наконец, запускаем:

В консоль вывелся ответ, но что же произошло?

Метод subscribe определён в Observable. Он делает проверки и подготовку, а затем вызывает метод subscribeActual, который уже по-разному определён для разных источников.

В нашем случае метод subscribe вызвал метод subscribeActual у ObservableObserveOn, который вызывает метод subscribe завёрнутого в него источника, уточнив, в какой поток нужно вернуть результат.

В ObservableObserveOn лежит ObservableSubscribeOn. Его subscribeActual запускает subscribe завёрнутого в заданном потоке.

И, наконец, в ObservableSubscribeOn завёрнут ObservableJust, который просто выдаёт в onNext своё значение.

Естественно, просто с числом не интересно. Поэтому вот источник, который получает список товаров и узнаёт для них цены. Цены можно получать только по 20 штук (у InAppBilling API такое же ограничение).

Этот пример создан для демонстрации принципа работы, а не для использования в реальных проектах.

В RxJava огромное количество разных реализаций источников. Все они работают по одному принципу, а детали отлично описаны в документации. Поэтому не буду останавливаться на них.

Операции

Все операции над источниками делятся на 2 типа:

Не терминальные возвращают новый источник, который завернул исходный
Терминальные исполняют цепочку и получают данные (subscribe, map. )

И да, ничего не исполнится, пока не будет выполнена терминальная операция. Цепочка может сколько угодно лежать в памяти, не делая вообще ничего. И это хорошо, потому что если мы не получаем данные, то зачем их производить? (Ленивые вычисления без Haskell в комплекте!).

По аналогии со Streams API из [2]

Dispose (Unsubscribe в RxJava 1)

Исполнение цепочки можно прервать. Делается это вызовом dispose() у DisposableObserver (unsubscribe() у Subscriber в RxJava 1).

После этого RxJava прекратит исполнение цепочек, отпишет всех Observer’ов и вызовет iterrupt() у потоков, которые больше не нужны.

Так же можно узнать, не прервано ли исполнение из источников. Для этого у Emitter есть метод isDispose() (isUnsubscribe() для RxJava 1).

У этого есть логичная, но неприятная особенность: так как Observer отвечает за обработку ошибок, теперь все ошибки крашат приложение. Я пока не нашёл решения, о котором готов написать.

Заключение

— Позволяет легко компоновать запросы к сети, базе данных и т.д; организуя их асинхронное выполнение. Это означает, что ваши пользователи получат более быстрое и отзывчивое приложение.

— Не содержит в себе никакой магии. Только составление и исполнение цепочек источников.

— (Для меня) Решает больше проблем, чем создаёт!

Источник

Исследуем RxJava 2 для Android

rxjava disposable что такое. Смотреть фото rxjava disposable что такое. Смотреть картинку rxjava disposable что такое. Картинка про rxjava disposable что такое. Фото rxjava disposable что такое

Меня зовут Аркадий, я Android-разработчик в Badoo. В последнее время в нашем блоге много постов про Go, PHP, JS, QA, и я решил разбавить их темами по мобильной разработке. Как раз занимался портированием одного Android-проекта с RxJava 1 на RxJava 2 и читал всё, что можно найти на эту тему в интернете. В частности, доклад Джейка Вортона с конференции GOTO Copenhagen 2016. Мне показалось, что это достойный кандидат на перевод – думаю, многие Android-разработчики задумываются о переходе на RxJava 2, и им интересно, что изменилось по сравнению с первой версией.

Джейк сделал достаточно объёмное введение о реактивном программировании, так что знание RxJava 1 не требуется для понимания статьи. Доклад был подготовлен, когда RxJava2 ещё только готовилась к выпуску (на текущий момент уже выпущена версия 2.1.0).

Почему Reactive?

Почему все вокруг вдруг стали говорить о реактивном программировании? Если вы не можете сделать приложение полностью синхронным, то наличие единственного асинхронного ресурса полностью ломает традиционный императивный стиль программирования, к которому мы привыкли. «Ломает» не в смысле «всё перестаёт работать», а в смысле «приводит к увеличению сложности», и в результате вы начинаете терять все преимущества императивного программирования.

Чтобы пояснить, почему я считаю это серьёзной проблемой, приведу пример.

Начнём с простого класса, который может получить для нас объект User с какими-то модификаторами.

Если бы мы жили в синхронном, однопоточном мире, то этот код делал бы именно то, что и ожидается: создание экземпляра, вывод пользователя, изменение каких-то свойств, вывод пользователя.

Проблема возникает тогда, когда мы начинаем прибегать к асинхронности. Допустим, нам нужно отразить изменения свойств на стороне сервера. Для этого последние два метода должны быть асинхронными. Как бы мы тогда изменили код?

Одно из решений — вообще ничего не делать: вы можете предположить, что асинхронный вызов обновления сервера будет успешным, так что можно внести изменения локально. Они будут отражены мгновенно. Как вы понимаете, это не лучшая идея. Сети непредсказуемы, сервер может вернуть ошибку, и тогда придётся как-то откатывать локальное состояние.

Можем сообщить пользователю о проблеме. Можем автоматически попытаться снова. Подобные решения работают, и в этом направлении идут, когда нужно совместить асинхронный код с кодом, выполняющимся в одиночном потоке (в случае Android это UI-поток).

Чем больше вам нужно сделать асинхронных вызовов, тем больше возникает проблем. Например, когда пользователь заполняет форму, он изменяет несколько свойств. Или у вас есть последовательность асинхронных вызовов, когда успешное выполнение одного вызова должно запустить другой асинхронный вызов, которому тоже может сопутствовать успех или неудача.

Есть императивные подходы к решению проблемы. Мы можем проверить состояние перед обращением к методам UI.

В этом примере мы создаём анонимный тип, который однозначно приведёт к краткосрочной утечке памяти, потому что он будет хранить ссылку на нашу Activity до тех пор, пока асинхронный вызов не завершится.

Проблема заключается ещё и в том, что мы не знаем, в каких потоках вызываются эти колбэки. Возможно, они вызываются в фоновом потоке, так что требуется передавать события в основной поток выполнения ( main/UI thread ).

Мы захламили Activity кучей вещей, которые не имеют отношения к основной задаче, решаемой нашим кодом. И всё это лишь для того, чтобы начать работать асинхронно и обрабатывать асинхронные результаты. Мы лишь реализовали вызов асинхронного запроса; мы не блокируем ввод данных пользователем, не обрабатываем нажатия кнопок и не работаем со множественными полями.

Реактивное мышление

Пользователи — тоже нечто вроде асинхронных источников данных. Мы даём им информацию через UI, и они реагируют на неё нажатиями кнопок и внесением данных в поля.

rxjava disposable что такое. Смотреть фото rxjava disposable что такое. Смотреть картинку rxjava disposable что такое. Картинка про rxjava disposable что такое. Фото rxjava disposable что такое

Пока вы не можете обеспечить синхронность всей архитектуры вашего приложения, наличие единственного асинхронного ресурса будет ломать традиционный императивный стиль программирования.

Трудно найти приложение, не использующее сетевые запросы, а они по своей природе асинхронны. У вас есть диск, база данных — асинхронные источники. UI тоже должен рассматриваться исключительно как асинхронный источник. Так что по умолчанию всё в Android функционирует асинхронно. Если будете цепляться за традиционное императивное программирование и методики управления состояниями, то будете вредить сами себе.

rxjava disposable что такое. Смотреть фото rxjava disposable что такое. Смотреть картинку rxjava disposable что такое. Картинка про rxjava disposable что такое. Фото rxjava disposable что такое

Вместо того чтобы пытаться координировать все асинхронные элементы архитектуры, мы можем снять с себя эту обязанность, соединив их напрямую. Можно напрямую подписать UI на базу данных, чтобы он реагировал на изменения данных. Можно так изменить базу данных и сетевые вызовы, чтобы они реагировали на нажатие кнопки, а не пытаться получить клик и затем его отправить.

Было бы прекрасно, если бы получаемый нами сетевой ответ обновлял данные. Ведь когда обновляются данные, автоматически обновляется и UI. Таким образом мы снимаем с себя ответственность за это. Если Android делает что-то асинхронно (например, поворот экрана или рассылка broadcast), то было бы замечательно, если бы это автоматически отразилось на интерфейсе или при этом автоматически запускалась какая-нибудь фоновая задача.

rxjava disposable что такое. Смотреть фото rxjava disposable что такое. Смотреть картинку rxjava disposable что такое. Картинка про rxjava disposable что такое. Фото rxjava disposable что такое

В целом такой подход позволяет нам не писать кучу кода, необходимого для поддержки состояний: вместо того чтобы управлять состояниями, мы просто соединяем компоненты друг с другом.

RxJava

Переходим к RxJava. Эта реактивная библиотека стала наиболее популярной при разработке под Android по большей части потому, что была первым полноценным [реактивным] инструментом для Java. RxJava 2 сохраняет поддержку старой версии Java, что важно для разработки под Android.

Источники

Источник данных делает некую работу, когда вы начинаете или заканчиваете его прослушивать. Представьте сетевой запрос, который не будет отправлен, пока вы не начнёте ожидать ответа. И если вы отписываетесь от источника данных до его завершения, то теоретически он может отменить сетевой запрос.

Источник может работать как синхронно, так и асинхронно. Например, блокирующий сетевой запрос, выполняющийся в фоновом потоке, либо что-то чисто асинхронное вроде обращения к Android и ожидания onActivityResult. Источник может выдать один элемент или несколько элементов. Сетевой запрос вернёт один ответ. Но пока работает ваш UI, поток нажатий на кнопки потенциально бесконечен, даже если вы подписаны на единственную кнопку.

Завершения может и не быть. К примеру, мы моделируем нажатие кнопки в качестве источника данных, который работает до тех пор, пока работает UI. И когда UI исчезает, то вы, вероятно, отписываетесь от этого источника нажатий кнопки, но он не завершает свою работу.

Flowable vs. Observable

В RxJava 2 источники представлены двумя основными типами — Flowable и Observable. Они устроены очень похоже. Оба генерируют от нуля до n элементов. Оба могут завершаться успешно или с ошибкой. Так зачем нам два разных типа для представления одной и той же структуры данных?

Всё сводится к такой штуке, как backpressure. Не углубляясь в подробности, скажу лишь, что backpressure позволяет замедлить работу источника данных. Существующие системы имеют ограниченные ресурсы. И с помощью backpressure мы можем сказать всем, кто шлёт нам данные, чтобы они притормозили, потому что мы не можем обрабатывать информацию с той скоростью, с которой она к нам поступает.

В RxJava 1 была поддержка backpressure, но она была добавлена довольно поздно в процессе развития API. В RxJava 1 каждый тип в системе имеет механизм backpressure. И хотя концепцию backpressure поддерживают все типы, далеко не все источники её реализуют, так что использование этого механизма может привести к падению приложения. Применение backpressure должно проектироваться и учитываться заранее. Именно поэтому в RxJava 2 два разных типа источников. Поэтому теперь вы можете указывать с помощью типа источника, должна ли осуществляться поддержка backpressure.

Допустим, у нас есть источник данных — события касания экрана. Мы не можем его замедлить. Нельзя же сказать пользователю: «Нарисуй-ка половину символа, остановись и подожди, пока я обработаю, а затем дорисуй оставшееся». Мы можем замедлить ввод данных иначе, например, отключив кнопки или отобразив другой UI, но сам источник замедлить не получится.

Возьмём другой пример: у нас есть база данных, содержащая большой набор строк, из которого нам нужно извлекать по несколько за раз. БД может очень эффективно решить эту задачу благодаря такому инструменту, как курсоры. Но для потока событий касания это реализовать невозможно, потому что нельзя замедлить палец пользователя.

С поддержкой backpressureБез поддержки backpressure
0–n элементов, complete | errorFlowableObservable

Итак, единственная разница между двумя этими типами заключается в том, что один поддерживает backpressure, а другой — нет.

Реактивные потоки

Реактивные потоки (С поддержкой backpressure)Без поддержки backpressure
0…n элементов, complete | errorFlowableObservable

Тип Flowable реализует спецификацию реактивных потоков, что подразумевает поддержку backpressure.

Observable – это источник объектов User. Он генерирует элемент при каждом изменении, и мы можем реагировать на это, отображая данные на экране. Теперь не надо пытаться определить наиболее подходящее для этого время на основе других событий, происходящих в системе.

Специализированные источники

Реактивные потоки (С поддержкой backpressure)Без поддержки backpressure
0…n элементов, complete | errorFlowableObservable
item | complete | errorMaybe
item | errorSingle
complete | errorCompletable

Создание источников

На этом примере показано, как создаются источники и как можно оборачивать то, что вы уже используете, в реактивные источники. Все эти типы имеют статические методы, позволяющие создавать их на основе одиночных значений.

Но наиболее полезны два метода, которые, как мне кажется, больше всего будут использоваться для адаптации методов и действий, которые вы уже выполняете (как синхронно, так и асинхронно).

fromCallable доступен для всех пяти типов:

Так можно моделировать синхронные источники одиночных данных. При императивном подходе мы использовали бы метод, возвращающий значение.

Для Maybe и Completable есть ещё по два дополнительных метода. Они позволяют моделировать источники, не возвращающие значения, – просто исполняемые куски кода, но зато реактивные.

Преобразуем в лямбду.

Можно также отправить больше одной порции данных.

Вызывать onNext можно многократно.

Ещё одно преимущество — теперь мы можем моделировать асинхронные данные. К примеру, если нужно асинхронно выполнить HTTP-запросы, то можно вызывать onNext из колбэка HTTP-запроса.

Создание с помощью метода create работает для всех пяти типов:

Наблюдение за источниками

На самом деле, вам необязательно реализовывать интерфейсы Observer/ Subscriber напрямую, когда вы подписываетесь с помощью метода subscribe. Это не вполне удобно как раз из-за четвёртого метода onSubscribe – в нём надо как-то обрабатывать передаваемый объект Disposable / Subscription в момент подписки.

На Android вы часто будете сталкиваться с тем, что у вас есть CompositeDisposable для Activity или фрагмента, и вы отписываетесь в onDestroy (или ещё где-то).

Метод subscribeWith существует для всех четырёх типов без поддержки backpressure.

Можно провести аналогию с ресурсом, файлом, курсором в БД. Вы не откроете файл, не имея способа каким-то образом его закрыть. Открыв курсор в БД, вы в конце концов закроете его. Всегда сохраняйте Disposable при подписке на Observable, чтобы потом отписаться.

Операторы

Операторы позволяют решать три задачи:

Рассмотрим первые две.
Точно так же, как источники позволяют оборачивать синхронные методы и делать их реактивными, операторы позволяют делать реактивными некие действия. Например, здесь мы применяем метод toUppercase() к строке и получаем новую строку.

В реактивном мире мы берем Observable и с помощью оператора осуществляем эту операцию.

Посмотрим на объект User : мы сделали так, чтобы колбэки вызывались в фоновом потоке, и нам приходилось явно передавать данные в основной поток выполнения. На самом деле, это можно сделать с помощью встроенного оператора, причём куда более явным образом.

Можно сказать: «Хочу в другом потоке наблюдать за генерируемыми этим Observable событиями». При смене потоков выполнения имеет значение очерёдность осуществления этих операций.

Возьмём сетевой запрос. Он будет выполняться синхронно, но мы не хотим, чтобы это происходило в основном потоке выполнения. Можно применить оператор, который изменит поток (в котором произойдёт подписка), и работа будет происходить в заданном потоке. В примере мы подписываемся, указывая Schedulers.io() — это просто пул потоков. Работа будет выполнена в этом пуле, а затем всем подписавшимся будут разосланы уведомления. Здесь subscribeOn — это оператор, который задаёт поток выполнения работы.

Специализация операторов

rxjava disposable что такое. Смотреть фото rxjava disposable что такое. Смотреть картинку rxjava disposable что такое. Картинка про rxjava disposable что такое. Фото rxjava disposable что такое

Если Observable был пустой, то это приведёт к ошибке, потому что Single либо содержит элемент, либо возвращает ошибку.

rxjava disposable что такое. Смотреть фото rxjava disposable что такое. Смотреть картинку rxjava disposable что такое. Картинка про rxjava disposable что такое. Фото rxjava disposable что такое

rxjava disposable что такое. Смотреть фото rxjava disposable что такое. Смотреть картинку rxjava disposable что такое. Картинка про rxjava disposable что такое. Фото rxjava disposable что такое

rxjava disposable что такое. Смотреть фото rxjava disposable что такое. Смотреть картинку rxjava disposable что такое. Картинка про rxjava disposable что такое. Фото rxjava disposable что такое

Всё описанное применимо и к Flowable : здесь есть точно такие же операторы, и они возвращают такие же специализированные типы.

В этой таблице приведены некоторые операторы.

rxjava disposable что такое. Смотреть фото rxjava disposable что такое. Смотреть картинку rxjava disposable что такое. Картинка про rxjava disposable что такое. Фото rxjava disposable что такое

Реактивный подход

Если переписывать наш исходный пример с точки зрения реактивного программирования, то можно подписаться на User и сказать: «Хочу получать уведомления в основном потоке выполнения, а затем хочу передать это в UI и отобразить данного пользователя». Этот код будет автоматически выполняться при каждом изменении User’а, и вам будут автоматически показываться сделанные изменения, так что больше не придётся забивать себе этим голову.

Аналогично, когда мы делаем асинхронный запрос на изменение данных, нам нужно, чтобы это произошло в фоновом потоке выполнения. Мы хотим в основном потоке наблюдать за результатом – будет ли выполнение завершено успешно или с ошибкой. И в случае успешного завершения мы могли бы в колбэке заново активировать текстовое поле.

Эта схема требует размещения в памяти кучи промежуточных объектов. В RxJava 2 их количество уменьшено. Каждый из операторов создаёт на один объект меньше, накладные расходы при подписке ниже. Система работает быстрее, нам нужно реже собирать мусор, и всё это – без всяких компромиссов API.

Заключение

В RxJava 2 реализована следующая идея: берутся изначально асинхронные вещи — работа с сетью, сама Android, база данных, даже UI — и пишется такой код, который реагирует на изменения в этих источниках, а не пытается справиться с изменениями и самостоятельно управлять состояниями.

Если вы используете RxJava 1, то обратите внимание на проект, позволяющий конвертировать типы. С его помощью вы сможете постепенно обновить свои приложения до RxJava 2.

Реактивное программирование позволяет использовать правильную архитектуру — асинхронную. Приветствуйте асинхронность источников, а не пытайтесь самостоятельно управлять всеми состояниями. Комбинируйте источники так, чтобы ваши приложения стали по-настоящему реактивными.

Источник

The curious case of RxJava Disposables A.K.A. what happens when you dispose a Disposable

rxjava disposable что такое. Смотреть фото rxjava disposable что такое. Смотреть картинку rxjava disposable что такое. Картинка про rxjava disposable что такое. Фото rxjava disposable что такое

rxjava disposable что такое. Смотреть фото rxjava disposable что такое. Смотреть картинку rxjava disposable что такое. Картинка про rxjava disposable что такое. Фото rxjava disposable что такое

In RxJava 2, there’s this concept of Disposable s. They’re useful when e.g. you make a long-running HTTP request, but as the request is still in progress you are no longer interested in the results of that request. This might look like this in code:

I was playing around with Disposable s by calling dispose() on different reactive chains with various operators used and I was actually surprised with the results. Therefore I decided to share the results in this article 🤓

All the code referenced in this article is available at the bottom of the article. You’ll also find some cheat sheets there.

I’m not going to explain how Disposable s work as there’s already a great article about this written by Niklas Baudy (link below), but I’ll present a number of examples showing what would happen if we disposed a Disposable in various Rx flows.

RxJava 2 Disposable — Under the hood

Everyone has code like the following:

Setup

We will increase counter in subsequent operators and check the value at the end of each test to verify where the flow stopped.

In the example above, TestSchedulerRule is a JUnit Rule which sets a TestScheduler so that we can test asynchronous code easier with RxJava:

Disposing Observable-based flows

Example with flatMap + map + doOnNext

When we run this test it will print:

The first value will be also received by the subscriber, but we won’t receive a completed callback event.

switchMap

In this example, the initial Observable#flatMap is followed by Observable#switchMap and Observable#map after that.

When we run this test it will print:

Unlike the previous example, no values are received by the subscriber. Also, the flow stops after Observable#switchMap — the code inside Observable#map block doesn’t get executed.

concatMap

In this example, the initial Observable#flatMap is followed by Observable#concatMap and Observable#map after that.

When we run this test it will print:

Similar to the previous example, no values are received by the subscriber. However, the flow stops before executing the code in Observable#concatMap block (the code inside Observable#map block also doesn’t get executed).

flatMap with Single#just + toObservable

In this example, the initial Observable#flatMap is followed by a second Observable#flatMap with Single#toObservable and Observable#map after that.

When we run this test it will print:

No values are received by the subscriber. Just as with Observable#switchMap example before, the flow stops after the second Observable#flatMap — the code inside Observable#map block doesn’t get executed.

flatMapSingle

In this example, the initial Observable#flatMap is followed by Observable#flatMapSingle and Observable#map after that.

When we run this test it will print:

No values are received by the subscriber. The flow stops after Observable#flatMapSingle — the code inside Observable#map block doesn’t get executed.

Observable.create

In this example, the initial Observable#flatMap is followed by a second Observable#flatMap with Observable#create and Observable#map after that.

When we run this test it will print:

No values are received by the subscriber. Just as with Observable#switchMap example before, the flow stops in the second Observable#flatMap — the code inside Observable#create block doesn’t get executed just as the code in subsequent Observable#map block.

Side note — when creating Observable with Observable#create you should also check if ObservableEmitter is not disposed and do not emit any values if it is.

Disposing Single-based flows

fromCallable

When we run this test it will print:

No values are received by the subscriber. Also, the flow stops right after Single#fromCallable — the code inside Single#map block doesn’t get executed.

This behavior is different from what we’ve seen with Observable where the flow was either stopped or not depending on the subsequent operators. With Single#fromCallable there’s no point in testing subsequent operators as the code inside of them would never be executed.

In the next sections, we’re going to test disposing the flow while the long-running operation happens inside different operator blocks.

flatMap

When we run this test it will print:

No values are received by the subscriber. Also, the flow stops right after Single#flatMap — the code inside Single#doOnSuccess block doesn’t get executed.

map + subsequent doOnSuccess

When we run this test it will print:

The value will be received by the subscriber and the code in the Single#doOnSuccess block gets executed.

map + subsequent flatMap

When we run this test it will print:

No values are received by the subscriber. Also, the flow stops right after Single#flatMap — the code inside Single#doOnSuccess block doesn’t get executed.

map + subsequent flatMapCompletable

When we run this test it will print:

No values are received by the subscriber. Also, the flow stops right after Single#flatMapCompletable — the code inside Single#doOnSuccess and Completable#toSingle blocks doesn’t get executed.

Disposing Completable-based flows

fromCallable

When we run this test it will print:

Subscriber won’t receive a completed callback event. Also, the flow stops right after Completable#fromCallable — the code inside Completable#doOnComplete block doesn’t get executed.

andThen

When we run this test it will print:

Subscriber won’t receive a completed callback event. Also, the flow stops right after Completable#andThen — the code inside Completable#doOnComplete block doesn’t get executed.

doOnComplete + doOnComplete

When we run this test it will print:

Subscriber will receive a completed callback event and the code inside the second Completable#doOnComplete will be executed.

doOnComplete + andThen

When we run this test it will print:

Subscriber won’t receive a completed callback event. Also, the flow stops right after Completable#andThen — the code inside the second Completable#doOnComplete block doesn’t get executed.

Note: up until RxJava 2.2.5 “andThen” would not be printed, since 2.2.6 it would (probably due to: https://github.com/ReactiveX/RxJava/pull/6362 )

doOnComplete + toSingle

When we run this test it will print:

Subscriber will receive a completed callback event and the code inside Completable#toSingle and Single#doOnSuccess will be executed.

Источник

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *