ADB Spark 3 Connector usage examples
Requirements
This article describes how to transfer data between ADB and Spark 3 via ADB Spark 3 Connector. The following prerequisites are met:
-
The ADB cluster is installed according to the Online installation guide.
-
The ADH cluster is installed according to the Online installation guide. The minimal ADH version is 3.1.2.1.b1.
-
The Spark 3 service is added to the ADH cluster.
-
IP address of the ADB master host is
10.92.40.38
. The default port number for income PostgreSQL connections is5432
. -
The
adb
database exists in the ADB cluster. -
The
adb_to_spark
user with theCREATEEXTTABLE
privileges and123
password exists in the ADB cluster. To add a user, you can run the following query:CREATE ROLE adb_to_spark WITH CREATEEXTTABLE(protocol='gpfdist',type='readable') CREATEEXTTABLE(protocol='gpfdist',type='writable') LOGIN PASSWORD '123';
-
The
adb_to_spark
user can access theadb
database from the ADH cluster. For this purpose, you can change the pg_hba.conf file as follows:host all adb_to_spark 0.0.0.0/0 md5
Note that it is not a production-ready configuration. Use it only for test needs. Instead
0.0.0.0/0
, you can write IP addresses of the hosts in the ADH cluster where Spark 3 is installed (with a subnet number).TIPYou can modify the pg_hba.conf file via the ADCM web interface. To do this, fill in the Custom pg_hba section parameter on the Configuration tab of the ADB service in the ADB cluster. To apply changes, click Save and run the service Reconfigure action.
-
The
author
table exists in theadb
database. To create and fill in a table, use the following queries:CREATE TABLE author(id INT NOT NULL, name TEXT NOT NULL) WITH (appendoptimized=true) DISTRIBUTED BY(id);
INSERT INTO author(id, name) VALUES (1,'Virginia Woolf'), (2,'J.R.R. Tolkien'), (3,'Harper Lee'), (4,'J.D. Salinger'), (5,'George Orwell'), (6,'John Steinbeck'), (7,'Margaret Mitchell'), (8,'Alan Moore'), (9,'Jack Kerouac'), (10,'Ernest Hemingway');
-
The
adb_to_spark
user has necessary permissions on theauthor
table:GRANT SELECT, INSERT ON public.author TO adb_to_spark;
ADB Spark 3 Connector communicates with ADB via a JDBC connection. The PostgreSQL JDBC driver comes with the connector.
Commands from examples below should be executed in the interactive shell for Scala — spark3-shell.
Load data from ADB to Spark
-
Open spark3-shell on the host where Spark 3 is installed:
$ sudo -u hdfs spark3-shell
-
Specify the connector settings for reading data from the ADB
author
table:val options = Map( "spark.adb.url" -> "jdbc:postgresql://10.92.40.38:5432/adb", "spark.adb.user" -> "adb_to_spark", "spark.adb.password" -> "123", "spark.adb.dbschema" -> "public", "spark.adb.dbtable" -> "author" )
The result:
val options: scala.collection.immutable.Map[String,String] = HashMap(spark.adb.password -> 123, spark.adb.dbtable -> author, spark.adb.user -> adb_to_spark, spark.adb.url -> jdbc:postgresql://10.92.40.38:5432/adb, spark.adb.dbschema -> public)
-
Register a DataFrame:
val adb_author = spark.read.format("adb").options(options).load()
The result:
val adb_author: org.apache.spark.sql.DataFrame = [id: int, name: string]
-
Check the DataFrame content:
adb_author.show()
The result:
+---+-----------------+ | id| name| +---+-----------------+ | 4| J.D. Salinger| | 8| Alan Moore| | 5| George Orwell| | 10| Ernest Hemingway| | 2| J.R.R. Tolkien| | 1| Virginia Woolf| | 7|Margaret Mitchell| | 6| John Steinbeck| | 3| Harper Lee| | 9| Jack Kerouac| +---+-----------------+
-
Get the table schema:
adb_author.printSchema()
The result:
root |-- id: integer (nullable = false) |-- name: string (nullable = false)
-
Calculate a number of the table rows:
adb_author.count()
The result:
val res9: Long = 10
Run any SQL query in ADB
You can execute any SQL query through the connector as well. To do this, run the following commands:
-
Set additional Spark session settings and enable auxiliary functions:
spark.stop val spark1 = org.apache.spark.sql.SparkSession. builder(). master("local[*]"). appName("spark_example"). config("spark.adb.url","jdbc:postgresql://10.92.40.38:5432/adb"). config("spark.adb.driver","org.postgresql.Driver"). config("spark.adb.user","adb_to_spark"). config("spark.adb.password","123"). getOrCreate() import io.arenadata.spark.adb.implicits._
The result:
val spark1: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@20e87aa3
-
Create an arbitrary table (
test_spark
) in ADB and fill it with some test data:spark1.executeAdbQueryOnMaster("CREATE TABLE public.test_spark(id INT);") spark1.executeAdbQueryOnMaster("INSERT INTO public.test_spark VALUES(1);") spark1.executeAdbQueryOnMaster("INSERT INTO public.test_spark VALUES(2);") spark1.executeAdbQueryOnMaster("INSERT INTO public.test_spark VALUES(3);")
-
Create a DataFrame based on the new ADB table (
test_spark
):val res = spark1.executeAdbSelectQueryOnMaster("SELECT * FROM public.test_spark;")
The result:
val res: org.apache.spark.sql.DataFrame = [id: int]
-
Check the DataFrame content:
res.show()
The result:
+---+ | id| +---+ | 2| | 3| | 1| +---+
Write data from Spark to ADB
-
Specify the connector settings for writing data to ADB:
val options_write = Map( "spark.adb.url" -> "jdbc:postgresql://10.92.40.38:5432/adb", "spark.adb.user" -> "adb_to_spark", "spark.adb.password" -> "123", "spark.adb.dbschema" -> "public", "spark.adb.dbtable" -> "test_spark_write")
The result:
val options_write: scala.collection.immutable.Map[String,String] = HashMap(spark.adb.password -> 123, spark.adb.dbtable -> test_spark_write, spark.adb.user -> adb_to_spark, spark.adb.url -> jdbc:postgresql://10.92.40.38:5432/adb, spark.adb.dbschema -> public)
-
Run the following command to create a
test_spark_write
table in ADB and write data from theres
DataFrame created in the previous example into it:res.write.format("adb").options(options_write).mode(org.apache.spark.sql.SaveMode.Overwrite).save()
-
Check the data availability in ADB (for example, via psql):
-
Table structure:
\d+ public.test_spark_write
Table "public.test_spark_write" Column | Type | Modifiers | Storage | Stats target | Description --------+---------+-----------+---------+--------------+------------- id | integer | | plain | | Distributed by: (id)
-
Table data:
SELECT * FROM public.test_spark_write;
id ---- 2 1 3 (3 rows)
-