на каком потоке будут вызываться onnext onerror oncomplete если в цепочке нет subscribeon observeon
Разбираемся с многопоточностью в RxJava
Когда описывают преимущества RxJava, всегда упоминают об удобстве организации работы многопоточного приложения средствами RxJava. То, как использовать операторы subscribeOn и observeOn, можно прочитать практически в каждой статье, посвященной основам RxJava. Например, здесь хорошо описаны случаи, когда использовать методы subscribeOn и когда observeOn. Однако, на практике часто приходится сталкиваться с проблемами, для которых нужно более глубокое понимание того, что именно делают методы subscribeOn и observeOn. В этой статье я хотел бы рассмотреть ряд вопросов, которые иногда возникают при использовании этих операторов.
Изучать нюансы RxJava можно разными способами: по документации (которая весьма подробна), по исходникам или же на практике. Я выбрал последний способ. Для этого я набросал пару тестов, по работе которых я смог лучше разобраться с асинхронным реактивным программированием.
Сначала для проверки работы смены потоков я использовал следующий код:
Проверим как работает этот код без всяких преобразований:
Результат:
Inside observable: main
Before transform: main
After transform: main
Inside doOnNext: main
In onNext: main
In onComplete: main
Как и ожидалось, никакой смены потоков.
1. ObserveOn и SubscribeOn
SubscribeOn
Как можно понять из документации reactivex.io/documentation/operators/subscribeon.html
с помощью этого оператора можно указать Scheduler, в котором будет выполняться процесс Observable.
Проверяем:
Результат:
Inside observable: RxCachedThreadScheduler-1
Before transform: RxCachedThreadScheduler-1
After transform: RxCachedThreadScheduler-1
Inside doOnNext: RxCachedThreadScheduler-1
In onNext: RxCachedThreadScheduler-1
In onComplete: RxCachedThreadScheduler-1
Начиная с выполнения содержимого Observable и до получения результата, все методы выполнялись в потоке, созданном Schedulers.io().
ObserveOn
В документации по этому методу сказано, что применение этого оператора приводит к тому, что последующие операции над “излученными” данными будут выполняться с помощью Scheduler, переданным в этот метод.
Результат:
Inside observable: main
Before transform: main
After transform: RxCachedThreadScheduler-1
Inside doOnNext: RxCachedThreadScheduler-1
In onNext: RxCachedThreadScheduler-1
In onComplete: RxCachedThreadScheduler-1
Как и ожидалось, с момента применения метода observeOn поток, в котором производится обработка данных, будет изменен на тот, который ему выделит указанный Scheduler.
Объединим использование subscribeOn и observeOn:
Результат:
Inside observable: RxComputationThreadPool-3
Before transform: RxComputationThreadPool-3
After transform: RxCachedThreadScheduler-1
Inside doOnNext: RxCachedThreadScheduler-1
In onNext: RxCachedThreadScheduler-1
In onComplete: RxCachedThreadScheduler-1
Методы, выполняемые до применения оператора observeOn выполнились в Scheduler, указанном в subscribeOn, а после – в scheduler, указанном в observeOn.
Комбинируя эти два метода, можно добиться асинхронной загрузки данных из интернета и отображения их на экране в главном потоке приложения.
Но что будет, если применить эти методы несколько раз?
Для начала вызовем observeOn несколько раз:
Inside observable: main
Before transform: main
Between two observeOn: RxComputationThreadPool-3
After transform: RxCachedThreadScheduler-1
Inside doOnNext: RxCachedThreadScheduler-1
In onNext: RxCachedThreadScheduler-1
In onComplete: RxCachedThreadScheduler-1
Никаких сюрпризов. После применение observeOn обработка элементов производится с помощью указанного Scheduler.
Теперь вызовем subscribeOn несколько раз.
Результат:
Inside observable: RxComputationThreadPool-1
Before transform: RxComputationThreadPool-1
Between two observeOn: RxComputationThreadPool-1
After transform: RxComputationThreadPool-1
Inside doOnNext: RxComputationThreadPool-1
In onNext: RxComputationThreadPool-1
In onComplete: RxComputationThreadPool-1
Как видим, применение второго subscribeOn не привело ни каким изменениям. Но совсем ли он бесполезен?
Добавим между вызовами subscribeOn оператор:
Получим первое сообщение в логе:
Inside lift: RxCachedThreadScheduler-1
RxCachedThreadScheduler-1 — это именно тот поток, который был получен из Schedulers.io(), указанного во втором вызове subscribeOn.
lift() — это оператор, с помощью которого можно трансформировать subscription.
Можно схематично описать процесс выполнения подписки следующим образом:
Пользователь подписывается на observable, передавая subscription.
Этот subscription доставляется до корневого observable, при этом он может быть преобразован с помощью операторов.
Subscription передается в observable, отправляются onNext, onComplete, onError.
Над произведенными элементами выполняются преобразования
Преобразованные элементы попадают в onNext изначального subscriber.
Таким образом, когда subscription доставляется до observable, изменить поток можно с помощью subscribeOn. А когда элементы доставляются из observable в subscription – влияет observeOn.
Для того, что бы это проиллюстрировать рассмотрим код:
Подписчик, созданный в последней строчке, передается в Observable, созданный с помощью Observable.create(). Внутри оператора map вызывается оператор lift, куда передается Operation, который во время подписки декорирует Subscriber. Когда Observable излучает данные, они попадают в декорированный Subscriber. Декорированный Subscriber изменяет данные и отправляет их в оригинальный Subscriber.
Без изменения Scheduler весь процесс будет выполняться в потоке, в котором вызывается метод subscribe. Далее, пока Subscriber декорируется, с помощью subscribeOn можно изменить поток, в котором будет выполняться следующая декорация. В методе call() интерфейса OnSubscribe будет использоваться последний Scheduler, указанный в SubscribeOn. После излучения данных, Scheduler меняется уже с помощью onserveOn.
2. Выполняем задачи параллельно.
Рассмотрим следующий кейс:
Необходимо загрузить с сервера различную информацию, после этого скомпоновать ее и отобразить на экране. При этом, чтобы ускорить процесс, загружать данные стоит параллельно (если есть такая возможность). Если бы у нас не было RxJava, то эта задача требовала бы значительных усилий. Но с реактивным программированием эта задача тривиальна.
Мы будем выполнять три задачи, каждая из которых ждет 1 секунду, а потом отправляет сообщение в subscription. Далее с помощью оператора combineLatest все сообщения будут объединены и переданы в подписку.
Для проверки будем использовать следующий код:
Для начала запустим тест без всяких преобразований:
Результат:
Inside Observable1: main
Inside Observable2: main
Inside Observable3: main
Inside combining result: main
Before transform: main
After tranform: main
In onNext: main
In onComplete: main
Как видим, все выполняется в одном потоке. Наши три задачи выполняются последовательно.
Добавим subscribeOn и observeOn для observable, полученного с помощью функции zip.
Результат:
Inside Observable1: RxCachedThreadScheduler-1
Inside Observable2: RxCachedThreadScheduler-1
Inside Observable3: RxCachedThreadScheduler-1
Inside combining result: RxCachedThreadScheduler-1
Before transform: RxCachedThreadScheduler-1
After tranform: RxNewThreadScheduler-1
In onNext: RxNewThreadScheduler-1
In onComplete: RxNewThreadScheduler-1
Все так, как и описывалось в предыдущей части статьи про subscribeOn и observeOn.
Теперь каждую из задач будем выполнять в своем потоке. Для этого достаточно указать Scheduler.io(), т.к. внутри него содержится пулл потоков, оптимальный для загрузки данных.
Результат:
Inside Observable1: RxCachedThreadScheduler-1
Inside Observable2: RxCachedThreadScheduler-2
Inside Observable3: RxCachedThreadScheduler-3
Inside combining result: RxCachedThreadScheduler-3
Before transform: RxCachedThreadScheduler-3
After tranform: RxComputationThreadPool-3
In onNext: RxComputationThreadPool-3
In onComplete: RxComputationThreadPool-3
Мы добились того, чего и хотели — три наши задачи выполнились параллельно.
3. Операторы с Schedulers.
В предыдущей главе для эмулирования долгих задач отлично подошел бы оператор delay(), но проблема в том, что этот оператор не так прост, как может показаться на первый взгляд.
Существует ряд операторов, которые требуют указания Scheduler для свой работы. При этом есть их перегруженные версии, которые в качестве Scheduler используют computation(). delay() является примером такого оператора:
Несмотря на то, что мы не указывали никакой Scheduler, результат будет следующим:
LastSeenThread: RxComputationThreadPool-1
Для того, что бы избежать использования computation scheduler, достаточно третьим параметром передать требуемый scheduler:
.delay(1, TimeUnit.SECONDS, Schedulers.immediate())
Примечание: Schedulers.immediate() — выполняет задачу в том же потоке, в котором выполнялась предыдущая задача.
Результат:
LastSeenThread: main
Кроме delay() существуют и другие операторы, которые могут сами менять Scheduler: interval(), timer(), некоторые перегрузки buffer(), debounce(), skip(), take(), timeout() и некоторые другие.
4. Subjects.
При использовании Subjects стоит учесть то, что по умолчанию цепочка изменений данных, отправленных в onNext subject, будет выполняться в том же потоке, в котором был вызван метод onNext(). До тех пор, пока не встретится в цепочке преобразований оператор observeOn.
А вот применить subscribeOn так просто не получится.
Рассмотрим следующий код:
Тут указаны и observeOn и subscribeOn, но результат будет следующим:
doOnNext: RxCachedThreadScheduler-1
onNext: RxNewThreadScheduler-1
doOnNext: main
onNext: RxNewThreadScheduler-1
doOnNext: main
onNext: RxNewThreadScheduler-1
Т.е. когда мы подписываемся на subject, он сразу возвращает значение и оно обрабатывается потоке из Shedulers.io(), а вот когда приходит следующее сообщение в subject, то используется поток, в котором был вызван onNext().
Поэтому, если вы после получения объекта из subject запускаете какую-то долгую операцию, то необходимо явно проставить observeOn между ними.
5. Backpressure
В этой статье невозможно не упомянуть о таком понятии как backpressure. MissingBackpressureException — ошибка, которая довольно много нервов мне подпортила. Я не стану тут пересказывать то, что можно прочитать в официальной wiki RxJava: github.com/ReactiveX/RxJava/wiki/Backpressure. Но если вы активно используете RxJava, то вам обязательно надо прочитать о backpressure.
Когда у вас в приложении имеется некоторый производитель данных в одном потоке и какой-то потребитель в другом, то стоит учитывать ситуацию, когда потребитель будет не успевать обрабатывать данные. В такой ситуации вам помогут операторы, описанные по приведенной ссылке.
Заключение.
“it makes the most sense for Subscribers to always assume that values are delivered asynchronously, even though on some occasions they may be delivered synchronously.”
“Для подписчика имеет смысл считать, что данные доставляются асинхронно, даже в тех случаях, когда они могут доставляться синхронно”.
Параллелизм в RxJava 2
Многопоточное приложение состоит из двух или более частей, которые могут работать параллельно. Это позволяет приложению лучше использовать ядра внутри процессора устройства. Это позволяет выполнять задачи быстрее и обеспечивает более плавный и отзывчивый опыт для пользователя.
Кодирование для параллелизма в Java может быть болезненным, но благодаря RxJava теперь это сделать намного проще. С RxJava вам просто нужно объявить поток, в котором вы хотите, чтобы задача была выполнена (декларативно) вместо создания и управления потоками (обязательно).
Предпосылки
Чтобы следовать этому уроку, вы должны быть знакомы с:
Ознакомьтесь с другими нашими статьями, чтобы освоить основы RxJava и лямбда-выражений.
Планировщики в RxJava 2
Schedulers в RxJava используются для выполнения единицы работы над потоком. Scheduler предоставляет абстракцию для механизма потоков Android и Java. Если вы хотите запустить задачу и используете Scheduler для ее выполнения, Scheduler переходит в свой пул потоков (набор потоков, готовых к использованию), а затем запускает задачу в доступном потоке.
В следующем разделе мы собираемся исследовать различные виды Schedulers и их использование.
Типы планировщиков
Вот некоторые из типов Schedulers доступных в RxJava и RxAndroid чтобы указать тип потока для выполнения задач.
Оператор subscribeOn()
Многопоточное программирование в Android с использованием RxJava 2
Если вы новичок в общении с RxJava или пытались разобраться в этом, но не довели дело до конца, то ниже вы найдете для себя кое-что новое.
Оригинал статьи написан 29 ноября 2017. Перевод вольный.
Нам в GO-JEK требуется выполнять большое количество асинхронных операций в приложениях и мы не можем позволить себе идти на компромиссы в ущерб скорости работы и плавности пользовательского интерфейса.
Написание сложных многопоточных Android приложений может быть достаточно трудоемким процессом, который время от времени будет вас сильно перегружать из-за необходимости заботиться о большом количестве связанных друг с другом вещей. Это и многие другие причины убедили нас использовать RxJava в разрабатываемых Android приложениях.
В этой статье мы поговорим о том как мы использовали реальные возможности работы с многопоточностью в RxJava для того, чтобы сделать процесс разработки приложения максимально простым, легким и веселым. Во всех примерах кода ниже будет использоваться RxJava 2, но описанные концепции можно будет применять и в других реактивных расширениях.
Почему реактивное программирование?
Каждая статья о реактивном программировании начинается с такого обязательного блока и мы не нарушим эту традицию. Существует несколько преимуществ использования реактивного подхода к построению Android приложений, обратим внимание на те, которые вам действительно нужны.
Никаких больше обратных вызовов
Если вы давно разрабатываете под Android, то, должно быть, заметили, как быстро вещи становятся чересчур сложными и неподконтрольными с использованием вложенных обратных вызовов.
Это происходит, когда вы выполняете несколько асинхронных операций последовательно и хотите, чтобы дальнейшие действия зависели от результата предыдущих операций. Почти сразу код становится слишком перегруженным и сложным для поддержки.
Простой контроль ошибок
В императивном мире, в ситуации когда выполняется множество сложных асинхронных операций, ошибки могут возникать в большом количестве мест. И в каждом месте вы должны обрабатывать эти ошибки, в результате появляется много повторяющегося шаблонного кода, методы становятся громоздкими.
Очень простое использование многопоточности
Все мы знаем (и тайно признаем) насколько иногда сложной может быть работа с многопоточностью в Java. Например, выполнение части кода в фоновом потоке и возврат результата обратно в главный поток. Это только звучит просто, но на практике появляется много подводных камней, которые нужно обходить.
RxJava делает безумно легким выполнение нескольких сложных операций в любом потоке на ваш выбор, заботясь о корректной синхронизации и позволяя без проблем переключаться между потоками.
Преимущества RxJava бесконечны. Мы можем говорить об этом часами и адски утомить вас, но вместо этого давайте копнем глубже и начнем изучать реальную работу с многопоточностью в RxJava.
RxJava НЕ многопоточна по умолчанию
Да, вы прочли всё верно. RxJava по умолчанию не многопоточна в любом случае. Определение, данное для RxJava на официальном сайте, выглядит примерно следующим образом:
«Библиотека для составления асинхронных и основанных на событиях программ с использованием последовательностей (observable sequences) для виртуальной Java машины».
Увидев слово «асинхронных», многие люди ошибочно полагают, что RxJava многопоточна по умолчанию. Да, RxJava поддерживает многопоточность, предлагает множество мощных возможностей для легкой работы с асинхронными операциями, но это не значит что поведение RxJava по умолчанию многопоточно.
Если вы уже немного работали с RxJava, то её знаете базовые конструкции:
Если вы запустите данный пример кода, то ясно увидите, что все действия выполняются в основном потоке приложения (проследите за именами потоков в логе в консоли). Этот пример показывает, что по умолчанию поведение RxJava блокирующее. Всё выполняется в том же потоке, в котором вызван код.
Простой пример
Для того, чтобы начать работать с многопоточностью с применением RxJava необходимо познакомиться с базовыми классами и методами, такими как Schedulers, observeOn/subscribeOn.
Давайте рассмотрим один из самых простых примеров. Допустим, мы хотим получить список объектов Book сетевым запросом и показать его в основном потоке приложения. Довольно общий и понятный пример для начала.
Также мы используем оператор observeOn() вместе с планировщиком AndroidSchedulers.mainThread() для того, чтобы обрабатывать результат в основном потоке и показать список книг в пользовательском интерфейсе приложения.
Не волнуйтесь, скоро мы перейдем к более продвинутым вещам. Этот пример был предназначен только для того, чтобы вспомнить базовые понятия, прежде чем погрузиться глубже.
Подружимся с планировщиками (Schedulers)
RxJava предоставляет мощный набор планировщиков. Вы не можете получить прямой доступ к потокам или управлять ими. Если вам нужно работать с потоками, то необходимо воспользоваться встроенными планировщиками.
Можете представлять планировщики как потоки или пулы потоков (коллекции потоков) для выполнения разного рода задач.
Говоря проще, если вам нужно выполнить задачу в отдельном потоке — необходимо использовать верный планировщик, который возьмёт поток из своего пула доступных потоков и выполнит в нём задачу.
В RxJava доступны несколько типов планировщиков. Самая сложная часть — выбрать верный планировщик для вашей задачи. Задача никогда не будет выполняться оптимально, если вы не выберете верный планировщик. Давайте разберем каждый планировщик.
Schedulers.io()
Этот планировщик основывается на неограниченном пуле потоков и используется для интенсивной работы с вводом-выводом без использования ЦП, например, доступ к файловой системе, выполнение сетевых вызовов, доступ к базе данных и так далее. Количество потоков в этом планировщике неограничено и может расти по мере необходимости.
Schedulers.computation()
Этот планировщик используется для выполнения работы, высоко нагружающей ЦП, такой как обработка больших объемов данных, изображений и так далее. Планировщик основывается на ограниченном пуле потоков с размером в количество доступных процессоров.
Так как этот планировщик подходит только для интенсивной работы с ЦП — количество его потоков ограничено. Сделано это для того, чтобы потоки не конкурировали за процессорное время и не простаивали.
Schedulers.newThread()
Этот планировщик создает совершенно новый поток при каждом вызове. В данном случае использование пула потоков не принесет никакой выгоды. Потоки очень затратно создавать и уничтожать. Вы должны быть осторожны и не злоупотреблять чрезмерным созданием потоков, так как это может привести в замедлению работы системы и переполнению памяти. Новый поток будет создаваться для обработки каждого элемента, полученного из observable-источника.
В идеале вы должны использовать этот планировщик довольно редко, в основном для выведения в отдельный поток долго работающих частей программы.
Schedulers.single()
Этот планировщик основывается на единственном потоке, который используется для последовательного выполнения задач. Он может быть очень полезен, когда у вас есть набор фоновых заданий в разных местах вашего приложения, но нельзя допустить одновременного выполнения более чем одного из этих заданий.
Schedulers.from(Executor executor)
Допустим, вы хотите ограничить число параллельных сетевых вызовов, которые делает ваше приложение. Можно создать собственный планировщик, который будет работать на базе ограниченного в размерах пула потоков ( Scheduler.from(Executors.newFixedThreadPool(n)) ) и использовать его во всех местах, связанных с сетевыми вызовами.
AndroidSchedulers.mainThread()
Понимание subscribeOn() и observeOn()
Теперь, когда у вас есть представление о типах планировщиков, разберем subscribeOn() и observeOn() в деталях.
Вы должны глубоко разбираться в том, как эти два оператора работают по отдельности и вместе, чтобы профессионально работать с многопоточностью в RxJava.
subscribeOn()
Простыми словами, этот оператор говорит в какой поток наблюдаемый источник (source observable) будет передавать элементы. Вы должны уяснить важность слова «источник». Когда у вас цепь наблюдаемых элементов (observables), источник (source observable) — это всегда корневой элемент или верхняя часть цепи, откуда происходит создание событий.
Также важно понять, что нельзя использовать subscribeOn() несколько раз в одной цепочке вызовов. Можно, конечно, написать ещё раз, но никаких изменений это не повлечет. В примере ниже мы последовательно вызываем три различных планировщика, можете ли вы догадаться, какой планировщик сработает при запуске?
Под капотом
Теперь для вас должна быть понятна концепция работы подписок в RxJava. К настоящему времени у вас должно появиться понимание того, как формируются цепочки наблюдаемых (observable) объектов и как события распространяются, начиная с observable-источника.
observeOn()
Как мы уже видели, subscribeOn() указывает observable-источнику передавать элементы в определенный поток и этот поток будет отвечать за продвижение элементов вплоть до подписчика (Subscriber). Поэтому, по умолчанию, подписчик получает обработанные элементы в этом же потоке.
Но это может быть не то поведение, которого вы ожидаете. Предположим, вы хотите получить некие данные из сети и отобразить их в пользовательском интерфейсе.
Нужно выполнить две вещи:
В этом придуманном примере мы наблюдаем получение целых чисел из сети и их дальнейшую передачу из observable-источника. В реальных примерах это может быть любая другая асинхронная операция, например, чтение большого файла, выборка данных из базы данных и т.д. Вы можете запустить данный пример и посмотреть на результаты, просто следите за логами в консоли.
Теперь рассмотрим более сложный пример, в котором observeOn() будет вызываться несколько раз для переключения потоков в процессе обработки данных.
Но что произойдет, если использовать observeOn() несколько раз последовательно? В примере ниже в каком потоке подписчик получит результат?
Под капотом
Резюме
Сейчас у вас должно быть достаточно хорошее представление о том, как правильно использовать RxJava для написания многопоточных приложений, обеспечивающих быструю и плавную работу пользовательского интерфейса.
Если понимание не пришло сразу, ничего страшного. Прочитайте статью ещё раз, поэкспериментируйте с примерами кода. Здесь достаточно много нюансов для понимания, не торопитесь.
Nurlandroid
Nurlan’s personal website
Show menu Hide menu
My name is Nurlan Nabiyev. I am passionate Android developer. This is my personal website with my portfolio of apps, posts and etc.
Contacts
Phone: +7 705 575 4828
Вопросы по RxJava на собеседованиях
Ответы на самые частые вопросы по RxJava
1.Какому «паттерну поведения» следует RxJava? Push или Pull?
В RxJava новые данные «заталкиваются»(Push) к наблюдателям
2.Какова разница между колбэками onNext(), onComplete() and onError()?
Эти колбэки которые получают Observable / Flowable. onNext() отрабатывает для каждой эмиссии данных. onComplete() и onError() взаимоисключаемы. Первый отработает когда эмиссия данных завершилась, а второй если произошла ошибка.
3.Сколько раз колбэки onNext(), onComplete() and onError() могут быть вызваны?
onNext() – от 0 до бесконечного кол-ва раз
onComplete() – максимум один раз за поток
onError() – максимум один раз за поток
4.Когда Observable начинает эмиссию частей данных?
Есть 2 вида Observable — «холодные» и «горячие». Холодные начинают эмиссию данных только тогда когда на них кто-нибудь подпишется. Горячие же эмитят данные вне зависимости от того есть ли подписант или нет.
5. Какая разница между «холодными» и «горячими» Observables
В том что они эмитят данные по разному. Холодные создаются множество раз и каждого инстанса могут подключены разные слушатели(подписанты), с разной логикой. Горячие же похоже на «поток» событий — разные слушатели могут подписываться на горячий observable, но такой observable создается один раз.
6. Можно ли трансформировать «холодный» Observable в «горячий»?
Другой метод трансформации это обернуть Observable Subject-ом. В данном случае Subject подписывается на «холодный» Observable незамедлительно и раскрывает себя как Observable для будущих подписчиков. Опять же, работа выполняется независимо от того, есть ли какие-либо подписчики или нет, с другой стороны, несколько подписчиков Subject-а не будут инициировать начальную эмиссию данных несколько раз.
7. Можно ли трансформировать «горячий» observable в «холодный»?
Можно несколькими путями. Первый подход — использование оператора defer(). Этот оператор откладывает создание «горячего» Observable, поэтому каждый новый подписчик снова запускает работу.
8. Что такое Планировщик (Scheduler)? Как RxJava использует их?
По умолчанию RxJava использует один поток — все операции выполняются на одном потоке. Планировщик помогает переключить выполнение определенного блока кода в иной поток.
9. Что такое цепочка-Observable?
Список операций / преобразований, выполненных между источником и конечным подписчиком. Простой пример — создание объекта User, фильтрация пользователей-администраторов оператором filter(), проверка их подлинности оператором filter() и, наконец, полное имя
оператором map().
10. Какая разница между операторами observeOn() и subscribeOn()?
subscribeOn() — используется для того, чтобы указать планировщику на каком потоке будет проходить основная работа. Например, очень тяжелые вычисления, которые могут затормозить UI-поток, можно перенести в рабочий поток используя данный оператор. Пример: subscribeOn(Schedulers.newThread())
observeOn() — указывает планировщику поток, на котором будут выполняться все последующие операции. Другими словами, он меняет поток для всех операторов после него. Например, после тяжелых вычислений на рабочем потоке, нужно показать результат на UI-потоке. Здесь мы можем написать observeOn(AndroidSchedulers.mainThread())
11. Что будет если много раз выполнить оператор subscribeOn() в цепочке?
Только первый оператор даст желаемый эффект. Остальные же эффекта не дадут, кроме траты ресурсов.
12. Что будет если много раз выполнить оператор observeOn() в цепочке?
Каждый observeOn() включает планировщик (поток), в котором будут выполняться все последующие операторы. Сложные потоки RxJava могут выиграть от нескольких операторов observeOn().
13. Какая разница между операторами map() и flatMap()?
Оператор map() просто превращает(мэппит) ЗначениеА в ЗначениеБ. Например: объекты в списке типа Int превратить в объект типа String Обрабатываются уже готовые/полученные значения.
14. Какая разница между операторами flatMap(), concatMap() и switchMap()?
Оператор flatMap() — как мы говорили выше — может разделить цепочку выполнения на несколько промежуточных потоков, а результаты будут получены Observer-ом. Следует отметить, что порядок получения результатов будет в соответствии с тем какой результат был получен первым.
Оператор concatMap() работает также как flatMap(), но только сохраняет порядок выполнения потоков. Из-за этого выполнение этого оператора может занять больше времени.
Оператор switchMap() тоже чем-то похож на flatMap(), но с единственным исключением — при получении нового элемента из цепочки предыдущие потоки созданные из предыдущих элементов уничтожаются. Проще говоря, используя данный оператор активным будет только последний Observable из последнего полученного элемента. Стало быть результат выполнения получим только самого последнего Observable.