18 мая 2014

.NET-реализация map/reduce

.NET-реализация модели map/reduce (на одном вычислительном узле)

Hadoop + .NET Framework

При торговле на рынках ценных бумаг / валютных рынках одной из нередких задач является расчет величины (ширины) спреда. Спред (от англ. spread «размах») — разность между лучшими ценами заявок на продажу (ask) и на покупку (bid) в один и тот же момент времени на какой-либо актив (акцию, товар, валюту, фьючерс, опцион) [wikipedia].

Спред важен, так как отражает ликвидность актива. Так чем меньше спред, тем ликвиднее актив, и наоборот.

Проблема в том, что на финансовых биржах количество выставленных игроками заявок на покупку/продажу активов – это, в общем случае, цифра с 6-ю нулями. Количество финансовых инструментов, торгуемых, даже одной бирже также исчисляется тысячами.

Поэтому расчет спреда финансового инструмента – не самая тривиальная задача. Кроме того, эта задача является параллельной по данным и к ней вполне применены те практики, которые используются для решения задач, связанных с Большими Данными.

Ниже мы рассмотрим пример расчета спреда наиболее ликвидных акций биржи ММВБ-РТС, а также in-memory-реализацию программной модели map/reduce на C#, как эффективный подход к расчету спреда акций.

Почему map/reduce?

Модель map/reduce – крайне простая и крайне изящная абстракция для MPP-задач, параллельных по данным.

В общем случае, для map/reduce выделяют 2 фазы:

  • map(ƒ, c): принимает функцию ƒ и список c. Возвращает выходной список, являющийся результатом применения функции ƒ к каждому элементу входного списка c.
    map(f, c)
  • reduce(ƒ, c): принимает функцию ƒ и список c. Возвращает объект, образованный через свертку коллекции c через функцию ƒ.
    reduce(f, c)

Я уже неоднократно упоминал об это модели в своем блоге. Наиболее полное описание программной модели map/reduce можно найти в статье «Hadoop MapReduce. Основные концепции и архитектура».

Почему не Hadoop?

Основные причины: инфраструктура и время ожидание результата работы программы. Обо все по порядку.

В любой компании есть экономические реалии, которые нельзя не учитывать в разработке ПО. У меня сейчас реалии такие - нет ни денег ни желания «содержать» Hadoop-кластер. Зато есть довольно мощные вычислительные узлы (>= 16 CPU), арендуемые по модели IaaS. За счет чего нет расходов на аппаратное обеспечение, на лицензии ОС, на администрирование, за электричество. Как и свойственно таким сценариям – ресурсы используются «по требованию», т.е. только в течении торговой сессии (на ~16 часов в сутки сервера «тухнут»).

Второе требование – время ожидание результата. Работа «калькулятора спредов» должна выполняться в режиме «итеративный + интерактивный» (т.е. есть циклы работы, результаты циклов хранятся в памяти, цикл длятся как можно более короткий период времени (миллисекунды)).

Не возьмусь утверждать, что с Hadoop в облаках проблема с инфраструктурой бы стояла сильно острее, или в Hadoop 2.0 интерактивный режим реализован менее эффективно. И все же выбор был сделан в сторону:

  • compute-intensive узла в облаках Microsoft Azure;
  • .NET Framework 4.5.1 с использованием TPL (Task Parallel Library);
  • программной модели map/reduce;
  • работе с биржевыми котировками в RAM вместо обмена через временные файлы или по TCP- каналам.

Реализация

Условно поделена на 3 библиотеки:

  • ядро MapReduce.Core.dll;
  • вспомогательные библиотеки, такие как MapReduce.Storage.dll;
  • и клиентские библиотеки, непосредственно использующие обобщенную реализацию MapReduce, представленную в ядре.

Ниже будет приведен только код ядра и основного файла клиентской библиотеки. Полный код решения находится в открытом доступе и доступен в GitHub (ссылка в конце статьи).

Листинг 1. MapReduce.Core. Обобщенная реализация программной модели map/reduce

    
    /// <summary>
/// Интерфейс пары ключ-значение
/// </summary>
/// <typeparam name="TKey">Тип ключа</typeparam>
/// <typeparam name="TValue">Tип значения</typeparam>
public interface IKeyValuePair<TKey, TValue>
{
/// <summary>
/// Ключ
/// </summary>
TKey Key { get; set; }

/// <summary>
/// Значение
/// </summary>
TValue Value { get; set; }
}

/// <summary>
/// Обобщенная реализация MapReduce
/// </summary>
/// <typeparam name="TSource">Тип элементов исходной коллекции</typeparam>
/// <typeparam name="TKey">Типа ключа, используемого в функциях map() и reduce()</typeparam>
/// <typeparam name="TValue">Тип значения, ассоциированного с ключом и используемого в функциях map() и reduce()</typeparam>
/// <typeparam name="TResult">Тип элементов результирующей коллекции</typeparam>
public abstract class GenericMapReduce<TSource, TKey, TValue, TResult>
{
/// <summary>
/// Возвращает промежутной список пар, являющийся результатом обработки переданной коллекции
/// </summary>
/// <param name="values">Элементы исходной коллекции</param>
/// <returns>Промежуточный список</returns>
public abstract IEnumerable<MapReduce.Core.KeyValuePair<TKey, TValue>> Map(TSource values);

/// <summary>
/// Возращает коллекцию, являющуюся результатом свёртки переданного списка
/// </summary>
/// <param name="value">Список промежуточных значений</param>
/// <returns>Результирующая коллекция</returns>
public abstract TResult Reduce(MapReduce.Core.KeyValuePair<TKey, IEnumerable<TValue>> value);

/// <summary>
/// Запуск выполнения расчетов MapReduce
/// </summary>
/// <param name="source">Элементы исходной коллекции</param>
/// <param name="map">Функция Map</param>
/// <param name="reduce">Функция Reduce</param>
public IEnumerable<TResult> MapReduce(IEnumerable<TSource> source,
Func<TSource, IEnumerable<MapReduce.Core.KeyValuePair<TKey, TValue>>> map,
Func<KeyValuePair<TKey, IEnumerable<TValue>>, TResult> reduce)
{
Contract.Requires<ArgumentNullException>(source != null);
Contract.Requires<ArgumentNullException>(map != null);
Contract.Requires<ArgumentNullException>(reduce != null);
// map phase
var mapResults = new ConcurrentBag<KeyValuePair<TKey, TValue>>();
Parallel.ForEach(
source,
item =>
{
foreach (var result in map(item))
mapResults.Add(result);
});
// reduce phase
var reduceSources = mapResults
.GroupBy(
item =>
item.Key,
(key, values) => new KeyValuePair<TKey, IEnumerable<TValue>>(key, values.Select(value => value.Value)));
var reduceResult = new ConcurrentBag<TResult>();
Parallel.ForEach(
reduceSources,
item => reduceResult.Add(reduce(item)));
// return result
return reduceResult;
}
}

Листинг 2. Клиентская библиотека (логика расчета спреда)

 
    /// <summary>
/// Задача расчета спреда актива
/// </summary>
public class CalculateQuotesSpreadJob :
MapReduce.Core.GenericMapReduce<MapReduce.Core.KeyValuePair<String, String>, String, Tuple<decimal, decimal>, MapReduce.Core.KeyValuePair<String, Decimal>>
{
public CalculateQuotesSpreadJob(IMappingStrategy<Quote> mappingStrategy) : base()
{
Contract.Requires<ArgumentNullException>(mappingStrategy != null);
_mappingStrategy = mappingStrategy;
}
private readonly IMappingStrategy<Quote> _mappingStrategy;
public override IEnumerable<MapReduce.Core.KeyValuePair<String, Tuple<decimal, decimal>>> Map(MapReduce.Core.KeyValuePair<String, String> values)
{
// получаем список строк в файле данных
IEnumerable<string> rows = values.Value.Split(SeparatorType.RowSeparator.ToChar());
// получаем список котировок
IEnumerable<Quote> quotes = rows.Select(r => _mappingStrategy.Map(r));
return quotes.Select(q => new Core.KeyValuePair<string, Tuple<decimal, decimal>>(q.Ticker, Tuple.Create(q.High, q.Low)));
}
public override MapReduce.Core.KeyValuePair<String, Decimal> Reduce(MapReduce.Core.KeyValuePair<String, IEnumerable<Tuple<decimal, decimal>>> value)
{
return new MapReduce.Core.KeyValuePair<string, decimal>(
value.Key,
value.Value
.Select(v => v.Item1 - v.Item2) /* считаем спред */
.Max()); /* ищем максимальное значение спреда */
}
}

Листинг 3. Запуск map/reduce-задачи

    class Program
{
/// <summary>
/// Main function
/// </summary>
static void Main(string[] args)
{
IEnumerable<MapReduce.Core.KeyValuePair<String, String>> source = null;
ReadInput(out source);
var mapReduce = new CalculateQuotesSpreadJob(new QuoteMappingStrategy());
var results = mapReduce.MapReduce(source, mapReduce.Map, mapReduce.Reduce).ToList();
WriteOutput(results);
Console.ReadKey();
}

/// <summary>
/// Чтение входных параметров
/// </summary>
/// <typeparam name="TSource">Тип элементов исходной коллекции</typeparam>
private static void ReadInput<TSource>(out IEnumerable<TSource> input)
where TSource : MapReduce.Core.IKeyValuePair<String, String>, new()
{
input = new QuotesRepository(new JobConfiguration() { StorageConfiguration = new StorageConfiguration() { InputFolder = @"\App_Data\1m", OutputFolder = "" }})
.Get()
.Select(d => new TSource { Key = d.Id, Value = d.Content });
}

/// <summary>
/// Запись выходных данных в выходной поток
/// </summary>
/// <typeparam name="TResult">Тип элементов результирующей коллекции</typeparam>
private static void WriteOutput<TResult>(IEnumerable<TResult> results)
where TResult : MapReduce.Core.IKeyValuePair<String, Decimal>
{
foreach (var result in results.OrderByDescending(r => r.Value))
Console.WriteLine("{0}: {1}", result.Key, result.Value);
}
}

Результат:

ROSN: 1,1200000
GAZP: 0,8500000
SBER: 0,7300000
VTBR: 0,0003000
(прочие результаты)
<unknown>: 0

Elapsed time (ms): 48

Полный код решения на github.

Автор статьи

,
Machine Learning Preacher, Microsoft AI MVP && Coffee Addicted