Databricks schema evolution.
Problem: System źródłowy zmienia się dynamicznie, często pojawiają się nowe kolumny. Mamy dostosować się do zmian i nowe kolumny w źródle, mają pojawić się automatycznie w bronze layer.
Rozwiązanie: Użyj schema evolution, dzięki temu nowe kolumny będą w sposób automatyczny dodawane do Lake House. Włączymy tą funkcjonalność na poziomie clustra, żeby ułatwić obłsugę schema evolution w SQL'u.
Databricks schema evolution: konfiguracja
Właczenie tej funkcjonalności na poziomie klustra albo sessji możliwe jest przy użyciu:
spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled","True")
Natomiast w ten sposób, możesz sprawdzić, czy schema evolution jest ustawione globalnie:
spark.conf.get("spark.databricks.delta.schema.autoMerge.enabled")
Databricks schema evolution: implementacja
Dodanie nowych kolumn możesz zrobić przy pomocy * (gwiazda), jeżeli nie wiesz jakie będą kolumny albo nie możesz tego łatwo określić:
merge into ${schema_name}.tmp_ds_product target
using ${schema_name}.tmp_stg_product source
on target.product_id = source.product_id
when matched then update set *
when not matched then insert *
albo jeżeli znasz nazwy kolumn to polecam ten sposób, jest dużo bardziej czytelny:
merge into ${schema_name}.tmp_ds_product target
using ${schema_name}.tmp_stg_product source
on target.product_id = source.product_id
when matched then update set product_name = source.product_name, target.product_status = source.product_status, target.brand = source.brand, target.sales_representative = source.sales_representative, made_in = source.made_in
when not matched then insert (product_id, product_name, product_status, brand, sales_representative, made_in) values (product_id, product_name, product_status, brand, sales_representative, made_in)
Databricks schema evolution: kiedy nie zadziała
Gdy napiszesz powyższe query w inny sposób, wtedy schema evolution nie zadziała, nie może znaleźć kolumny w tabeli docelowaj i jej automatycznie nie tworzy:
-- this will not work, we need to remove alias from target column name
merge into ${schema_name}.tmp_ds_product target
using ${schema_name}.tmp_stg_product source
on target.product_id = source.product_id
when matched then update set product_name = source.product_name, target.product_status = source.product_status, target.brand = source.brand, target.sales_representative = source.sales_representative, target.made_in = source.made_in
when not matched then insert (product_id, product_name, product_status, brand, sales_representative, made_in) values (product_id, product_name, product_status, brand, sales_representative, made_in)
Nową kolumną w tym przykładzie jest made_in, dodawanie prefiksu tabeli docelowej target.made_in, sprawi, że dane nie zostaną wstawione i wykonanie zapytania skończy się błędem:
DeltaAnalysisException: Cannot resolve target.made_in in UPDATE clause given columns source.product_id, source.product_name, source.product_status, source.brand, source.sales_representative, source.made_in.; line 2 pos 0
Usunięcie aliasu sprawi, że zapytanie będzie działać.