---
title: "Loading schema v1 assets into DBT"
subtitle: "WOKR IN PROGRESS- placeholder dpeloy; will polish in release 2.1"
author:
- name: Ran Li (Maintainer)
orcid: 0000-0002-4699-4755
- name: Jessica Uruchima
date: last-modified
format:
html:
page-layout: full
toc: true
self-contained: true
code-fold: true
df-print: kable
code-tools: true
comments:
hypothesis:
theme: clean
editor: source
execute:
warning: false
message: false
eval: true
editor_options:
chunk_output_type: console
---
```{r}
#| code-summary: 'Notebook Setup'
#| code-fold: true
#| eval: true
{ ## 'Global Setup (For any notebook)'
#### Configure relative path (Change!)
path_global_setup_function = '../../../R/setup/global_setup.R'
#### Generic global setup code (Do not change!)
source (path_global_setup_function)
setup = global_setup (path_global_setup_function)
invisible (list2env (setup$ sourced_objects, envir = .GlobalEnv))
global_context = setup$ global_context
}
{ ## Local Setup (For any notebook)
## Cache Paths
local_context_object = lst (
cache_folder = file.path (
global_context$ path_server_etl_cache,
'_salurbal_cube_v1' ),
cache_path_precensor_metadata_v1 = file.path (
cache_folder, 'metadata_v1.parquet' ),
cache_path_precensor_metadata_v2 = file.path (
cache_folder, 'metadata_v2.parquet' ),
cache_path_precensor_metadata = file.path (
cache_folder, 'precensor_metadata.parquet' ),
cache_path_precensor_data_v1 = file.path (
cache_folder, 'data_v1.parquet' ),
cache_path_precensor_data_v2 = file.path (
cache_folder, 'data_v2.parquet' ),
cache_path_precensor_data = file.path (
cache_folder, 'precensor_data.parquet' ),
cache_path_censored_metadata_cube = file.path (
cache_folder, 'censored_metadata_cube.parquet' ),
cache_path_censored_data_cube = file.path (
cache_folder, 'censored_data_cube.parquet' ),
cache_path_validated_metadata_cube = file.path (
cache_folder, 'validated_metadata_cube.parquet' ),
cache_path_validated_data_cube = file.path (
cache_folder, 'validated_data_cube.parquet' ),
path_cache_utility_1 = file.path (
cache_folder, 'utility_cache_1.parquet' ),
path_cache_utility_2 = file.path (
cache_folder, 'utility_cache_2.parquet' )
)
local_context = c (local_context_object, global_context)
}
```
We have some assets still in schema v1. During schema v1 developement we did not use DBT as a datawarehouse but rahter use a DYI script, so the renovations did not load assets into DBT. We can still utilize them int he datawarehouse but we just need to load them manually - as opposed to schema v2 assets which are loaded directly into datawarehouse after validation.
# Inventory
The schema v1 Legacy cubes are stored in `salurbal-dbt-server\sources\schema_v1_deprecating`
```{r}
#| eval: true
## Inventory available v1 dataset instance cubes
df_inventory_schema_v1 = file.path (
global_context$ path_server_dbt_sources,
'schema_v1_deprecating' ) %>%
list.files (
pattern = 'internal.parquet' ,
full.names = T
) %>%
map_df (~ {
path = .x; file = basename (path)
tibble (
dataset_id = file %>%
str_remove_all ('_internal.parquet' ) %>%
str_remove_all ('_v \\ d \\ . \\ d' ) ,
dataset_version = str_extract (file, 'v \\ d \\ . \\ d' ),
dataset_instance = paste0 (dataset_id, '_' , dataset_version),
schema_version = 'v1' ,
observation_type = 'area-level' ,
path_dbt_source_data = path ,
path_dbt_source_metadata = path %>%
str_replace ('_internal.parquet' , '_metadata.parquet' )
)
}) %>%
arrange (dataset_id, dataset_version, schema_version)
## Preview
df_inventory_schema_v1 %>%
salurbal_reactable ()
```
Okay. Looks okay - of course as expected, we are msising some renovation cotnext ifnormation which was not explcitiy stored during v1 renoations.
# Load
## Metadata
For this particular release we want to harmonize schema v2 and v1. So that v1 schema assets can be interoperable with v2; this just means appending some additional composite keys not present in schema v1.
Here we do this backwards compatibility updates and compile all our schema v1 metadta assets into a single parquet file that is ingested via `cte__schema_v1_metadata_cube`
```{r}
#| code-summary: 'Metadata v1 standards'
path_source_metadata_v1 = "//files.drexel.edu/colleges/SOPH/Shared/UHC/Projects/Wellcome_Trust/Data Methods Core/Dashboards/dbt/salurbal-dbt-server/sources/_seeds/metadata_v1.parquet"
if (! file.exists (path_source_metadata_v1)) {
## Import + Clean + Validate
df_cube_metadata_precensor_raw_v1 = df_production_inventory %>%
filter (schema_version == 'v1' ) %>%
group_by (row_number ()) %>%
group_map ( ~ {
## Import + Clean + Validate
## .x = df_production_inventory %>% filter(schema_version == 'v1') %>% slice(1)
arrow:: read_parquet (.x$ path_dbt_source_metadata) %>%
mutate (
dataset_instance = paste0 (.x$ dataset_id, '_' , .x$ dataset_version),
version = .x$ dataset_version,
dataset_version = .x$ dataset_version,
schema_version = .x$ schema_version,
observation_type = 'area-level' ,
day = ifelse (is.na (day), '' , day),
month = ifelse (is.na (month), '' , month)
) %>%
mutate (
strata_id = ifelse (is.na (strata_id) | strata_id== 'NA' ,
'' , strata_id) %>%
str_remove_all ("_NA_NA$" ) ,
strata_description = ifelse (
is.na (strata_description) | strata_description== 'NA' ,
'' , strata_description)) %>%
select (- any_of (c ('n_salid_data_points' ))) %>%
verify (valid_non_missing_value (.,
list (vec__v1_v2_schema_columns_metadata),
local_context)) %>%
verify (composite_key_uniqueness (., local_context))
}) %>%
bind_rows ()
## Intermediate table
xwalk_var_subdomain_v1 = df_cube_metadata_precensor_raw_v1 %>%
select (var_name, subdomain) %>%
distinct () %>%
rowwise () %>%
mutate (
list_subdomain = str_split (subdomain, ';' ) %>%
map ( ~ str_trim (.x)),
n_subdomain = length (list_subdomain),
) %>%
ungroup () %>%
arrange (desc (n_subdomain))
## Joins + Validation
df_cube_metadata_precensor_v1 = df_cube_metadata_precensor_raw_v1 %>%
left_join (xwalk_var_subdomain_v1) %>%
verify (full_referential_integrity (., df_cube_metadata_precensor_raw_v1,
local_context)) %>%
verify (valid_non_missing_value (.,
list (vec__v1_v2_schema_columns_metadata),
local_context)) %>%
verify (composite_key_uniqueness (., local_context)) %>%
verify (valid_strata_id (.)) %>%
verify (columns_must_not_have_NA_NULL_cells (., list (vec__v1_v2_composite_keys_metadata)))
## Export
df_cube_metadata_precensor_v1 %>%
arrow:: write_parquet (path_source_metadata_v1)
cli_alert_success ('datawarehouse v1 df_cube_metadata_precensor generated!!' )
}
```
## Data
```{r}
path_source_data_v1 = "//files.drexel.edu/colleges/SOPH/Shared/UHC/Projects/Wellcome_Trust/Data Methods Core/Dashboards/dbt/salurbal-dbt-server/sources/_seeds/data_v1.parquet"
if (! file.exists (path_source_data_v1)) {
## Compile and ad-hoc transformations
df_cube_data_precensor_v1 = df_production_inventory %>%
filter (schema_version == 'v1' ) %>%
group_by (row_number ()) %>%
group_map (~ {
## Setup
dataset_instance_tmp = .x$ dataset_instance
cli_alert ('Processing - {dataset_instance_tmp}' )
## Clean
ds = arrow:: open_dataset (.x$ path_dbt_source_data)
result = ds %>%
# filter(geo == 'L1AD') %>%
# distinct() %>%
select (- any_of ('public' )) %>%
collect () %>%
mutate (
schema_version = 'v1' ,
version = .x$ dataset_version,
dataset_version = .x$ dataset_version,
dataset_instance = dataset_instance_tmp,
observation_type = 'area-level' ,
observation_id = salid,
strata_id = ifelse (is.na (strata_id),'' , strata_id),
strata_id = ifelse ( is.na (strata_id) | strata_id == '' ,
'' , strata_id) %>%
str_remove_all ("_NA_NA$" ) ) %>%
filter (! is.na (salid))
## Test
validated_result = result %>%
verify (valid_non_missing_value (.,
list (vec__v1_v2_schema_columns),
local_context)) %>%
verify (composite_key_uniqueness (., local_context))
## Return
cli_alert_success ("Finished processing - {dataset_instance_tmp}" )
return (validated_result)
}) %>%
bind_rows () %>%
mutate (
day = ifelse (is.na (day), '' , day),
month = ifelse (is.na (month), '' , month))
## Validated + Export
df_cube_data_precensor_v1 %>%
verify (columns_must_not_have_NA_NULL_cells (., list (vec__v1_v2_composite_keys))) %>%
verify (valid_non_missing_value (., list (vec__v1_v2_schema_columns), local_context)) %>%
# verify(composite_key_uniqueness(., local_context)) %>%
arrow:: write_parquet (path_source_data_v1)
}
```