Example of launching jobs with a DataFrame

Spark DataFrame (DF) is a data structure organized into named columns and distributed across a cluster. It is conceptually similar to a table in a relational database, but with smart optimizations under the hood. In terms of Spark, a DataFrame is technically a DataSet of Rows.

This section provides basic DataFrame usage examples that can run in spark-shell, however Spark supports other languages like Java, Python, and R.

DataFrame operations

Create

A DataFrame object can be constructed from a variety of sources, most common ways are presented below.

  • From structured data files

  • From RDD

  • From external database via JDBC

The following snippet shows how to create a DF from a JSON file stored on HDFS.

val df = spark
    .read
    .json("hdfs://adh/user/admin/testload/people.json")

// read multiline JSON
val multiline_df = spark
    .read
    .option("multiline","true")
    .json("hdfs://adh/user/admin/testload/people_multiline.json")

In this case the DF schema is inferred automatically based on the structure of the JSON file. For more information on creating DFs from other sources, like Avro, Parquet, ORC, and many others, see available data sources.

You can convert an RDD to DF using the rdd.toDF() method. By default, Spark generates the schema automatically based on the RDD structure, though you can also specify the schema explicitly.

val rdd = sc.textFile("hdfs://adh/user/admin/testload/test_data_rdd.txt")
val df = rdd.toDF()

The example below shows getting a DF from a MySQL table. The MySQL driver JAR must be available in the classpath for this to work.

val df_mysql = spark.read.format("jdbc")
   .option("url", "jdbc:mysql://<host>:<port>/db_name")
   .option("driver", "com.mysql.jdbc.Driver")
   .option("dbtable", "<table_name>")
   .option("user", "<username>")
   .option("password", "<password>")
   .load()

Getting a DF from other DB engines is conceptually similar.

Apart from these, there are many other ways to get a DataFrame object, like using SparkSession.sql()/SparkSession.createDataFrame(), querying a Hive/HBase table, and so on. More details on using these methods are available in Spark documentation.

Modify

All DataFrame operations are divided into two types:

  • Transformations that return a new modified DF. For example, map(), filter(), agg(), col(), join(), etc.

  • Actions that return a value to the driver program. For example, collect(), count(), etc.

All DF operations are lazy, i.e. they do not compute right away. The computation is done only when some result must be returned to the driver program, for example df.show().

For a complete reference on supported operations, see Spark DataSet API.

View contents

To view a DF contents, use df.show(x) that returns first x rows of the DF.

NOTE
Be careful with using constructs like df.collect().foreach(println) as this may cause the Spark driver machine to run out of memory, since collect() fetches the entire DF from all the cluster nodes to a single machine.

Use df.printSchema() to print the DF schema as shown below.

root
 |-- age: long (nullable = true)
 |-- dept: string (nullable = true)
 |-- id: long (nullable = true)
 |-- lastname: string (nullable = true)
 |-- name: string (nullable = true)

Run SQL queries

The Spark SQL module allows you to run traditional SQL queries on Dataframes. For this, you should create a temporary view from a DataFrame using createOrReplaceTempView(). Then, you can submit SQL queries to this view using the SparkSession.sql() method. An example is below.

val df = spark
    .read
    .csv("hdfs://adh/user/admin/testload/employees.json")

df.createOrReplaceTempView("employees")

val df_sql = spark.sql("SELECT name, lastname, email FROM employees")
df_sql.show()

Once created, the temporary view behaves like an ordinary SQL table, but it is not persisted anywhere as a file. The view is attached to the SparkSession where it was created, and remains accessible until the session dies. For more details on running SQL queries, see SparkSession.sql() API.

Example

The snippet below summarizes basic DF operations. To run the example without errors, create the corresponding test file (employees.json) in HDFS. To create the test file:

  1. On your local file system, create the employees.json file with dummy contents:

    echo -e '{"id":1,"name":"Sarah","lastname":"Connor","dept":"hr","age":34}\n{"id":2,"name":"Michael","lastname":"Scott","dept":"sales","age":47}\n{"id":3,"name":"Luke","lastname":"Skywalker","dept":"it","age":45}\n{"id":4,"name":"James","lastname":"Hetfield","dept":"sales","age":50}' > employees.json
  2. Create a test directory in HDFS, provide access permissions, and copy the test file to HDFS:

    $ sudo -u hdfs hdfs dfs -mkdir /user/admin
    $ sudo -u hdfs hdfs dfs -chown admin:admin /user/admin
    $ sudo -u hdfs hdfs dfs -put /home/admin/spark-demo/employees.json /user/admin/employees.json

    Please ensure that the hdfs user has access to the employees.json file.

Once the test data is loaded to HDFS, you can run the following Scala code in spark-shell (/bin/spark-shell).

val df_employees = spark.read.json("hdfs://adh/user/admin/employees.json") (1)

println("DataFrame created:")
df_employees.show()

println("DataFrame schema:")
df_employees.printSchema()

println("Show name-age selection from all employees:")
df_employees.select("name","age").show()

println("Get all employees older than 40")
df_employees.filter("age > 40").show()


val depts_data = Seq((1,"it"),(2,"sales"),(3,"hr"))
val rdd = sc.parallelize(depts_data)
val df_department = rdd.toDF("id","department") (2)

println("DataFrame created:")
df_department.show()

val joined = df_employees.filter("age > 30").join(df_department, df_employees("dept") === df_department("department")) (3)

println("Get number of employees per department")
joined.groupBy("department").count().show()

println("Get average age per department")
joined.groupBy("department").mean("age").show() (4)
1 Create a DF from a JSON file in HDFS.
2 Create a DF from RDD.
3 Filter DF data by a column and apply JOIN for two DFs.
4 Use GROUP BY with aggregation by a column.

 

Running the example outputs the following:

DataFrame created:
+---+-----+---+---------+-------+
|age| dept| id| lastname|   name|
+---+-----+---+---------+-------+
| 34|   hr|  1|   Connor|  Sarah|
| 47|sales|  2|    Scott|Michael|
| 45|   it|  3|Skywalker|   Luke|
| 50|sales|  4| Hetfield|  James|
+---+-----+---+---------+-------+

DataFrame schema:
root
 |-- age: long (nullable = true)
 |-- dept: string (nullable = true)
 |-- id: long (nullable = true)
 |-- lastname: string (nullable = true)
 |-- name: string (nullable = true)

Show name-age selection from all employees:
+-------+---+
|   name|age|
+-------+---+
|  Sarah| 34|
|Michael| 47|
|   Luke| 45|
|  James| 50|
+-------+---+

Get all employees older than 40:
+---+-----+---+---------+-------+
|age| dept| id| lastname|   name|
+---+-----+---+---------+-------+
| 47|sales|  2|    Scott|Michael|
| 45|   it|  3|Skywalker|   Luke|
| 50|sales|  4| Hetfield|  James|
+---+-----+---+---------+-------+

DataFrame created:
+---+----------+
| id|department|
+---+----------+
|  1|        it|
|  2|     sales|
|  3|        hr|
+---+----------+

Get number of employees per department:
+----------+-----+
|department|count|
+----------+-----+
|     sales|    2|
|        hr|    1|
|        it|    1|
+----------+-----+

Get average age per department:
+----------+--------+
|department|avg(age)|
+----------+--------+
|     sales|    48.5|
|        hr|    34.0|
|        it|    45.0|
+----------+--------+
Found a mistake? Seleсt text and press Ctrl+Enter to report it