Загрузка...

Ruby & Rails: веб-разработка с удовольствием

Ruby on Rails — фреймворк для создания веб-приложений. Является открытым программным обеспечением (лицензия MIT). Здесь мы обсуждаем новости RoR, делимся учебными материалами и интересными находками С RoR даже сложные веб-приложения могут быть написаны за считанные дни. Это действительно разработка с удовольствием!
     

11. Metaprogramming patterns. 15 кю. MapReduce

14.11.09, 23:19
Автор artem.voroztsov

Шаблон функционального программирования MapReduce с одной стороны идейно очень прост, а с другой - реализация распределенной эффективной системы MapReduce довольно сложная задача, с можеством интересных подзадач и расширений. На основе этого шаблона

  • придумывают эффективные распределённые хранилища данных
    • забавно, что идеи, которые пришли из функционального программирования (ФП), стали основой для архитектуры хранилищ данных; у новичков, изучающих ФП, может сложится мнение, что хранение данных неестественно для ФП, так как там нет состояния, но это мнение ошибочно; ФП является источником множества полезных идей для структур данных
  • масштабируют поточные преобразования данных
  • решают повседневные задачи обработки логов и вычисления разнообразных статистических величин

На язык Ruby "map and reduсe" переводится как "map and inject", но, конечно, за MapReduce стоит больше, чем просто комбинирование двух методов.

Мы начнём изучение MapReduce с несколько искусственной библиотеки, которая отображает суть и даже может использоваться для поточной обработки данных, но, конечно, отличается от того, что обычно имеют в виду под промышленной системой MapReduce.

Начнем с метода reduce.

Давайте думать про reduce как про метод-итератор для контейнера, в котором хранятся пары (key, value) (будем их называть записями), причём допускаются пары с одинаковым ключом.

Cуть reduce - это, собственно, группировка всех записей  с одинаковым ключом и применение метода inject ко множеству всех значений записей одной группы:

records.reduce{|a,b| a + b}
# эквивалентно
records.group_by(&:key).map{|key, key_records| 
  Record.new(key, key_records.map(&:value).inject{|a,b| a + b})
}
# и эквивалентно
records.inject( Hash.new{|h,k| h[k]=[]} ) {|groups,record|
  groups[record.key] < < record.value }.map{|key, key_values|
  Record.new(key, key_values.inject{|a,b| a + b}) 
}


(см. описание group_by)
Но такая реализация reduce нам не очень интересна. Она подразумевает закачку всех записей в память, а нам хочется осуществлять поточную обработку данных.
Давайте напишем метод reduce, который не будет заниматься группировкой записей по ключу, а будет предполагать, что записи выдаются контейнером группами, то есть записи с одим ключом идут подряд друг за другом.
Будем считать, что записями являются массивы вида [key, value1, value2, ...].
Значением будем считать все элементы массива кроме первого.
Алгоритм заключается в том, что мы выполняем reduce для группы последовательных записей с одним ключем. Как только приходит запись с ключом, отличным от ключа последней записи, выводим результат [key, reduced_value] в массив output (который может быть и не массивом вовсе, а некоторым объектом с определённым оператором < < , например выходным потоком или блоком - объеком класса Proc, для которого выполнена команда alias < < call) и устанавливаем reduced_value в nil. Нужно не забыть вывести последнюю группу:
Пишем:

class Array
  alias :key :first
  def value
    if self.size == 2
      self[1]
    else
      self[1..-1]
    end
  end
end
module Enumerable
  def reduce(output=nil,&reduce_block)
    output ||= []
    last_record = nil
    reduced_value = nil
    each do |record|
      if last_record && record.key != last_record.key
        output < < [last_record.key, reduced_value]
        reduced_value = nil
      end
      last_record = record
      reduced_value = reduce_block[reduced_value, record.value, record]
    end
    output < < [last_record.key, reduced_value]
  end
end
puts [['a', 1], ['b', 100], ['b', 10], ['a', 5]].
  sort.
  reduce{|a,b| (a||0)+b}.map{|r| r.join("\t")}


Если не вызвать функцию сортировки, reduce не сработает (в данном случае ошибка будет для для записей с ключом 'a', которые не идут подряд).

Давайте поместим этот код в файл reduce.rb. Теперь мы можем писать так

cat data.txt | sort | ruby -r reduce -e 'STDIN.map{|line| line.split("\t")}.reduce{...'

 

STDIN является контейнером, то есть имеет метод each, который по умолчанию итерирует строки, и включает в себя примесь Enumerable. Это значит, что у STDIN есть методы inject, map, sort, и др. стандартные методы контейнеров.
Количество данных после reduce обычно бывает существенно меньше данных до reduce.
Последовательно считывая строчку за строчкой, мы не загружаем все данные в память, а по ходу дела их редьюсим ... Стоп!! Эти рассуждения были бы верны, если бы не было вызова метода map. Он нам все портит и создает в памяти массив из всех записей (которые были получены из строчек делением на части по символу "\t").
Тут нам приходит на помощь ленивый контейнер. Включаем в reduce.rb строку

require 'lazy_enumerable' 


и можем писать так:

cat data.txt | sort | ruby -r reduce -e 'STDIN.to_lazy.map{|line| line.split("\t")}.reduce{...'


Удобно определить метод records и еще пару helper'ов:

def records
  STDIN.to_lazy.map{|line| line.to_record}
end
class Array
  def to_line
    self.join("\t")
  end
end
class String
  def to_record
    self.split("\t")
  end
end
module Enumerable
  def put_lines
    each{|e| puts e.to_line}
  end
end


После чего командная строка выглядит покороче:

cat data.txt | sort | ruby -r reduce -e 'records.reduce{...}.put_lines'

 

Частоты слов

Пусть у нас есть данные о частотах поисковых запросов, и нам нужно получить данные о частотах слов.
Решение задачи представляет собой поточную обработку данных вида  map -> sort -> reduce.
Вход состоит из строчек вида "ФРАЗА\tЧАСТОТА". Выход должен состоять из строчек вида
"СЛОВО\tЧАСТОТА".

 

cat data.txt | sort | ruby -r reduce -e \    
'records.each{|phrase,f| phrase.split.map{|word| puts [word,f].to_line} }' |  \  
ruby -r reduce -e 'records.reduce{|a,b| (a||0) + b.to_i}.put_lines'

 

 

Почему записи не сортируются средствами Ruby? Внешние утилиты

Собственно ответ Вы получите сами, если попробуйте отсортировать миллион строк двумя способами -- с помощью команды sort из GNU utils и с помощью метода Enumerable#sort.

Команда sort сортирует очень эффективно. Для сортировки больших файлов используются алгоритмы внешней сортировки. Особенно хорошо она заточена под сортировку строк (видимо используется алгоритм radix-sort). Также внешняя команда сортировки (вы можете написать её сами или выбрать из существующих) может использовать многоядерность и многопроцессорность вашей машины или использовать кластер машин. А в Ruby сортировка идет в одном потоке и не очень резва из-за интерпретируемости языка.

Это простая идея - научимся круто масштабировать задачу сортировки строк (распределять ее по машинам кластера), а остальные задачи будем сводить к ней.

Сама концепция MapReduce заключается в том, что идёт параллельная работа над тремя типами задач: map, sort, reduce, которые потенциально нужно уметь распределять по нескольким машинам.
Можно представить себе узлы разветвляющейся вычислительной сети, которые параллельно занимаются этими задачами и передают друг другу данные по pipe. Еще одна важнейшая идея конструирования масштабируемых систем MapReduce - это split.Расщепление может быть двух типов

  • split потока записей (как бы режем таблицы на кусочки по горизонтали) - потоки записей разветвляются и идут в разные узлы. Маршрутизация осуществляется, например, на основе вычисления остатков от деления хеш-функции от ключа.
  • split полей (режем по вертикали) - каждая запись вида (key, value) расщепляется на несколько (key, f1(value)), (key, f2(value)), ..., (key, fM(value)) которые идут в свой узел вычислительной сети.

Есть также два типа операции join, которые дуальны упомянутым двум зоперациям split:

  • (дуальная операция к split по записям) Слияние нескольких уже отсортированных файлов в один делается опять же с помощью команды sort (см. опцию -m).
  • (дуальная операция к split полей) Командная утилита join в двух отсортированных потоках записей находит записи  с одинаковыми ключами ((key,value2), (key, value2)), и выводит объединённые записи - записи вида (key, value1, value2). Записи, с ключами, которые есть только в одном из двух файлов будут отбброшены. Кроме перечечения по ключам командная утилита join может выводить разности по ключам - записи с ключами, которых есть в первом файле, но нет во втором, и наоборот (см. опцию -м)

Наконец, я просто должен упомянуть утилиту grep, которую тоже часто с успехом используют в потоковой обработке данных - это фильтр строк по регулярному выражению или просто слову.

Собственно, представленный здесь код можно рассматривать как еще один инструмент потоковой обработки данных дополняющий набор Unix утилит: sort, grep, join, head, tail.

  • Примечание: Перед командой reduce вместо задачи sort на самом деле  нам нужно решать лишь задачу group_by - нам не нужна упорядоченность данных по ключу в потоке, нам нужна лишь из группировка по ключу. Интереснаяя задача -  написать на Си программу group_by, которая в нескольких потоках группирует входные строчки по ключу, и выводит их группами на выход. Алгоритм можно реализовать на основе хеш таблицы.
    Вполне возможно, что для программы group_by удасться добиться большей производительности для многоядерных машин нежели для программы sort.

Код ленивого контейнера

Приведу обновлённый код ленивого контейнера. На этот раз он содержит метод flatten и метод pipe (|) который позволяет перенаправлять данные в командную строку и получать в итоге снова ленивый контейнер.

 

 cat records.txt | ruby -r reduce -e \ 
  'records.each{|p,f| p.split.each{|w| puts [w,f].to_line}}.|("sort"). \ 
   reduce{|a,b| (a||0) + b.to_i}}).put_lines'

 

Вот исходный код:

 

class LazyEnumerable
  # Creates LazyEnumerable from elements of container +enum+. 
  def initialize(enum)
    @enum = enum
  end
  
  delegate :each, :to => '@enum'
  
  include Enumerable
  
  def map!(&map_block)
    @enum = self.clone
    @map_block = map_block
    self.singleton_class.class_eval do
      def each(&output_block)
        @enum.each do |e| 
          output_block[@map_block[e]]
        end
      end
    end
    self
  end
  
  def select!(&select_block)
    @enum = self.clone
    @select_block = select_block
    self.singleton_class.class_eval do
      def each(&output_block)
        @enum.each do |e| 
          output_block[e] if @select_block[e]
        end
      end
    end
    self
  end
  
  def uniq!
    @enum = self.clone
    @done = {}
    self.singleton_class.class_eval do
      def each(&output_block)
        @enum.each do |e| 
          (output_block[e]; @done[e]=true) unless @done[e]
        end
      end
    end
    self
  end
  
  def flatten!
    @enum = self.clone
    self.singleton_class.class_eval do
      def each(&output_block)
        @enum.each do |e| 
          e.flatten.each{|e2| output_block[e2]} 
        end
      end
    end
    self
  end
  
  def sort!(&sort_block)
    @enum = self.clone
          output_block[e]
        end
      end
    end
    self
  end
  
  def pipe!(cmd, &map_block)
    @enum = self.clone
    @map_block = map_block 
    self.singleton_class.class_eval do
      def each(&output_block)
        IO.popen(cmd, 'r+') do |pipe|
          Thread.new {
            @enum.each{|e| pipe.puts(e)}
            pipe.close_write
          }
          if @map_block
            pipe.each do |line|
              output_block[@map_block[line]]
            end
          else
            pipe.each do |line|
              output_block[line]
            end
          end
        end
      end
    end
  end
  
  make_nobang :map, :select, :flatten, :sort, :pipe
end
Enumerable.module_eval do
  def to_lazy
    LazyEnumerable.new(self)
  end
end

 

Проект на Github

Завел проект на github: http://github.com/avorozhtsov/mr_shell

Желающие поучаствовать в разработке - пишите.

Кроме описанныех здесь вещей, в репозитории есть много других вкусностей.

Задачи для самостоятельного решения

1. Используя reduce, проанализируйте log apache

  • напишите таблицу с объемом трафика по разделам сайта (раздел сайта - первые латинские буквы в префиксе script_path)
  • напишите таблицу с объемом трафика по странам
  • напишите таблицу вида {число запросов GET => сколько IP адресов имеют столько запросов}

2. Представьте себе, что у нас есть журнал, строчки которого представляют собой записи вида

   time user_id referer_url url

Каждая строка представляет собой событие перехода со страницы referer_url на страницу url. Иногда referer_url = nil.
  • Выведите таблицу вида {url => число переходов }
  • Выведите 1000 топовых записей таблицы вида {url => referer_url => частота этого перехода}
  • Выведите 1000 топовых записей таблицы вида {чужой referer_url => частота заходов на наш сайт}
  • Выведите таблицу вида {referer_url нашего сайта => энтропия частотного распределения переходов на другие страницы }
  • Выведите 24 частоты посещаемости сайта по часам дня
  • Напишите скрипт, который для каждого из последних N недель пишет топовые поисковые запросы google, которые приводили посетителей на наш сайт
  • Напишите скрипт, который для каждого из последних N недель пишет изменение топовые поисковые запросы google, которые приводили посетителей на наш сайт, по сравнению с предыдущей неделей

3. Создайте библиотеку готовых редьюсеров, которые можно комбинировать и выполнять описанные выше задачи.

Ссылки

Предыдущие посты в теме "Metaprogramming patterns":

Комментарии

> придумываются эффективные распределенные хранилища данных (и это притом, что в функциональном программировании нет понятия состояния!!!)

1) MapReduce сам по себе никак не связан с концепцией хранилища и не налагает никаких ограничений на его организацию. Утверждение, что эффективные хранилища придумываются на основе MapReduce – абсолютно неверно; хоть какое-то право на существование имеет только обратное утверждение – распределенный MapReduce обычно эффективно реализуется (сюрприз!) поверх эффективного распределенного хранилища (например, GFS, HDFS, CouchDB…).
2) MapReduce и подавно не налагает на хранилище требования изменяемости. На входе данные, на выходе данные. Где состояние-то?
3) Само замечание (“и это притом…”) не совсем верно. В-нулевых, понятие “функциональное программирование” весьма размыто; я бы попросил пояснить, что именно имеется в виду, и не запутывать неискушенных читателей. Во-первых, ссылочная прозрачность действительно соблюдается только в весьма малом количестве функциональных языков. Из применяющихся на практике мне известны только Haskell и Clean. В них отдельные виды понятия состояния (которое само по себе также очень размыто) моделируются некоторыми монадами (например, State, IO, ST, Writer итп).

По поводу основного контента: В моей АФТУ-шной лекции про свертки http://docs.google.com/Doc?docid=dv682st_12drcf56c7 в конце написано и про MapReduce. Приведены значительно менее тривиальные примеры, которые уже нельзя реализовать тупо с помощью одной аргегатной функции SQL, а также ссылки на еще менее тривиальные статьи, и на значительно более полезные, чем сам MapReduce, инструменты (Sawzall, Pig).
Вот еще ссылки:
http://www.cs.stanford.edu/people/ang/papers/nips06-mapreducemulticore.pdf – офигенная статья, сверхкратко описывается дюжина применений MapReduce в машинном обучении.
http://www.cs.indiana.edu/~jekanaya/cglmr.html
http://www.cs.vu.nl/~ralf/MapReduce/paper.pdf
http://research.yahoo.com/files/Lin-cloud.pdf
http://en.wikipedia.org/wiki/MapReduce , в конце концов (там тоже куча ссылок)

1. Сюрприз – может быть связан. Эффективные хранилища придумываются на основе MapReduce
2. Не налагает. В том то и дело что MapReduce может стать основой для хранилища данных, в котором данные все время как бы “в пути от одной функции к другой”. об этом я напишу в слудуюший раз.
3. Термины Map и Reduce пришли из функционального программирования, собственно это названия функций, который в аргументе получают функции. То, что функции становятся объектами первого порядка и есть один из признаков функционального программирования. Чего шумим то? У нас здесь не шумят. Сам для себя я так определяю функциональное программирование – это программирование на Haskel и то, что похоже на программирование на Haskel.
4. Я пользуюсь кластером MapReduce на работе. Хотел в статье показать некоторый код, после которого можно уже вести разговор о распределенных MapReduce системах.

1,2 – насчет существования такого хранилища не знал. Можно хотя бы ссылку?
3 – я знаю, что такое map и reduce :) (кстати: Haskell) Но откуда взялось утверждение, что в ФП отсутствует понятие состояния – я так и не понял. Оно никак не следует из формулировки “то, что похоже на программирование на Haskell”; возможности моделировать состояние у хаскела не отнимешь, это понятие там присутствует в форме некоторых монад (это, конечно, не значит, что все монады моделируют состояние). Я не шумлю, я просто терминологический наци :)
4 – ок. Дело, собственно, в том, что туториалов по mapreduce на уровне wordcount существуют тысячи, и они все одинаковые – а вот найти что-то серьезное про него очень нелегко; я был несколько разочарован, увидев еще один базовый туториал. Надеюсь на продолжение.

О, вот это круто!

Ба, Валерий Воротынцев! Мир ФП в рунете таки тесен.

Войдите, чтобы оставить комментарий