Yupana
Yupana -- аналитическая платформа с открытым исходным кодом для анализа больших данных.
Она обеспечивает:
- перевод транзакционной информации в форму пригодную для бизнес анализа;
- хранение обработанной информации в формате оптимизированном для выполнения аналитических задач на многомерных временных рядах;
- массовую и Online обработку данных.
Платформа поддерживает SQL-подобный синтаксис запросов, которые могут выполнятся на выделенном сервере или в кластере Apache Spark.
В состав Yupana также входит набор примеров использования, который может быть использован как стартовая точка для реализации аналитической платформы для решения реальных задач.
Оглавление
Общие сведения о Yupana
Архитектура Yupana не привязана к конкретному хранилищу. Существующая реализация использует в качестве хранилища Apache HBase. Данные сохраняются в виде отдельных временных рядов.
Временной ряд — это собранные в разные моменты времени данные о значении наблюдаемых параметров.
Структура временных рядов:
- Время измерения -- обязательная размерность временного ряда, является частью первичного составного ключа. При выполнении запросов всегда должны быть указаны ограничения по времени;
- Измерения -- поля сущности, которые являются частью первичного составного ключа и позволяют выполнять быстрый поиск. Например: идентификатор устройства или название товара;
- Метрики -- значения наблюдений. Например: сумма и количество;
- Внешние связи -- интерфейсы отображения и/или группировки размерностей, которые позволяют определить древовидные размерности временного ряда. Например: Город отображается в уникальный идентификатор устройства.
Начало работы
Системные требования
- JDK 8;
- GNU/Linux (работа на других окружениях не проверялась);
- Apache HBase 2.4.x с поддержкой сжатия Snappy;
- Apache Spark 3.0.x для запуска запросов на кластере. Кроме того, в прилагаемых примерах загрузка данных также производится из Spark-приложения, хотя это и не является обязательным условием;
- Кластер Apache Ignite 2.8.0 при использовании распределенных кэшей в Ignite (опционально);
- sbt -- для сборки проекта.
Сборка проекта
Сборка проекта осуществляется с помощью sbt. Некоторые команды в sbt shell:
- compile -- компиляция проекта
- test -- запуск юнит-тестов
- assembly -- сборка толстых jar-ов, применяется в yupana-jdbc и yupana-examples
Подготовка окружения
- Установить hbase 2.4.x
- Для нативной поддержки snappy небходимо скопировать нативные библиотеки из сборки hadoop 2.7 в выбранную папку и добавить в hbase-env.sh строки
export JAVA_LIBRARY_PATH=$JAVA_LIBRARY_PATH:/path/to/native/lib
Запуск
Модуль yupana-examples
содержит пример использования Yupana для анализа транзакций, которые могут быть использованы в качестве основы при реализации собственных аналитических систем. В примере реализована схема данных основанная на схеме из пакета yupana-schema с добавлением двух внешних связей (каталог адресов и каталог организаций). Каталог адресов (AddressCatalog) использует внутреннюю логику для отображения идентификатора кассы на город. Каталог организаций (OrganisationCatalog) отображает кассы на информацию об организации: тип организации (например аптека, супермаркет) и обезличенный идентификатор. Каталог использует данные из внешнего источника -- базы данных PostgreSQL.
Для запуска примеров необходима база данных PostgreSQL. По умолчанию используется база данных yupana-example
на localhost
. Базу необходимо создать до запуска примера и миграции:
CREATE DATABASE yupana_example;
CREATE USER yupana WITH ENCRYPTED PASSWORD 'yupana';
GRANT CONNECT ON DATABASE yupana_example TO yupana;
После создания базы можно мигрировать:
sbt examples/flywayMigrate
Для запуска миграций с альтернативным адресом сервера PostgreSQL можно использовать команду:
sbt -Dflyway.url=jdbc:postgresql://server:port/db_name -Dflyway.user=db_user examples/flywayMigrate
1. Server
Реализация сервера на базе yupana-akka.
Запуск из sbt:
examples/runMain org.yupana.examples.server.Main
Настройки приложения в файле yupana-examples/src/main/resources/application.conf
. По умолчанию сервер слушает порт 10101
.
Для подключения к серверу нужен JDBC драйвер. Его можно собрать командой sbt jdbc/assembly
. Пакет с драйвером будет сохранен в файл: yupana/yupana-jdbc/target/scala-2.12/yupana-jdbc-assembly-{версия_проекта}-SNAPSHOT.jar
. Для соединения с сервером с использованием Yupana JDBC нужно указать следующие параметры:
- URL:
jdbc:yupana://localhost:10101
- Class name (класс драйвера):
org.yupana.jdbc.YupanaDriver
2. ETL
Приложение эмулирует добавление данных Yupana. Данные генерируются случайным образом.
Для запуска есть скрипт deploy_etl.sh
. Подразумевается что Apache Spark со scala 2.12 установлен в /opt/spark
или задана переменная окружения SPARK_HOME
. Перед запуском скрипта необходимо собрать толстый JAR (в sbt examples/assembly
).
3. QueryRunner
Приложение для запуска запросов к Yupana на кластере Apache Spark. Результаты сохраняются в виде CSV файла.
SQL запрос для запуска и путь для сохранения результатов задается в query-runner-app.conf
.
Запуск осуществляется скриптом deploy_query_runner.sh
Адаптация Yupana к существующему окружению
Модуль yupana-examples
может быть использован в качестве основы для создания собственной аналитической системы. Для этого потребуется:
- Определить и реализовать внешние связи для доступа к существующим источникам данных.
- Определить схему данных на основе существующей схемы.
- Приведенная в примерах реализация сервера запросов является минимально полной, достаточно использовать схему реализованную на шаге 2 схему в сервере. Однако для интеграции сервера в существующую инфраструктуру скорее всего понадобятся некоторые изменения (например чтение настроек из другого источника, использование дополнительных настроек для внешних источников и др).
- Реализовать ETL процесс для наполнения базы данными. Для периодического наполнения можно использовать Spark RDD, а для потокового DStream.
Yupana SQL
Для выполнения запросов Yupana поддерживает собственный диалект SQL. Поддерживаются следующие операции:
-
SELECT
-- выборка данных. -
UPSERT
-- вставка данных. -
SHOW TABLES
-- вывод списка таблиц. -
SHOW COLUMNS FROM <table_name>
-- вывод списка полей таблицы. -
SHOW QUERIES
-- просмотр истории запросов. -
KILL QUERY
-- остановка запроса. -
SHOW UPDATES_INTERVALS WHERE TABLE = '<tableName>' AND UPDATED_AT BETWEEN <FROM> AND <TO>
- вывод списка изменений временных рядов, произошедших в указанный период`tableName` - имя таблицы (сейчас используется только 'receipt' и 'receipt_by_day') `FROM` - Начало периода `TO` - Конец периода
Правила наименования полей
- Время для любой схемы указывается как поле
time
типа TIMESTAMP. Доступны следующие функции для работы со временем:trunc_second
,trunc_minute
,trunc_hour
,trunc_day
,trunc_month
,trunc_year
.extract_second
,extract_minute
,extract_hour
,extract_day
,extract_month
,extract_year
.
При работе с драйвером следует учитывать, что выражение WHERE обязательно и должно содержать временной интервал time >= x and time < y
.
- Поля таблицы указываются:
- как есть (quantity или "quantity")
- с указанием таблицы ("kkm_items"."quantity" или kkm_items.quantity)
-
Размерности указываются как есть (например kkmId)
-
Поля внешних связей указываются в виде
имясвязи_имяполя
(например ItemsInvertedIndex_phrase). -
Фильтровать можно по размерностям, метрикам и полям внешних связей, времени используя =, !=, IN, IS NULL/NOT NULL для строк и =, !=, >, >=, <, <=, IN, IS NULL/NOT NULL для остальных типов.
Литералы
Поддерживаются литералы следующих типов:
- Строки:
'Hello!'
- Числа (целые либо с плавающей запятой):
42
или1234.567
- Даты:
TIMESTAMP '2018-08-06'
TIMESTAMP '2018-08-06 16:24:50'
TIMESTAMP '2018-08-06 16:24:50.123'
{ ts '2017-06-13' }
{ ts '2017-06-13 09:15:44' }
{ ts '2017-06-13 09:15:44.666' }
- Интервалы:
INTERVAL '06:00:00'
-- 6 часовINTERVAL '1 12:00:00'
-- 1 день и 12 часовINTERVAL '1' HOUR
-- 1 часINTERVAL '30' MINUTE
-- 30 минутINTERVAL '2 12' DAY TO HOUR
-- 2 дня 12 часовINTERVAL '6 12:30' DAY TO MINUTE
-- 6 дней 12 часов 30 минутINTERVAL '3' MONTH
-- 3 месяцаINTERVAL '1' YEAR
-- 1 годINTERVAL '3-10' MONTH TO DAY
-- 3 месяца и 10 дней
- Массивы:
{1, 2, 3,}
,{ 'one', 'two', 'three' }
.
И т.д. Важно понимать, что интервалы содержащие месяца и/или годы не могут быть использованы при сравнении длительности интервала между двумя датами. Это обуславливается тем что длина месяца или года зависит от определенной даты.
При выполнении математических операций (плюс или минус) над интервалами можно использовать любые интервалы.
Примеры запросов
Суммы продаж для указанной кассы за указанный период с разбивкой по дням:
SELECT sum(sum), day(time) as d, kkmId
FROM items_kkm
WHERE time >= TIMESTAMP '2019-06-01' AND time < TIMESTAMP '2019-07-01' AND kkmId = '10'
GROUP BY d, kkmId
Суммы продаж товаров в которых встречается слово "штангенциркуль" за указанный период с разбивкой по дням:
SELECT sum(sum), day(time) as d, kkmId
FROM items_kkm
WHERE time >= TIMESTAMP '2019-06-01' AND time < TIMESTAMP '2019-07-01' AND itemsInvertedIndex_phrase = 'штангенциркуль'
GROUP BY d, kkmId
Первой и последней продажи селедки за сутки:
SELECT min(time) as mint, max(time) as maxt, day(time) as d
FROM items_kkm
WHERE time >= TIMESTAMP '2019-06-01' AND time < TIMESTAMP '2019-07-01' and itemsInvertedIndex_phrase = 'селедка'
GROUP BY d
Считаем количество продаж товаров, купленных в количестве больше 10:
SELECT item, sum(CASE
WHEN quantity > 9 THEN 1
ELSE 0 )
FROM items_kkm
WHERE time >= TIMESTAMP '2019-06-01' AND time < TIMESTAMP '2019-07-01'
GROUP BY item
Применяем фильтры после расчета оконной функции:
SELECT
kkmId,
time AS t,
lag(time) AS l
FROM receipt
WHERE time >= TIMESTAMP '2019-06-01' AND time < TIMESTAMP '2019-07-01'
GROUP BY kkmId
HAVING
((l - t) > INTERVAL '2' HOUR AND extract_hour(t) >= 8 AND extract_hour(t) <= 18) OR
((l - t) > INTERVAL '4' HOUR AND extract_hour(t) > 18 OR extract_hour(t) < 8)
Выбираем предыдущие три месяца:
SELECT sum(sum), day(time) as d, kkmId
FROM items_kkm
WHERE time >= trunc_month(now() - INTERVAL '3' MONTH) AND time < trunc_month(now())
GROUP BY d, kkmId
Агрегация по выражению:
SELECT kkmId,
(CASE WHEN totalReceiptCardSum > 0 THEN 1 ELSE 0) as paymentType
FROM items_kkm
WHERE time >= TIMESTAMP '2019-06-01' AND time < TIMESTAMP '2019-07-01'
GROUP BY paymentType, kkmId
Используем арифметику (+
, -
, *
, /
):
SELECT sum(totalSum) as ts, sum(cardSum) * max(cashSum) / 2 as something
FROM receipt
WHERE time >= TIMESTAMP '2019-06-01' AND time < TIMESTAMP '2019-07-01' AND kkmId = '11'
GROUP BY kkmId
Группируем колбасу по вкусу и считаем сумму:
SELECT
item,
case
when contains_any(tokens(item), tokens('вареная')) then 'вареная'
when contains_any(tokens(item), tokens('соленая')) then 'соленая'
else 'невкусная' as taste,
sum(sum)
FROM items_kkm
WHERE time >= TIMESTAMP '2019-06-01' AND time < TIMESTAMP '2019-07-01' AND itemsInvertedIndex_phrase = 'колбаса'
GROUP BY item, taste
Функции
Функция | Тип функции | Типы аргументов | Тип значения | Описание |
---|---|---|---|---|
min | агрегация | число, строка, время | тот же | Минимальное значение. Для строковых значение в лексикографическом порядке |
max | агрегация | число, строка, время | тот же | Максимальное значение. Для строковых значение в лексикографическом порядке |
sum | агрегация | число | тот же | Сумма |
count | агрегация | любой | число | Количество |
distinct_count | агрегация | любой | число | Количество уникальных значений |
lag | оконная | любой | тот же | Предыдущее значение в группе записей. Группа определяется в запросе в секции группировки. Сортировка по времени. |
trunc_year | унарная | время | время | Округление времени до года |
trunc_month | унарная | время | время | Округление времени до месяца |
trunc_day | унарная | время | время | Округление времени до дня |
trunc_hour | унарная | время | время | Округление времени до часа |
trunc_minute | унарная | время | время | Округление времени до минуты |
trunc_second | унарная | время | время | Округление времени до секунды |
exract_year | унарная | время | число | Извлечение значения года из времени |
exract_month | унарная | время | число | Извлечение значения месяца из времени |
exract_day | унарная | время | число | Извлечение значения дня из времени |
exract_hour | унарная | время | число | Извлечение значения часа из времени |
exract_minute | унарная | время | число | Извлечение значения минуты из времени |
exract_second | унарная | время | число | Извлечение значения секунды из времени |
abs | унарная | число | число | Значение числа по модулю |
tokens | унарная | строка | массив строк | Получение стемированых транслитерированых строк из строки |
tokens | унарная | массив строк | массив строк | Получение стемированых транслитерированых строк из массива строк |
split | унарная | строка | массив строк | Разбиение строки на слова по пробелам |
length | унарная | строки, массивы | строки, массивы | Длина строки или количество элементов в массиве |
array_to_string | унарная | массив | строка | Преобразование массивы в строку в формате "( a, b, .., n)" |
id | унарная | размерность | число | Идентификатор значения размерности в словаре |
+ | инфиксная | число, строка, интервал | тот же | Сложение |
- | инфиксная | число | тот же | Вычитание |
* | инфиксная | число | тот же | Умножение |
/ | инфиксная | число | тот же | Деление |
+ | инфиксная | время и интервал | время | Сложение |
- | инфиксная | время и интервал | время | Вычитание |
- | инфиксная | время и время | интервал | Вычитание |
= | инфиксная | число, строка, время | логический | Сравнение на равенство |
<> или != | инфиксная | число, строка, время | логический | Сравнение на неравенство |
> | инфиксная | число, строка, время | логический | Сравнение на больше |
< | инфиксная | число, строка, время | логический | Сравнение на меньше |
>= | инфиксная | число, строка, время | логический | Сравнение на больше или равно |
<= | инфиксная | число, строка, время | логический | Сравнение на меньше или равно |
contains | бинарная | массив и тип элемента | логический | True если массив содержит элемент, иначе False |
contains_all | бинарная | массив и массив | логический | True если массив1 содержит все элементы массива2, иначе False |
contains_any | бинарная | массив и массив | логический | True если массив1 содержит хотя бы один элемент из массива2, иначе False |
contains_same | бинарная | массив и массив | логический | True если массив1 содержит те же элементы что и массив2 (в любом порядке) |
Типы функций
- Агрегация -- функция вычисляющая общее значение из множества значений (например сумму или максимум). Агрегации не могут использоваться вместе с оконными функциями.
- Оконная -- функция вычисляющая общее значение из множества значении и их порядка. Оконные функции не могут использоваться вместе с агрегациями. Не поддерживаются в реализации TSDB для Spark.
- Унарная -- функция над одним значением (например length или tokens).
- Инфиксная -- функция над двумя значениями, в SQL записывается между аргументами (например + или -).
- Бинарная -- функция с двумя значениями, например contains_all.
Кроме того, поддерживаются следующие SQL выражения:
Выражение | Описание |
---|---|
x IN (1, 2 .. z) |
Проверка что x является одним из элементов заданного множества констант |
x NOT IN (3, 4, .. z) |
Проверка что x не является одним из элементов заданного множества констант |
x IN NULL |
Проверка что значение x не определено |
x IS NOT NULL |
Проверка что значение x определено |
x BETWEEN 1 AND 10 |
То же самое что x >= 1 AND x <= 10 |
Добавление данных
Для добавления используется команда UPSERT
. При этом необходимо заполнить данные всех размерностей, время и необходимые измерения.
UPSERT INTO kkm_items(kkmId, item, operation_type, position, time, sum, quantity)
VALUES ('12345', 'Пряник тульский', '1', '1', TIMESTAMP '2020-01-10 16:02:30', 100, 1)
Можно добавлять одновременно несколько значений:
UPSERT INTO kkm_items(kkmId, item, operation_type, position, time, sum, quantity) VALUES
('12345', 'Пряник тульский', '1', '1', TIMESTAMP '2020-01-10 16:02:30', 300, 5),
('12345', 'Чай индийский', '1', '1', TIMESTAMP '2020-01-10 16:02:30', 100, 1)
Структура проекта
yupana-api
Этот модуль содержит определения базовых примитивов Yupana таких как типы данных и операции над ними, таблица, схема, свертка, внешняя связь и др.
yupana-core
Реализация ядра хранилища. В этом модуле содержится реализация работы с временными рядами, вне зависимости от типа используемого хранилища.
yupana-hbase
Реализация хранилища поверх HBase.
yupana-proto
Протокол взаимодействия между JDBC драйвером и сервером.
yupana-jdbc
JDBC драйвер для Yupana.
yupana-akka
Базовые части для реализации сервера поверх Akka. Принимает запросы через TCP.
yupana-spark
Реализация TSDB работающая поверх HBase внутри Apache Spark.
Настройки
- spark.hbase.regions.initial.max - настройка ограничивающая максимальное количество регионов при создании таблицы временного ряда в hbase, по-умолчанию 50.
Начальное кол-во регионов считается как количество интервалов, которое помещается во временной ряд (текущее время + год - начало эпохи хранения данных table.epochTime делённое на rowTimeSpan и умноженное на 10). Если полученное значение превышает значение настройки hbase.regions.initial.max, то используется значение настройки.
yupana-schema
Минимальное определение схемы для выполнения аналитики ОФД.
yupana-external-links
Реализация внешних связей, таких как инвертированный индекс, поиск сопутствующих товаров и связи на базе SQL таблиц.
yupana-caffeine
Реализация кэшей на базе Caffeine.
yupana-ehcache
Реализация кэшей на базе EhCache.
yupana-ignite
Реализация кэшей на базе Apache Ignite.
yupana-examples
Пример использования Yupana. Содержит типичный набор приложений для работы с Yupana: сервер обработки запросов, ETL для загрузки данных и приложения для запуска тяжелых вычислений на Spark.
Кэширование
Для ускорения работы Yupana использует кэши. Имеется несколько реализаций кэшей для Yupana:
yupana-ehache
yupana-caffeine
yupana-ignite
При использовании Yupana рекомендуется выбрать одну или несколько реализаций кэшей и сделать соответствующие настройки.