Problem

W środowisku opartym o Databricks stworzone zostało wiele notebooków. Cześć z nich została umieszczona w jobach (przepływach) a niektóre niestety pominięte. Chciałbyś je znaleźć i dodać do ładowania.

Potrzebujesz sprawdzić, które z notebooków zostały pominięte. Masz też podejrzenie, że w niektórych jobach zostały umieszczone notebooki, które: zmieniły nazwy albo zostały usunięte. Pora zrobić porządek!

Rozwiązanie

Użyj databriks REST API!

Dzięki niemu wylistujesz wszystkie notebooki w obszarze roboczym:
https://docs.databricks.com/api/workspace/workspace/list

Pobierając definicję joba sprawdzisz, jaki notebook został dodany w poszczególnych zadaniach.

https://docs.databricks.com/api/workspace/jobs/list
https://docs.databricks.com/api/workspace/jobs/get

Listowanie tasków w jobie

job_ids = [job['job_id'] for job in list_jobs()['jobs']]
tasks_in_jobs = []
for job_id in job_ids:
    job = get_job(job_id) 
    [tasks_in_jobs.append([job['job_id'], job['settings']['name'], task['task_key'], task['notebook_task']['notebook_path']]) for task in job['settings']['tasks']]

W tej części odwołujemy się dwa razy do REST API.
Listujemy listę zdefiniowanych jobów list_jobs i później dla każdego joba pobieramy jego definicję jsonie.

Z definicji joba pobieramy jego unikalne id, nazwę, nazwę taska i ścieżkę do notebooka. Jeżeli jesteś zainteresowany większą liczbą atrybutów joba zapoznaj się z jego definicją i wybierz co Ci potrzebne.

Listowanie notebooków w workspace

notebook_in_wks = []
for obj in list_workspace(workspace_path):
    if obj['object_type'] == 'DIRECTORY':            
        notebook_in_wks.extend(list_notebooks_in_workspace(obj['path']))            
    elif obj['object_type'] == 'NOTEBOOK':
        notebook_in_wks.append([obj['path'], obj['path'].split('/')[-1]])

Możesz zacząć w jakim katalogu chcesz, ja zaczynam przeszukiwania workspaca w katalogu /Shared/. Następnie rekurencyjnie pobieram dla każdego katalogu listę notebooków a jeżeli jest tam też katalog to udaje się do niego i tam też sprawdzam.

Wywołuję list_workspace z REST API. Ważne dla mnie jest ścieżka do notebooka i jego nazwa.

Możesz też dodatkowo zapisać kiedy notebook był stworzony i modyfikowany. To da możliwości do dodatkowych analiz. Dla potrzeb tego zadania, te atrybuty nie są wybierane.

Zapisanie wyników

Możesz pracować na dataframach i nie materializować wyników, ja preferuje pracę na tabelach i zapisuje wyniki zapytań do tabel.

Potem łatwiej mi jest pracować na tych danych i udostępniać innym członkom zespołu.

with user_notebooks as (
select * from ${schema_name}.${user_notebook_table} un
where notebook_path not like '%90 debug%'
)
select * from user_notebooks un
full outer join ${schema_name}.${user_job_task_table} ujn on (un.notebook_path = ujn.task_path)
where un.notebook_path is null or ujn.task_path is null

W strukturze folderów w workspace mam też folder debug, wygluczam go z analiz, żeby nie zaszumiał wyników.

Owocnych przeszukiwań!

Cały kod jak zawsze dostępny na github:
https://github.com/rgogloza/nextlevelbi/blob/master/databricks/dbx_job_notebook_cross_check.ipynb

Pytania, wątpliwości, wnioski? Pisz w komentarzu!