ADQM Spark 3 Connector overview

Arenadata QuickMarts (ADQM) is a column-based database management system for online query processing, which is based on the open-source ClickHouse DBMS.

Apache Spark 3 is an open-source framework for distributed processing of unstructured and semi-structured data, a part of the ecosystem of Hadoop projects.

ADQM Spark 3 Connector provides high-speed parallel data exchange between Apache Spark 3 and Arenadata QuickMarts.

Architecture

Each Spark 3 application consists of a controlling process (driver) and a number of distributed worker processes (executors). The interaction scheme is provided below.

ADQM Spark 3 Connector architecture
ADQM Spark 3 Connector architecture
ADQM Spark 3 Connector architecture
ADQM Spark 3 Connector architecture

Read data

To load an ADQM table into Spark 3, ADQM Spark 3 Connector initializes a driver via a connection string specified by the spark.adqm.url option. If the spark.adqm.cluster.name option is specified, the driver, connecting to spark.adqm.url via JDBC, also reads metadata that contains information about all shards and replicas of the specified ADQM cluster. This allows Spark 3 executors to effective read data from shard tables in parallel.

Data in ADQM is stored on shards. A Spark 3 application that uses the connector can receive data from each shard into one or more Spark 3 partitions. In the case of reading from Distributed tables, data is read from the corresponding MergeTree table on each shard in parallel.

There are currently three partitioning modes:

NOTE

The information is valid for cases when the spark.adqm.cluster.name option is specified and not specified.

  • According to shard_id. Data from ADQM tables is read and distributed across Spark 3 partitions depending on a shard number. The number of Spark 3 partitions corresponds to the number of active ADQM shards. This partitioning mode does not require additional parameters and is used by default.

  • According to the specified column and number of partitions. Data is divided into Spark 3 partitions according to value ranges of the specified column by the specified number of partitions. The number of Spark 3 partitions corresponds to the number of ranges. It is necessary to set the spark.adqm.partition.column and spark.adqm.partition.count parameters (see ADQM Spark 3 Connector options). This mode has limitations: only an integer or date/time table field can be used as a column.

  • According to the specified column. Data is divided into Spark 3 partitions depending on unique values of the specified column. The number of Spark 3 partitions corresponds to the number of unique values in the specified column. You should set the spark.adqm.partition.column option (see ADQM Spark 3 Connector options). This mode is recommended for small and limited sets of values.

With this architecture, each Spark 3 executor is assigned a data processing task and a corresponding partition from previously created ones. Data exchange is performed in parallel for each partition with each shard (if the spark.adqm.cluster.name option is specified — with each shard of this cluster, otherwise — with shards specified in the spark.adqm.url option).

Data reading algorithm
Data reading algorithm
Data reading algorithm
Data reading algorithm

Write data

To load a table from Spark 3 into ADQM, ADQM Spark 3 Connector initializes a driver via a connection string specified by the spark.adqm.url option. If the spark.adqm.cluster.name option is specified, the driver, when connecting to spark.adqm.url via JDBC, also reads metadata that contains information about all shards and replicas of the specified ADQM cluster. Depending on the writing mode, each shard is prepared for data loading (tables are created or cleared).

The following writing modes are currently supported:

  • append. Additional data is written to the target table. If the spark.adqm.cluster.name option is specified, the connector attempts to write data to each shard of the cluster, otherwise — only to shards specified in spark.adqm.url. If any of the shards does not have a table to be appended to, the loading process fails. In this mode, a target table must be pre-created on all shards.

  • errorIfExists. If the target table exists, an error occurs. This mode assumes that the target table does not exist, it is created during the loading process. For this mode, you should use the spark.adqm.create.table.engine option to specify an engine of the target table to be created. If the spark.adqm.cluster.name option is specified, then the target table with the specified engine is created on each shard of the cluster, otherwise — the target table is created on shards specified in the spark.adqm.url option.

  • overwrite. Table data is overwritten. Depending on spark.adqm.table.truncate, the target table is either completely deleted, or all data is cleaned up before the writing process. In the case of a complete deletion of a table, the writing process is similar to the errorIfExists mode, in the case of a cleanup, it is similar to the append mode.

To specify additional settings for target tables to be created, you can use more spark.adqm.create.table options (see ADQM Spark 3 Connector Options).

A data loading task is created for each Spark 3 partition. Data exchange is performed in parallel with each shard (if the spark.adqm.cluster.name option is specified — with each shard of the specified cluster, otherwise — only with shards specified in the spark.adqm.url option).

Data writing algorithm
Data writing algorithm
Data writing algorithm
Data writing algorithm

Supported table engines

Table engine is an ADQM entity that determines how data is stored, how to write it, where to read it from, which data access is provided, and also specifies query, index and replication parameters (for more information, see Table engines in the ADQM documentation).

ADQM Spark 3 Connector supports the following ADQM table engines:

  • MergeTree engine family. These engines are designed to quickly record a huge amount of data in parts according to the specified rules. The main MergeTree engine is supported, as well as its special versions:

    • AggregatingMergeTree;

    • CollapsingMergeTree;

    • VersionedCollapsingMergeTree;

    • ReplacingMergeTree;

    • SummingMergeTree;

    • ReplicatedMergeTree;

    • ReplicatedAggregatingMergeTree;

    • ReplicatedCollapsingMergeTree;

    • ReplicatedVersionedCollapsingMergeTree;

    • ReplicatedReplacingMergeTree;

    • ReplicatedSummingMergeTree.

  • Distributed engine. It does not store data, but allows you to process queries distributed across multiple servers, automatically performing parallel reading (read more about creating and using Distributed tables in the Sharding article of the ADQM documentation).

Supported data types

Tables below show how ADQM and Spark 3 data types are matched during data transfers.

Transfer from ADQM to Spark 3
ADQM Spark 3

Date

DateType

DateTime

TimestampType

DateTime64

TimestampType

Decimal

DecimalType

Decimal128

DecimalType

Decimal256

DecimalType

Decimal32

DecimalType

Decimal64

DecimalType

Float32

FloatType

Float64

DoubleType

IPv4

LongType

Int16

IntegerType

Int32

IntegerType

Int64

LongType

Int8

ShortType

String

StringType

UInt16

IntegerType

UInt32

LongType

UInt64

LongType

UInt8

ShortType

UUID

StringType

float8

DoubleType

Transfer from Spark 3 to ADQM
Spark 3 ADQM

BinaryType

String

BooleanType

UInt8

ByteType

Int8

DateType

Date

DecimalType

Decimal

DoubleType

Float64

FloatType

Float32

IntegerType

Int32

LongType

Int64

ShortType

Int16

StringType

String

TimestampType

DateTime64

Found a mistake? Seleсt text and press Ctrl+Enter to report it