22 декабря 2013

Google Photon. Обработка данных со скоростью света

Google Photon. Обработка данных со скоростью света*

Статья из цикла «Google Platform»

Photon – масштабируемая, отказоустойчивая и географически распределенная система обработки потоковых данных в режиме реального времени. Система является внутренним продуктом Google и используется в Google Advertising System. Research paper [5], описывающие базовые принципы и архитектуру Photon, был представлен на научной конференции ACM SIGMOD в 2013 году.

В research paper [5] заявлено, что пиковая нагрузка на систему может составлять миллионы событий в минуту со средней end-to-end задержкой менее 10 секунд.

Photon решает вполне конкретную задачу: необходимо соединить (выполнить операцию join) два непрерывных потока данных в режиме реального времени. Так в упоминаемой уже Google Advertising System один из этих потоков – поток поисковых запросов, другой – поток переходов по рекламным объявлениям.

Photon является географически распределенной системой и автоматически способен обрабатывать случаи деградации инфраструктуры, в т.ч. отказа дата-центра. В геораспределенных системах крайне непросто гарантировать время доставки сообщений (в первую очередь, из-за сетевых задержек), поэтому Photon допускает, что обрабатываемые потоковые данные могут быть не упорядочены по времени.

Используемые сервисы: GFS, PaxosDB, TrueTime.

Базовые принципы

В [5] объяснение принципов работы Photon идет в следующем контексте: пользователь ввел поисковый запрос (query) в момент времени t1 и перешел по рекламному объявлению (click) в момент времени t2. В этом же контексте, если не задано иного, в этой статье будут объяснены принципы работы Photon.

Принцип объединения потоков (join) взят из мира РСУБД: поток query имеет уникальный идентификатор query_id (условно Primary Key), поток click имеет уникальный идентификатор click_id и включает в себя некоторый query_id (условно Foreign Key). Объединение потоков происходит по query_id.

Следующий важный момент: ситуация, когда один click event посчитан дважды, либо, наоборот, не посчитан, будет вести, соответственно, либо к судебным искам со стороны рекламодателей, либо к упущенным выгодам со стороны Google. Отсюда, крайне важно обеспечить at-most-once семантику обработки событий.

Другое требование – обеспечить near-exact семантику, т.е. чтобы большая часть событий была посчитана в режиме близкому real-time. События, не посчитанные в real-time, все равно должны быть посчитаны - exactly-once семантика.

Кроме того, для экземпляров Photon, работающих в разных дата-центрах, необходимо синхронизированное состояние (точнее только critical state, так как между ДЦ весь state слишком «дорого» реплицировать). Таким синхронизируемым critical state выбрали event_id (по сути, click_id). Critical state храниться в структуре IdRegistry – in-memory key-value хранилище, построенное на основе PaxosDB.

Последний – PaxosDB – реализует алгоритм Paxos для поддержки отказоустойчивости и согласованности данных.

Взаимодействие с клиентами

Worker-узлы взаимодействуют с IdRegistry по клиент-серверной модели. Архитектурно взаимодействие Worker-узлов с IdRegistry – это сетевое взаимодействие с очередью асинхронных сообщений.

Так клиенты – Worker-узлы - отправляют к IdRegistry только 1) запрос на поиск event_id (если event_id найден, значит он уже был обработан) и 2) запрос на вставку event_id (для случая, если на шаге 1 event_id не был найден). На стороне сервера запросы принимают RPC-обработчики, целью которых поставить запрос в очередь. Из очереди запросы забирает специальный процесс Registry Thread (синглтон), который и выполнит запись в PaxosDB и инициализирует обратный вызов (callback) клиенту.

Источник иллюстрации [5, Figure 3]

Масштабируемость

Т.к. реплика IdRegistry происходит между географическим регионами, сетевые задержки между которыми могут достигать 100 мс [5], то это автоматически ограничивает пропускную способность IdRegistry до десяти последовательных транзакций (event_id commits) в секунду, в то время как требование к IdRegistry было равно 10K транзакций в секунду. Но и отказаться от геораспределенности и/или от синхронной репликации critical state с поддержкой решений конфликтов в кворуме также нельзя.

Тогда инженеры Google внедрили еще 2 практики, знакомые многим из мира СУБД:

  • пакетная отправка запросов (batching) – «полезная» информация по event_id занимает менее 100 байт; запросы отправляются пакетами на IdRegistry Client. Там они попадают в очередь, которую разбирает процесс Registry Thread, в обязанности которого входит решение конфликтов, связанные с тем, что в очереди может быть более одного элемента с одинаковым event_id.
  • sharding (+ динамический resharding) – все event_id делятся по диапазонам; транзакции по каждому из диапазонов отправляются на определенный IdRegistry.

Пакетная отправка запросов имеет и обратную сторону: кроме смешения семантики (Photon обрабатывает данные real-time, а некоторые его части работают в batching-режиме), batching-сценарий не подойдет для систем c небольшим количеством событий – время сбора полного пакета может занимать существенный интервал времени.

Компоненты

В рамках одного ДЦ выделают следующие компоненты:

  • EventStore – обеспечивает эффективный поиск по queries (поток поисковых запросов в поисковой системе);
  • Dispatcher – чтение потока кликов по рекламным объявлениям (clicks) и передача (feed) прочитанного Joiner;
  • Joiner – stateless RPC-сервер, принимающий запросы от Dispatcher, обрабатывающий их и соединяющий (join) потоки queries и clicks.

Алгоритм добавления записи представлен ниже:

Источник иллюстрации [5, Figure 5]

Взаимодействие между ДЦ:

Источник иллюстрации [5, Figure 6]

Алгоритм добавления записи в Joined Click Logs опустим, отметив, что в работы систем с частым сетевым взаимодействием применение retry-политик и асинхронных вызовов является крайне эффективным способом увеличения надежности и масштабируемости системы, соответственно, без усложнения общего алгоритма работы.

Этими же приемами – retry-политик и асинхронных вызов – и воспользовались создатели Photon.

Логика повтора запросов (щелкните, чтобы открыть)

Как уже ранее упоминалось, ситуация, когда click_id поступил на обработку, а ассоциированный с ним query_id нет – не исключение. Все из-за того, что не обязательно поток поисковых запросов обработается к тому моменту, кода начнет обрабатываться поток кликов по контекстной рекламе.

Для надежного обеспечения at-least-once семантики обработки всех click_id была введена логика, по которой для случая, описанного выше, применяется логика повторения. Для избегания троллинга (throttling) системы самой собой время между неудачными запросами увеличивается по экспоненте – exponential backoff algorithm. После некоторого количества неудачных запросов или по прошествии определенного времени click помечается как «unjoinable».

Dispatcher

Dispatcher – процесс, ответственный за чтение логов кликов - clicks. Эти логи растут во времени непрерывно.

Для того, чтобы эффективно их читать, Dispatcher периодически сканирует директорию с логами и идентифицирует новые файлы и/или измененные, сохраняет состояние каждого файла в локальной GFS ячейке. Это состояние содержит список файлов и сдвиг от начала файла для данных, которые уже были обработаны. Таким образом при изменении файла, последний вычитывается не с начала, а с того момента, на котором обработка закончилась в прошлое чтение.

Обработка новых данных осуществляется параллельно несколькими процессами, каждый из которых расшаривает свое состояние, что позволяет различным процессам бесконфликтно работать на разными частями одного и того же файла.

Joiner

Joiner – реализация stateless RPC-сервера, принимающего запросы от Dispatcher. Приняв запрос от Dispatcher, Joiner извлекает из него click_id и query_id. После чего по query_id пытается получить информацию из EventStore.

В случае успеха, EventStore возвращает поисковый запрос соответствующий обрабатываемому click.

Далее Joiner удаляет дубликаты (с помощью IdRegistry) и генерирует выходной лог, содержащий объединенные (joined) значения – Joined Click Logs.

Если Dispatcher для обработки отказов использовал retry-логику, то в Joiner инженеры Google добавили еще один прием. Прием работает в случаях, когда Joiner отправил запрос к IdRegistry; последний успешно зарегистрировал click_id, но из-за сетевых проблем, либо по таймауту Joiner так и не получил ответ об успехе от IdRegistry.

Для этого с каждым «commit click_id»-запросом, который Joiner отправляет на IdRegistry, ассоциируется специальный токен. Токен сохраняется в IdRegistry. В случае, если ответ от IdRegistry не был получен, Joiner повторяет запрос с тем же токеном, что и в прошлом запросе, и IdRegistry без труда «понимает», что пришедший запрос уже обрабатывался.

Генерация уникальных токенов / Event_Id (щелкните, чтобы открыть)

Еще одним интересным приемом, который следует отметить, является способом генерации уникальных event_id.

Ясно, что гарантированная уникальность для event_id крайне важное требование для работы Photon. В то же время, алгоритм генерации уникального в рамках нескольких ДЦ значения может занять крайне значительное время и количество CPU-ресурсов.

Инженеры Google нашли элегантное решение: event_id можно уникально идентифицировать используя IP узла (ServerIP), Id процесса (ProcessId) и временную метку (Timestamp) узла, на котором данное событие было сгенерировано.

Как и в случае со Spanner, для минимизации несогласованности временных меток на различных узлах, используется TrueTime API.

EventStore

EventStore – это сервис, принимающий на вход query_id и возвращающий соответствующий query (информацию о поисковом запросе).

В Photon для EventStore имеются 2 реализации:

  1. CacheEventStore – распределенное [sharding по hash(query_id)] in-memory хранилище, к котором хранится полная информация по query. Таком образом, для ответа на запрос не требуется чтение с диска.
  2. LogsEventStore - key-value хранилище, где key – query_id, а value – имя log-файла, в котором хранится информацию по соответствующему query, и смещение (byte offset) в этом файле.

Так как Photon работает в режиме близком к реальному времени, то можно с уверенностью гарантировать, что вероятность нахождения query в CacheEventStore (при условии, что в query в него попадают с минимальной задержкой) будет очень высокой, а сам CacheEventStore может хранить события за относительно небольшой промежуток времени.

В researching paper [5] приводится статистика, что только 10% запросов «проходят мимо» in-memory кэша и, соответственно, обрабатываются LogsEventStore.

Результаты

Конфигурация

На момент публикации [5], т.е. в 2013 году, реплики IdRegistry развернуты в 5-ти датацентрах в 3-ех географических регионах, причем сетевые задержки между регионами превышают 100 мс. Другие компоненты Photon – Dispatchers, Joiners, etc. – развернуты в 2-ух географических регионах на западном и восточном побережье США.

В каждом из ДЦ количество IdRegistry-шардов превышает сотню, а количество экземпляров процессов Dispatcher и Joiner превышает тысячи.

Производительность

Photon обрабатывает миллиарды joined событий в день, в том числе, в периоды пиковых нагрузок миллионы событий в минуту. Объем clicks-логов, обрабатываемых за 24 часа, превышает терабайт, а объем суточных query-логов исчисляется десятками терабайт.

90% всех событий обрабатываются (join'ятся в один stream) в первые 7 секунд, после их появления.

Источник иллюстрации [5, Figure 7]

В заключении

В заключении research paper [5], инженеры Google поделились хорошими практиками и своим планами на будущее.

Принципы не новы, но для полноты и законченности статьи, я их перечислю:

  • Используйте RPC-коммуникации вместо записи на диск. Запросы, выходящие за физические границы узла, должны выполняться асинхронно, а клиент всегда должен рассчитывать, что не получит ответ по таймауту или из-за сетевых проблем.
  • Минимизируйте критическое состояние (critical state) системы, т.к. его, в общем случае, приходится синхронно реплицировать. В идеале в критическое critical state системы должен включать в себя только метаданные системы.
  • Sharding – друг масштабируемости :) Но и эту идею инженеры Google улучшили, сделав динамический resharding.

В планах создателей Photon захватить мир уменьшить end-to-end задержки за счет того, что сервера, которые генерируют clicks и queries будут напрямую отправлять RPC-запросы к Joiner'ам (сейчас Dispatcher «ждет» этих событий). Также планируется Photon «научить» объединять несколько потоков данных (в текущей реализации Photon умеет объединять только 2 потока).

Пожелаем создателям Photon удачи в реализации их планов! И ждем новых research paper!

Список источников*

  • [5] Rajagopal Ananthanarayanan, Venkatesh Basker, Sumit Das, Ashish Gupta, Haifeng Jiang, Tianhao Qiu, et al. Photon: Fault-tolerant and Scalable Joining of Continuous Data Streams, 2013.

* Полный список источников, используемый для подготовки цикла.

Автор статьи

,
Machine Learning Preacher, Microsoft AI MVP && Coffee Addicted