Databricks schema evolution.

Problem: System źródłowy zmienia się dynamicznie, często pojawiają się nowe kolumny. Mamy dostosować się do zmian i nowe kolumny w źródle, mają pojawić się automatycznie w bronze layer.

Rozwiązanie: Użyj schema evolution, dzięki temu nowe kolumny będą w sposób automatyczny dodawane do Lake House. Włączymy tą funkcjonalność na poziomie clustra, żeby ułatwić obłsugę schema evolution w SQL'u.

Databricks schema evolution: konfiguracja

Właczenie tej funkcjonalności na poziomie klustra albo sessji możliwe jest przy użyciu:

spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled","True")

Natomiast w ten sposób, możesz sprawdzić, czy schema evolution jest ustawione globalnie:

spark.conf.get("spark.databricks.delta.schema.autoMerge.enabled")

Databricks schema evolution: implementacja

Dodanie nowych kolumn możesz zrobić przy pomocy * (gwiazda), jeżeli nie wiesz jakie będą kolumny albo nie możesz tego łatwo określić:

merge into ${schema_name}.tmp_ds_product target
using ${schema_name}.tmp_stg_product source 
on target.product_id = source.product_id
when matched then update set *
when not matched then insert *

albo jeżeli znasz nazwy kolumn to polecam ten sposób, jest dużo bardziej czytelny:

merge into ${schema_name}.tmp_ds_product target
using ${schema_name}.tmp_stg_product source 
on target.product_id = source.product_id
when matched then update set product_name = source.product_name, target.product_status = source.product_status, target.brand = source.brand, target.sales_representative = source.sales_representative, made_in = source.made_in
when not matched then insert (product_id, product_name, product_status, brand, sales_representative, made_in) values (product_id, product_name, product_status, brand, sales_representative, made_in)

Databricks schema evolution: kiedy nie zadziała

Gdy napiszesz powyższe query w inny sposób, wtedy schema evolution nie zadziała, nie może znaleźć kolumny w tabeli docelowaj i jej automatycznie nie tworzy:

-- this will not work, we need to remove alias from target column name
merge into ${schema_name}.tmp_ds_product target
using ${schema_name}.tmp_stg_product source 
on target.product_id = source.product_id
when matched then update set product_name = source.product_name, target.product_status = source.product_status, target.brand = source.brand, target.sales_representative = source.sales_representative, target.made_in = source.made_in
when not matched then insert (product_id, product_name, product_status, brand, sales_representative, made_in) values (product_id, product_name, product_status, brand, sales_representative, made_in)

Nową kolumną w tym przykładzie jest made_in, dodawanie prefiksu tabeli docelowej target.made_in, sprawi, że dane nie zostaną wstawione i wykonanie zapytania skończy się błędem:

DeltaAnalysisException: Cannot resolve target.made_in in UPDATE clause given columns source.product_id, source.product_name, source.product_status, source.brand, source.sales_representative, source.made_in.; line 2 pos 0

Usunięcie aliasu sprawi, że zapytanie będzie działać.