databricks user table columns

Databricks - User Table Columns?

Przenosisz się ze świata baz danych do świata Databricksów?

Zmieniono nazwę Twojego stanowiska na Data Engineer i będziesz teraz pracował w Databricks?

W świecie baz danych przyzwyczaiłeś się zapewne do prostego i intuicyjnego dostępu do metadanych. Możliwe, że używałeś ich, żeby zautomatyzować pracę lub wykryć w prosty sposób, gdzie trzeba przeprowadzić zmianę.

Świat Databricks i Hive metastore to trochę inne doświadczenia.

Mówiąc łagodnie.

Patrząc co jest tam dostępne brakuje mi czegoś podobnego do Oraclowego all_tab_columns (user_tab_columns) albo SQL Serverowego Information_Schema.Columns.

Zróbmy więc go sami wykorzystując możliwości jakie daje Databricks.

Ustawianie parametrów

Skrypt ma być łatwo uruchamiany na różnych schematach. Schemat źródłowy może być inny niż schemat docelowy. W jednym schemacie możemy mieć tylko prawo do odczytu a w drugim możemy zapisywać.

Nazwa tabeli też będzie zależała od osobistych preferencji.

Do przechowywania parametrów użyjemy widgetów, tak będzie wyglądał ich definicja:

dbutils.widgets.text("source_schema","ds_goblet")
dbutils.widgets.text("target_schema","nextlevel_dm")
dbutils.widgets.text("target_table","user_tab_columns")

Definiujemy schemat źródłowy, docelowy i nazwę tabeli w schemacie docelowym.

Potem należy odczytać wartość widgetów, żeby można było z nich korzystać w kodzie:

w_source_schema = dbutils.widgets.get("source_schema")
w_target_schema = dbutils.widgets.get("target_schema")
w_user_tab_columns = dbutils.widgets.get("target_table")

Stworzymy też sztuczną tabelę tymczasową, która przyda się do definiowania struktury user_tab_columns.

spark.sql(f"create or replace table {w_target_schema}.dual select 1 as id ")

Definiowanie struktury

Co powinno się znaleźć w docelowej tabeli? O jakich metadanych będziemy zbierać informacje? Co jest takim wymaganym i pożądanym elementem tabeli z metadanymi?

Schemat - schemat źródłowej tabeli
Table_name - nazwa tabeli do której należy kolumna
Column_name - nazwa kolumny
Data_type - type danych
Ordinal_position - informacja, na której pozycji w tabeli znajduje się kolumna
Comment - komentarze o danej kolumnie, jeżeli zostały wstawione
Latest_metadata_refresh_date - data ostatniego odświeżenia tabeli.

user_tab_columns = spark.sql(f"select 's' as schema, 'a' as table_name, 'b' as column_name, 'c' as data_type, 'd' as ordinal_position, 'e' as comment, current_timestamp() as latest_metadata_refresh_date from {w_target_schema}.dual where 1 = 2")

Budowanie tabeli

Struktura jest gotowa, teraz pora na wybranie ze schematu określonego w parametrach listy tabel, które będziemy wstawiać do tabeli z metadanymi.

Może się zdarzyć tak, że schemat źródłowy jest taki sam jak docelowy, zakładam, że nie chciałbyś widzieć w tabeli user_tab_columns:

  • Tymczasowych widoków
  • Metadanych tabeli user_tab_columns
tables = spark.sql(f"show tables in {w_source_schema}")
tables = tables.filter(f"istemporary = False and tableName <> '{w_user_tab_columns}'")

Teraz dla każdej tabeli z listy tabel musimy odczytać jakie są kolumny i wynik dopasować do schematu tabeli user_tab_columns. Powtarzamy to kolejno dla każdej z tabel odczytanych ze schematu.

for row in tables.collect():    
    df_table = spark.sql(f"select '{w_source_schema}' as schema, '{row.tableName}' as table_name" )
    df_table.createOrReplaceTempView(f"{w_target_schema}.table_schema")

    table_columns = spark.sql(f"describe table {w_source_schema}.{row.tableName}") # table describe           
    table_columns.createOrReplaceTempView(f"{w_target_schema}.table_columns")

    one_table_data = spark.sql("""select ts.schema, ts.table_name, lower(tc.col_name) as column_name, tc.data_type, row_number() over(order by (select null)) as ordinal_position, tc.comment, current_timestamp() as latest_metadata_refresh_date from table_columns tc
                  inner join table_schema ts on (1 = 1) """)

    user_tab_columns = user_tab_columns.unionAll(one_table_data)

Potem zostało już tylko zapisać dane do tabeli z metadanymi:

user_tab_columns.write.mode("overwrite").saveAsTable(f"{w_target_schema}.{w_user_tab_columns}")

Jak widzisz tutaj jest użyta opcja overwrite czyli na tabeli (jeżeli istaniała wcześniej) wykonany będzie truncate i insert danych.

Databricks - user table columns - podsumowanie

Oczywiście Databricks posiada Unity Catalog i jeżeli masz szczęście w Twojej organizacji go używają. Daje on dostęp do metadanych do jakiego jesteś przyzwyczajony.

Jeżeli korzystasz z Hive metastore to rozwiązanie dla Ciebie. Taka tabela to duże uproszczenie i ułatwienie pracy. Znowu metadane są w formacie, do jakiego jestem przyzwyczajony.

Wadą tego rozwiązania jest to, że tabelę trzeba odbudowywać.

Czyli po każdej zmianie metadanych, w schemacie który analizowaliśmy trzeba tabelę odbudować czyli uruchomić skrypt ponownie.

Widzisz jakieś możliwości usprawnienia skryptu? Jakieś wady? Chcesz, żebym coś dopisał?

Daj znać w komentarzu 🙂

Skrypt dostępny na github:
https://github.com/rgogloza/nextlevelbi/blob/master/databricks/user_tab_columns.ipynb