Важность понимания парадигмы. RX для работы с API
Каждый день в нашей работе мы сталкиваемся с различными парадигмами. Не смотря на то, что большинство парадигм стары как мир (ООП, ФП и т.д.), часто всплывает что-то новое для нас. Возможно, что раньше мы не обращали внимание на них, или просто отсутствовала необходимость. Но теперь, когда она появилась, важно открыть свой разум, и освободить его от оков старых устоев.
Когда мы изучаем что-то новое, бывает сложно перестроиться. Что такое парадигма? Это философия, образ мышления. Если его не понять, не придерживаться ему, то использование инструментов парадигмы становится бессмысленным.
С этой проблемой столкнулись С++ программисты, когда популярность Си стала угасать, и многие Сишные программисты ринулись покорять новый, более популярный, более сложный язык. Проблема была в том, что С++ использовали как Си с классами, не пытаясь постичь ООП, соответствующие паттерны, ну вы поняли мысль.
Хотя люди и получили новый инструмент, они просто не хотели менять свой образ мышления. Ведь для этого нужно многое переосмыслить, поменять образ мышления. Не все этого хотят, не все на это способны.
Это я все к чему. Парадигма реактивного программирования становится все более популярной. Если вы еще не знакомы с ней, я очень советую познакомиться.
Я вижу, как библиотеку RX (для Unity это UniRx) пользуют в разных проектах. В частности, один из вариантов использования – это обертка над API. Оно и понятно, благодаря RX можно удобно комбинировать запросы, реагировать на ошибки и тому подобное.
Давайте посмотрим на пример. Я буду писать на .Net Core и Rx.Net, но его можно легко транслировать на Unity и UniRx. Если вы знакомы с RX, то можете пропустить следующую секцию, и перейти к следующей части
Пример работы с API через RX.Net
Итак, предположим, у нас есть некий интерфейс коммуникации с сервером.
Здесь есть две простейшие операции: получить токен и получить сейв. Как правило, при старте игры мы получаем токен (если он протух или его не было), затем получаем сейв. В принципе это может быть объединено и в одну операцию. Но для демонстрации идеи я их разделил.
Методы возвращают IObservable, что означает, что операция асинхронна, и может занять какое-то время.
Сделаем простейшую заглушку для интерфейса.
Здесь оба метода возвращают фиксированное значение с небольшой задержкой.
Обратите внимание, что здесь использован SingleAsync
, который сразу закрывает стрим, после первого же события. Казалось бы, вполне логичное решение, ведь один запрос - один ответ.
UserState
пример класса сейва, просто для демонстрации.
Ну и теперь попробуем это завести.
Программа покажет следующий вывод:
Started the program
Got token: stub_user_token
Finished getting the token
Пока что все ок. Теперь попробуем сэмулировать ошибку сервиса. Привожу только измененные части.
При первом запросе к GetToken возвращаю ошибку сервера. При втором, меняю имплементацию и возвращаю токен.
Запускаем программу, и видим, что она висит. Третье сообщение не выводится никогда.
Started the program
Got exception while getting the token: System.Exception: Failed to get token
Одна из частых ошибок – люди не предусматривают обработку завершения стрима с ошибкой. В данном случае ошибка обработана (выведена в консоль), но выход из программы не осуществлен, так как onComplete не вызвался. В данном случае в exit = false
достаточно перенести в Finally.
Теперь, обычная практика, добавить retry и таймаут, мало ли, может плохое соединение, и можно повторить запрос.
Чтобы поддержать задержку ошибки, нужно немного изменить метод ReturnError
:
Обратите внимание! Мы не можем сделать просто
Observable.Throw<string>(new Exception("error")).Delay(TimeSpan.FromSeconds(10)))
. В таком случае Delay не будет работать, так как ошибка прерывает стрим моментально. Поэтому здесь я комбинирую стрим через SelectMany. Так же мы не можем воспользоваться Observable.Empty, так как он тоже сразу закроет стрим.
Оператор Retry организован таким образом, что при возникновении ошибки, он переподписывается на стрим. В текущей реализации логика в методе GetLogin()
не будет вызвана при переподписке. Поэтому необходимо его обновить.
Здесь используется фабрика Observable.Create, которая будет вызывать функтор при каждой подписке. Таким образом мы можем быть уверены, что возвращаем разные результаты при Retry.
Запускаем программу:
Started the program
GetToken called
GetToken called
GetToken called
Got exception while getting the token: System.TimeoutException: The operation has timed out.
Finished getting the token
GetToken вызвался трижды, при этом весь стрим завершился с ошибкой таймаута. Если мы изменим задержку на приемлемые значения, то увидим следующий вывод:
Started the program
GetToken called
GetToken called
Got token: user_stub_token
Прилеплять логику по ретраю и таймауту извне не красиво, поэтому мы можем перенести это в репозиторий.
Ну и в конце, после авторизации, нам нужно получить UserState.
Таким образом, Retry и Timeout будут инкапсулированы в репозитории, и пользователю не нужно о них думать.
Финальный стрим будет выглядеть следующим образом:
Соблюдаем философию RX
Итак, давайте подведем краткие итоги. На данном этапе ясно, что RX позволяет легко и прозрачно внедрять логику вроде ретрая или таймаутов. В целом, логика работы с асинхрорнными операциями выглядит более стройно и понятно.
Пытливый ум читателя может заметить, что то же самое можно было бы сделать с помощью async/await, но не во всех версиях Unity/С# оно доступно, да и обработка ошибок, на мой взгляд, при таком подходе, не так прозрачна. В любом случае – решать вам.
Давайте вернемся к началу статьи. Я сказал, что важно понимать парадигму, чтобы максимально извлекать из нее выгоду. Что в приведенном мною примере не так?
Реактивное программирование потому и называется реактивным, что весь код должен реагировать на события. Мы должны создавать все стримы событий заранее. Тогда мы будем уверены, что, когда прилетит событие, все обновится как надо.
В описанном же примере стрим “пассивный”. То есть посылка запроса происходит во время подписки на событие. Мы одновременно запрашиваем данные, и их читаем. Более того, такая подписка действую всего лишь один раз, из-за SingleAsync
.
Это очень ограничивает варианты использования кода. Например, мы можем либо привязать код обновления к загрузке какого-либо вью, либо к кнопке “refresh”. Если нам нужны обновления в реальном времени, то мы делаем периодический refresh, что совсем не красиво.
Не смотря на то, что это естественно для REST API, это убивает всю гибкость и выгоду от RX.
В идеологии RX, запрос данных и реакция на событие об обновлении данных — две разные задачи, которые должны обрабатываться отдельно. Если вы знакомы с паттерном MVVM, то можете заметить, что в нем изменение модели и обновление вьюхи разделено. Обновление вью производится с помощью байндингов, которые реагируют на события. Изменение модели производится с помощью команд.
RX по сути требует такого же подхода. Как же это воплотить при работе с API?
Reactive API
Представим, что запрос данных — это команда в терминах MVVM. А ответы от API – это событие, которое может прикатить в любое время. Если мы их разделим, то все вьюхи при старте сразу могут подписаться на событие обновления, а мы можем быть уверены, что данные будут всегда акутальны.
Запросить же обновление данных мы можем из любого места программы. При этом, в отличие от предыдущего подхода, нам не нужно будет менять код обновления view. Да и вообще мы можем быть совершенно отвязаны от view.
Итак, интерфейс меняется следующим образом:
По интерфейсу сразу понятно как данные запросить, и как подписаться на обновления.
Теперь к реализации.
Что я меняю первым делом – создаю отдельные Observable:
BehaviorSubject — классная штука. При подписке на него он сразу эммитит последнее известное ему onNext значение. Таким образом, если токен уже был получен, то подписчик будет обладать актуальным значением. Это такой своеобразный кэш.
Если в какой-то момент мы поймем, что токен надо обновить, то достаточно просто вызывать fetchToken() и все заинтересованные его получат.
Даже если нам необходимо периодическое обновление, то его легко сделать в одном месте по таймеру. Подписчиков может быть сколь угодно много.
Но это еще не все. Часто в приложениях нужен прямо таки настоящий реалтайм, когда сервер уведомляет клиент об изменениях. Например, с использованием сокетов. Если это ваш случай, то изменить код с REST API на сокеты элементарно. Все подписки остаются прежними. Меняется только транспорт: присоединяемся к сокету, и прокидываем событие в BehaviorSubject.
Важный момент: такой стрим не завершается никогда, при нормальных обстоятельствах. Если стрим закрылся, то это либо программа завершается, либо произошла ошибка. То есть, например, http ошибки прокидывать в этот стрим не нужно. Вся обработка ошибок должна уйти на другой слой логики.
Итак, давайте посмотрим как изменилась реализация с разделением кода.
Здесь я сделал просто заглушку, которая присылает события с задержкой. Но на деле, в этом месте должен посылаться запрос и обрабатываться ошибки. Если пользователю нужно знать об ошибке, то нужно вывесить отдельный Observable с человекопонятным типом ошибки.
Теперь посмотрим на сам стрим.
Интересные моменты:
- Теперь две отдельные подписки
- Подписки учитывают, что может прийти null, поэтому фильтруют ивенты
- Команды на запрос данных могут находиться в любом месте программы, как до подписки, так и после. Завязка на порядок вызова отсутствует.
Подводим итоги
Фух, получилась довольно большая статья. Но основная мысль такова: недостаточно использовать парадигму, нужно полностью понять и принять ее философию. Если мы это не делаем, то сильно себя ограничиваем, тем самым теряя всю пользу от подхода.
Сделаю удобную работу с API можно и нужно. Если утилизировать возможность реактивности по максимуму, разделяя запросы и события, то код будет понятен и предсказуем.
Тем не менее, я хочу предостеречь о сложностях отладки RX стримов. когда что-то не работает, приходится повозится, зарываясь в дебрях колстека. Это один из негативных моментов работы с RX, да и ФП в целом.
Как всегда, если у вас есть мысли по теме — буду рад услышать. Комменты, или ПМ приветствуются.
Исходный код по теме здесь.