PySpark

PySpark is a Python wrapper for Spark that allows invocation of native Spark methods from the Python code. The core of PySpark is the Py4J library that allows a Python interpreter to manipulate Spark objects in a running JVM.

This section describes the use of PySpark via spark3-submit and PySpark shell. All the examples refer to the Spark3 service, however the steps are applicable for the Spark service as well.

Cluster requirements

The examples demonstrated in this section should run in the following environment:

  • ADH cluster nodes are CentOS 7-powered machines. For other OS types, the commands/paths may slightly differ.

  • ADH cluster 3.1.2.1.b1 or later is installed. The cluster has the Spark3 service installed.

    During the cluster installation, ADCM also installs a Python3 interpreter to /opt/pyspark3-python/ on the hosts with the Spark3 Client component. This Python is used in the scenarios below, however, you can use a custom Python 3.10+ instance to run PySpark.

  • A separate HDFS directory for loading test data is created and the user who runs PySpark is the owner of this directory.

    $ sudo -u hdfs hdfs dfs -mkdir /user/admin
    $ sudo -u hdfs hdfs dfs -chown admin:admin /user/admin

Use PySpark and spark3-submit

The following scenario shows how to submit a Spark job in Python using spark3-submit in the cluster mode.

  1. Set the PYSPARK_PYTHON environment variable on all cluster hosts to allow Spark to run the specific Python executable.

    For this, in ADCM, go to Clusters → <your_cluster_name> → Services → Spark3 → Primary configuration and add the line export PYSPARK_PYTHON=/opt/pyspark3-python/bin/python3 to the spark-env.sh field (the field appears after selecting Show advanced). Then, save the configuration and restart the Spark3 service.

  2. Create a test script named test.py.

    from pyspark import SparkConf
    from pyspark import SparkContext
    from pyspark.sql import SparkSession
    
    conf = SparkConf()
    conf.setMaster('yarn')
    conf.setAppName('spark-yarn')
    sparkSession = SparkSession.builder.appName("spark-yarn").config(conf=conf).getOrCreate()
    
    data = [('First', 1), ('Second', 2), ('Third', 3), ('Fourth', 4), ('Fifth', 5)]
    df = sparkSession.createDataFrame(data) (1)
    df.write.csv("hdfs:///user/admin/tmp/test") (2)
    quit()
    1 Creates a sample DataFrame.
    2 Writes the DataFrame contents to HDFS as a CSV file.
  3. Run spark3-submit and provide test.py as an argument. In this scenario, spark3-submit must run under the admin user.

    $ /bin/spark3-submit \
        --deploy-mode cluster \ (1)
        --master yarn \ (2)
        test.py
    1 Runs Spark in the cluster mode.
    2 The master URL to run Spark on a YARN cluster. The cluster location is resolved based on the HADOOP_CONF_DIR or YARN_CONF_DIR environment variables.
  4. Verify HDFS writes:

    $ sudo -u hdfs hdfs dfs -ls /user/admin/tmp/test/

    Observe the DataFrame parts persisted in HDFS:

    Found 3 items
    -rw-r--r--   3 admin admin          0 2023-07-10 12:51 /user/admin/tmp/test/_SUCCESS
    -rw-r--r--   3 admin admin         17 2023-07-10 12:51 /user/admin/tmp/test/part-00000-1a966f98-6c1a-467b-9564-dbbd65dd32a2-c000.csv
    -rw-r--r--   3 admin admin         25 2023-07-10 12:51 /user/admin/tmp/test/part-00001-1a966f98-6c1a-467b-9564-dbbd65dd32a2-c000.csv

Use PySpark shell

PySpark shell allows you to run Spark jobs in the interactive mode. The following example shows how to install PySpark shell and run a Spark job in the local mode.

  1. Create a new virtual environment using Python from /opt/pyspark3-python/:

    $ cd ~
    $ mkdir pyspark-demo
    $ /opt/pyspark3-python/bin/python3 -m venv pyspark-demo/venv
  2. Activate the virtual environment:

    $ source pyspark-demo/venv/bin/activate
  3. Run PySpark:

    $ pyspark3

    You should see a similar output:

    Python 3.10.4 (main, Sep  7 2023, 08:17:33) [GCC 4.8.5 20150623 (Red Hat 4.8.5-44)] on linux
    Type "help", "copyright", "credits" or "license" for more information.
    Picked up _JAVA_OPTIONS: -Djava.io.tmpdir=/var/tmp
    Picked up _JAVA_OPTIONS: -Djava.io.tmpdir=/var/tmp
    SLF4J: Class path contains multiple SLF4J bindings.
    SLF4J: Found binding in [jar:file:/usr/lib/spark3/jars/log4j-slf4j-impl-2.17.2.jar!/org/slf4j/impl/StaticLoggerBinder.class]
    SLF4J: Found binding in [jar:file:/usr/lib/hadoop/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
    SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
    SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
    Setting default log level to "WARN".
    To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
    23/09/28 08:37:05 WARN HiveConf: HiveConf of name hive.metastore.event.db.notification.api.auth does not exist
    Welcome to
          ____              __
         / __/__  ___ _____/ /__
        _\ \/ _ \/ _ `/ __/  '_/
       /__ / .__/\_,_/_/ /_/\_\   version 3.3.2
          /_/
    
    Using Python version 3.10.4 (main, Sep  7 2023 08:17:33)
    Spark context Web UI available at http://ka-adh-1.ru-central1.internal:4040
    Spark context available as 'sc' (master = yarn, app id = application_1695647776253_0020).
    SparkSession available as 'spark'.
  4. Submit the following Python code directly to the PySpark shell. This Spark job creates a sample DataFrame and writes it to HDFS through a NameNode endpoint.

    from pyspark import SparkConf
    from pyspark import SparkContext
    from pyspark.sql import SparkSession
    
    conf = SparkConf()
    conf.setMaster('yarn')
    conf.setAppName('spark-yarn')
    sparkSession = SparkSession.builder.appName("spark-yarn").config(conf=conf).getOrCreate()
    
    data = [('First', 1), ('Second', 2), ('Third', 3), ('Fourth', 4), ('Fifth', 5)]
    df = sparkSession.createDataFrame(data)
    df.write.csv("hdfs://<active-nn-host>:8020/user/admin/tmp/test-nn") (1)
    quit()
    1 <active-nn-host> must point to an active NameNode. To identify the active NameNode, use the command below:
    $ sudo -u hdfs hdfs haadmin -getAllServiceState

    A sample output:

    ka-adh-1.ru-central1.internal:8020                 standby
    ka-adh-3.ru-central1.internal:8020                 active
  5. Verify HDFS writes:

    $ sudo -u hdfs hdfs dfs -ls /user/admin/tmp/test-nn/

    Observe the DataFrame persisted in HDFS:

    -rw-r--r--   3 admin hadoop          9 2023-03-10 16:53 /user/admin/tmp/test-nn/part-00001-dedba941-113e-4fd6-871d-e87dd3291e57-c000.csv
    -rw-r--r--   3 admin hadoop          8 2023-03-10 16:53 /user/admin/tmp/test-nn/part-00002-dedba941-113e-4fd6-871d-e87dd3291e57-c000.csv
    -rw-r--r--   3 admin hadoop         17 2023-03-10 16:53 /user/admin/tmp/test-nn/part-00003-dedba941-113e-4fd6-871d-e87dd3291e57-c000.csv

Steps for a secured cluster

If you have enabled security for your cluster, that is:

  • installed Apache Ranger within ADPS;

  • kerberized both ADPS and ADH;

  • joined all Hadoop nodes into a domain and SSSD is used;

  • activated the HDFS/YARN plugins;

  • configured Ranger policies for HDFS/YARN plugins.

For such a cluster, the steps on using PySpark are identical, the main requirements are:

  1. Your user/domain group has access to specific HDFS paths, defined in Ranger ACL.

  2. You have to get a Kerberos ticket using kinit as shown below:

    $ kinit myuser@EXAMPLE_REALM.COM
Found a mistake? Seleсt text and press Ctrl+Enter to report it