ADB Spark 3 Connector usage examples

Requirements

This article describes how to transfer data between ADB and Spark 3 via ADB Spark 3 Connector. The following prerequisites are met:

  • The ADB cluster is installed according to the Online installation guide.

  • The ADH cluster is installed according to the Online installation guide. The minimal ADH version is 3.1.2.1.b1.

  • The Spark 3 service is added to the ADH cluster.

  • IP address of the ADB master host is 10.92.40.38. The default port number for income PostgreSQL connections is 5432.

  • The adb database exists in the ADB cluster.

  • The adb_to_spark user with the CREATEEXTTABLE privileges and 123 password exists in the ADB cluster. To add a user, you can run the following query:

    CREATE ROLE adb_to_spark
    WITH  CREATEEXTTABLE(protocol='gpfdist',type='readable')
          CREATEEXTTABLE(protocol='gpfdist',type='writable')
          LOGIN
          PASSWORD '123';
  • The adb_to_spark user can access the adb database from the ADH cluster. For this purpose, you can change the pg_hba.conf file as follows:

    host    all  adb_to_spark       0.0.0.0/0     md5

    Note that it is not a production-ready configuration. Use it only for test needs. Instead 0.0.0.0/0, you can write IP addresses of the hosts in the ADH cluster where Spark 3 is installed (with a subnet number).

    TIP

    You can modify the pg_hba.conf file via the ADCM web interface. To do this, fill in the Custom pg_hba section parameter on the Configuration tab of the ADB service in the ADB cluster. To apply changes, click Save and run the service Reconfigure action.

  • The author table exists in the adb database. To create and fill in a table, use the following queries:

    CREATE TABLE author(id INT NOT NULL, name TEXT NOT NULL)
    WITH (appendoptimized=true)
    DISTRIBUTED BY(id);
    INSERT INTO author(id, name) VALUES
    (1,'Virginia Woolf'),
    (2,'J.R.R. Tolkien'),
    (3,'Harper Lee'),
    (4,'J.D. Salinger'),
    (5,'George Orwell'),
    (6,'John Steinbeck'),
    (7,'Margaret Mitchell'),
    (8,'Alan Moore'),
    (9,'Jack Kerouac'),
    (10,'Ernest Hemingway');
  • The adb_to_spark user has necessary permissions on the author table:

    GRANT SELECT, INSERT ON public.author TO adb_to_spark;

ADB Spark 3 Connector communicates with ADB via a JDBC connection. The PostgreSQL JDBC driver comes with the connector.

Commands from examples below should be executed in the interactive shell for Scala — spark3-shell.

Load data from ADB to Spark

  1. Open spark3-shell on the host where Spark 3 is installed:

    $ sudo -u hdfs spark3-shell
  2. Specify the connector settings for reading data from the ADB author table:

    val options = Map(
       "spark.adb.url" -> "jdbc:postgresql://10.92.40.38:5432/adb",
       "spark.adb.user" -> "adb_to_spark",
       "spark.adb.password" -> "123",
       "spark.adb.dbschema" -> "public",
       "spark.adb.dbtable" -> "author"
       )

    The result:

    val options: scala.collection.immutable.Map[String,String] = HashMap(spark.adb.password -> 123, spark.adb.dbtable -> author, spark.adb.user -> adb_to_spark, spark.adb.url -> jdbc:postgresql://10.92.40.38:5432/adb, spark.adb.dbschema -> public)
  3. Register a DataFrame:

    val adb_author  = spark.read.format("adb").options(options).load()

    The result:

    val adb_author: org.apache.spark.sql.DataFrame = [id: int, name: string]
  4. Check the DataFrame content:

    adb_author.show()

    The result:

    +---+-----------------+
    | id|             name|
    +---+-----------------+
    |  4|    J.D. Salinger|
    |  8|       Alan Moore|
    |  5|    George Orwell|
    | 10| Ernest Hemingway|
    |  2|   J.R.R. Tolkien|
    |  1|   Virginia Woolf|
    |  7|Margaret Mitchell|
    |  6|   John Steinbeck|
    |  3|       Harper Lee|
    |  9|     Jack Kerouac|
    +---+-----------------+
  5. Get the table schema:

    adb_author.printSchema()

    The result:

    root
     |-- id: integer (nullable = false)
     |-- name: string (nullable = false)
  6. Calculate a number of the table rows:

    adb_author.count()

    The result:

    val res9: Long = 10

Run any SQL query in ADB

You can execute any SQL query through the connector as well. To do this, run the following commands:

  1. Set additional Spark session settings and enable auxiliary functions:

    spark.stop
    val spark1 = org.apache.spark.sql.SparkSession.
       builder().
       master("local[*]").
       appName("spark_example").
       config("spark.adb.url","jdbc:postgresql://10.92.40.38:5432/adb").
       config("spark.adb.driver","org.postgresql.Driver").
       config("spark.adb.user","adb_to_spark").
       config("spark.adb.password","123").
       getOrCreate()
    import io.arenadata.spark.adb.implicits._

    The result:

    val spark1: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@20e87aa3
  2. Create an arbitrary table (test_spark) in ADB and fill it with some test data:

    spark1.executeAdbQueryOnMaster("CREATE TABLE public.test_spark(id INT);")
    spark1.executeAdbQueryOnMaster("INSERT INTO public.test_spark VALUES(1);")
    spark1.executeAdbQueryOnMaster("INSERT INTO public.test_spark VALUES(2);")
    spark1.executeAdbQueryOnMaster("INSERT INTO public.test_spark VALUES(3);")
  3. Create a DataFrame based on the new ADB table (test_spark):

    val res = spark1.executeAdbSelectQueryOnMaster("SELECT * FROM public.test_spark;")

    The result:

    val res: org.apache.spark.sql.DataFrame = [id: int]
  4. Check the DataFrame content:

    res.show()

    The result:

    +---+
    | id|
    +---+
    |  2|
    |  3|
    |  1|
    +---+

Write data from Spark to ADB

  1. Specify the connector settings for writing data to ADB:

    val options_write = Map(
       "spark.adb.url" -> "jdbc:postgresql://10.92.40.38:5432/adb",
       "spark.adb.user" -> "adb_to_spark",
       "spark.adb.password" -> "123",
       "spark.adb.dbschema" -> "public",
       "spark.adb.dbtable" -> "test_spark_write")

    The result:

    val options_write: scala.collection.immutable.Map[String,String] = HashMap(spark.adb.password -> 123, spark.adb.dbtable -> test_spark_write, spark.adb.user -> adb_to_spark, spark.adb.url -> jdbc:postgresql://10.92.40.38:5432/adb, spark.adb.dbschema -> public)
  2. Run the following command to create a test_spark_write table in ADB and write data from the res DataFrame created in the previous example into it:

    res.write.format("adb").options(options_write).mode(org.apache.spark.sql.SaveMode.Overwrite).save()
  3. Check the data availability in ADB (for example, via psql):

    • Table structure:

      \d+ public.test_spark_write
                         Table "public.test_spark_write"
       Column |  Type   | Modifiers | Storage | Stats target | Description
      --------+---------+-----------+---------+--------------+-------------
       id     | integer |           | plain   |              |
      Distributed by: (id)
    • Table data:

      SELECT * FROM public.test_spark_write;
       id
      ----
        2
        1
        3
      (3 rows)
Found a mistake? Seleсt text and press Ctrl+Enter to report it