Пример запуска задач c RDD
Spark resilient distributed dataset (RDD) — это отказоустойчивая коллекция, элементы которой распределены по узлам кластера и могут обрабатываться параллельно.
В данной статье показаны базовые примеры использования RDD на языке Scala, которые можно запускать в spark-shell. Также Spark поддерживает другие языки, например Java, Python и R.
Операции с RDD
Создание
Существует два способа создания нового RDD, примеры представлены ниже.
Для создания RDD из коллекции можно использовать метод sparkContext.parallelize()
в коде программы драйвера.
val dataSeq = Seq(("foo", 0), ("bar", 1), ("buzz", 2))
val rdd=sc.parallelize(dataSeq)
val dataArr = Array(0, 1, 2, 3)
val rdd1=sc.parallelize(dataArr)
val emptyRDD = sc.parallelize(Seq.empty[String])
Данные для RDD можно получить из внешнего хранилища, например, HDFS, HBase или любого другого источника данных, реализующего Hadoop InputFormat. Ниже показан способ чтения данных из HDFS.
val rdd2 = sc.textFile("hdfs://adh/user/admin/testload/file_test.csv")
После создания объекта RDD вы можете выполнять основные операции Spark над RDD.
Модификация
Все операции RDD делятся на два типа:
-
Трансформации (transformations), которые возвращают новый RDD. Например,
map()
,flatMap()
,union()
и так далее. -
Действия (actions), которые возвращают некое значение программе-драйверу после выполнения вычислений над RDD. Например,
collect()
,count()
и так далее.
Все операции с RDD, включая создание и модификацию, являются "ленивыми" (lazy), то есть они не выполняются сразу.
Вычисления производятся только тогда, когда какой-то результат должен быть возвращен программе-драйверу, например, вызов rdd.take(10)
.
Полная справочная информация о поддерживаемых операциях доступна на странице Spark RDD API.
Просмотр содержимого
Для просмотра содержимого RDD используйте метод rdd.take(x)
, который возвращает первые x
элементов RDD.
ПРИМЕЧАНИЕ
Используйте с осторожностью конструкции типа rdd.collect().foreach(println) , так как это может привести к нехватке памяти на машине-драйвере Spark, поскольку collect() загружает данные RDD со всех узлов кластера на одну машину.
|
Пример
Следующий пример демонстрирует использование основных операций с RDD. Чтобы пример отработал без ошибок, соответствующий файл с данными (test_lorem.txt) должен быть доступен в HDFS. Создание тестового файла показано ниже.
-
В локальной файловой системе создайте файл test_lorem.txt:
echo "Lorem ipsum dolor sit amet, consectetuer adipiscing elit. Aenean commodo ligula eget dolor. Aenean massa. Cum sociis natoque penatibus et magnis dis parturient montes, nascetur ridiculus mus. Donec quam felis, ultricies nec, pellentesque eu, pretium quis, sem. Nulla consequat massa quis enim. Donec pede justo, fringilla vel, aliquet nec, vulputate eget, arcu. In enim justo, rhoncus ut, imperdiet a, venenatis vitae, justo. Nullam dictum felis eu pede mollis pretium. Integer tincidunt. Cras dapibus. Vivamus elementum semper nisi. Aenean vulputate eleifend tellus." > test_lorem.txt
-
Создайте директорию в HDFS, установите права доступа и скопируйте тестовый файл в HDFS:
$ sudo -u hdfs hdfs dfs -mkdir /user/admin $ sudo -u hdfs hdfs dfs -chown admin:admin /user/admin $ sudo -u hdfs hdfs dfs -put /home/admin/spark-demo/test_lorem.txt /user/admin/test_lorem.txt
Необходимо убедиться, что пользователь
hdfs
имеет доступ к файлу test_lorem.txt.
Когда тестовые данные загружены в HDFS, вы можете выполнить следующий Scala-код непосредственно в spark-shell (/bin/spark-shell).
val rdd = sc.textFile("hdfs://adh/user/admin/test_lorem.txt")
println("Created RDD:")
rdd.take(10)
val resultRDD = rdd.flatMap(line => line.split(" ")) (1)
.map(word => (word,1)) (2)
.reduceByKey(_+_) (3)
.sortBy(_._2, false) (4)
resultRDD.take(100).foreach(println)
resultRDD.persist() (5)
println("Writing RDD to a file: /user/admin/lorem_demo_result")
resultRDD.saveAsTextFile("hdfs://adh/user/admin/lorem_demo_result") (6)
1 | Разбиение каждой строки текста исходного файла на отдельные слова. |
2 | Создание кортежа (<word>,1) для каждого слова. |
3 | Редуцирование коллекции таким образом, чтобы остались лишь уникальные слова с количеством повторений. |
4 | Сортировка слов по количеству повторений. |
5 | Кеширование RDD в памяти. В этот момент выполняется синхронизация RDD по всем узлам кластера. |
6 | Запись RDD в HDFS. В HDFS создается директория с частями (parts) данных RDD. |
В результате выполнения примера в консоль выводится следующий результат.
Created RDD: val res0: Array[String] = Array(Lorem ipsum dolor sit amet, ...) val resultRDD: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[9] at sortBy at <console>:4 (Aenean,3) (vulputate,2) (pede,2) (nec,,2) (Donec,2) (justo,,2) (montes,,1) ... Writing RDD to a file: /user/admin/lorem_demo_result
Запись RDD в HDFS в примере выше можно проверить с помощью команды:
$ sudo -u hdfs hdfs dfs -ls /user/admin/lorem_demo_result
Пример вывода:
Found 3 items -rw-r--r-- 3 admin admin 0 2023-07-25 20:55 /user/admin/lorem_demo_result/_SUCCESS -rw-r--r-- 3 admin admin 64 2023-07-25 20:55 /user/admin/lorem_demo_result/part-00000 -rw-r--r-- 3 admin admin 753 2023-07-25 20:55 /user/admin/lorem_demo_result/part-00001