Databricks clusters: Dwa antywzorce, do zastosowania na produkcji

Problem:

Działasz w dużej organizacji ale pracujesz w jednym z projektów rozwijających rozwiązanie w oparciu o Databricks. Organizacja narzuca dwie dobre praktyki projektowe: stosowanie job clusters do wywołania jobów oraz nie używania spot instances w ładowaniach produkcyjnych. Jakich argumentów użyjesz, żeby przekonać, że w Twoim przypadku użycie interactive cluster i spot instance jest optymalnym rozwiązaniem

Rozwiązanie:

Przedstawimy kontekst decyzji w oparciu o scenariusz, który mamy. W przetwarzaniu ETL ładującym dane do wszystkich warstw Brązowej, Srebrnej i Złotej (Bronze Silver i Gold – Medalion Architecture) stworzone jest 35 (!) jobów. Nie jest to jeden duży job ładujący wszystko ale w celu zwiększenia odporności na błędy, elastyczności, łatwości debugowania i zrównoleglenia przetwarzań. Dodatkowo ładowanie ma być optymalne kosztowo dlatego używamy spot instance. Pójdziemy dzisiaj pod prąd rekomendacjom Databricks. Zacznijmy jednak od początku.

Read More

Pułapki pyspark – union, union all

Problem:

Migrujemy kod z SQL do pyspark. Nie chcemy używać pyspark sql ale rzeczywiście użyć kodu napisanego w Sparku. Czy są polecenia, które inaczej działają w Spark i SQL? Mają taką samą składnie a zupełnie inne działanie? W jaki sposób sprawić, żeby zachować tą oczekiwaną funkcjonalność po migracji? Jako przykład weźmy union, union all w SQL’u i Sparku.

Rozwiązanie:

Dla przypomnienia, union w SQL łączy zbiory i usuwa duplikaty. Union all tylko łączy zbiory, bez usuwania duplikatów. W Sparku union tylko łączy zbiory ale nie usuwa duplikatów. W Sparku union all to alias do funkcji union.
Jeżeli w SQL masz funkcję union i chcesz również w pysparku deduplikować zbiór danych wtedy zastosuj:
df1.union(df2).distinct()

Read More

Jak używać funkcji okienkowej w Spark?

Problem:

SQL jest prosty. Funkcje okienkowe są proste, intuicyjne i przychodzi bez komplikacji. Natomiast w Spark? Jak wytłumaczyć koledze w zespole w jaki sposób tworzyć funkcję okienkowe w Spark? Od czego zacząć? Jakie są problemy i wyzwania w tworzeniu funkcji okienkowych? Jakie są różnic w porównaniu z SQL? Co jest łatwiejsze w utrzymaniu?

Rozwiązanie:

Na szybko można odpowiedzieć w ten sposób, że funkcja okienkowa tworzona jest w oderwaniu od data setu ale aplikowania na jego podzbiorze. Na przykład definicja wygląda w ten sposób:

user_window = Window.partitionBy(„user_id”).orderBy(„event_time”)

Jak to rozumieć? Dla każdego użytkownika okienko będzie posortowane po even_time i najstarsze dane będą pojawiały się na jako pierwsze a najświeższe dane wylądują w tym zbiorze danych jako ostatnie.

Potem wystarczy już tylko użyć funkcji okienkowej.

Ale zacznijmy po kolei i na przykładzie.

Read More

Jak automatycznie startować Databricks job po załadowaniu danych do tabel źródłowych?

Problem:

Pracujemy na Unity Catalog. Dane z systemu źródłowego zapisywane są do warstwy RAW. Zapisywane są dane z kilku tabel, nie wiemy znamy dokładnie kolejności w jakiej dane są zapisywane. Po załadowaniu danych do tych wszystkich tabel chcemy wyzwolić uruchamianie joba i ładowanie danych do warstwy BRONZE. Jak to zrobić bez uruchamiania clustra? Czy jest jakiś feature w DBX, który pozwoli nam wykonać to efektywnie kosztowo i czasowo?

Rozwiązanie:

Databricks posiada możliwość wyzwalania joba, gdy tabele źródłowe zostaną doświerzone:

https://learn.microsoft.com/en-us/azure/databricks/jobs/trigger-table-update

Uwaga: Ta opcja działa tylko dla tabel znajdujących się w Unity Catalog. Więc jeżeli masz tabele, które operują na legacy hive metastore, wtedy nie będziesz mógł skorzystać z tej opcji.

Read More

Jak zintegrować Azure DevOps z Databricks?

Problem:

Przy uruchamianiu job’a w Databricks dostajemy błąd, że nie można połączyć się z Azure DevOps repository. Błąd występuje losowo. Czasami są tygodnie, że nie występuje. Wystarczy uruchomić ponownie job, żeby wszystko wróciło do normy i ładowanie było kontynuowane. Niestety wymagana jest interwencja człowieka a niestety czasami człowieka nie ma w okolicy. Istnieje potrzeba, żeby w inny sposób skonfigurować uruchamianie jobów.

Rozwiązanie:

A właściwie dwa, które można zastosować. Pull albo push. Czyli albo wysyłamy z Azure DevOps kod do Azure Databricks workspace albo z Databricksów przy użyciu REST API ciągniemy kod z repozytorium. Oba rozwiązania mają swoje zalety i wady i przyjrzymy im się szczegółowo.

Read More

Katastrofa: cluster działał ponad 24h

Problem:

Ładowanie rozpoczęło się w weekend a w poniedziałek rano stwierdziliśmy, że cluster wciąż działa. Ładowanie, które rozpoczęło się w niedzielę rano, trwało ponad 24h i w poniedziałek rano ładowanie wciąż było w trakcie. W sumie ładowanie powinno trwać 3h, trwało ponad 21h dłużej i nie zakończyło się sukcesem. Pociągnęło to za sobą duże koszty i nasunęło dwa pytania: co się stało, dlaczego cluster działa tak długo? Co zrobić, żeby to się więcej nie powtórzyło.

Rozwiązanie (a raczej przyczyna)

Spojrzenie w szczegóły ładowania pokazały, że był jeden notebook, gdzie ładowanie trwało ponad 21h i wciąż trwało. Jeżeli właśnie pomyślałeś, że to duplikaty to masz rację. Dane zostały zwielokrotnione. Dla niektórych rekordów z tabeli źródłowej było 32 768 rekordów. Nie spowodowało to wywrócenia procesu i błędu wskazującego na nie wystarczającą pamięć (out of memory exception). Ładowanie trwało ale trzeba było je przerwać, nie chcieliśmy go kontynułować, gdyż nie zawierało poprawnych danych.

Dobrze wiemy już co się stało, w takim razie jak przeciwdziałać takim sytuacjom w przyszłości?

Read More

Jak najszybciej wysłać dane z on prem na Azure Storage account

Problem:

Wysyłasz dane z on prem na Azure Storage Account. Została podjęta decyzja, że użyjesz do tego Pythona i bibliotek azure.storage. Przeglądasz dokumentacje i zastanawiasz się, którą metodę wysyłania danych do Azure wybrać? Czy lepiej wybrać append_date czy upload_data? Czy te metody mają jakieś ograniczenia? Która będzie szybsza?

Rozwiązanie:

Użycie upload_data jest zazwyczaj szybsze, natomiast ma pewne ograniczenie. Wysyłając duże pliki możesz dostać Timeout. Ale wysyłanie danych tą metodą jest zdecydowanie szybsze. Możesz pokusić się o rozwiązanie, które próbuje zrobić upload_data a dopiero potem robi append, jeżeli upload się nie powiódł.

Ale może zacznijmy po początku.

Read More

Ładowanie danych z Databricks do Azure Synapse Analytics

Problem:

Zadanie zostało zdefiniowane przez managera w ten sposób:
– Dane z Databricks mają zostać przesłane na Azure Synapse
– Security utworzyło i otworzyło odpowiednie private endpointy.
– Dostałeś też namiary na service principala, którego wykorzystasz do zapisywania danych z Databricks na Azure Synapse.

Będziesz używał sparka, żeby od razu stworzył tabelę i dane. Będzie to szczególnie pomocne gdyż danych nie jest zbyt dużo. Nie powinno być żadnych problemów wydajnościowych.
Niestety pojawiają się problemy zupełnie innej natury. Przy próbie wstawienia danych dostajesz błąd:


„com.microsoft.sqlserver.jdbc.SQLServerException: The statement failed. Column 'drone_spec_key’ has a data type that cannot participate in a columnstore index.”

Rozwiązanie:

Spark wysyłał do Synapsa create i insert statement w tym samym czasie. Błąd wynikał z tego, że Synapse przy próbie stworzenia tabeli jednocześnie próbuje stworzyć custered index. Niestety ograniczenie, które posiada to brak możliwości stworzenia indeksu na kolumnach, gdzie typ danych zdefiniowany jest jako: VARCHAR(max), NVARCAHR(MAX) a to się dzieje, gdy spark próbuje stworzyć tabelę.
Jako rozwiązanie zastosowano:
1. Najpierw została tworzona tabela po stronie Azure Synapse Analytics
2. Dopiero później zostały wstawione do niej dane

Szczegóły kodu oraz alternatywne, dające więcej możliwości, rozwiązanie poniżej.

Read More

Qualify w Databricks

Problem:

Działasz na tabeli opisującej procesy produkcji. Jeden proces może występować więcej niż jeden raz w tabeli. Nas interesuje tylko ostatnia data zakończenia procesu. Użyjemy funkcji okienkowej, row_number, żeby oznaczyć proces, który zakończył się jako ostatni. W jaki sposób w tym samy zapytaniu wybrać ten wiersz, bez używania dodatkowych podzapytań lub CTE?

Rozwiązanie:

Użyjemy Qualify! Qualify w Databricks filtruje wyniki zapytania funkcji okienkowej. Możesz myśleć o nim jak warunku zakładanym na wyniku funkcji okienkowej. Upraszcza to znakomicie składnie.

Jak go zastosować:

select process_id, process_name, process_start_date, process_end_date
, row_number() over(partition by process_id order by process_end_date desc) as rn
from d_process
qualify rn = 1

Read More

Python i Synapse: połączenie przy użyciu service principala

Problem:

Masz bazę danych Synapse dostępną w usłudze Azure. Posiadasz uprawnienia, żeby połączyć się do tej bazy przy użyciu service principala. To połączenie musisz wykonać w Pythonie. W jaki sposób połączyć się posiadając dane service principala do usługi Synapse?

Rozwiązanie:

Użyjemy .env do przechowywania zmienny środowiskowych. Użyjemy azure.identity żeby połączyć się z usługą i wygenerować token dostępowy. Następnie posiadając token utworzymy połączenie pyodbc.

Zawiłe? Teraz trochę bardziej szczegółowo.

Read More