блог о bi, №1 в рунете

Автоматизация процессирования сегментов кубов Kylin посредством NiFi

Apache Kylin

В последнее время программное обеспечение с открытым исходным кодом стало набирать популярность и активно использоваться. Наша компания последовала основному тренду и начала осваивать новый стек технологий Apache. Спустя несколько успешных проектов мы хотим поделиться опытом разработки и некоторыми фичами.

Выполнив разработку аналитического решения в Apache Kylin, мы столкнулись со сложностями автоматизации обновления сегментов кубов Kylin после загрузки данных в хранилище. Если на этапе разработки выполнение этих действий в ручном режиме не вызывает сложностей, то при внедрении решения на продуктив это является проблемой.



Для автоматизации операций по обработке данных используется открытое программное обеспечение Apache NiFi. Задачей на разработку является автоматический запуск процессирования сегментов кубов посредством RestAPI с использованием процессора ExecuteScript.



Код запроса RestAPI был сгенерирован программно в Postman. ПО предоставляет возможность выбора языка программирования, который должен использоваться. В пересечение возможных к использованию языков в ExecuteScript (NiFi) и Postman попадают Python и Ruby. Тестирование показало, что для выполнения HTTP запросов на Python, необходимо до установить библиотеки, а на Ruby, достаточно конфигурации «из коробки». Это повлияло на выбор языка программирования – Ruby.


Кроме кода самого запроса необходимо разработать алгоритм выполнения запросов для ряда кубов, а также функционал передачи даты начала и конца периода данных сегмента.



Для хранения информации о кубах создан двумерный массив с именами кубов и средним временем обработки его сегмента. Цикл выполняет проход по массиву, имя куба используется в строке url, а среднее время обработки секции в конце цикла в качестве аргумента функции sleep для последовательного запуска процессирования сегментов (Kylin сконфигурирован таким образом, что все ресурсы сервера задействуются при запуске одного задания Spark, при параллельном запуске задание будет уходить в ожидание на рандомный период).


Для определения даты начала и конца периода данных сегмента устанавливается дата начала отсчёта, первое число текущего месяца: получаем год и месяц текущей даты и подставляем первое число. Задаём число n - количество обновляемых сегментов в кубе. Для каждого куба в цикле от 0 до n в обратном порядке (т. е. начиная с n и заканчивая 0) вычитаем переданное количество месяцев из даты начала отсчёта, получая startTime, и на 1 меньше переданного количества месяцев для endTime. В теле HTTP запроса даты преобразуем в тип Unix Timestamp в миллисекундах, аргумент "buildType": "BUILD" возможно использовать как для обновления существующего сегмента, так и для создания нового. Код представлен на листинге 1.


Листинг 1 – скрипт автоматизации процессирования секций кубов Kylin

require "uri"
require "json"
require "net/http"
require "date"


#массив наименований процессируемых кубов и среднего вемени процессирования сегмента в секундах
cubes = [["Sales_dev",240],["Stocks_dev",480],["Movements_dev",240],["Buffers",480],["Summary",220]]




today = DateTime.now.to_date
startcurrentmonth = Date.new(today.year, today.month, 1)
startnextmonth = startcurrentmonth.next_month
n=13 #количество обновляемых сегментов в кубе


#Ожидание загрузки файлов
sleep 120


#Обновление сегментов за последние n месяцев включая текущий (при отсутствии сегмента создаётся новый) каждого куба
(0..cubes.size()-1).step(1) do |i|
url = URI("http://0.0.0.0:7070/kylin/api/cubes/" + cubes[i][0] + "/rebuild")

http = Net::HTTP.new(url.host, url.port);
request = Net::HTTP::Put.new(url)
request["Authorization"] = "Basic …"
request["Content-Type"] = "application/json"
(0..n).reverse_each do |j|
startTime = startcurrentmonth.prev_month(j)
endTime = startcurrentmonth.prev_month(j-1)
request.body = JSON.dump({
"startTime": startTime.strftime('%Q'),
"endTime": endTime.strftime('%Q'),
"buildType": "BUILD"
})
response = http.request(request)
sleep cubes[i][1]
end
end