Problem:

Czas procesowania danych w Databricks zwiększył się dwukrotnie. Poprzednio wynosił 3 godziny teraz wynosi 6. Zanim podejmiesz proces naprawy trzeba sprawdzić który notebook spowodował aż tak duży spadek wydajności. Czy spadek wydajności rozlał się równomiernie pomiędzy wszystkie notebooki? Czy dotyczy tylko części? Możemy wykluczyć, że ktoś inny pracował na platformie w tym samym czasie i wpływał na wydajność. Cluster jest dedykowany do przetwarzań batchowych i nikt inny nie ma do niego dostępu.

Rozwiązanie:

Użyjemy Databricks rest API, żeby przeszukać wszystkie joby i znaleźć ten, który trwał najdłużej w porównaniu z poprzednim ładowaniem. Jak znajdziemy, który to job, wtedy przeszukamy wszystkie taski i sprawdzimy czy są jacyś pojedynczy kandydaci, których wydajność zdecydowanie spadła i znajdziemy wolno działający notebook. Z rest API będziemy stosować:

api/2.2/jobs/list - do wylistowania wszystkich jobów zdefiniowanych w klastrze

/api/2.2/jobs/runs/list?job_id - dzięki któremu wylistujemy uruchomienia joba

/api/2.2/jobs/runs/get?run_id - to da nam szczegóły uruchomienia każdego taska w jobie

Pełna dokumentacja dostępna tutaj:
https://docs.databricks.com/api/azure/workspace/jobs

Znajdywanie joba

Poszukiwania zaczynamy od nazwy joba, którą podajemy po nazwie. Do kolejnych kroków potrzebny nam będzie identyfikator job'a, żeby go znaleźć wylistujemy wszystkie joby i wyszukamy sprawdzimy, która nazwa zgadza się z wzorcem, który jest podany.

Najpierw odpytamy Databricks REST API o listę jobów dostępnych na hoście:

data = {"limit" : "100"}
return requests.get(f"{DBX_HOST}/api/2.2/jobs/list",json=data, auth=BearerAuth()).json()

W momencie pisanie tego wpisu dostępne było api w wersji 2.2 i tam domyślny limit zwracanych jobów był 20. Ustawiamy go od razu na 100, żeby pobrać większą paczkę.

Następnie przeszukujemy wygenerowany wcześniej rezultat, żeby znaleźć nazwę joba. Gdy ustawisz full_name na True, wtedy będzie wyszukiwał tylko po dokładnej nazwie

def get_job_by_name(job_name_pattern: str, full_name: bool = False) -> []:
    jobs = list_jobs()['jobs']
    job_ids = []
    for job in jobs:
        if job_name_pattern in job.get('settings').get('name',None):
            if full_name:
                if job_name_pattern == job.get('settings').get('name',None):
                    job_ids.append(job['job_id'])
            else:
                job_ids.append(job['job_id'])
    return job_ids

Kod jest rozbity na wiele linii ale według mnie jest czytelny. Chat GPT proponuje takie zapisanie powyższej funkcji:

def get_job_by_name(job_name_pattern: str, full_name: bool = False) -> []:
    return [job['job_id'] for job in list_jobs()['jobs'] if job_name_pattern in job.get('settings', {}).get('name', '') and (not full_name or job_name_pattern == job.get('settings', {}).get('name', ''))]

Nie testowałem tego co wygenerował Chat GPT, więc używasz tego na własną odpowiedzialność 😉

Ważniejsze od tego jak to jest napisane jest rezultat. Mamy poszukiwane job_id, to otwiera furtkę do kolejnych funkcji.

Znajdywanie uruchomień joba

Teraz znajdźmy uruchomienia job'a. Dla każdego job_id, które mamy wykonujemy funkcję:

def list_jobs_runs(job_id: str) -> []:
    return requests.get(f"{DBX_HOST}/api/2.2/jobs/runs/list?job_id={job_id}", auth=BearerAuth()).json()

Co nas głównie interesuje ze wracanego rezultatu to run_id, które nam daje unikalne uruchomienie job'a. Teraz już tylko jedno wywołanie REST API, żeby dostać się do czasów uruchomień poszczególnych zadań (tasks).

Uruchomienia zadań

Ostatnią częścią jest pobranie uruchomień tasków, dla konkretnego wykonania:

def get_job_run(run_id: str) -> []:
    return requests.get(f"{DBX_HOST}/api/2.2/jobs/runs/get?run_id={run_id}", auth=BearerAuth()).json()

Mamy już wszystkie wykonania teraz pora na pobranie użytecznych informacji z uruchomienia. Zapiszemy sobie wybranie atrybuty, jak potrzebujesz czegoś więcej zobacz w dokumentacji, co jeszcze jest dostępne.

https://docs.databricks.com/api/azure/workspace/jobs/listruns#runs

Wolno działający notebook - Winowajca

Na podstawie powyższych danych tworzę tabelę: audit_job_run. W niej będzie już łatwo znaleźć job, który wykonywał się dłużej niż zakładano, na przykład przy pomocy takiego query:

select
  jr.run_id
, jr.task_key 
, jr.run_duration_seconds
, jr.start_time
, jr.end_time
, lag(run_id) over(partition by task_key order by start_time) as previous_run_id 
, lag(run_duration_seconds) over(partition by task_key order by start_time) as previous_run_duration_seconds
, jr.run_duration_seconds - previous_run_duration_seconds as performance_degradation_seconds
, round((performance_degradation_seconds/previous_run_duration_seconds)*100,2) as performance_degradation_percentage
, jr.notebook_path
, jr.notebook_source 
from next_level_dm.audit_job_run jr
where task_key like '%drone%'
order by start_time desc

Co widzisz w wynikach to: poprzednie wykonanie joba w sekundach, o ile zmieniła się wydajność w sekundach, nazwę taska oraz ile trwało wykonanie zadania. Jak widzisz zmieniło się z 35 minut, do 133 minut. To jest nasz kandydat do optymalizacji!

Pełny kod pythona, znajdziesz jak zawsze w GIT.
https://github.com/rgogloza/nextlevelbi/tree/master/databricks

Ten kod dla zachowania czytelności został podzielony na dwa pliki:
Główny plik:
https://github.com/rgogloza/nextlevelbi/blob/master/databricks/job_execution_history.ipynb
Plik z pomocniczymi metodami do komunikacji z Databricks REST API:
https://github.com/rgogloza/nextlevelbi/blob/master/databricks/dbx_rest_util.ipynb