Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 108 additions & 0 deletions code/snt_utils.r
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,114 @@ install_and_load <- function(packages) {
print(loaded_packages)
}

# Load SNT configuration file with a consistent error message
load_snt_config <- function(config_path, config_file_name = "SNT_config.json") {
config_file <- file.path(config_path, config_file_name)
config_json <- tryCatch(
{
jsonlite::fromJSON(config_file)
},
error = function(e) {
msg <- paste0("[ERROR] Error while loading configuration ", conditionMessage(e))
stop(msg)
}
)
return(config_json)
}

# Validate that required keys exist in a config section
validate_required_config_keys <- function(config_json, keys, section = "SNT_CONFIG") {
if (is.null(config_json[[section]])) {
stop(paste0("[ERROR] Missing configuration section: ", section))
}

missing_keys <- keys[!keys %in% names(config_json[[section]])]
if (length(missing_keys) > 0) {
stop(paste0("[ERROR] Missing configuration input(s): ", paste(missing_keys, collapse = ", ")))
}

invisible(TRUE)
}

# Generic helper to load a country-specific dataset file
load_country_file_from_dataset <- function(dataset_id, country_code, suffix, label = NULL) {
file_name <- paste0(country_code, suffix)
output_data <- tryCatch(
{
get_latest_dataset_file_in_memory(dataset_id, file_name)
},
error = function(e) {
target_label <- if (is.null(label)) file_name else label
msg <- paste0(
"[ERROR] Error while loading ",
target_label,
" (dataset: ",
dataset_id,
", file: ",
file_name,
"): ",
conditionMessage(e)
)
stop(msg)
}
)

log_msg(paste0("Loaded file `", file_name, "` from dataset `", dataset_id, "`."))
return(output_data)
}

# Ensure YEAR and MONTH are stored as integers when present
normalize_year_month_types <- function(input_df, year_col = "YEAR", month_col = "MONTH") {
output_df <- input_df
if (year_col %in% names(output_df)) {
output_df[[year_col]] <- as.integer(output_df[[year_col]])
}
if (month_col %in% names(output_df)) {
output_df[[month_col]] <- as.integer(output_df[[month_col]])
}
return(output_df)
}

# Standard routine preparation: select, pivot longer, optional deduplication
prepare_routine_long <- function(routine_df, fixed_cols, indicators, deduplicate = TRUE) {
cols_to_select <- intersect(c(fixed_cols, indicators), names(routine_df))
missing_indicators <- setdiff(indicators, names(routine_df))
if (length(missing_indicators) > 0) {
stop(paste0("[ERROR] Missing indicator column(s): ", paste(missing_indicators, collapse = ", ")))
}

routine_long <- routine_df %>%
dplyr::select(dplyr::all_of(cols_to_select)) %>%
tidyr::pivot_longer(
cols = dplyr::all_of(indicators),
names_to = "INDICATOR",
values_to = "VALUE"
)

if (deduplicate) {
dedup_keys <- intersect(c("ADM1_ID", "ADM2_ID", "OU_ID", "PERIOD", "YEAR", "MONTH", "INDICATOR"), names(routine_long))
routine_long <- routine_long %>%
dplyr::distinct(dplyr::across(dplyr::all_of(dedup_keys)), .keep_all = TRUE)
}

return(routine_long)
}

# Build a standardized output path under /data and create it if needed
standard_output_path <- function(data_root_path, domain, subdomain = NULL, create_dir = TRUE) {
target_path <- if (is.null(subdomain) || nchar(subdomain) == 0) {
file.path(data_root_path, domain)
} else {
file.path(data_root_path, domain, subdomain)
}

if (create_dir && !dir.exists(target_path)) {
dir.create(target_path, recursive = TRUE, showWarnings = FALSE)
}

return(target_path)
}

# Helper to safely extract values from parameters (allows to specify the type)
get_param <- function(params_list, target_param, default, cast_method = identity) {
#' Safely retrieve a parameter if it exists in the input, using a default fallback if it doesn't exist in the inupt
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,24 +59,16 @@
"source": [
"# Project folders (ROOT_PATH injected by pipeline if not set)\n",
"if (!exists(\"ROOT_PATH\")) ROOT_PATH <- \"~/workspace\"\n",
"CODE_PATH <- file.path(ROOT_PATH, 'code') \n",
"CONFIG_PATH <- file.path(ROOT_PATH, 'configuration')\n",
"DATA_PATH <- file.path(ROOT_PATH, 'data')\n",
"PIPELINE_PATH <- file.path(ROOT_PATH, \"pipelines\", \"snt_dhis2_outliers_imputation_iqr\")\n",
"\n",
"# Load utils\n",
"source(file.path(CODE_PATH, \"snt_utils.r\"))\n",
"# Shared helpers for this pipeline (code)\n",
"source(file.path(PIPELINE_PATH, \"utils\", \"snt_dhis2_outliers_imputation_iqr.r\"))\n",
"setup_ctx <- bootstrap_iqr_context(\n",
" root_path = ROOT_PATH,\n",
" required_packages = c(\"data.table\", \"arrow\", \"tidyverse\", \"jsonlite\", \"DBI\", \"RPostgres\", \"reticulate\", \"glue\", \"zoo\")\n",
")\n",
"\n",
"# Load libraries \n",
"required_packages <- c( \"data.table\", \"arrow\", \"tidyverse\", \"jsonlite\", \"DBI\", \"RPostgres\", \"reticulate\", \"glue\", \"zoo\")\n",
"install_and_load(required_packages)\n",
"\n",
"# Environment variables\n",
"Sys.setenv(PROJ_LIB = \"/opt/conda/share/proj\")\n",
"Sys.setenv(GDAL_DATA = \"/opt/conda/share/gdal\")\n",
"Sys.setenv(RETICULATE_PYTHON = \"/opt/conda/bin/python\")\n",
"\n",
"# Load OpenHEXA sdk\n",
"openhexa <- import(\"openhexa.sdk\")"
"OUTPUT_DIR <- setup_ctx$OUTPUT_DIR"
]
},
{
Expand Down Expand Up @@ -120,15 +112,9 @@
},
"outputs": [],
"source": [
"# Load SNT config\n",
"config_json <- tryCatch({ fromJSON(file.path(CONFIG_PATH, \"SNT_config.json\")) },\n",
" error = function(e) {\n",
" msg <- glue(\"[ERROR] Error while loading configuration {conditionMessage(e)}\")\n",
" log_msg(msg)\n",
" stop(msg)\n",
" })\n",
"\n",
"log_msg(glue(\"SNT configuration loaded from : {file.path(CONFIG_PATH, 'SNT_config.json')}\"))"
"# Load SNT config from bootstrap context\n",
"config_json <- setup_ctx$config_json\n",
"log_msg(glue(\"SNT configuration loaded from : {file.path(setup_ctx$CONFIG_PATH, 'SNT_config.json')}\"))"
]
},
{
Expand All @@ -142,16 +128,7 @@
},
"outputs": [],
"source": [
"# Check SNT configuration \n",
"snt_config_mandatory <- c(\"COUNTRY_CODE\", \"DHIS2_ADMINISTRATION_1\", \"DHIS2_ADMINISTRATION_2\") \n",
"for (conf in snt_config_mandatory) {\n",
" if (is.null(config_json$SNT_CONFIG[[conf]])) {\n",
" msg <- paste(\"Missing configuration input:\", conf)\n",
" log_msg(msg)\n",
" stop(msg)\n",
" }\n",
"}\n",
"\n",
"# Configuration validation is handled in pipeline.py\n",
"COUNTRY_CODE <- config_json$SNT_CONFIG$COUNTRY_CODE\n",
"ADMIN_1 <- toupper(config_json$SNT_CONFIG$DHIS2_ADMINISTRATION_1)\n",
"ADMIN_2 <- toupper(config_json$SNT_CONFIG$DHIS2_ADMINISTRATION_2)\n",
Expand Down Expand Up @@ -190,15 +167,12 @@
"source": [
"# Load file from dataset (formatting)\n",
"dataset_name <- config_json$SNT_DATASET_IDENTIFIERS$DHIS2_DATASET_FORMATTED\n",
"dhis2_routine <- tryCatch({ get_latest_dataset_file_in_memory(dataset_name, paste0(COUNTRY_CODE, \"_routine.parquet\")) }, \n",
" error = function(e) {\n",
" msg <- glue(\"[ERROR] Error while loading DHIS2 routine data file for {COUNTRY_CODE} : {conditionMessage(e)}\") # log error message\n",
" log_msg(msg)\n",
" stop(msg)\n",
"})\n",
"dhis2_routine <- load_routine_data(\n",
" dataset_name = dataset_name,\n",
" country_code = COUNTRY_CODE,\n",
" required_indicators = DHIS2_INDICATORS\n",
")\n",
"\n",
"log_msg(glue(\"DHIS2 routine data loaded from dataset : {dataset_name}\"))\n",
"log_msg(glue(\"DHIS2 routine data loaded has dimensions: {nrow(dhis2_routine)} rows, {ncol(dhis2_routine)} columns.\"))\n",
"print(dim(dhis2_routine))\n",
"head(dhis2_routine, 2)"
]
Expand All @@ -214,8 +188,7 @@
},
"outputs": [],
"source": [
"# YEAR and MONTH should be integers; in the input data they are numeric, but we later use them as integers\n",
"dhis2_routine[c(\"YEAR\", \"MONTH\")] <- lapply(dhis2_routine[c(\"YEAR\", \"MONTH\")], as.integer)"
"# YEAR/MONTH casting is handled inside load_routine_data()."
]
},
{
Expand All @@ -237,14 +210,7 @@
},
"outputs": [],
"source": [
"# Raise an error if any of DHIS2_INDICATORS are not present in the dhis2 routine data.\n",
"for (ind in DHIS2_INDICATORS) {\n",
" if (!(ind %in% colnames(dhis2_routine))) {\n",
" msg <- paste(\"[ERROR] Missing indicator column in routine data: \", ind)\n",
" log_msg(msg)\n",
" stop(msg)\n",
" }\n",
"}"
"# Indicator validation is handled inside load_routine_data()."
]
},
{
Expand Down Expand Up @@ -536,23 +502,8 @@
},
"outputs": [],
"source": [
"# Define helper function to compute moving average for an outlier column\n",
"start_time <- Sys.time()\n",
"\n",
"impute_outliers_dt <- function(dt, outlier_col) {\n",
" dt <- as.data.table(dt) # transform to datatable\n",
" setorder(dt, ADM1_ID, ADM2_ID, OU_ID, INDICATOR, PERIOD, YEAR, MONTH) \n",
" dt[, TO_IMPUTE := fifelse(get(outlier_col) == TRUE, NA_real_, VALUE)] # Compute TO_IMPUTE column\n",
" \n",
" # Fast rolling mean by group\n",
" dt[, MOVING_AVG := frollapply(TO_IMPUTE, n = 3, FUN = function(x) ceiling(mean(x, na.rm = TRUE)), align = \"center\"), \n",
" by = .(ADM1_ID, ADM2_ID, OU_ID, INDICATOR)]\n",
" \n",
" dt[, VALUE_IMPUTED := fifelse(is.na(TO_IMPUTE), MOVING_AVG, TO_IMPUTE)] \n",
" dt[, c(\"TO_IMPUTE\") := NULL] # clean up \"MOVING_AVG\"\n",
" \n",
" return(as.data.frame(copy(dt)))\n",
"}"
"# Helper loaded from utils/snt_dhis2_outliers_imputation_iqr.r\n",
"start_time <- Sys.time()"
]
},
{
Expand Down Expand Up @@ -629,24 +580,7 @@
},
"outputs": [],
"source": [
"# Define helper function to format both versions \n",
"format_routine_data_selection <- function(df, outlier_column, remove = FALSE) {\n",
" \n",
" # remove outliers \n",
" if (remove) df <- df %>% filter(!.data[[outlier_column]])\n",
"\n",
" target_cols <- c(\"PERIOD\", \"YEAR\", \"MONTH\", \"ADM1_NAME\", \"ADM1_ID\", \"ADM2_NAME\", \"ADM2_ID\", \"OU_ID\", \"OU_NAME\", DHIS2_INDICATORS)\n",
" \n",
" output <- df %>%\n",
" select(-VALUE) %>%\n",
" rename(VALUE = VALUE_IMPUTED) %>%\n",
" select(all_of(fixed_cols), INDICATOR, VALUE) %>% # global: fixed_cols\n",
" mutate(VALUE = ifelse(is.nan(VALUE), NA_real_, VALUE)) %>%\n",
" pivot_wider(names_from = \"INDICATOR\", values_from = \"VALUE\") %>%\n",
" left_join(pyramid_names, by = c(\"ADM1_ID\", \"ADM2_ID\", \"OU_ID\"))\n",
"\n",
" output %>% select(all_of(intersect(target_cols, names(output))))\n",
"}"
"# Helper loaded from utils/snt_dhis2_outliers_imputation_iqr.r"
]
},
{
Expand All @@ -661,8 +595,22 @@
"outputs": [],
"source": [
"# Format IQR tables (imputed and removed)\n",
"dhis2_routine_iqr_imputed <- format_routine_data_selection(dhis2_routine_outliers_iqr_imputed, iqr_column)\n",
"dhis2_routine_iqr_removed <- format_routine_data_selection(dhis2_routine_outliers_iqr_imputed, iqr_column, remove = TRUE)"
"dhis2_routine_iqr_imputed <- format_routine_data_selection(\n",
" df = dhis2_routine_outliers_iqr_imputed,\n",
" outlier_column = iqr_column,\n",
" DHIS2_INDICATORS = DHIS2_INDICATORS,\n",
" fixed_cols = fixed_cols,\n",
" pyramid_names = pyramid_names\n",
")\n",
"\n",
"dhis2_routine_iqr_removed <- format_routine_data_selection(\n",
" df = dhis2_routine_outliers_iqr_imputed,\n",
" outlier_column = iqr_column,\n",
" DHIS2_INDICATORS = DHIS2_INDICATORS,\n",
" fixed_cols = fixed_cols,\n",
" pyramid_names = pyramid_names,\n",
" remove = TRUE\n",
")"
]
},
{
Expand Down Expand Up @@ -714,7 +662,7 @@
},
"outputs": [],
"source": [
"output_path <- file.path(DATA_PATH, \"dhis2\", \"outliers_imputation\")\n",
"output_path <- OUTPUT_DIR\n",
"\n",
"# IQR detection table (for DB and reporting)\n",
"outlier_col <- colnames(dhis2_routine_outliers_selection)[startsWith(colnames(dhis2_routine_outliers_selection), \"OUTLIER_\")][1]\n",
Expand Down
Loading