Пример запуска задач c DataFrame
Spark DataFrame (DF) — это структура данных, организованная в именованные столбцы и распределенная по кластеру. Концептуально DF похож на таблицу в реляционной базе данных, но с определенными оптимизациями. Технически Spark DataFrame представляет собой DataSet, хранящий данные типа Row.
В данной статье показаны базовые примеры использования DF на языке Scala, которые можно запускать в spark-shell. Также Spark поддерживает другие языки, например Java, Python и R.
Операции с DataFrame
Создание
Объект DataFrame может быть создан из различных источников, наиболее распространенные способы представлены ниже.
Ниже показано создание DF из JSON-файла, хранящегося в HDFS.
val df = spark
.read
.json("hdfs://adh/user/admin/testload/people.json")
// read multiline JSON
val multiline_df = spark
.read
.option("multiline","true")
.json("hdfs://adh/user/admin/testload/people_multiline.json")
В данном примере схема для DF определяется автоматически на основе структуры JSON-файла. Для получения дополнительной информации о создании DF из других источников, таких как Avro, Parquet, ORC и многих других, смотрите доступные источники данных.
Вы можете преобразовать RDD в DF с помощью метода rdd.toDF()
.
По умолчанию Spark генерирует схему автоматически на основе структуры RDD, однако вы также можете указать схему явно.
val rdd = sc.textFile("hdfs://adh/user/admin/testload/test_data_rdd.txt")
val df = rdd.toDF()
Пример ниже демонстрирует получение DF из таблицы MySQL. Чтобы код примера заработал, в classpath должен быть доступен JAR MySQL-драйвера.
val df_mysql = spark.read.format("jdbc")
.option("url", "jdbc:mysql://<host>:<port>/db_name")
.option("driver", "com.mysql.jdbc.Driver")
.option("dbtable", "<table_name>")
.option("user", "<username>")
.option("password", "<password>")
.load()
Получение DF из других СУБД выглядит аналогично.
Помимо вышеупомянутых, существует множество других способов получения объекта DataFrame, например, с помощью методов SparkSession.sql()
/SparkSession.createDataFrame()
или запроса к таблице Hive/HBase и так далее.
Более подробная информация об этих методах доступна в документации Spark.
Модификация
Все операции DataFrame делятся на два типа:
-
Трансформации (transformations), которые возвращают новый DF. Например,
map()
,filter()
,agg()
,col()
,join()
и так далее. -
Действия (actions), которые возвращают некое значение программе-драйверу после выполнения вычислений над DF. Например,
collect()
,count()
и так далее.
Все операции с DF являются "ленивыми" (lazy), то есть они не выполняются сразу.
Вычисления производятся только тогда, когда какой-то результат должен быть возвращен программе-драйверу, например, вызов df.show()
.
Полная справочная информация о поддерживаемых операциях доступна на странице Spark DataSet API.
Просмотр содержимого
Для просмотра содержимого DF используйте метод df.show(x)
, который возвращает первые x
строк DF.
ПРИМЕЧАНИЕ
Используйте с осторожностью конструкции типа df.collect().foreach(println) , так как это может привести к нехватке памяти на машине-драйвере Spark, поскольку collect() загружает данные DF со всех узлов кластера на одну машину.
|
Для получения информации о DF-схеме, используйте df.printSchema()
как показано ниже.
root |-- age: long (nullable = true) |-- dept: string (nullable = true) |-- id: long (nullable = true) |-- lastname: string (nullable = true) |-- name: string (nullable = true)
Выполнение SQL-запросов
Модуль Spark SQL позволяет выполнять традиционные SQL-запросы к DF.
Для этого необходимо создать временное представление (View) из DataFrame с помощью метода createOrReplaceTempView()
.
Затем вы можете отправлять SQL-запросы к этому представлению, используя метод SparkSession.sql()
.
Пример приведен ниже.
val df = spark
.read
.csv("hdfs://adh/user/admin/testload/employees.json")
df.createOrReplaceTempView("employees")
val df_sql = spark.sql("SELECT name, lastname, email FROM employees")
df_sql.show()
После создания временное представление ведет себя как обычная таблица SQL, однако оно нигде не сохраняется в виде файла. Представление привязано к сессии SparkSession, в которой оно было создано, и остается доступным до тех пор, пока существует объект SparkSession. Больше информации о выполнении SQL-запросов доступно в разделе SparkSession.sql() API.
Пример
Следующий пример демонстрирует использование основных операций с DF. Чтобы пример отработал без ошибок, соответствующий файл с данными (employees.json) должен быть доступен в HDFS. Создание тестового файла показано ниже.
-
В локальной файловой системе создайте файл employees.json:
echo -e '{"id":1,"name":"Sarah","lastname":"Connor","dept":"hr","age":34}\n{"id":2,"name":"Michael","lastname":"Scott","dept":"sales","age":47}\n{"id":3,"name":"Luke","lastname":"Skywalker","dept":"it","age":45}\n{"id":4,"name":"James","lastname":"Hetfield","dept":"sales","age":50}' > employees.json
-
Создайте директорию в HDFS, установите права доступа и скопируйте тестовый файл в HDFS:
$ sudo -u hdfs hdfs dfs -mkdir /user/admin $ sudo -u hdfs hdfs dfs -chown admin:admin /user/admin $ sudo -u hdfs hdfs dfs -put /home/admin/spark-demo/employees.json /user/admin/employees.json
Необходимо убедиться, что пользователь
hdfs
имеет доступ к файлу employees.json.
Когда тестовые данные загружены в HDFS, вы можете выполнить следующий Scala-код непосредственно в spark-shell (/bin/spark-shell).
val df_employees = spark.read.json("hdfs://adh/user/admin/employees.json") (1)
println("DataFrame created:")
df_employees.show()
println("DataFrame schema:")
df_employees.printSchema()
println("Show name-age selection from all employees:")
df_employees.select("name","age").show()
println("Get all employees older than 40")
df_employees.filter("age > 40").show()
val depts_data = Seq((1,"it"),(2,"sales"),(3,"hr"))
val rdd = sc.parallelize(depts_data)
val df_department = rdd.toDF("id","department") (2)
println("DataFrame created:")
df_department.show()
val joined = df_employees.filter("age > 30").join(df_department, df_employees("dept") === df_department("department")) (3)
println("Get number of employees per department")
joined.groupBy("department").count().show()
println("Get average age per department")
joined.groupBy("department").mean("age").show() (4)
1 | Создание DF из JSON-файла в HDFS. |
2 | Создание DF из RDD. |
3 | Фильтрация по столбцу age и выполнение JOIN для двух DF. |
4 | Использование GROUP BY с агрегацией по столбцу. |
В результате выполнения примера в консоль выводится следующий результат:
DataFrame created: +---+-----+---+---------+-------+ |age| dept| id| lastname| name| +---+-----+---+---------+-------+ | 34| hr| 1| Connor| Sarah| | 47|sales| 2| Scott|Michael| | 45| it| 3|Skywalker| Luke| | 50|sales| 4| Hetfield| James| +---+-----+---+---------+-------+ DataFrame schema: root |-- age: long (nullable = true) |-- dept: string (nullable = true) |-- id: long (nullable = true) |-- lastname: string (nullable = true) |-- name: string (nullable = true) Show name-age selection from all employees: +-------+---+ | name|age| +-------+---+ | Sarah| 34| |Michael| 47| | Luke| 45| | James| 50| +-------+---+ Get all employees older than 40: +---+-----+---+---------+-------+ |age| dept| id| lastname| name| +---+-----+---+---------+-------+ | 47|sales| 2| Scott|Michael| | 45| it| 3|Skywalker| Luke| | 50|sales| 4| Hetfield| James| +---+-----+---+---------+-------+ DataFrame created: +---+----------+ | id|department| +---+----------+ | 1| it| | 2| sales| | 3| hr| +---+----------+ Get number of employees per department: +----------+-----+ |department|count| +----------+-----+ | sales| 2| | hr| 1| | it| 1| +----------+-----+ Get average age per department: +----------+--------+ |department|avg(age)| +----------+--------+ | sales| 48.5| | hr| 34.0| | it| 45.0| +----------+--------+