от BAGHOU Marwane: https://github.com/mbaghou/spark-java

Прежде чем описывать наиболее полезные API-интерфейсы Spark, нам нужно поговорить об анонимных функциях и интерфейсах Java.

Начиная с Java 8 мы можем передавать функции в качестве аргументов методам. Следовательно, необходимо предоставить авторам методов способ объявить, что их метод требует, чтобы определенный параметр был функцией. Вместо того, чтобы вводить новый тип, Java решает эту проблему, определяя функциональные интерфейсы. Функциональный интерфейс — это интерфейс с одним абстрактным методом (начиная с Java 8 интерфейсы могут иметь методы с реализацией по умолчанию, просто для дальнейшего усложнения). Компилятор Java создает функции с правильными параметрами и возвращаемыми типами, реализующими функциональный интерфейс, необходимый для данного вызова метода.

Ниже приведен пример функционального интерфейса:

public interface DistanceFunction<T> {

  public double call(T a, T b);

}

Приведенный выше функциональный интерфейс можно использовать для объявления метода, который принимает в качестве параметра функцию расстояния, например:

public static <T> ArrayList<T> kCenter(ArrayList<T> points,
                                       int k,
                                       DistanceFunction<T> distance) {
  // ...
  // use distance.call(points.get(i), points.get(j))
  // to get the distance between the i-th and the j-th points.
}

Вышеописанный метод, в свою очередь, может быть использован следующим образом:

ArrayList<Double> pointsOnTheLine;
kCenter(pointsOnTheLine, 10, (x, y) -> Math.abs(a - b));

Компилятор позаботится о том, чтобы анонимная функция, которую мы передаем методу kCenter, могла реализовать функциональный интерфейс DistanceFunction. Обратите внимание, что мы не сделали ничего особенного, чтобы сделать DistanceFunction функциональным интерфейсом, кроме объявления в нем одного абстрактного метода.

Многие интерфейсы стандартной библиотеки Java соответствуют требованиям функциональных интерфейсов. Среди прочих примечательными примерами являются java.util.Comparator, …, а также недавно введенные java.function.Function и java.function.BiFunction`. Spark introduces some functional interfaces of its own: ``org.apache.spark.Function, org.apache.spark.FlatMapFunction, org.apache.spark.PairFunction, org.apache.spark.PairFlatMapFunction. Суть в том, что всякий раз, когда вы найдете метод, требующий передачи функционального интерфейса, вы можете передать анонимную функцию.

Есть некоторые предостережения при использовании функций и функциональных интерфейсов с Java API Spark. В зависимости от того, где вы запускаете свой код (например, в кластере), Spark может потребоваться сериализовать ваши функции, чтобы отправить их исполнителям. Сделать объект Java сериализуемым так же просто, как наследовать его от java.util.Serializable, что даже не требует реализации какого-либо метода. Обычно анонимные функции потенциально сериализуемы. Анонимная функция не является сериализуемой, если она захватывает в своем теле несериализуемый объект. Однако, даже если функция потенциально сериализуема (поскольку все ее захваты сериализуемы), во время компиляции она «одевается» функциональным интерфейсом, требуемым методом, которому мы ее передаем. Это означает, что после компиляции наша функция будет, по сути, java.util.Comparator или org.apache.spark.FlatMapFunction, в зависимости от контекста. Проблема в том, что некоторые функциональные интерфейсы, такие как java.util.Comparator, не наследуются от Serializable, поэтому наша функция не рассматривается как сериализуемая. Следствием этого является то, что во время выполнения наша программа может аварийно завершать работу с ошибкой TaskNotSerializableException. Эта проблема связана с тем, что компилятор Java недостаточно умен в этом контексте. Обходной путь — явно реализовать класс, реализующий как Serializable, так и Comparator, как показано в примере для методов min и max.

scala.Tuple2‹K, V›::

В MapReduce мы работаем с парами ключ-значение. Spark использует этот тип для представления таких пар. Вы можете получить доступ к ключу и значению с помощью методов _1() и _2() соответственно. Вот некоторые примеры:

// Create a new pair
Tuple2<String, Long> pair = new Tuple2<>("ciao", 1);

// Access the elements
pair._1() // = "ciao"
pair._2() // = 1

org.apache.spark.api.java.JavaRDD<T>::

Этот класс является локальным дескриптором для устойчивого распределенного набора данных, содержащего элементы типа T. Он предоставляет функции для преобразования данных и локального сбора информации.

Преобразования

Следующие функции называются преобразованиями, поскольку они преобразуют одно СДР в другое.

map(Функция‹T, R› функция)

Возвращает JavaRDD<R>, применяя данную функцию к каждому элементу this независимо. Этот метод принимает в качестве параметра функцию, принимающую один параметр типа T и возвращающую другой объект типа R. В следующем примере показано, как преобразовать СДР целых чисел в СДР двойных значений путем деления каждого элемента исходной коллекции пополам:

JavaRDD<Integer> numbers;
JavaRDD<Double> halves = numbers.map((x) -> x / 2.0);

flatMap(FlatMapFunction‹T, R› функция)

Подобно map, но функция, переданная в качестве параметра, может возвращать несколько значений. Это полезно для «взрывания» набора данных. Например, представьте, что у вас есть СДР предложений, и вы хотите преобразовать его в СДР отдельных слов:

JavaRDD<String> sentences;
JavaRDD<String> words = sentences.flatMap((s) -> {
  return s.split(" ").iterator();
});

Вы также можете заставить функцию возвращать один элемент или даже не возвращать ни одного элемента, используя возможности java.util.Collections:

JavaRDD<Integer> numbers;
JavaRDD<Integer> evenNumbers = numbers.flatMap((x) -> {
  if (x % 2 == 0) {
    // if the number is even, wrap it in a singleton
    // set and return an iterator of this set.
    return Collections.singleton(x).iterator();
  } else {
    // otherwise just return an empty iterator.
    return Collections.emptyIterator();
  }
});

filter(Функция‹T, логический› предикат)

Возвращает JavaRDD<T>, содержащий только те элементы this, для которых данный предикат возвращает значение true. Аргументом этого метода является функция, которая принимает один параметр типа T и возвращает логическое значение. Ниже показано, как получить СДР четных чисел из СДР целых чисел:

JavaRDD<Integer> numbers;
JavaRDD<Double> evenNumbers = numbers.filter((x) -> x % 2 == 0);

sample(логическое значение с заменой, двойная дробь)

Эта функция полезна для получения неравномерно распределенной выборки данных. Возвращает RDD типа T.

  • withReplacement: следует ли повторно рассматривать уже отобранные элементы.
  • fraction: доля элементов для выборки.

Вот как выбрать 1000 элементов из rdd без замены:

long size = rdd.count();
double fraction = 1000.0 / size;
JavaRdd<T> sampled = rdd.sample(false, fraction);

mapToPair(PairFunction‹T, K, V› функция)

Аналогично flatMap, но возвращает JavaPairRDD<K, V>. Этот метод принимает функцию в качестве параметра. Эта функция должна принимать одно значение типа T и возвращать итератор пар ключ-значение Tuple2<K, V>. Например, чтобы отобразить СДР предложений в СДР слов вместе с инициализированным количеством слов:

JavaRDD<String> words;
JavaPairRDD<String, Long> counts = words.mapToPair((x) -> {
  return new Tuple2<>(x, 1);
});

keyBy(Функция‹T, K› функция)

Возвращает JavaPairRDD пар ключ-значение, где ключи вычисляются из элементов this. Этот метод принимает в качестве аргумента функцию, которая вычисляет ключ из отдельных элементов. Например, представьте, что у нас есть набор данных слов, и мы хотим использовать в качестве ключа длину слов:

JavaRDD<String> words;
JavaPairRDD<Int, String> withKeys = words.keyBy((w) -> w.length);

So if words is:

"dog"
"cat"
"computer"

СДР, полученный в результате вышеуказанного применения keyBy, будет:

(3, "dog")
(3, "cat")
(8, "computer")

groupBy(Функция‹T, K› функция)

Возвращает JavaPairRDD с ключами, предоставленными func (как для метода keyBy), где значение, связанное с каждым ключом, представляет собой итерацию всех значений, использующих один и тот же ключ. Рассмотрим снова пример, который мы сделали для keyBy, и предположим, что мы хотим сгруппировать слова по длине:

JavaRDD<String> words;
JavaPairRDD<Int, Iterable<String>> grouped = words.groupBy((w) -> w.length);

если this — это RDD строк, подобных следующим:

"dog"
"cat"
"computer"

то результат будет:

(3, ["cat", "dog"])
(8, ["computer"])

где нотация [...] обозначает список элементов.

Действия

count()

Возвращает размер this, то есть количество содержащихся в нем элементов.

collect()

Передает драйверу все данные, содержащиеся в этом RDD (которые могут быть распределены между несколькими исполнителями).

Предупреждение

Для этого действия требуется достаточно памяти в драйвере для хранения всего набора данных. Используйте его только на небольших RDD. Если набор данных слишком велик, будет выдано OutOfMemoryError, и программа вылетит.

Например:

JavaRDD<String> distributedWords;
List<String> localWords = distributedWords.collect();
// localWords is a local copy of all the elements of the distributedWords RDD.

max(java.util.Comparator‹T› comp)min(java.util.Comparator‹T› comp)

Возвращает максимальный и минимальный элемент RDD соответственно. Порядок среди элементов задается Comparator, принятым в качестве аргумента. Компаратор — это интерфейс, определяющий единственный метод, который должен соответствовать следующему контракту. Даны два аргумента одного типа. Если они равны, то методы должны возвращать 0, если первое меньше, должно возвращаться отрицательное число, если первое больше, то должно возвращаться положительное число.

У этих двух методов есть проблема в Java API Spark. Как мы видели, анонимные функции автоматически реализуют так называемые функциональные интерфейсы. Интерфейсы java.util.Comparator — это функциональные интерфейсы, поэтому мы можем передать анонимную функцию в max и min, и компилятор Java успешно завершит компиляцию. Однако Spark требует, чтобы функции были сериализуемыми, потому что он должен отправлять их исполнителям. Теперь java.util.Comparator не Serializable, поэтому анонимные функции, переданные в max и min, не сериализуемы, даже если бы могли. Это ограничение компилятора Java. Следовательно, передача анонимной функции в max или min приведет к сбою кода во время выполнения с ошибкой TaskNotSerializableException.

Существует обходной путь, который заключается в явном определении класса, реализующего как Comparator, так и Serializable. В следующем примере выполняется поиск самого длинного слова в СДР слов:

public class MyClass {
  // It is important to mark this class as `static`.
  public static class LengthComparator implements Serializable, Comparator<String> {
    public int compare(String a, String b) {
      if (a.length < b.length) return -1;
      else if (a.length > b.length) return 1;
      return 0;
    }
  }
  public static void main(String[] args) {
    // ...
    JavaRDD<String> words;
    String longest = words.max(new LengthComparator());
  }
}

reduce(Функция2‹T, T, T› функция)

Получите единственное значение типа T, объединив все значения СДР в соответствии с функцией func. Функция должна быть ассоциативной и коммутативной, поскольку нет гарантии порядка применения к элементам СДР. Например, чтобы получить сумму всех элементов СДР целых чисел:

JavaRDD<Integer> numbers;
int sum = numbers.reduce((x, y) -> x + y);

org.apache.spark.api.java.JavaPairRDD<K, V>

Этот класс, как и его собрат JavaRDD, является локальным дескриптором распределенной коллекции, но на этот раз мы имеем дело с коллекцией пар ключ-значение.

Вы можете переключаться между JavaRDD<Tuple2<K, V>> и JavaPairRDD<K, V>. Чтобы преобразовать JavaRDD объектов Tuple2 в JavaPairRDD, вы можете использовать статический метод JavaPairRDD.fromRDD, как в этом примере:

JavaRDD<Tuple2<String, Integer>> wordCounts;
JavaPairRDD<String, Integer> wordCountsPairRDD = JavaPairRDD.fromRDD(wordCounts);

Обратите внимание, что два RDD представляют одни и те же данные, но JavaPairRDD дает вам доступ к большему количеству методов, в частности к тем, которые работают с парами ключ-значение. Если вам нужно преобразовать обратно в JavaRDD из JavaPairRDD, вы можете использовать метод toRDD для JavaPairRDD следующим образом:

JavaPairRDD<String, Integer> wordCountsPairRDD;
JavaRDD<Tuple2<String, Integer>> wordCounts = wordCountsPairRDD.toRDD();

Как и в случае с классом JavaRDD, мы можем разделить методы на преобразования и действия. Многие методы JavaRDD по-прежнему доступны, а также доступны некоторые новые. В частности, методы с Values в названии работают так же, как их JavaRDD аналоги, изменяя только значения и оставляя ключи нетронутыми.

Для получения дополнительных примеров искрового java, пожалуйста, посетите: https://github.com/mbaghou/spark-java