ADQM Spark Connector overview
Arenadata QuickMarts (ADQM) is a column-based database management system for online query processing, which is based on the open-source ClickHouse DBMS.
Apache Spark is an open-source framework for distributed processing of unstructured and semi-structured data, a part of the ecosystem of Hadoop projects.
ADQM Spark Connector provides high-speed parallel data exchange between Apache Spark and Arenadata QuickMarts.
Architecture
Each Spark application consists of a controlling process (driver) and a number of distributed worker processes (executors). The interaction scheme is provided below.
Read data
To load an ADQM table into Spark, ADQM Spark Connector initializes a driver via a connection string specified by the spark.adqm.url
option. If the spark.adqm.cluster.name
option is specified, the driver, connecting to spark.adqm.url
via JDBC, also reads metadata that contains information about all shards and replicas of the specified ADQM cluster. This allows Spark executors to effective read data from shard tables in parallel.
Data in ADQM is stored on shards. A Spark application that uses the connector can receive data from each shard into one or more Spark partitions. In the case of reading from Distributed tables, data is read from the corresponding MergeTree table on each shard in parallel.
There are currently three partitioning modes:
NOTE
The information is valid for cases when the |
-
According to shard_id. Data from ADQM tables is read and distributed across Spark partitions depending on a shard number. The number of Spark partitions corresponds to the number of active ADQM shards. This partitioning mode does not require additional parameters and is used by default.
-
According to the specified column and number of partitions. Data is divided into Spark partitions according to value ranges of the specified column by the specified number of partitions. The number of Spark partitions corresponds to the number of ranges. It is necessary to set the
spark.adqm.partition.column
andspark.adqm.partition.count
parameters (see ADQM Spark Connector options). This mode has limitations: only an integer or date/time table field can be used as a column. -
According to the specified column. Data is divided into Spark partitions depending on unique values of the specified column. The number of Spark partitions corresponds to the number of unique values in the specified column. You should set the
spark.adqm.partition.column
option (see ADQM Spark Connector options). This mode is recommended for small and limited sets of values.
With this architecture, each Spark executor is assigned a data processing task and a corresponding partition from previously created ones. Data exchange is performed in parallel for each partition with each shard (if the spark.adqm.cluster.name
option is specified — with each shard of this cluster, otherwise — with shards specified in the spark.adqm.url
option).
Write data
To load a table from Spark into ADQM, ADQM Spark Connector initializes a driver via a connection string specified by the spark.adqm.url
option. If the spark.adqm.cluster.name
option is specified, the driver, when connecting to spark.adqm.url
via JDBC, also reads metadata that contains information about all shards and replicas of the specified ADQM cluster. Depending on the writing mode, each shard is prepared for data loading (tables are created or cleared).
The following writing modes are currently supported:
-
append
. Additional data is written to the target table. If thespark.adqm.cluster.name
option is specified, the connector attempts to write data to each shard of the cluster, otherwise — only to shards specified inspark.adqm.url
. If any of the shards does not have a table to be appended to, the loading process fails. In this mode, a target table must be pre-created on all shards. -
errorIfExists
. If the target table exists, an error occurs. This mode assumes that the target table does not exist, it is created during the loading process. For this mode, you should use thespark.adqm.create.table.engine
option to specify an engine of the target table to be created. If thespark.adqm.cluster.name
option is specified, then the target table with the specified engine is created on each shard of the cluster, otherwise — the target table is created on shards specified in thespark.adqm.url
option. -
overwrite
. Table data is overwritten. Depending onspark.adqm.table.truncate
, the target table is either completely deleted, or all data is cleaned up before the writing process. In the case of a complete deletion of a table, the writing process is similar to theerrorIfExists
mode, in the case of a cleanup, it is similar to theappend
mode.
To specify additional settings for target tables to be created, you can use more spark.adqm.create.table
options (see ADQM Spark Connector Options).
A data loading task is created for each Spark partition. Data exchange is performed in parallel with each shard (if the spark.adqm.cluster.name
option is specified — with each shard of the specified cluster, otherwise — only with shards specified in the spark.adqm.url
option).
Supported table engines
Table engine is an ADQM entity that determines how data is stored, how to write it, where to read it from, which data access is provided, and also specifies query, index and replication parameters (for more information, see Table engines in the ADQM documentation).
ADQM Spark Connector supports the following ADQM table engines:
-
MergeTree engine family. These engines are designed to quickly record a huge amount of data in parts according to the specified rules. The main MergeTree engine is supported, as well as its special versions:
-
AggregatingMergeTree;
-
CollapsingMergeTree;
-
VersionedCollapsingMergeTree;
-
ReplacingMergeTree;
-
SummingMergeTree;
-
ReplicatedMergeTree;
-
ReplicatedAggregatingMergeTree;
-
ReplicatedCollapsingMergeTree;
-
ReplicatedVersionedCollapsingMergeTree;
-
ReplicatedReplacingMergeTree;
-
ReplicatedSummingMergeTree.
-
-
Distributed engine. It does not store data, but allows you to process queries distributed across multiple servers, automatically performing parallel reading (read more about creating and using Distributed tables in the Sharding article of the ADQM documentation).
Supported data types
Tables below show how ADQM and Spark data types are matched during data transfers.
ADQM | Spark |
---|---|
Date |
DateType |
DateTime |
TimestampType |
DateTime64 |
TimestampType |
Decimal |
DecimalType |
Decimal128 |
DecimalType |
Decimal256 |
DecimalType |
Decimal32 |
DecimalType |
Decimal64 |
DecimalType |
Float32 |
FloatType |
Float64 |
DoubleType |
IPv4 |
LongType |
Int16 |
IntegerType |
Int32 |
IntegerType |
Int64 |
LongType |
Int8 |
ShortType |
String |
StringType |
UInt16 |
IntegerType |
UInt32 |
LongType |
UInt64 |
LongType |
UInt8 |
ShortType |
UUID |
StringType |
float8 |
DoubleType |
Spark | ADQM |
---|---|
BinaryType |
String |
BooleanType |
UInt8 |
ByteType |
Int8 |
DateType |
Date |
DecimalType |
Decimal |
DoubleType |
Float64 |
FloatType |
Float32 |
IntegerType |
Int32 |
LongType |
Int64 |
ShortType |
Int16 |
StringType |
String |
TimestampType |
DateTime64 |