19 сентября 2012

Эластичный MapReduce. Распределенная реализация

Распределенное введение в эластичные проблемы Hadoop

Симбиоз облачных технологий и платформы Apache Hadoop уже не первый год рассматривается как источник интересных решений, связанных с анализом Big Data.
И основной момент, почему именно «симбиоз», а не «чистый» Hadoop – это, конечно, снижение уровня входа для разработчиков MPP-приложений (и не только) как с точки зрения квалификации (администратора), так и первоначальных финансовых вложений в аппаратную часть, на которой приложение будет исполняться.
Второй момент – это то, что облачные провайдеры смогут обойти некоторые ограничения Hadoop*, навязанные архитектурой master/slave (master всегда единичная точка отказа и с этим надо что-то делать) и, возможно (на Microsoft, в связи с параллельно развивавшимся проектом Dryad, была особая надежда), даже сильным сцеплением хранилища данных (HDFS) и компонентами выполнения распределенных вычислений (Hadoop MapReduce).
Надежды, относящиеся к первому пункту - снижение стоимости владения Hadoop-кластером - оправдались более чем: крупнейшая тройка облачных провайдеров, с разностью степенью близости к release-mode, начали предоставлять «Hadoop-кластер as a Service» (терминология моя и условная) за цены, вполне «подъемные» для стартапов и/или исследовательских групп.
Надежды же, связные с обходом ограничений платформы Hadoop, не сбылись вовсе.
Amazon Web Services, как и IaaS-платформа, никогда и не стремилась предоставлять услуги как сервис (хотя и тут есть исключение – Amazon S3, Amazon DynamoDB). И в далеком 2009 году компания Amazon предоставила разработчикам сервис Amazon Elastic MapReduce как инфраструктуру, а не как сервис.
Вслед за Amazon в середине 2010 года компания Google анонсировала экспериментальную версию программного интерфейса App Engine MapReduce, в рамках своей облачной платформы Google App Engine.
App Engine MapReduce API предоставил разработчикам «Hadoop MapReduce»-подобные интерфейсы к своим, уже работающим по парадигме map/reduce, службам. Но это никак не убрало ограничений сильной связанности хранилища данных и компонентов вычислений. Более того, сам Google добавил туда ограничений - возможности переопределения только map-фазы**, да и сама платформа GAE, со свойственными ей квотами, наложила (как я подозреваю) еще пару ограничений на App Engine MapReduce API.
В 2011 года очередь дошла до Microsoft. В октябре 2011 года Microsoft объявила об открытии сервиса Hadoop on Azure. На текущий момент времени он находится в CTP-версии. Попробовать у меня этот сервис из-за отсутствия приглашения (и наличия лени) не получилось. Но, по отсутствию статей о преодоленных ограничениях Hadoop, понятно, что «проблемы» платформы Hadoop и в этом случае оставили решать самой Hadoop.
Описанные выше ограничения решений на основе «облачных платформ + Hadoop» позволяют понять круг проблем, решаемых проектом Cloud MapReduce, речь о котором и пойдет далее.

1. Cloud MapReduce. Основные концепции

Cloud MapReduce (CMR) – это open source проект, реализующий программную парадигму map/reduce на основе (on top) облачных сервисов Amazon Web Services.
В основе CMR лежит концепция облачной операционной системы. Если проводить аналогию с традиционными ОС, то в облачных ОС:
  • вычислительные ресурсы представлены не CPU, а инстансами Amazon EC2 / Windows Azure Workers / Google Compute Engine;
  • хранилище данных представлено не жестким диском (SD-, флэш-накопители, etc.), а сервисами Amazon S3 / Windows Azure Blob / Google Cloud Storage;
  • хранилище состояний (которое не теряется после перезагрузки OC) представлено не реестром (или локальной структурой с подобной функцией), а службами Amazon SimpleDB / Windows Azure Table / Google BigQuery;
  • механизм межпроцессового взаимодействия реализован с помощью сервисов Amazon SQS / Windows Azure Queue / Google App Engine Task Queue API.
Заложив принципы облачной ОС в архитектуру Cloud MapReduce разработчики получили впечатлившиий меня результат. В своем блоге они приводят следующие факты из сравнения своей платформы с платформой Hadoop:
  • отсутствие единичной точки отказа;
  • отсутствие необходимости копировать данные из сервисов хранения (таких как Amazon S3) в HDFS перед запуском MapReduce-задания;
  • ускорение в некоторых случаях более, чем в 60 раз;
  • проект занимает всего 3000 строчек кода на Java, в то время как Hadoop «расположился» аж на 280K кода.
Кроме того, Cloud MapReduce, в отличие от Apache Hadoop, спроектирован не на основе master/slave-архитектуры. Кроме очевидных плюсов peer-подобных архитектур (отсутствии single point of failure), разработчики CMR приводят в плюсы их реализации MapReduce более простое, чем в Hadoop, конфигурирование, резервирование, восстановление после сбоев.
В достоинства CMR ставят также инкрементальную масштабируемость: при добавлении новых вычислительных инстансов в кластер они «на горячую» подключаются к выполнению map/reduce-задания. Также CMR не требует (рекомендует) иметь гомогенный кластер (т.е. из машин с одинаковой вычислительной мощностью). В кластере из гетерогенных машин наиболее быстрая машина выполнит большее число заданий, чем более «медленная» машина.
Добавлю, что инкрементальной масштабируемости действительно очень не хватало платформе Hadoop. А вот отсутствие требования (рекомендации) к гомогенности кластера вряд ли актуально для облачных сред.

2. Cloud MapReduce. Архитектура

Архитектура Cloud MapReduce делится на следующие логические слои:
  • слой хранения данных (Storage Layer);
  • слой обработки и вычисления (Computing Layer);
  • слой взаимодействия (Messaging).
Отношения этих слоев, информационные потоки и сервисы, которыми он представлены в AWS показаны на рисунке ниже.
Cloud MapReduce Design
Ниже разберем подробнее функцию каждого из представленных выше слоев.

2.1. Взаимодействие между узлами

Взаимодействие между узлами Map Workers и Reduce Workers построено на основе очередей. Очереди в Cloud MapReduce представлены сервисом Amazon SQS.
В CMR существуют следующие типы очередей:
  • Input / Map Queue – очередь map-заданий;
  • Multiple Reduce Queue – очереди промежуточных результатов выполнения map-функций;
  • Master Reduce Queue – очередь reduce-заданий;
  • Output Queue – очередь выходных данных.
У сообщений в очередях Amazon SQS / Azure Queue есть «invisibility timeout»-механизм. Логика механизма такая: сообщение берется из очереди, после чего сообщение на некоторое время становится невидимым в очереди. При успешной обработки сообщения, последнее из очереди удаляется, в противном случае, по истечению таймаута невидимости сообщение снова появляется в очереди.
Благодаря «invisibility timeout»-механизму, предоставляемому сервисами очередей, реализуется очень простая поддержка обработки отказов Map и Reduce Worker’ов и повышается общая отказоустойчивость кластера.

2.2. Хранение данных

Хранилище данных хранит входные данные приложения и представлено сервисом Amazon S3.
Amazon S3 также представляет более чистую абстракцию слоя хранения данных, благодаря тому, что доступ предоставляется к данным как к ресурсам (что свойственно REST-сервисам), а не как к файлам (что характерно для файловых систем). Следует отметить, что подход хранения данных в облачном хранилище имеет и обратную сторону – меньшую управляемость.
В Amazon S3 храниться анализируемые на этапе map данные. В Input Queue содержатся пару <k, v>, где k, в общем случае, идентификатор map-задания, а v - ссылка файл в S3 и опционально указатель на часть внутри файла.
Такой подход снимает неудобство/проблему (для кого как) с копированием данных из Amazon S3 в HDFS на первой стадии запуска MapReduce-задания в сервисе Amazon Elastic MapReduce.
Разработчик также упомянули, что выходные данные также возможно сохранить напрямую в Amazon S3:
We store our input and possibly output data in S3
Из документации точно следует, что все результаты этапа reduce сохраняются в Reduce Queue в виде пар <k',v'>.

2.3. Вычислительные узлы

На вычислительных узлах (Compute Nodes) выполняются определенные пользователем map- и reduce-задания. Compute Nodes представлены EC2-инстансами и делятся на 2 типа: Map Workers и Reduce Workers. На Map Workers происходит выполнение map-функций, на Reduce Workers – reduce-функций.
На один и тот же EC2-интстанс может последовательно выполнять роль и Map Worker, и Reduce Worker.
Cloud MapReduce Workflow
Потоки работ (workflow) map- и reduce-операций приведены ниже.
Mapper workflow:
  1. Получение из очереди Map Queue ссылок на данные для map-заданий;
  2. Извлечение данных из сервиса Amazon S3;
  3. Выполнение определенной пользователем map-функции;
  4. Добавление результата выполнения <k',v'> в некоторую очередь, определяемую на основе хэша k’ (если это не переопределено явно), из множества очередей Multiple Reduce Queues;
  5. Удаление map-задания из очереди Map Queue.
Reducer workflow:
  1. Получает из очереди Master Reduce Queue ссылку на Reduce Queue, к которой нужно применить функцию свертки;
  2. Извлекает <k',v'>-пары из соответствующей очереди множества очередей Multiple Reduce Queues;
  3. Выполняет определенную пользователем reduce-функцию и добавляет выходные пары <k'', v''> в очередь Output Queue;
  4. Удаляет reduce-задание из очереди Master Reduce Queue.

2.4. Клиент

Клиент (Job Client) – программный клиент, управляющий выполнением map/reduce-заданий.
Про клиента из документации CMR понятно меньше всего. Но, учитывая, что мы знаем о потоке работ Map и reduce Worker’ов и принципах построения подобных систем, позволю себе высказать пару околонаучных предположений о Job Client workflow.
Поток работ Job Client делится на следующие стадии:
  1. Сохранение входных данных в сервисе Amazon S3;
  2. Создание map-задание для каждого сплита данных и добавление созданного задания в очередь Map Queue;
  3. Создание множества очередей Multiple Reduce Queues;
  4. Создание очереди Master Reduce Queue и добавление созданную очередь reduce-задания для каждой очереди Partition Queue;
  5. Создание очереди Output Queue;
  6. Создание запроса Job Request и добавление созданного запроса в SimpleDB;
  7. Запуск EC2-инстансов для Map Workers и Reduce Workers;
  8. Опрос Map Workers и Reduce Workers для получения статуса выполнения заданий;
  9. По окончанию выполнения всех заданий, загрузка результатов из Output Queue.

2.5. Вспомогательные операции

Операции сохранения/обновления статуса выполнения map-/reduce-заданий реализованы на основе нереляционных баз данных. Нереляционные БД в AWS представлены сервисами Amazon SimpleDB (с 2007 года) и Amazon DynamoDB (с 2012 года). Т.к. архитектура CMR предполагает равнозначность всех нодов, входящих в вычислительный кластер, то центром координации узлов является сервис Amazon SimpleDB, предоставляющий распределенное нереляционное хранилище данных.

Заключение

У Cloud MapReduce есть недостатки, которые делают бизнес-риски от его использования существенными (маленькая команда, редкие обновления, отсутствие такой экосистемы как у того же Hadoop), а перспективы туманными. Но идеи, почерпнутые из архитектуры проекта Cloud MapReduce, позволяют еще более распределенно взглянуть на уже устоявшееся среди ИТ-специалистов Hadoop-ориентированное представление на Data Intensive Computing.

Первоисточник

[1] Huan Liu, Dan Orban // Cloud MapReduce: a MapReduce Implementation on top of a Cloud Operating System // Accenture Technology Labs, 2010.
* Я сейчас не беру во внимание alpha-версию Apache Hadoop 2.0, которая «лишена» (точнее к release-версии «собирается быть лишенной») описанных архитектурных ограничений.
** Вспоминается (или может приснилось?), что на конференции Google I/O 2011, кроме смягчения существующих лимитов платформы App Engine, Mike Aizatsky сказал, что инженеры Google работают над предоставлением возможности переопределения и других этапов алгоритма map/reduce в App Engine MapReduce API.

Автор статьи

,
Machine Learning Preacher, Microsoft AI MVP && Coffee Addicted