Databricks clusters: Dwa antywzorce, do zastosowania na produkcji

Problem:

Działasz w dużej organizacji ale pracujesz w jednym z projektów rozwijających rozwiązanie w oparciu o Databricks. Organizacja narzuca dwie dobre praktyki projektowe: stosowanie job clusters do wywołania jobów oraz nie używania spot instances w ładowaniach produkcyjnych. Jakich argumentów użyjesz, żeby przekonać, że w Twoim przypadku użycie interactive cluster i spot instance jest optymalnym rozwiązaniem

Rozwiązanie:

Przedstawimy kontekst decyzji w oparciu o scenariusz, który mamy. W przetwarzaniu ETL ładującym dane do wszystkich warstw Brązowej, Srebrnej i Złotej (Bronze Silver i Gold – Medalion Architecture) stworzone jest 35 (!) jobów. Nie jest to jeden duży job ładujący wszystko ale w celu zwiększenia odporności na błędy, elastyczności, łatwości debugowania i zrównoleglenia przetwarzań. Dodatkowo ładowanie ma być optymalne kosztowo dlatego używamy spot instance. Pójdziemy dzisiaj pod prąd rekomendacjom Databricks. Zacznijmy jednak od początku.

Read More

Pułapki pyspark – union, union all

Problem:

Migrujemy kod z SQL do pyspark. Nie chcemy używać pyspark sql ale rzeczywiście użyć kodu napisanego w Sparku. Czy są polecenia, które inaczej działają w Spark i SQL? Mają taką samą składnie a zupełnie inne działanie? W jaki sposób sprawić, żeby zachować tą oczekiwaną funkcjonalność po migracji? Jako przykład weźmy union, union all w SQL’u i Sparku.

Rozwiązanie:

Dla przypomnienia, union w SQL łączy zbiory i usuwa duplikaty. Union all tylko łączy zbiory, bez usuwania duplikatów. W Sparku union tylko łączy zbiory ale nie usuwa duplikatów. W Sparku union all to alias do funkcji union.
Jeżeli w SQL masz funkcję union i chcesz również w pysparku deduplikować zbiór danych wtedy zastosuj:
df1.union(df2).distinct()

Read More

Katastrofa: cluster działał ponad 24h

Problem:

Ładowanie rozpoczęło się w weekend a w poniedziałek rano stwierdziliśmy, że cluster wciąż działa. Ładowanie, które rozpoczęło się w niedzielę rano, trwało ponad 24h i w poniedziałek rano ładowanie wciąż było w trakcie. W sumie ładowanie powinno trwać 3h, trwało ponad 21h dłużej i nie zakończyło się sukcesem. Pociągnęło to za sobą duże koszty i nasunęło dwa pytania: co się stało, dlaczego cluster działa tak długo? Co zrobić, żeby to się więcej nie powtórzyło.

Rozwiązanie (a raczej przyczyna)

Spojrzenie w szczegóły ładowania pokazały, że był jeden notebook, gdzie ładowanie trwało ponad 21h i wciąż trwało. Jeżeli właśnie pomyślałeś, że to duplikaty to masz rację. Dane zostały zwielokrotnione. Dla niektórych rekordów z tabeli źródłowej było 32 768 rekordów. Nie spowodowało to wywrócenia procesu i błędu wskazującego na nie wystarczającą pamięć (out of memory exception). Ładowanie trwało ale trzeba było je przerwać, nie chcieliśmy go kontynułować, gdyż nie zawierało poprawnych danych.

Dobrze wiemy już co się stało, w takim razie jak przeciwdziałać takim sytuacjom w przyszłości?

Read More

Jak najszybciej wysłać dane z on prem na Azure Storage account

Problem:

Wysyłasz dane z on prem na Azure Storage Account. Została podjęta decyzja, że użyjesz do tego Pythona i bibliotek azure.storage. Przeglądasz dokumentacje i zastanawiasz się, którą metodę wysyłania danych do Azure wybrać? Czy lepiej wybrać append_date czy upload_data? Czy te metody mają jakieś ograniczenia? Która będzie szybsza?

Rozwiązanie:

Użycie upload_data jest zazwyczaj szybsze, natomiast ma pewne ograniczenie. Wysyłając duże pliki możesz dostać Timeout. Ale wysyłanie danych tą metodą jest zdecydowanie szybsze. Możesz pokusić się o rozwiązanie, które próbuje zrobić upload_data a dopiero potem robi append, jeżeli upload się nie powiódł.

Ale może zacznijmy po początku.

Read More

Qualify w Databricks

Problem:

Działasz na tabeli opisującej procesy produkcji. Jeden proces może występować więcej niż jeden raz w tabeli. Nas interesuje tylko ostatnia data zakończenia procesu. Użyjemy funkcji okienkowej, row_number, żeby oznaczyć proces, który zakończył się jako ostatni. W jaki sposób w tym samy zapytaniu wybrać ten wiersz, bez używania dodatkowych podzapytań lub CTE?

Rozwiązanie:

Użyjemy Qualify! Qualify w Databricks filtruje wyniki zapytania funkcji okienkowej. Możesz myśleć o nim jak warunku zakładanym na wyniku funkcji okienkowej. Upraszcza to znakomicie składnie.

Jak go zastosować:

select process_id, process_name, process_start_date, process_end_date
, row_number() over(partition by process_id order by process_end_date desc) as rn
from d_process
qualify rn = 1

Read More

Databricks: porównanie CTAS, deep i shallow clone

Problem:

Masz już wyczyszczone, połączone, załadowane i gotowe do konsumpcji przez warstwę raportową dane. Teraz Power BI będzie pobierało dane do dataflow. Następne ładowanie zaczyna się za chwilę i zmieni, kształt danych. Chcesz jednak załadowane dane wysłać do raportów. Żeby zapewnić spójny obraz danych potrzebujesz zrobić snapshot (migawkę) danych. Potrzebujesz zduplikować istniejące dane. Masz do wyboru: create table as select (CTAS), deep (głęboki) oraz shallow (płytki) klon danych.

Rozwiązanie:

W moim przypadku najlepszym rozwiązaniem będzie użycie głębokiego klonowania (deep clone). Płytkie klonowanie ma niestety ograniczenia, które wykluczają jego zastosowanie. CTAS jest zbyt wolny, to było dopiero odkrycie! Tabela, na której testowałem nie jest duża ma 0,5 GB ale to wystarczający wolumen żeby wyciągnąć wnioski.

Read More

Not able to load Function App

Problem:

Rozwijasz projekt w Function App. Dostałeś do napisania funkcję w C#. Funkcja została napisana, przetestowana i wdrożona na produkcję. Jak to w życiu, pojawiają się zmiany wymagań do funkcji, które też rozwijasz wdrażasz i zamiast oczekiwanych zmian widzisz błąd:
„We are not able to load some functions in the list due to errors”.
Gdy oglądasz szczegóły błędu widzisz:
„The operation timed out and could not be completed”.
W kodzie praktycznie nic się nie zmieniło a aplikacji nie da się uruchomić i używać.
Do zrobienia builda i deploymentu używałem Azure Devops.

Rozwiązanie:

Jeżeli szukasz szybkiego rozwiązania problemu to zaraz Ci go podam. W moim przypadku problemem nie był kod, który zmieniłem, bo zmiana była wręcz kosmetyczna: dodanie jednego loggera do obsługi błędów.
Deployment poprzedniej wersji także sprawiał, że aplikacja znowu zaczynała działać.
Co w takim razie było źle? W Dev Ops pipelines wystarczyło ustawić, żeby czyścił katalog deploymentu.
Niejasne? Już tłumaczę.

Read More

Azure storage file datalake do pobierania plików?

Problem:

Masz dostępne Databricks, Pythona i Azure Storage Account. Potrzebujesz pobrać raport Power BI umieszczony na Azure Storage account przy pomocy Pythona.
Potem ten plik należy opublikować w serwisie Power BI.
Nie możesz tego zrobić przy użyciu Sparka, albo Pandas. To się nie uda i jednocześnie, to nie jest to zadanie.
Możesz instalować bilbioteki na clustrze. Najlepiej, żeby ich autorem był Microsoft.

Rozwiązanie:

Microsoft udostępnia bibliotekę: azure.storage.filedatalake przy pomocy której można przeczytać plik z landing zone w formacie binarnym a potem opublikować go w portalu Power BI.

Wystarczy z kontenera przeczytać plik. Ta zawartość zostanie wczytana w formacie binarnym:
file_content = file_container.download_file()

A potem opublikować go w Power BI portalu:
publish_powerbi_report(PBI_WORKSPACE_ID, PBI_REPORT_NAME, file_content)

Read More

Databricks: Jak pobrać pliki binarne z Azure Storage Account używając Pythona?

Problem:

Masz dostępne Databricks, Pythona i Azure Storage Account. Potrzebujesz pobrać plik z Azure Storage account przy pomocy Pythona w formacie binarnym. Jeżeli chcesz zrobić to przy użyciu Sparka, albo Pandas to nie jest to zadanie.
Dane masz pobrać z pliku binarnego.
Trzeba użyć modułów Pythona do wczytywania plików.
W dokumentacji piszą, że taka operacja jest „not supported”. (Stan na 15.10.2024)
Nie chcesz też robić „mount” zdalnego systemu plików. Taka operacja jest nie polecana przez Databricks.

Rozwiązanie:

W Databricks z Pythona NIE można czytać plików ze zdalnego systemu plików. Można za to czytać pliki z lokalnego file systemu. Obejście problemu przedstawionego powyżej to:
1. Przy użyciu dbutils.fs albo %fs skopiować pliki ze zdalnego filesystemu do lokalnego.
2. Przeczytać pliki z lokalnego systemu plików przy użyciu Pythona.
Skasować skopiowany plik z lokalnego filesystemu.

Read More

Jaki jest rozmiar tabeli, schematu w Databricks?

Problem:
Jaki jest rozmiar tabeli w Databricks? Ile miejsca zajmuje mój schemat? Jak policzyć rozmiar? W jaki sposób sprawdzić ile przybyło danych od ostatniego ładowania? Ile miejsca zajmuje bronze, silver oraz gold layer? Jak to zadanie zautomatyzować? Czy można z tego wyciągnąć jeszcze jakieś wnioski?

Rozwiązanie:
W Databricks dostępne jest polecenie:

describe detail table_name

Umożliwia ono pokazanie rozmiaru w bajtach, wylistowanie ile plików zajmuje obecnie tabela. Pokazuje też kiedy zostało utworzona albo ostatnio załadowana.

Pokażę teraz w jaki sposób, wygląda skrypt, który dla schematu zbiera dane o wszystkich tabelach.

Read More