Usage example
Before you start using ADQM Spark 3 Connector, you need to find out the following information:
-
ADQM host name;
-
port for interacting with ADQM;
-
user credentials for connection;
-
name of the database to connect to;
-
name of the table to be accessed.
Examples in this article use the following data for testing:
-
ADQM host —
10.92.17.146
; -
port —
9000
; -
ADQM user —
default
(no password); -
database —
default
; -
table to be read from ADQM —
users
:┌─user_id─┬─name───┬───role──┐ │ 1 │ john │ admin│ │ 2 │ mary │ author│ │ 3 │ andrew │ author│ │ 4 │ harry │ reviewer│ │ 5 │ ann │view only│ └─────────┴────────┴─────────┘
ADQM Spark 3 Connector communicates with ADQM using a JDBC connection. The ClickHouse Native JDBC driver comes with the connector.
Commands from examples below should be executed in the interactive shell for Scala — spark3-shell.
Load data from ADQM to Spark 3
-
Specify the connector settings for reading data from the ADQM
default.users
table:val options_read = Map( "spark.adqm.url" -> "jdbc:clickhouse://10.92.17.146:9000", "spark.adqm.user" -> "default", "spark.adqm.dbschema" -> "default", "spark.adqm.dbtable" -> "users")
-
Create a DataFrame:
val adqm_users = spark.read.format("adqm").options(options_read).load()
Result of command execution:
adqm_users: org.apache.spark.sql.DataFrame = [user_id: int, name: string ... 1 more field]
-
Check the DataFrame’s content:
adqm_users.show()
+-------+------+---------+ |user_id| name| role| +-------+------+---------+ | 1| john| admin| | 2| mary| author| | 3|andrew| author| | 4| harry| reviewer| | 5| ann|view only| +-------+------+---------+
Run any SQL query on an ADQM node
-
Set additional settings for a Spark session and enable auxiliary functions:
spark.stop val spark1 = org.apache.spark.sql.SparkSession. builder(). master("local[*]"). appName("spark_example"). config("spark.adqm.url","jdbc:clickhouse://10.92.17.146:9000"). config("spark.adqm.user","default"). getOrCreate() import io.arenadata.spark.adqm.implicits._
-
Create an arbitrary table in ADQM and fill it with test data:
spark1.executeAdqmQueryOnShard("create table default.test_spark(id integer) ENGINE = MergeTree() ORDER BY id;") spark1.executeAdqmQueryOnShard("insert into default.test_spark values(1);") spark1.executeAdqmQueryOnShard("insert into default.test_spark values(2);") spark1.executeAdqmQueryOnShard("insert into default.test_spark values(3);")
-
Create a DataFrame based on the new ADQM table:
val test = spark1.executeAdqmSelectQueryOnShard("select * from default.test_spark;")
test: org.apache.spark.sql.DataFrame = [id: int]
-
Check the DataFrame’s content:
test.show()
+---+ | id| +---+ | 1| | 2| | 3| +---+
-
Check in ADQM (for example, in the clickhouse-client console client) that the table has been created and populated with data:
SELECT * FROM test_spark;
┌─id─┐ │ 1 │ │ 2 │ │ 3 │ └────┘
Write data from Spark 3 to ADQM
-
Specify the connector settings:
val options_write = Map( "spark.adqm.url" -> "jdbc:clickhouse://10.92.17.146:9000", "spark.adqm.user" -> "default", "spark.adqm.dbschema" -> "default", "spark.adqm.dbtable" -> "test_spark_write", "spark.adqm.create.table.engine" -> "MergeTree")
-
Run the following command to create a
test_spark_write
table in ADQM and write data from the DataFrame created in the previous example into it:test.write.format("adqm").options(options_write).mode(org.apache.spark.sql.SaveMode.Overwrite).save()
-
Check in ADQM that data has been added to the new table:
SELECT * FROM test_spark_write;
┌─id─┐ │ 1 │ │ 2 │ │ 3 │ └────┘