Flink SQL Gateway
Flink SQL Gateway — это сервис, позволяющий нескольким клиентам параллельно выполнять запросы Flink SQL. Используя SQL Gateway, можно запускать задачи Flink, выполнять поиск по метаданным и анализировать данные в реальном времени.
SQL SQL Gateway состоит из следующих компонентов:
-
SqlGatewayService, основной задачей которого является обработка SQL-запросов.
-
Подключаемые endpoints, через которые пользователи могут отправлять запросы на сервер.
На момент релиза ADH 3.1.2.b1, основной поддерживаемый endpoint — REST endpoint, подразумевающий взаимодействие с сервисом по HTTP.
Ниже представлена высокоуровневая архитектура Flink SQL Gateway.
Работа с REST endpoint
Схема взаимодействия с использованием REST endpoint представлена ниже.
Основные этапы схемы:
-
Создание сессии. Клиент отправляет POST-запрос на создание сессии. SQL Gateway создает новую сессию и возвращает ее идентификатор —
sessionHandle
, который нужен для дальнейшего общения с сервером. Сессия существует в течение настраиваемого периода времени, а срок ее жизни может быть продлен вручную. -
Отправка запроса. После создания сессии клиент отправляет SQL-запрос на сервер SQL Gateway. Для каждого запроса SQL Gateway создает новую сущность Operation и возвращает ее идентификатор (
operationHandle
), необходимый в будущем для получения результатов запроса. Экземпляр Operation может быть принудительно отменен/закрыт для освобождения ресурсов. -
Получение результатов. Используя
operationHandle
, клиент получает результаты из ранее созданного экземпляра Operation. Если вычисления по запросу готовы, SQL Gateway возвращает пакет результатов и URI, указывающий на следующий пакет. Если SQL Gateway вернул все результаты для данного запроса, возвращается специальный ответ с полем"resultType": "EOS"
.
REST URL сервиса SQL Gateway доступен на странице Clusters → <YOUR_CLUSTER> → Services → Flink → Info в ADCM, например: http://ka-adh-1.ru-central1.internal:8083/v1/info. Полная справочная информация о SQL Gateway REST API доступна в документации Flink.
Пример использования
Ниже представлен пример взаимодействия с SQL Gateway, демонстрирующий основные операции с использованием REST endpoint. Вы можете выполнить следующие шаги, используя curl, или можете скачать Postman-коллекцию с готовыми запросами.
-
Проверьте доступность сервиса SQL Gateway.
$ curl http://<sql-gateway-host>:8083/v1/info
В ответе содержится базовая информация о сервисе.
{"productName":"Apache Flink","version":"1.16.2"}
-
Создайте новую сессию.
$ curl -X POST http://<sql-gateway-host>:8083/v1/sessions
Сервер возвращает
sessionHandle
, который уникально идентифицирует созданную сессию.{"sessionHandle":"97fa59cf-6aa9-440b-b5cb-191f4167f0fb"}
-
Используя полученный
sessionHandle
, отправьте SQL-запрос.$ curl -X POST http://<sql-gateway-host>:8083/v1/sessions/{sessionHandle}/statements/ -d @test_query.json
В этом примере файл test_query.json содержит тестовый SQL-запрос, выполняющий подсчет количества слов, и выглядит следующим образом:
{ "statement": "SELECT word, SUM(frequency) AS `count` FROM ( VALUES ('Hello', 1), ('Ciao', 1), ('Hello', 2) ) AS WordTable(word, frequency) GROUP BY word" }
В ответе содержится
operationHandle
, который идентифицирует отправленный запрос.{"operationHandle":"b18e38b0-8a9b-4f36-9121-7d9f2eb46b81"}
-
Получите результаты запроса.
$ curl http://<sql-gateway-host>:8083/v1/sessions/{sessionHandle}/operations/{operationHandle}/result/0
Ответ от сервера имеет следующий вид.
{"results":{"columns":[{"name":"word","logicalType":{"type":"VARCHAR","nullable":false,"length":5},"comment":null},{"name":"count","logicalType":{"type":"INTEGER","nullable":false},"comment":null}],"data":[{"kind":"INSERT","fields":["Hello",1]},{"kind":"INSERT","fields":["Ciao",1]},{"kind":"UPDATE_BEFORE","fields":["Hello",1]},{"kind":"UPDATE_AFTER","fields":["Hello",3]}]},"resultType":"PAYLOAD","nextResultUri":"/v1/sessions/b9ea29d4-e373-463f-bd9a-a943d853b093/operations/c5df0878-e102-4e02-a37d-3293aa53c038/result/1"}
Обратите внимание на поля
"resultType":"PAYLOAD"
и"nextResultUri":"<nextURI>"
. В последнем содержится URI, указывающий на следующий пакет с данными. Если при обращении к<nextURI>
ответ содержит"resultType": "EOS"
, это означает, что SQL Gateway вернул все результаты для данного запроса. -
Чтобы продлить срок действия сессии, отправьте heartbeat-запрос.
$ curl -X POST http://<sql-gateway-host>:8083/v1/sessions/{sessionHandle}/heartbeat
Ответ
200 OK
указывает, что таймер жизни сессии был сброшен.
Кроме вышеупомянутых запросов существуют и другие полезные операции, например, отмена/остановка/проверка статуса операции и так далее. Подробная информация о REST API доступна в документации Flink.