Jak wygenerować ładowanie (job) w Databricks używając REST API i Python?

Problem:

Ładujesz warstwę brązową (bronze layer) w Databricks. Masz utworzone notebooki i teraz chciałbyś utworzyć ich ładowanie (job). Nie jest dla Ciebie istotna kolejność ładowania. Ważne jest tylko wygenerowanie workflow.

Rozwiązanie:

  1. Utwórz job ładujący dane.
  2. Zautomatyzuj tworzenie ładowania - wykorzystamy w tym celu Databricks REST API.

Przygotowania

Co potrzebujemy?

  1. Cluster id - identyfikator clustra na którym będzie uruchamiane ładowanie.
  2. Adres instancji.
  3. Podłączenie do clustra: Databricks token, w celu uzyskania połączenia z Databricks.
  4. Stworzone notatniki (notebooks) i adres folderu.

Te 3 pierwsze najlepiej umieścić w zmiennych środowiskowych i potem odwoływać się do nich w ten sposób:

TASK_CLUSTER_ID = os.getenv('DBX_JOB_CLUSTER_ID') # Example: '1234-567890-abcdef12'

DBX_INSTANCE = os.getenv('DBX_INSTANCE') # Example 'https://adb-1234567890.1.azuredatabricks.net'
DBX_TOKEN = os.getenv('DBX_TOKEN') # Example: 'dapif12scvde34ffda34tm34n123n1m-2'

Funkcje do REST API

Odpytywanie końcówek REST zostało zrobione w najbardziej pythonowy sposób przy wykorzystaniu pakietu requests.

Jeżeli chciałbyś to zrobić w bardziej "Databricksowy" sposób wykorzystałbyś Databricks CLI. To nie będzie przedmiotem tego wpisu. Jeżeli chcesz wiedzieć więcej zajrzyj na przykład do:

https://docs.databricks.com/archive/dev-tools/python-api.html#step-1-set-up-authentication

Do odpytywanie endpointów potrzebna będzie autentykacja.

Autentykacja jest robiona przy pomocy tokenów, które zdefiniowałeś wcześniej. Ten prosty wraper pomoże za chwilę w autentykacji.

class BearerAuth(requests.auth.AuthBase):
    def __init__(self):
        self.token = DBX_TOKEN
    def __call__(self, r):
        r.headers["authorization"] = "Bearer " + self.token
        return r

Ta metoda będzie listowała notebooki z folderu, jako parametr przyjmuje ścieżkę do folderu. Powyższy BearerAuth jest wykorzystany do autentykacji.

def list_workspace(path: str) -> str:
    data = {'path':path}
    return requests.get(f"{DBX_INSTANCE}/api/2.0/workspace/list", auth=BearerAuth(), json=data)

Jak wygenerować ładowanie (job) w Databricks używając REST API i Python?

Generowanie joba.

Na wysokim poziomie generowanie ładowania będzie polegało na:

  1. Listowaniu notebooków znajdujących się w podanej ścieżce:
 notebooks = list_workspace(notebook_path).json()['objects']    
  1. Chcemy, żeby każdy proces roboczy był obciążony, dlatego dzielimy zadaniami pomiędzy workerów. Tutaj jest bardzo prosty algorytm podziału na części. Możesz sobie wyobrazić bardziej wyszukany algorytm, gdzie przydzielasz zadania do procesów roboczych mając na uwadze potrzebne im zasoby.
    chunk_len = DBX_JOB_LIMIT if math.ceil(len(notebooks) / DBX_INSTANCE_NBR_OF_WORKERS ) > DBX_JOB_LIMIT else math.ceil(len(notebooks) / DBX_INSTANCE_NBR_OF_WORKERS )
    chunks = [notebooks[x:x+chunk_len] for x in range(0, len(notebooks), chunk_len)]
  1. Generujemy definicję zadań, jedno zadanie to jeden notebook do uruchomienia:
tasks = generate_task_definition(notebooks_chunk, TASK_CLUSTER_ID)
  1. Generujemy definicję joba:
        job_def = generate_job_definition(f"{job_name}_{i}", tasks)

Cała funkcja będzie wyglądała w ten sposób:

def create_job_instance(job_name: str, notebook_path: str) -> []:

    notebooks = list_workspace(notebook_path).json()['objects']

    # Distributes tasks among the clusters
    chunk_len = DBX_JOB_LIMIT if math.ceil(len(notebooks) / DBX_INSTANCE_NBR_OF_WORKERS ) > DBX_JOB_LIMIT else math.ceil(len(notebooks) / DBX_INSTANCE_NBR_OF_WORKERS )
    chunks = [notebooks[x:x+chunk_len] for x in range(0, len(notebooks), chunk_len)]
    # Creating jobs
    jobs = []
    i = 0
    for notebooks_chunk in chunks:
        tasks = generate_task_definition(notebooks_chunk, TASK_CLUSTER_ID)
        job_def = generate_job_definition(f"{job_name}_{i}", tasks)
        jobs.append(create_job(job_def).json())
        i+=1
    return jobs

Pełny kod, z komentarzem, dostępny na github:
https://github.com/rgogloza/nextlevelbi/blob/master/databricks/utils-job-creator.ipynb

Ograniczenia, warto rozważyć i słowniczek

  1. Limit zadań
    W chwili pisania tego bloga Databricks ma limit 100 zadań w przepływie. Można się spodziewać, że ten limit zostanie zniesiony ale jeżeli masz ponad 100 zadań warto podzielić to ładowanie na części.

    databricks-rest-api-python-job

    https://learn.microsoft.com/en-us/azure/databricks/workflows/jobs/jobs-api-updates

  2. Liczba workerów - w zależności od liczby procesów roboczych (workers), uzależnij liczbę jobów działających w tym samym czasie. Czyli jeżeli masz dwóch workerów generujesz dwa joby, wtedy każdy ma zajęcie i wykorzystujesz zasoby maszyny efektywniej.

  3. Słowniczek:

    • Worker - proces roboczy
    • Notebook - notatnik
    • Task - zadanie do wykowania, pojedynczy notatnik do uruchomienia
    • Job - ? 🙂 też zadanie, ale tutaj chodzi o grupę zadań do wykonania, czyli ładowanie