Loading schema v1 assets into DBT

WOKR IN PROGRESS- placeholder dpeloy; will polish in release 2.1

Authors

Ran Li (Maintainer)

Jessica Uruchima

Published

October 5, 2024

Notebook Setup
{ ## 'Global Setup (For any notebook)'
  
  #### Configure relative path (Change!)
  path_global_setup_function = '../../../R/setup/global_setup.R'
  
  #### Generic global setup code (Do not change!)
  source(path_global_setup_function)
  setup = global_setup(path_global_setup_function)
  invisible(list2env(setup$sourced_objects, envir = .GlobalEnv))
  global_context = setup$global_context
}


{ ## Local Setup (For any notebook)
  
  ## Cache Paths
  local_context_object = lst(
    cache_folder = file.path(
      global_context$path_server_etl_cache, 
      '_salurbal_cube_v1'),
    cache_path_precensor_metadata_v1 = file.path(
      cache_folder, 'metadata_v1.parquet' ),
    cache_path_precensor_metadata_v2 = file.path(
      cache_folder, 'metadata_v2.parquet' ),
    cache_path_precensor_metadata = file.path(
      cache_folder, 'precensor_metadata.parquet' ),
    cache_path_precensor_data_v1 = file.path(
      cache_folder, 'data_v1.parquet' ),
    cache_path_precensor_data_v2 = file.path(
      cache_folder, 'data_v2.parquet' ),
    cache_path_precensor_data = file.path(
      cache_folder, 'precensor_data.parquet' ),
    cache_path_censored_metadata_cube = file.path(
      cache_folder, 'censored_metadata_cube.parquet' ),
    cache_path_censored_data_cube = file.path(
      cache_folder, 'censored_data_cube.parquet' ),
    cache_path_validated_metadata_cube = file.path(
      cache_folder, 'validated_metadata_cube.parquet' ),
    cache_path_validated_data_cube = file.path(
      cache_folder, 'validated_data_cube.parquet' ),
    path_cache_utility_1 = file.path(
      cache_folder, 'utility_cache_1.parquet'),
    path_cache_utility_2 = file.path(
      cache_folder, 'utility_cache_2.parquet')
  )

  local_context = c(local_context_object, global_context)

}

We have some assets still in schema v1. During schema v1 developement we did not use DBT as a datawarehouse but rahter use a DYI script, so the renovations did not load assets into DBT. We can still utilize them int he datawarehouse but we just need to load them manually - as opposed to schema v2 assets which are loaded directly into datawarehouse after validation.

Inventory

The schema v1 Legacy cubes are stored in salurbal-dbt-server\sources\schema_v1_deprecating

Code
## Inventory available v1 dataset instance cubes
df_inventory_schema_v1 = file.path(
  global_context$path_server_dbt_sources,
  'schema_v1_deprecating') %>%
  list.files(
    pattern = 'internal.parquet',
    full.names  = T
  ) %>% 
  map_df(~{
    path = .x; file = basename(path)
    tibble(
      dataset_id = file %>% 
        str_remove_all('_internal.parquet') %>% 
        str_remove_all('_v\\d\\.\\d') ,
      dataset_version = str_extract(file, 'v\\d\\.\\d'),
      dataset_instance = paste0(dataset_id, '_', dataset_version),
      schema_version = 'v1'  ,
      observation_type = 'area-level',
      path_dbt_source_data = path ,
      path_dbt_source_metadata = path %>% 
        str_replace('_internal.parquet', '_metadata.parquet')
    ) 
  }) %>% 
  arrange(dataset_id, dataset_version, schema_version) 

## Preview
df_inventory_schema_v1 %>% 
  salurbal_reactable()

Okay. Looks okay - of course as expected, we are msising some renovation cotnext ifnormation which was not explcitiy stored during v1 renoations.

Load

Metadata

For this particular release we want to harmonize schema v2 and v1. So that v1 schema assets can be interoperable with v2; this just means appending some additional composite keys not present in schema v1.

Here we do this backwards compatibility updates and compile all our schema v1 metadta assets into a single parquet file that is ingested via cte__schema_v1_metadata_cube

Metadata v1 standards
path_source_metadata_v1 = "//files.drexel.edu/colleges/SOPH/Shared/UHC/Projects/Wellcome_Trust/Data Methods Core/Dashboards/dbt/salurbal-dbt-server/sources/_seeds/metadata_v1.parquet"


if (!file.exists(path_source_metadata_v1)) {

    ## Import + Clean + Validate
    df_cube_metadata_precensor_raw_v1 =  df_production_inventory %>% 
      filter(schema_version == 'v1') %>%
      group_by(row_number()) %>% 
      group_map( ~ {
        ## Import + Clean + Validate
        ##   .x =  df_production_inventory %>% filter(schema_version == 'v1')  %>% slice(1)
        arrow::read_parquet(.x$path_dbt_source_metadata) %>%
          mutate(
            dataset_instance =  paste0(.x$dataset_id, '_', .x$dataset_version),
            version = .x$dataset_version,
            dataset_version = .x$dataset_version,
            schema_version = .x$schema_version,
            observation_type = 'area-level',
            day = ifelse(is.na(day), '', day),
            month = ifelse(is.na(month), '', month)
          ) %>% 
          mutate(
            strata_id = ifelse(is.na(strata_id) | strata_id=='NA', 
                               '', strata_id) %>% 
              str_remove_all("_NA_NA$") ,
            strata_description = ifelse(
              is.na(strata_description) | strata_description=='NA', 
              '', strata_description)) %>% 
          select(-any_of(c('n_salid_data_points'))) %>% 
          verify(valid_non_missing_value(., 
                                         list(vec__v1_v2_schema_columns_metadata),
                                         local_context)) %>%
          verify(composite_key_uniqueness(., local_context))
      }) %>% 
      bind_rows()
  
    
  ## Intermediate table
  xwalk_var_subdomain_v1 = df_cube_metadata_precensor_raw_v1 %>%
    select(var_name, subdomain) %>%
    distinct() %>%
    rowwise() %>%
    mutate(
      list_subdomain = str_split(subdomain, ';') %>%
        map( ~ str_trim(.x)),
      n_subdomain = length(list_subdomain),
    ) %>%
    ungroup() %>%
    arrange(desc(n_subdomain))
  
  ## Joins + Validation
  df_cube_metadata_precensor_v1 = df_cube_metadata_precensor_raw_v1 %>%
    left_join(xwalk_var_subdomain_v1) %>% 
    verify(full_referential_integrity(., df_cube_metadata_precensor_raw_v1,
                                      local_context)) %>% 
    verify(valid_non_missing_value(., 
                                   list(vec__v1_v2_schema_columns_metadata),
                                   local_context)) %>%
    verify(composite_key_uniqueness(., local_context)) %>% 
    verify(valid_strata_id(.))  %>% 
    verify(columns_must_not_have_NA_NULL_cells(., list(vec__v1_v2_composite_keys_metadata))) 
  
  ## Export
  df_cube_metadata_precensor_v1  %>%
    arrow::write_parquet(path_source_metadata_v1)
  
  cli_alert_success('datawarehouse v1 df_cube_metadata_precensor generated!!')
}  

Data

Code
path_source_data_v1 = "//files.drexel.edu/colleges/SOPH/Shared/UHC/Projects/Wellcome_Trust/Data Methods Core/Dashboards/dbt/salurbal-dbt-server/sources/_seeds/data_v1.parquet"


if (!file.exists(path_source_data_v1)) {
  
  ## Compile and ad-hoc transformations
  df_cube_data_precensor_v1 = df_production_inventory   %>% 
    filter(schema_version == 'v1') %>% 
    group_by(row_number()) %>% 
    group_map(~{
      
      ## Setup
      dataset_instance_tmp = .x$dataset_instance
      cli_alert('Processing - {dataset_instance_tmp}')
      
      ## Clean
      ds = arrow::open_dataset(.x$path_dbt_source_data) 
      result =  ds  %>%
        # filter(geo == 'L1AD') %>%
        # distinct() %>%
        select(-any_of('public')) %>%
        collect() %>%
        mutate(
          schema_version = 'v1',
          version = .x$dataset_version,
          dataset_version = .x$dataset_version,
          dataset_instance = dataset_instance_tmp,
          observation_type = 'area-level',
          observation_id = salid,
          strata_id = ifelse(is.na(strata_id),'', strata_id),
          strata_id = ifelse( is.na(strata_id) | strata_id == '',
                              '', strata_id) %>% 
            str_remove_all("_NA_NA$") ) %>% 
        filter(!is.na(salid))
      
      ## Test
      validated_result = result %>% 
        verify(valid_non_missing_value(., 
                                       list(vec__v1_v2_schema_columns), 
                                       local_context)) %>%
        verify(composite_key_uniqueness(., local_context))
      
      ## Return
      cli_alert_success("Finished processing - {dataset_instance_tmp}")
      return(validated_result)
      
    }) %>% 
    bind_rows() %>% 
    mutate(   
      day = ifelse(is.na(day), '', day),
      month = ifelse(is.na(month), '', month)) 
  
  
  
  ## Validated + Export
  df_cube_data_precensor_v1 %>% 
    verify(columns_must_not_have_NA_NULL_cells(., list(vec__v1_v2_composite_keys))) %>%
    verify(valid_non_missing_value(., list(vec__v1_v2_schema_columns), local_context)) %>%
    # verify(composite_key_uniqueness(., local_context)) %>% 
    arrow::write_parquet(path_source_data_v1)
  
}