RxSwift: немного о share(), replay(), shareReplayLatestWhileConnected() и других классных операторах
Я уже писал про Publish, Connect и RefCount в RxSwift. Для того, чтобы лучше раскрыть тему, представляю вашему вниманию перевод другой замечательной статьи, про различия между такими операторами, как share(), replay(), replayAll(), shareReplay(), publish() и shareReplayLatestWhileConnected().
Частая ошибка, которую совершают новички, взявшиеся за освоение Rx — это непонимание того, что цепочка операторов на Observable выполняется заново с каждым новым подписчиком:
Мы имеем несколько подписчиков на один-единственный Observable, но мы не хотим, чтобы его код исполнялся с каждым новым Subscriber’ом. Для этого в RxSwift имеется несколько операторов. Вот резюмирующая табличка, описывающая каждый из них:
1 — ретранслирует произведенных до подписки элементов не больше, чем bufferSize.
2 — ретранслирует 1 элемент, произведенный до подписки, до тех пор, пока существует хотя бы один подписчик.
Теперь рассмотрим подробно каждое свойство
Shared subscription. Возвращаемый Observable делит одну основную подписку между всеми подписчиками.
Reference counting. Возвращаемый Observable считает количество сабскрайберов, которые на него подписаны. Как мы помним из первого свойства, на Observable, к которому был применен оператор (его будем называть Source Observable), всегда существует только одна «основная» подписка и она делится данными с остальными подписчиками.
Когда счетчик подписчиков достигает нуля, происходит очистка ресурсов Source Observable. Обратите внимание: каждый раз, когда счетчик увеличивается с нуля до единицы, на Source Observable будет производиться новая подписка. Но этого может не случиться, если Source Observable до этого завершился или послал ошибку. Это довольно непредсказуемое поведение, поэтому я стараюсь избегать его и при написании кода убеждаюсь, что после того, как количество подписчиков опустилось до нуля, не будет никаких последующих подписок.
Изучаем мультикаст операторы RxJS
Привет, Хабр! Представляю вашему вниманию перевод статьи «Understanding RxJS Multicast Operators» автора Netanel Basal.
Широковещательные или мультикаст операторы нередко кажутся самой сложной темой при изучении RxJS. В этой статье я попробую все доступно объяснить.
Мы рассмотрим внутреннее устройство мультикаст операторов и решаемые ими задачи.
Давайте начнем с описания основных строительных блоков RxJS.
Observable
В RxJS наблюдаемые объекты (далее «потоки») изначально являются холодными. Это значит, что каждый раз при подписке на поток выполняется коллбэк подписки.
Для лучшего понимания создадим такую реализацию:
Это все довольно просто. Давайте теперь перейдем ближе к реальности. Например завернем в поток нативный XHR API
Теперь, глядя на нашу реализацию, как вы думаете, что произойдет когда мы подпишемся на этот поток дважды?
Правильно, выполнится два http-запроса. Если еще раз взглянуть на реализацию класса Observable, мы увидим почему так. Каждый подписчик вызывает коллбэк подписки, который в свою очередь каждый раз выполняет http-запрос.
Операторы
Оператор это функция, которая принимает на вход поток, производит какие-либо действия, и возвращает поток.
Напишем наш первый собственный оператор.
Т.е. внутри него обязательно присутствует подписка на входной поток.
Перед тем как использовать этот новый оператор, нам нужно как-то прикрепить его к потоку. Расширим наш класс Observable методом pipe()
Простой метод, всего одна строка кода. pipe принимает массив операторов и вызывает их по очереди, передавая каждому на вход результат выполнения предыдущего.
Давайте же используем наш оператор:
Если мы подпишемся на цепочку дважды, каждая подписка в цепочке вызовется два раза.
А если нам не подходит такое поведение? Если мы хотим вызвать функцию подписки только один раз, сколько бы у нас не было подписок?
Subjects
Давайте напишем его.
Subject может выполнять роль промежуточного звена между холодным потоком и множеством подписчиков.
Изменим наш пример следующим образом:
Теперь у нас оригинальный коллбэк подписки выполняется один раз, и только один http-запрос будет выполнен.
Подписчики, опоздавшие на вечеринку
Что произойдет, если исходный поток уже сработал до того, как мы подписались?
Не получится показать это на прошлом примере, так как http — асинхронный, даже если подписаться на него сразу после, значение все равно придет уже после подписки.
Давайте быстро создадим порождающую функцию of :
Наши подписчики не получили ничего. Почему? Наша реализация не поддерживает «опоздавших» подписчиков. Когда исходный поток от of() эмитит значения, подписчики еще не зарегистрированы, эти значения уйдут в никуда.
В актуальных примерах на Angular вполне может быть такое, что исходный поток сработал, но ваш компонент еще не присутствует на странице. И когда компонент появится, он подпишется на источник, но не получит уже пролетевших значений.
Каждое выпущенное значение будет передано всем текущим подписчикам и сохранено для будущих, размер буфера bufferSize устанавливается в конструкторе.
Несмотря на позднюю подписку мы поймали их всех.
Резюмируя, предназначение ReplaySubject — это рассылка значений всем подписчикам и кэширование их для будущих «опоздавших» подписчиков.
Теперь мы наконец переходим к мультикаст операторам. Надеюсь рассмотренные выше примеры выше вам помогут быстрее их понять.
Multicast Operators
Multicast и Connect
Метод connect позволяет нам самим определять, когда запустить на выполнение исходный поток. Тут есть момент, о котором нужно помнить — чтобы отписаться от источника нужно выполнить:
По этому коду можно догадаться, что произойдет под капотом.
Переиспользовать уже завершенный subject нельзя, фабричная функция решает эту проблему.
RefCount
Автоматизация процесса позволила бы избежать ошибок и упростила код. Добрые разработчики RxJS подумали об этом за нас и создали refCount оператор.
Publish и его варианты
multicast() + Subject + refCount() это довольно типичный случай в RxJS и разработчики сократили его до одного оператора.
Посмотрим какие у нас есть варианты.
В этом случае подсчета ссылок не будет. Это значит что пока исходный поток не будет завершен, shareReplay будет подписан на него, независимо есть у него самого конечные подписчики или нет. Все новые подписчики получат последние x значений.
shareReplay vs publishReplay + refCount
Посмотрим в чем сходство и в чем разница.
У них одинаковое поведение refCount — подписка и отписка от исходного потока на основании количества подписчиков. Еще у них одинаково реагируют когда исходный поток завершился — все новые подписчики получает X последних значений.
Примеры, иллюстрирующие это:
Актуальные примеры в Angular
Посмотрим как применять в боевых условиях изученные мультикаст операторы.
Используем share
Пусть у нас есть компонент, которому нужны данные из исходного потока. Это может быть запрос http, стейт или что-угодно. А еще нам нужно манипулирование данными, типа фильтрации, сортировки и т.п.
А теперь нам нужен другой компонент, который показывает только первого пользователя. Если мы подпишемся на исходных поток как он есть, то:
И вот у нас два http-запроса, операции сортировки или фильтрации выполнятся дважды.
Применяем share :
Используем ShareReplay
ShareReplay применяется когда нужно эмитить, кэшировать и повторять последние X значений. Типичный пример — синглтон сервис, выполняющий http запрос.
Что такое RxJS и почему о нём полезно знать
В этом материале мы поделимся с вами переводом интервью с руководителем проекта RxJS 5+, инженером Google Беном Лешем.
В огромном мире фронтенд-разработки существует множество интересных инструментов. Я стремлюсь найти правильный подход к изучению практических вещей, поэтому я решила пообщаться с одним из самых ярких представителей RxJS-сообщества, Беном Лешем. Мне хотелось побольше узнать о RxJS, и о том, почему мне, начинающему разработчику, стоит вложить время в изучение реактивного программирования. Кроме того, мне хотелось понять — зачем применять RxJS в моих проектах. Бен рассказал мне о том, как использовать RxJS и поделился советами, касающимися изучения этой технологии.

Какую проблему решает RxJS?
Программирование — это всегда решение проблем и поиск инструментов, подходящих для поиска ответов на конкретные вопросы. В случае RxJS решаемая задача заключается в возможности обрабатывать асинхронные вызовы с помощью множества событий. На этом стоит остановиться подробнее.
Представьте, что вы пишете функцию, которая производит с данными некую последовательность действий и в ходе этой работы возникает ошибка. Если вы просто используете функции для обработки последовательностей запросов, тут могут быть некоторые необязательные действия, предпринимаемые для возврата ошибки. Вместо того, чтобы передавать ошибку через все функции, нужно иметь возможность взять ошибку и обновить представление без необходимости проходить по всем Ajax-запросам, которые теперь не нужны.
Вы можете заметить, что для организации обработки ошибок в таком стиле созданы промисы, но RxJS выводит концепцию работы с последовательными действиями на новый уровень. Промис может обрабатывать лишь единственное значение, что ограничивает варианты использования данной конструкции. В дополнение к этому, промис нельзя отменить, что означает, что он вполне может заблокировать поток и впустую использовать ресурсы (важное соображение для маломощных устройств).
В противоположность этому, RxJS даёт способ устранения этих ограничений, давая несколько каналов связи, что упрощает обработку многошаговых событий и повышает её эффективность. RxJS, кроме того, даёт разработчику возможность единообразно представлять всё, что нуждается в генерировании событий. Когда всё выглядит одинаково, оказывается, что со всем этим очень просто работать: комбинировать, объединять, выполнять запросы. Всё это делает RxJS очень мощным инструментом.
Зачем изучать RxJS?
С одной стороны, RxJS — это мощный инструмент, который позволяет превращать сложные последовательности действий в лаконичный код, с которым легко работать.
С другой стороны, эта простота основывается на множестве языковых механизмов, а их изучение требует времени. Однако, полученные знания стоят затраченных усилий, когда понимаешь, что с помощью одной строки кода можно сделать что-то вроде реализации механизма «перетащить и опустить», что требует трёх наборов событий.
Подобное даёт возможность связать эти события воедино (нажатие кнопки мыши, перемещение мыши, отпускание кнопки) для того, чтобы получить одну краткую и точную строку кода. При обычном подходе подобное требует программы в несколько десятков строк.
Какие преимущества даёт использование RxJS?
Одна из наиболее привлекательных возможностей, открывающихся при интеграции RxJS в код, заключается в том, что чем больше вы этим пользуетесь, тем больше вы сможете с помощью этой технологии сделать. RxJS можно сравнить с конструктором Lego, в том смысле, что Lego отлично подходит для разработки новых конструкций, так как все кубики имеют одинаковую форму. Похожим образом, все наблюдаемые объекты выглядят одинаково, поэтому создание чего-то с их использованием становится увлекательной задачей, так как вы можете экспериментировать со множеством интересных решений. Чем больше некто использует наблюдаемые объекты в коде, тем больше возможностей получает в создании чего-то нового на основе существующих структур.
Как выглядит процесс интеграции наблюдаемых объектов в большую кодовую базу?
Наблюдаемые объекты можно использовать практически в любом приложении. Если речь идёт о командной разработке, это может занять некоторое время, однако, переход можно упростить, если выполняются следующие условия:
Существуют ли ситуации, в которых использование наблюдаемых объектов не рекомендуется?
Конечно существуют! Если перед нами отдельное событие, которое выполняет какое-то одно действие, как часто бывает в JS-программировании, тогда использовать RxJS — это малость странновато. Конечно, и тут можно применить RxJS, но это будет явный перебор.
Реализация операции «перетащить и опустить» — отличный пример задачи, для решения которой идеально подходит RxJS. Это — событие со множеством действий, сложность которого, при наличии такой возможности, всегда полезно уменьшить.
Как понять, когда следует использовать наблюдаемые объекты?
Полезно, при принятии решения о том, стоит ли использовать наблюдаемые объекты, опираться на контекст. Например:
С чего стоит начать, приняв решение изучить операторы Rx?
Легко ли начать использовать RxJS при работе с фреймворком вроде React?
Можете ли вы поделиться какими-нибудь советами по отладке Rx?
Как и при работе с любыми другими технологиями, в ходе изучения Rx, по мере того, как накапливается опыт, упрощается и отладка. Есть некоторые ситуации, которые сложно отлаживать. Я сейчас работаю с командой разработчиков Chrome для того, чтобы решить эти проблемы.
Типичная сложная ситуация, с которой сталкиваются программисты, возникает, когда возвращают что-нибудь из mergeMap и ожидают получить наблюдаемый объект, но оказывается, что это не наблюдаемый объект. Затем хотят получить возможность увидеть функцию, которая возвратила то, что, как они считали, должно быть наблюдаемым объектом, но им не является. В настоящее время нет способа это показать, так как нельзя узнать, что именно будет возвращено до момента возврата.
Вот несколько советов по отладке:
Что вы можете сказать о будущем RxJS?
В настоящее время такой оптимизации в RxJS не происходит. Всё находится в прототипах. Это даёт возможность использовать запись операторов через точку, но система не может оптимизировать размер пакета, так как весь исходный код считается используемым, даже если его, на самом деле, не применяют в конкретном проекте.
Как растущая популярность RxJS влияет React и Angular?
Надеюсь, что по мере того, как RxJS становится доступнее, и из-за того, что его всё легче интегрировать в различные проекты, он получит большее распространение в среде фронтенд-разработчиков. Сейчас широкому распространению RxJS мешают два препятствия. Первое — сложность изучения. Второе — размер библиотеки. RxJS существует уже давно, но раньше основной команде разработчиков было сложно понять то, как представить его программистам в простом и понятном виде.
Создатели Rx — замечательные люди, но они используют сложную терминологию, что привело к недостатку внимания к проекту со стороны сообщества разработчиков. Однако, благодаря образовательным ресурсам, вроде This Dot, сейчас наблюдается повышение интереса к этому замечательному инструменту.
Для того, чтобы способствовать дальнейшему распространению RxJS, сейчас я работаю над уменьшением размера библиотеки. Следите за новостями о Tiny Rx, или T-Rx. Если в двух словах, то этот проект позволил уменьшить 24-килобайтную (сжатую g-zip) библиотеку всего до 3 Кб!
Приятно наблюдать за тем, как растёт популярность RxJS, как им пользуется всё больше программистов. К тому же, надо сказать, что все желающие могут присоединиться к RxJS, внести посильный вклад в развитие этого мощного и полезного инструмента.
Если вы, немного ознакомившись с возможностями RxJS, хотите узнать больше, вот несколько полезных ссылок: Rx Workshop, Intro to Rx, Thinking Reactively и RxJS in Depth.
Уважаемые читатели! Пользуетесь ли вы RxJS в своих проектах?
Магия sharing-операторов и их разница в RXJS
Jun 22, 2019 · 13 min read
Перед погружением в sharing-операторы, первое что нужно определить — это то какие типы наблюдаемых объектов есть в rxjs. Всего существует 2 типа наблюдаемых обектов холодные и горячие. Вот отличная статья на эту тему Hot vs Cold Observables, но основная разница заключается в
Notification producer(поставщик событий) в холодном наблюдаемом обьекте создается самим обьектом и только тогда когда кто то на него подписывается.
Для примера interval() создает холодный наблюдаемый обьект. Информация создается внутри наблюдаемого обьекта и для каждой новой подписки будет создаваться новый интервал( interval ).
Notification producer(поставщик событий) в горячих наблюдаемых обьектах создается вне обьекта и не обращает внимание есть ли подписчики или нету.
Для примера fromEvent() создает горячий наблюдаемый обьект в качестве поставщика событий выступает DOM и он существует независимо от количества подписчиков.
Иногда нам нужн о сделать холодный observable горячим, возьмем для примера http-запросы. Рассмотрим следующий пример http-запроса из Angular
и мы отображаем имя и возвраст пользователя в шаблне используя пайп async (расположеных в разных местах что бы не возможно было обработать в одном пайпе async)
Во вкладке браузера network мы увидим 2 запроса. Причина этого заключается в том что Angular’s Http создает холодный observable так что каждый новый подписчик эквивалентен новому запросу. Определенно мы не хотим делать несколько похожих запросов. Для начала решим эту проблему и посмотрим как это работает.
Решение довольно тривиально. Все что нам нужно сделать это добавить share() или publish(), refCount() как то так
И теперь вкладка network отображает один запрос потому что информация была опубликована(shared) среди всех подписчиков. Так как share() или publish() решают данную проблему и в чем разница между ними если они делают одно и то же?
TL;DR share(), publish() и другие multicasting-операторы заставляют вести холодные observable как горячие
Для того что бы понять как sharing-операторы работают нам нужно понять что такое multicasting.
Горячие observable являются multicast так как все подписчики получают информацию из одного и того же источника событий (producer).
Холодные observable являются unicast так как каждый подписчик получает информацию из разных источников событий(producer) (от пер. при создании подписки producer создается для каждой подписки отдельно).
multicast()
Просто так это не заработает потому что нам нужно вызвать метод connect() вручную.
После этого мы получим похожее поведение, будет только один запрос вместо двух. Подключение (Connecting) и отключение(disconnecting) вручную может быть сложным для реализации, поэтому есть оператор — refCount() который будет автоматический подключаться( connect() ) при первой подписке, сохраняя количество подписчиков и Subject подключенных к источнику событий до тех пор пока не останется подписчиком. Когда не останется подписчиков, Subject будет отключен от источника событий( Source).
Источником событий(Source observable) в нашем примере выступает observable возвращаемый методом this.http.get()
Subject — внутренний subject переданный как аргумент multicast()
Subscriptions or observers — this.name$ и this.age$ и другие observable которые подписаны на Subject.
Вкратце все подписчики будут подписаны на subject X (переданные в multicast) и subject X сам подпишится на вызов http. Когда observable вернет результат запроса, наш subject X возмет результат и поделится им среди подпсчиков.
Основная идея мультикастинга(multicasting) заключается в том что Subject подписывается на источник событий(Source) и множество наблюдателей(Observers) подписываются на него(Subject).
Теперь нам не надо вручную вызывать connect() и беспокоиться об отписки(disconnecting). refCount() будет присоединять(connect) Subject к источнику событий(Source) при первой подписки на него и отпишется когда небудет наблюдателей.
Ранее мы использовали комбинацию publish(), refCount() она будет эквивалентна такой записи multicast(new Subject()), refCount()
publish()
В RxJs есть оператор publish() и если посмотрим на его исходный код то увидим что он использует multicast с Subject()
publish() опционально принимает selector function и это фактически меняет поведение оператора и это заслуживает отдельной статьи. Мы пропустим эту часть и рассмотрим использование publish() без selector function.
Так что когда мы используем publish() мы на самом деле используем уже известный метод multicast() с Subject() и нам так же нужно в ручную позаботиться о соединении и отсоединении, или использовать refCount() для автоматизации этого процесса.
Из за того что мы обычно используем publish() вместе с refCount() в rxjs есть схожий оператор который использует refCount() внутри себя и ведет себя схоже(с комбинацией publish и refCount). Это share()
share()
В первом примере мы видим что share() делает тоже самое что publish(), refCount() и в большенстве случаев они одинаковы. share() — это оператор который использует refCount() внутри себя, так что нам не нужно вызывать его. share() как и publish() использует multicast() но вся разница в аргументах переданных в multicast().
publish() использует экземпляр Subject — multicast(new Subject())
share() использует функцию-фабрику которая возвращает экземпляр Subject — multicast(() => new Subject()).refCount()
Различия между share() и publish() + refCount()
Они оба используют refCount() для управления подписчиками, однако
publish() + refCount() — пока есть хотябы один подписчик на источник(Subject), он будет выдавать значения. После того как не останется подписчиков, источник(Subject) будет отключен от исходного источника(Source). Если источник(Source) был завершен то все новые подписчики будут получать “completed”, но если источник(Source) не был завершен он будет переподключен к исходному источнику (Source)
share() — пока есть хотя бы один подписчик на источник(Subject), он будет выдавать значения. После того как не останется подписчиков, источник(Subject) будет отключен от исходного источника(Source). Для любого нового подписчика, неважно был ли завершен поток(Source) или нет, он будет подписан к источику(Source) снова используя new Subject
Разница очень тонкая но очень важная. Давайте модифицируем наш код, добавив кнопку которая будет обновлять информацию пользователя. При нажатии на нее данные будут повторно загружены с сервера.
Сначало давайте попробуем share()
Теперь рассмотрим тот же пример с publish(), refCount()
Снова мы увидим что refCount() равен 2 и как только Source запускает событие счетчик станет 0. Но когда мы выполним метод update() то ничего не произойдет(не будут повторно выполнены запросы к серверу). Как было написанно ранее новые подписчики будут получать уведомления о завершении источника если тот завершится.
share() использует фабричный метод который возвращает экземпляр субъекта. когда источник завершается, Субъект будет тоже завершен, НО для новых подписчиков будет создан новый экземпляр Субъекта который будет подписан на основной источник событий.
multicast() с различными типами subject
До сих пор мы говорили про multicast использующий Subject. Есть еще несколько типов Subject — ReplaySubject, BehaviorSubject и AsyncSubject. Передача мультикасту(multicast) различных Subject вернет ConnectableObservable, НО их поведение будет отличаться.
Если мы передадим ReplaySubject(n) в multicast() каждый новый подписчик будет получает n проигранных значений.
publishReplay()
publishReplay() === multicast(new ReplaySubject())
publishReplay() возвращает ConnectableObservable так что нам нужно использовать connect() или refCount() для управления соединениями (connections). Давайте модифицируем наш пример, так что бы каждый новый подписчик получал значения из буффера. Когда мы кликаем по update() мы не хотим получать новые значения но хотим получить закэшированные значения.
Все подписчики ReplaySubject-а перед завершением потока получат значение (в нашем случае только 1 значение так как http-емитит (emits) значение однажды). Для всех новых подписчиков ReplaySubject будет повторять N буфферизированных значений.
shareReplay()
shareReplay() — это очень интересный оператор. Он может вести себя схожим с publishReplay() + refCount() образом, но он зависит от того как мы его используем.
До версии RxJs 6.4.0 механизм подчета ссылок в shareReplay() работал немного по другому. Начиная с 6.4.0 мы можем явно передать аргумент для shareReplay() что бы тот использовал “нормальный” механизм подчета ссылок. Давайте разберемся.
shareReplay() (RXJS 6.4.0 или новей)
Как вы можете видет, нам не нужно больше использовать refCount() потому что shareReplay(
Различия между shareReplay() и publishReplay() + refCount()
publishReplay(n) + refCount() — пока существует хотя бы один подписчик на источник, ReplaySubject будет выдавать значения, если подписчиков нет, ReplaySubject будет отключен от источника. Любой новый подписчик получит последние N значений из ReplaySubject и повторно подпишется на источник, используя тот же ReplaySubject, если источник еще не завершен.
shareReplay(
Для того что бы увидеть разницу, давайте использовать interval() для того что бы новые подписчики не были завершены
Предположим что мы создали новую подписку на ReplaySubject, кликнув на кнопку.(после того как refCount выдал ноль)
Внутренний ReplaySubject повторяет буфферизированные значения для новых наблюдателей(observers) и завершает их, или переподписывается на Source в зависимости от статуса Source.
Теперь повторим пример с interval() используя shareReplay(
shareReplay() не использует multicast но он использует внутреннюю фабричную функцию и если мы используем его с refCount: true и refСount возвращает ноль, для любого нового подписчика, при условии завершения Source он будет возвращать буфферизованные значения и эмитить(emit) сообщение о завершении(completed). Если Source не был завершен(completed) для каждого нового подписчика будет создан новый ReplaySubject и подписка на Source.
В результате, после запуска кода выше и выполнния newSub() мы увидим.
Как вы заметили для sub3 значение не повторяется. Причина в том что при отписке (unsubscribe) sub1 и sub2, refCount вернет ноль и если Source завершен все новые подписчики как sub3 будут получать все буфферизированные значения и и статус completed, но так как мы используем интервал, Source не будет завершен, ReplaySubject будет уничтожен и новый подписчик будет создавать новый экземпляр ReplaySubject и подписываться на Source заново.
shareReplay() без refCount
Мы можем использовать shareReplay с refCount установленым в false или вообще не установленным(not set at all) и определить только размер буффера — например shareReplay(
По умолчанию refCount установлен как false в shareReplay()
refCount: false означает что при первой подписке на ReplaySubject он будет подписан на Source. Но он не отключит ReplaySubject от Source в котором не осталось подписчиков на ReplaySubject. Давай те изменим наш пример снова выставив refCount в значение false
sub1 и sub2 подписаны на ReplaySubject и ReplaySubject подписывается на внутреннему Source. После 2 секунд sub1 и sub2 будут отписаны, но в этом случае ReplaySubject не будет отписан от Source. Source будет продолжать публиковать значения даже если нет подписчиков, что бы обработать его значения. После 4 секунд новый подписчик подпишется на ReplaySubject и получит последние 2 значения из буффера и продолжит получать значения из Source. Результат будет следующим.
sub1 и sub2 подписаны, печатают значения и после двух секунд отписываются, но из за того что Source еще не завершен(completed), ReplaySubject будет получать данные из Source и поэтому после 4 секунд sub3 подписывается на ReplaySubject он получит не 0 и 1 как значения из буффера, но 2 и 3, потому то в то время ReplaySubject-у удалось получить новые данные из Source и одновить буффер. Единственый случай когда ReplaySubject отпишется от Source это когда Source завершен или завершен ошибкой. Каждый новый подписчик в таком случае будет получать воспроизводимые значения и завершится.
publishBehavior()
publishBehavior() использует multicast с другим subject. BehaviorSubject
publishBehavior() === multicast(new BehaviorSubject())
publishBehavior() вернет ConnectableObservable который нам нужно использовать для refCount() или подключаться вручную.
publishBehavior() принимает значения по умолчанию как параметр и будет публиковать его для всех подписчиков если Sourceне публиковал. Рассмотрим этот пример
Результат будет слудующий
Из за того что interval асинхронный, когда sub1 и sub2 подписываются на BehaviorSubject, в это время Source еще не опубликовал значение, так что sub1 и sub2 будет доставать значение по умолчанию из BehaviorSubject. Через 2 секунды sub1 и sub2 будут отписаны от BehaviorSubject и BehaviorSubject отпишется от Source. Через 4 секунды sub3 будет подписан на BehaviorSubject из за того что Source еще не был завершен, sub3 будет получать последнее опубликованное значение и переподписываться на Source используя тот же(same) BehaviorSubject.
publishBehavior(default_value) — когда подписка на BehaviorSubject создана до того как Source опубликовал значение, BehaviorSubject передаст значение по умолчанию подпсчику. Пока есть хотя бы один подписчик на Source BehaviorSubject будет публиковать значения. Когда не останется подписчиков BehaviorSubject будет отключен(disconnected) от Source. Если Source еще не был завершен(completed), новые подписчики будут получать последнее значение от BehaviorSubject и переподписываться на source пользуясь тем же BehaviorSubject. Если Source был завершен все новые подписчики будут получать статус завершения.
publishLast()
publishLast() uses multicast with AsyncSubject
publishLast() === multicast(new AsyncSubject())
Из за того что interval бесконечный observable мы использовали take(2) будет излучать 2 значения и завершаться. Ниже продемонстрирован результат.
когда sub1 и sub2 подписываются на AsyncSubject они не будут получать значения пока Source не завершится. Когда Source завершается AsyncSubject будет передавать последнее значение всем наблюдателям(observers) и завершится. После 7 секунд sub3 подписывается на AsyncSubject и из за того что он завершен sub3 получит последнее значение и тоже завершится.
Много чего есть в Rxjs и multicasting одна из самых важных вещей в библиотеке. Надеюсь эта статья помогла вам понять как работают sharing-операторы и в чем их отличия.
Спасибо за внимание.




