-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsnt_utils.r
More file actions
1805 lines (1506 loc) · 70.4 KB
/
snt_utils.r
File metadata and controls
1805 lines (1506 loc) · 70.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# ================================================
# Title: Utility Functions for SNT Process
# Description: This script contains utility functions used for SNT computation workflow.
# Author: Esteban Montandon
# Created: [2024-10-01]
# Last updated: [2025-08-26]
# Dependencies: stringi, httr, arrow, tools, jsonlite
# Notes:
# - [Optional: Any special considerations, references, or tips]
# ================================================
# add any other matching logic here
format_names <- function(x) {
x <- stri_trans_general(str = x, id = "Latin-ASCII") # remove weird characters
x <- gsub("[^a-zA-Z0-9]", " ", toupper(x)) # replace non-alphanum with space
# x <- gsub("(?i)PROVINCE|ZONE DE SANTE|AIRE DE SANTE|CENTRE DE SANTE", "", x) # TEMPORARY SKIP
x <- gsub(" +", " ", x) # collapse multiple spaces
trimws(x)
}
# Clean column names formatting
clean_column_names <- function(df) {
# Get column names
col_names <- colnames(df)
# Apply the transformation rules
cleaned_names <- gsub("[^a-zA-Z0-9]", "_", col_names) # Replace symbols with underscores
cleaned_names <- gsub("\\s+", "", cleaned_names) # Remove extra spaces
cleaned_names <- toupper(cleaned_names) # Convert to uppercase
# Return cleaned column names
return(trimws(cleaned_names))
}
# Function to check if packages are installed -> install missing packages
install_and_load <- function(packages) {
# is the one that interferes with loading {tidyverse} if not updated version
if (!requireNamespace("scales", quietly = TRUE) || packageVersion("scales") < "1.3.0") {
suppressMessages(install.packages("scales"))
}
# Create vector of packages that are not installed
missing_packages <- packages[!(packages %in% installed.packages()[, "Package"])]
# Install missing packages
if (length(missing_packages) > 0) {
suppressMessages(install.packages(missing_packages))
}
# Load all the packages
suppressMessages(lapply(packages, require, character.only = TRUE))
# Retrieve and print package names and versions
loaded_packages <- sapply(packages, function(pkg) {
paste(pkg, packageVersion(pkg), sep = " ")
})
print(loaded_packages)
}
# Load SNT configuration file with a consistent error message
load_snt_config <- function(config_path, config_file_name = "SNT_config.json") {
config_file <- file.path(config_path, config_file_name)
config_json <- tryCatch(
{
jsonlite::fromJSON(config_file)
},
error = function(e) {
msg <- paste0("[ERROR] Error while loading configuration ", conditionMessage(e))
stop(msg)
}
)
return(config_json)
}
# Validate that required keys exist in a config section
validate_required_config_keys <- function(config_json, keys, section = "SNT_CONFIG") {
if (is.null(config_json[[section]])) {
stop(paste0("[ERROR] Missing configuration section: ", section))
}
missing_keys <- keys[!keys %in% names(config_json[[section]])]
if (length(missing_keys) > 0) {
stop(paste0("[ERROR] Missing configuration input(s): ", paste(missing_keys, collapse = ", ")))
}
invisible(TRUE)
}
# Generic helper to load a country-specific dataset file
load_country_file_from_dataset <- function(dataset_id, country_code, suffix, label = NULL) {
file_name <- paste0(country_code, suffix)
output_data <- tryCatch(
{
get_latest_dataset_file_in_memory(dataset_id, file_name)
},
error = function(e) {
target_label <- if (is.null(label)) file_name else label
msg <- paste0(
"[ERROR] Error while loading ",
target_label,
" (dataset: ",
dataset_id,
", file: ",
file_name,
"): ",
conditionMessage(e)
)
stop(msg)
}
)
log_msg(paste0("Loaded file `", file_name, "` from dataset `", dataset_id, "`."))
return(output_data)
}
# Ensure YEAR and MONTH are stored as integers when present
normalize_year_month_types <- function(input_df, year_col = "YEAR", month_col = "MONTH") {
output_df <- input_df
if (year_col %in% names(output_df)) {
output_df[[year_col]] <- as.integer(output_df[[year_col]])
}
if (month_col %in% names(output_df)) {
output_df[[month_col]] <- as.integer(output_df[[month_col]])
}
return(output_df)
}
# Standard routine preparation: select, pivot longer, optional deduplication
prepare_routine_long <- function(routine_df, fixed_cols, indicators, deduplicate = TRUE) {
cols_to_select <- intersect(c(fixed_cols, indicators), names(routine_df))
missing_indicators <- setdiff(indicators, names(routine_df))
if (length(missing_indicators) > 0) {
stop(paste0("[ERROR] Missing indicator column(s): ", paste(missing_indicators, collapse = ", ")))
}
routine_long <- routine_df %>%
dplyr::select(dplyr::all_of(cols_to_select)) %>%
tidyr::pivot_longer(
cols = dplyr::all_of(indicators),
names_to = "INDICATOR",
values_to = "VALUE"
)
if (deduplicate) {
dedup_keys <- intersect(c("ADM1_ID", "ADM2_ID", "OU_ID", "PERIOD", "YEAR", "MONTH", "INDICATOR"), names(routine_long))
routine_long <- routine_long %>%
dplyr::distinct(dplyr::across(dplyr::all_of(dedup_keys)), .keep_all = TRUE)
}
return(routine_long)
}
# Build a standardized output path under /data and create it if needed
standard_output_path <- function(data_root_path, domain, subdomain = NULL, create_dir = TRUE) {
target_path <- if (is.null(subdomain) || nchar(subdomain) == 0) {
file.path(data_root_path, domain)
} else {
file.path(data_root_path, domain, subdomain)
}
if (create_dir && !dir.exists(target_path)) {
dir.create(target_path, recursive = TRUE, showWarnings = FALSE)
}
return(target_path)
}
# Helper to safely extract values from parameters (allows to specify the type)
get_param <- function(params_list, target_param, default, cast_method = identity) {
#' Safely retrieve a parameter if it exists in the input, using a default fallback if it doesn't exist in the inupt
#'
#' @param params_list list of the input parameters (can be null, if it doesn't exist)
#' @param target_param parameter to retrieve
#' @param default default value to return if the parameter doesn't exist/is null
#' @param cast_method how to cast the parameter value (if necessary, defaults to self)
#'
#' @returns the (cast) parameter value from the input if it exists or the default value otherwise
if (!is.null(params_list) && is.list(params_list) &&
!is.null(params_list[[target_param]])) {
return(cast_method(params_list[[target_param]]))
} else {
return(default)
}
}
# # Load a file from the last version of a dataset (last version of the dataset)
get_latest_dataset_file_in_memory <- function(dataset, filename) {
# Get the dataset file object
dataset_last_version <- openhexa$workspace$get_dataset(dataset)$latest_version
dataset_file <- dataset_last_version$get_file(filename)
# Perform the GET request and keep the content in memory
response <- httr::GET(dataset_file$download_url)
if (httr::status_code(response) != 200) {
stop("Failed to download the file.")
}
print(paste0("File downloaded successfully from dataset version: ",dataset_last_version$name))
# Convert the raw content to a raw vector (the content of the file)
raw_content <- httr::content(response, as = "raw")
temp_file <- rawConnection(raw_content, "r")
file_extension <- tolower(tools::file_ext(filename))
if (file_extension == "parquet") {
df <- arrow::read_parquet(temp_file)
}
else if (file_extension == "csv") {
df <- utils::read.csv(temp_file, stringsAsFactors = FALSE)
}
else if (file_extension == "geojson") {
tmp_geojson <- tempfile(fileext = ".geojson")
writeBin(raw_content, tmp_geojson)
df <- sf::st_read(tmp_geojson, quiet = TRUE)
}
else if (file_extension == "json") {
# Read JSON from raw content
json_txt <- rawToChar(raw_content)
df <- jsonlite::fromJSON(json_txt, flatten = TRUE)
}
else {
stop(paste("Unsupported file type:", file_extension))
}
# Return the dataframe
return(df)
}
# helper function for OpenHEXA logging
log_msg <- function(msg , level="info") {
print(msg)
if (!is.null(openhexa$current_run)) {
level <- tolower(level)
if (level == "info"){
openhexa$current_run$log_info(msg)
} else if(level == "warning"){
openhexa$current_run$log_warning(msg)
} else if(level == "error"){
openhexa$current_run$log_error(msg)
} else {
stop("Unsupported log level")
}
}
}
# Helper for logging when not in OH
pipeline_msg <- function(
target_msg,
pipeline_error = "object 'openhexa' not found",
extra_info = "not connected to pipeline run"
) {
tryCatch(
{
log_msg(target_msg)
},
error = function(e) {
# if the error is about openhexa
if (grepl(pipeline_error, e$message)) {
print(extra_info)
print(target_msg)
} else {
# rethrow the other errors
stop(e)
}
}
)
}
# Helper to create a folder path if it doesn't already exist
safe_create_dir <- function(dir_path){
# Check if the directory exists, and if not, create it
if (!dir.exists(dir_path)) {
dir.create(dir_path, recursive = TRUE)
message("Directory created: ", dir_path)
} else {
message("Directory already exists: ", dir_path)
}
}
# Helper function for exporting data (csv and parquet files)
export_data <- function(data_object, file_path) {
# Get directory and create if it doesn't exist
output_dir <- dirname(file_path)
if (!dir.exists(output_dir)) {
dir.create(output_dir, recursive = TRUE)
log_msg(paste0("Output folder created : ", output_dir))
}
# get file name extension
file_extension <- tools::file_ext(file_path)
# Export the data based on file type
if (file_extension == "csv") {
write_csv(data_object, file_path)
} else if (file_extension == "parquet") {
arrow::write_parquet(data_object, file_path)
} else {
stop("Unsupported file type. Please use 'csv' or 'parquet'.")
}
# Log the export
log_msg(paste0("Exported : ", file_path))
}
#############
convert_columns <- function(dt, col_type_map) {
# convert specified columns of a data.table to target types
# @param dt a data.table to modify in place
# @param col_type_map a named list mapping type names (e.g., "numeric") to vectors of column names
# @returns the modified data.table
for (type in names(col_type_map)) {
cols <- col_type_map[[type]]
convert_fun <- match.fun(paste0("as.", type))
dt[, (cols) := lapply(.SD, convert_fun), .SDcols = cols]
}
}
#############
get_newest_dataset_file <- function(target_dataset, target_filename, target_country_code){
#' Retrieve the latest version from a dataset
#' @param target_dataset the dataset which contains the file
#' @param target_filename the file to retrieve
#' @param target_country_code the code name of the country to retrieve
#' @returns the dataframe of the latest version on the dataset, for the target country
# load file
output_data <- tryCatch({ get_latest_dataset_file_in_memory(target_dataset, target_filename) },
error = function(e) {
msg <- glue("Error while loading {ountry_code} {target_filename}, {conditionMessage(e)}") # log error message
cat(msg)
stop(msg)
})
data_dimensions <- paste(dim(output_data), collapse=", ")
msg <- glue("{target_filename} loaded from dataset {target_dataset}; the dataframe has the following dimensions: {data_dimensions}")
log_msg(msg)
return(output_data)
}
#############
match_column_classes <- function(input_dt, reference_dt) {
#' ensure that columns in an input data.table have the same classes as those in a reference data.table
#' @param input_dt the data table whose columns will be adapted
#' @param reference_dt the data table with the target column classes
#' @return a data table with the same column names and data as input_dt, but with classes of common columns made to match those in reference_dt
output_dt <- copy(as.data.table(input_dt))
if (!is.data.table(reference_dt)) {
stop("The reference data must be a data.table.")
}
common_cols <- intersect(names(output_dt), names(reference_dt))
for (col in common_cols) {
ref_class <- class(reference_dt[[col]])
# keep reference semantics
if (!inherits(output_dt[[col]], ref_class)) {
new_col <- switch(
ref_class[1], # use only the primary class
character = as.character(output_dt[[col]]),
integer = as.integer(output_dt[[col]]),
numeric = as.numeric(output_dt[[col]]),
factor = as.factor(output_dt[[col]]),
Date = as.Date(output_dt[[col]]),
{
warning(paste("Unsupported class for column:", col, "-", ref_class[1]))
output_dt[[col]]
}
)
set(output_dt, j = col, value = new_col)
}
}
return(output_dt)
}
#############
make_cartesian_admin_period <- function(input_dt, admin_colname, year_colname, month_colname) {
#' make a place-time cartesian product, to ensure each possible combination exists, between a minimum period and a maximum period
#' @param input_dt the original data.table
#' @param admin_colname place (admin) column
#' @param year_colname time column 1
#' @param month_colname time column 2
#' @return the total number of periods (to check if the data contains enough periods)
#' @return a new data.table, with the cartesian place-time rows
dt <- copy(as.data.table(input_dt))
# select only relevant columns to work with
cols <- c(admin_colname, year_colname, month_colname)
dt <- dt[, ..cols]
# make the table of unique administrative units
admin_dt <- unique(dt[, .(
get(admin_colname),
placeholder = 1
)])
setnames(admin_dt, old = names(admin_dt)[1], new = admin_colname)
# make the table with all possible monthly periods, between the minimum and the maximum of input_dt
dt[, date := as.IDate(paste0(dt[[year_colname]], '-', dt[[month_colname]], '-01'))]
min_date <- min(dt$date, na.rm = TRUE)
max_date <- max(dt$date, na.rm = TRUE)
date_seq <- seq(min_date, max_date, by = "1 month")
dates_dt <- data.table(
YEAR = year(date_seq),
MONTH = month(date_seq),
placeholder = rep(1, length(date_seq))
)
# make the cartesian product between administrative units and monthly periods, using a placeholder column
result_dt <- merge.data.table(admin_dt, dates_dt, by = c('placeholder'), allow.cartesian = TRUE)
result_dt <- result_dt[, -'placeholder']
return(list(nrow(dates_dt), result_dt))
}
#############
make_cartesian_dt_vector <- function(input_dt, input_vector, new_colname){
#' cartesian product of a data table and a vector
#' @param input_dt data
#' @param input_vector extra vector to add rows based on
#' @param new_colname name of the new column (the one which has the values of the vector)
#' @returns data table with the cartesian product of the two parameters
if (!is.data.table(input_dt)) {
input_dt <- as.data.table(input_dt)
}
# vector to data table
vector_dt <- setnames(data.table(input_vector), new_colname)
# dummy columns to both for cross join
input_dt[, dummy := 1]
vector_dt[, dummy := 1]
# cartesian
output_dt <- merge(input_dt, vector_dt, by = "dummy", allow.cartesian = TRUE)
# temove the dummy column
output_dt[, dummy := NULL]
return(output_dt)
}
#############
make_full_time_space_data <- function(input_dt, full_rows_dt, target_colname, admin_colname = 'ADM2_ID', year_colname = 'YEAR', month_colname = 'MONTH') {
#' add missing administrative rows, from a time-place cartesian dataset, into a data table "with holes"
#'
#' @param input_dt input data
#' @param full_rows_dt full set of rows to merge with
#' @param target_colname the column which will eventually need to be imputed (which will have holes)
#' @param admin_colname the admin unit id
#' @param year_colname year
#' @param month_colname month
#' @returns data table with merged and imputed administrative unit columns
common_colnames <- c(admin_colname, year_colname, month_colname)
# make sure data is data table
output_dt <- copy(as.data.table(input_dt)[, .SD, .SDcols = c(common_colnames, target_colname)])
full_rows_dt <- as.data.table(full_rows_dt)[, .SD, .SDcols = common_colnames]
output_dt <- merge.data.table(
output_dt,
full_rows_dt,
by = common_colnames,
all = TRUE
)
# # fill in missings in the administrative unit columns for future imputation grouping
# output_dt <- output_dt[, `:=`(
# ADM1_ID = ifelse(is.na(ADM1_ID), unique(ADM1_ID[!is.na(ADM1_ID)]), ADM1_ID),
# ADM1 = ifelse(is.na(ADM1), unique(ADM1[!is.na(ADM1)]), ADM1),
# ADM2 = ifelse(is.na(ADM2), unique(ADM2[!is.na(ADM2)]), ADM2)
# ), by = ADM2_ID]
#
return(output_dt)
}
#############
extract_dt_with_missings <- function(input_dt, target_colname, id_colname){
#' extract the id's of all rows which have missing values on target_colname
#' @param input_dt the input data.table
#' @param target_colname the column where missings should be identified
#' @param id_colname the grouping column (units of observation)
#' @return a new data.table, which filters only those id's and returns all of the observations associated with them
ids_with_missings <- input_dt[is.na(get(target_colname)), unique(get(id_colname))]
dt_with_missings <- input_dt[get(id_colname) %in% ids_with_missings]
return(dt_with_missings)
}
#############
fill_missing_cases_ts <- function(district_data, original_values_colname, estimated_values_colname, admin_colname, period_colname, threshold_for_missing = 0.0){
#' SARIMA-based imputation for the values_colname variable of values_colname (generally confirmed malaria cases):
#' - fits a seasonal ARIMA model on the values_colname variable
#' - generates estimations for the missing values
#' @param district_data a data.table with the values of a specific admin unit
#' @param original_values_colname the name of the column which contains the values to be imputed
#' @param estimated_values_colname the name of the column which will contain the new, imputed values
#' @param admin_colname the name of the column which contains the administrative unit ids
#' @param period_colname the name of the column which contains the year-month periods
#' @param threshold_for_missing a threshold below which values_colname is considered as missing for the imputation purposes; default is 0 and should ideally stay like that
#' @return a new data.table, with the filled column, called <values_colname>'_EST' added
district_id <- district_data[, unique(get(admin_colname))]
# compute the log, to avoid estimating negative values during imputation
# values of 0 are re-added back at the end
log_values_colname = paste(original_values_colname, 'LOG', sep = '_')
district_data[, (log_values_colname) := ifelse(get(original_values_colname) > threshold_for_missing, log(get(original_values_colname)), NA)]
district_data$PERIOD <- yearmonth(district_data$PERIOD)
district_ts <- tsibble(district_data, index = PERIOD)
# fit ARIMA model to the column with missing values, then estimate values based on model
ts_fill <- district_ts |>
# for parsimony and speed, so the fit doesn't go crazy in the orders to chase good AIC's
model(predefined_sarima = ARIMA(!!sym(log_values_colname) ~ 0 + pdq(1, 1, 0) + PDQ(1, 1, 0))) |>
interpolate(district_ts) |>
mutate(!!sym(estimated_values_colname) := round(exp(!!sym(log_values_colname)))) |>
mutate(!!sym(admin_colname) := district_id)
district_data_filled <- as.data.table(ts_fill)
# drop the log of cases and merge the data
district_data_filled <- merge.data.table(
district_data[, (log_values_colname) := NULL],
district_data_filled[, (log_values_colname) := NULL],
by = c(admin_colname, period_colname)
)
# reformat back to original
district_data_filled[, YEAR := year(PERIOD)]
district_data_filled[, MONTH := month(PERIOD)]
district_data_filled[, PERIOD := NULL]
# this is the general situation, if outlier detection has already happened; in this specific case, the line below should not be run, because all zeroes are errors
district_data_filled[get(original_values_colname) <= threshold_for_missing, (estimated_values_colname) := get(original_values_colname)]
return(district_data_filled)
}
#' add string versions of selected integer columns
#' @param input_dt data.frame or data.table to modify
#' @param input_pattern pattern used to identify columns to convert
#' @param output_pattern replacement pattern used to form new column names
#' @param missing_label string to replace NA values in the new columns
#'
#' @return data.table containing the original data and the newly created columns
add_str_col_from_int <- function(input_dt, input_pattern, output_pattern, missing_label = "<=60%"){
output_dt <- copy(as.data.table(input_dt))
old_colnames <- grep(input_pattern, names(input_dt), value=TRUE)
for(old_colname in old_colnames){
new_colname <- gsub(input_pattern, output_pattern, old_colname)
output_dt[, (new_colname) := as.character(get(old_colname))]
output_dt[is.na(get(new_colname)), (new_colname) := missing_label]
}
return(output_dt)
}
#' Bin a numeric column into categories in a data.table
#' @param dt dt to be modified
#' @param breaks vector of cut points defining the bin boundaries
#' @param labels vector of labels for the resulting bins
#' @param col_in name of the numeric input column to be binned
#' @param col_out name of the output column to create
#' @param include.lowest set to true
#' @return return the modified dt invisibly
bin_column_dt <- function(dt, breaks, labels,col_in, col_out, include.lowest = TRUE, right = FALSE) {
if (!is.data.table(dt)) {
stop("dt must be a data.table")
}
dt[, (col_out) := cut(
get(col_in),
breaks = breaks,
labels = labels,
include.lowest = include.lowest,
right = right,
ordered_result = TRUE
)]
invisible(dt)
}
#%% SEASONALITY -------------------------------------------------------------------
#############
compute_month_seasonality <- function(input_dt, indicator, values_colname, vector_of_durations,
admin_colname = 'ADM2_ID', year_colname = 'YEAR', month_colname = 'MONTH',
proportion_threshold = 0.6,
use_calendar_year_denominator = FALSE) {
#' Compute month-level seasonality indicators using forward-looking month blocks.
#' This function implements the WHO month-block reasoning for seasonality computation.
#'
#' @param input_dt an input data table (or data frame)
#' @param indicator a string to specify the type of indicator (case/rainfall/etc. - will be added to the output variable name)
#' @param values_colname the indicator column, on which the computations are made
#' @param vector_of_durations the vector with the number of months in a block (3/4/5)
#' @param admin_colname the administrative units to group
#' @param year_colname year grouping column
#' @param month_colname month grouping column
#' @param proportion_threshold the proportion of indicator which needs to occur in a block, to qualify for seasonality
#' @param use_calendar_year_denominator Logical. If TRUE, uses the total accumulated value of the current
#' calendar year (Jan-Dec) as the denominator. If FALSE (default), uses the 12-month forward-looking
#' sliding window as denominator (WHO approach).
#' @return an output data table with the additional columns for seasonality indicators
indicator <- toupper(indicator)
dt <- copy(as.data.table(input_dt))
# ensure correct order
dt <- dt[order(get(admin_colname), get(year_colname), get(month_colname))]
# ---------------------------------------------------------
# DENOMINATOR CALCULATION
# ---------------------------------------------------------
if (use_calendar_year_denominator) {
# Alternative Approach: Total accumulated value for the current calendar year (Jan-Dec)
denominator_colname <- paste(indicator, "SUM_CALENDAR_YEAR", sep = "_")
dt[, (denominator_colname) := sum(get(values_colname), na.rm = TRUE),
by = c(admin_colname, year_colname)]
} else {
# Original Approach (WHO): 12-month forward-looking sliding sum (left-aligned)
denominator_colname <- paste(indicator, "SUM", 12, "MTH", "FW", sep = "_")
dt[, (denominator_colname) := frollsum(get(values_colname),
n = 12,
align = "left",
na.rm = TRUE),
by = admin_colname]
}
# ---------------------------------------------------------
# NUMERATOR & RATIO CALCULATIONS
# ---------------------------------------------------------
for (n in vector_of_durations) {
numerator_colname <- paste(indicator, "SUM", n, "MTH", "FW", sep = "_")
prop_name <- paste(indicator, n, "MTH", "ROW", "PROP", sep = "_")
seasonality_colname <- paste(indicator, n, "MTH", "ROW", "SEASONALITY", sep = "_")
# Calculate numerator: Rolling sum of next n months
dt[, (numerator_colname) := frollsum(get(values_colname),
n = n,
align = "left",
na.rm = TRUE),
by = admin_colname]
# Calculate Proportion: Numerator / Denominator
dt[, (prop_name) :=
fifelse(get(denominator_colname) > 0, get(numerator_colname) / get(denominator_colname), NA_real_)]
# Calculate Seasonality Flag
dt[, (seasonality_colname) := as.integer(get(denominator_colname) > 0 &
get(prop_name) >= proportion_threshold)]
}
# return the data
dt[]
}
#############
process_seasonality <- function(input_dt, indicator, vector_of_durations, admin_colname = 'ADM2_ID', year_colname = 'YEAR', month_colname = 'MONTH', proportion_seasonal_years_threshold = 0.5){
#' compute whether or not an admin unit is "seasonal", based on WHO guidelines
#' TODO: I'm passing all columns as arguments (needs resetting column names after each summarizing; to see if the data may have different column names; if not, no need to pass these as arguments and would make the code lighter)
#' @param input_dt the input data table/frame
#' @param indicator the type of indicator
#' @param vector_of_durations the block sizes to check
#' @param admin_colname the place column
#' @param year_colname the year grouping column
#' @param month_colname the month grouping column
#' @param proportion_seasonal_years_threshold the minimum number of seasonal years, for the admin unit to qualify as seasonal
#' @return the output data table, with extra dichotomous variables (seasonal/non-seasonal for each size of month-blocks)
indicator <- toupper(indicator)
# make an "empty" data.table, with only the admin units
output_dt <- input_dt[, setNames(list(unique(get(admin_colname))), admin_colname)]
for (num_months in vector_of_durations) {
regex_pattern <- paste(toupper(indicator), num_months, "MTH_ROW_SEASONALITY$", sep = '_')
row_seasonality_colname <- grep(regex_pattern, names(input_dt), value = TRUE)
subset_dt <- input_dt[, .SD, .SDcols = c(admin_colname, year_colname, month_colname, row_seasonality_colname)]
subset_dt <- subset_dt[!is.na(get(row_seasonality_colname)),]
num_seasonal_years_colname = paste(toupper(indicator), num_months, "MTH_NUM_SEASONAL_YEARS", sep = '_')
num_total_years_colname = paste(toupper(indicator), num_months, "MTH_NUM_TOTAL_YEARS", sep = '_')
subset_dt <- subset_dt[, setNames(
# list of new column values
.(
sum(get(row_seasonality_colname)), # sum of all rows where seasonality is 1
uniqueN(get(year_colname)) # number of total years where the month in question appears for a given admin unit
), c( # vector of new column names
num_seasonal_years_colname,
num_total_years_colname
)),
by = .(get(admin_colname), get(month_colname))
]
# retrieve column names (overwritten when summarizing/aggregating with "get")
names(subset_dt)[1] <- admin_colname
names(subset_dt)[2] <- month_colname
# compute proportion of seasonal years for each month
proportion_colname = paste('PROP', 'SEASONAL', toupper(indicator), num_months, 'MTH', sep = '_')
seasonality_colname = paste('SEASONALITY', toupper(indicator), num_months, 'MTH', sep = '_')
# aggregate by admin unit, to get the dichotonomous variable whether the admin unit is seasonal by this criterion
subset_dt <- subset_dt[, (proportion_colname) := get(num_seasonal_years_colname) / get(num_total_years_colname),
by = .(get(admin_colname), get(month_colname))]
subset_dt <- subset_dt[, (seasonality_colname) := ifelse(get(proportion_colname) >= proportion_seasonal_years_threshold, 1, 0),
by = .(get(admin_colname), get(month_colname))]
# aggregate to keep only the admin unit and whether or not the seasonality is 1
subset_dt <- subset_dt[
order(get(admin_colname), -get(seasonality_colname)),
.SD[1],
.SDcols = c(proportion_colname, seasonality_colname), # possible to add the month_colname here, and filter all cases where seasonality is 1
by = get(admin_colname)
]
# retrieve column names (overwritten when summarizing/aggregating with "get")
names(subset_dt)[1] <- admin_colname
# merge with the output_dt
output_dt <- merge.data.table(output_dt, subset_dt, by = admin_colname)
}
return(output_dt)
}
#############
compute_min_seasonality_block <- function(
input_dt,
seasonality_column_pattern,
vector_of_possible_month_block_sizes,
seasonal_blocksize_colname,
valid_value = 1
){
#' retrieve the minimum number of months which constitute a seasonality block
#' @param input_dt input data.table
#' @param seasonality_column_pattern pattern for seasonality columns
#' @param vector_of_possible_month_block_sizes numeric vector of block sizes
#' @param seasonal_blocksize_colname name of output column
#' @param valid_value value indicating seasonality
#' @return data.table with added blocksize column (NA if none)
# column names which match pattern
seasonality_cols <- grep(
seasonality_column_pattern,
names(input_dt),
ignore.case = TRUE,
value = TRUE
)
# validate block sizes with columns
if (length(vector_of_possible_month_block_sizes) != length(seasonality_cols)) {
stop("Input possible month block sizes should correspond to number of relevant columns.")
}
block_sizes <- as.integer(vector_of_possible_month_block_sizes)
# rowwise compute the new column
output_dt <- input_dt[, (seasonal_blocksize_colname) :=
apply(.SD, 1, function(row) {
# find block sizes corresponding to the target value
valid_blocks <- block_sizes[row == valid_value]
# change to NA if no seasonality
if (length(valid_blocks) == 0) return(NA_integer_)
# minimum block size
return(min(valid_blocks))
}),
.SDcols = seasonality_cols
]
return(output_dt)
}
#############
make_seasonality_plot <- function(spatial_seasonality_df, seasonality_colname, title_label){
#' map seasonality with predefined colors
#' areas are categorized as "Seasonal" or "Not seasonal"
#'
#' @param spatial_seasonality_df sf data frame with spatial geometry and seasonality data
#' @param seasonality_colname string with the name of the column indicating seasonality (values should be 0 or 1)
#' @param title_label string to customize the legend title
#'
#' @return a ggplot object of the seasonality map
seasonality_plot <- ggplot(spatial_seasonality_df) +
geom_sf(aes(fill = as.factor(get(seasonality_colname))))+
scale_fill_manual(values = c("1" = "chartreuse2", "0" = "#1E2044"),
labels = c("1" = "Seasonal", "0" = "Not seasonal")) + # Custom labels
coord_sf() + # map projection
# guides(fill=guide_legend(title= paste0("Seasonality (", title_label, ")"), nrow = 2)) +
guides(fill=guide_legend(title=title_label, nrow = 2)) +
theme_classic() +
theme(plot.title = element_text(face = "bold", hjust = 0.5),
legend.position = "bottom", legend.key.width = unit(2,"cm"), legend.text=element_text(size=10))
print(seasonality_plot)
return(seasonality_plot)
}
#############
make_seasonality_duration_plot <- function(spatial_seasonality_df, seasonality_duration_colname, title_label, palette_name = 'BrBG', none_label="No seasonality"){
#' map the duration of seasonality (in how many months x% of annual rain falls)
#'
#' @param spatial_seasonality_df sf data with spatial and seasonality columns
#' @param seasonality_duration_colname column name (string) for seasonality duration (number of months)
#' @param title_label string for the legend title
#' @param palette_name colorbrewer palette (default is 'BrBG')
#' @param none_label legend label when there is no seasonality (defaults to "Not seasonal")
#'
#' @return ggplot object
#'
duration_plot <- ggplot(spatial_seasonality_df) +
geom_sf(aes(fill = as.character(get(seasonality_duration_colname))))+
coord_sf() + # map projection
# scale_fill_discrete(
# # values = sort(unique(as.character(plot_df[['seasonality_duration_colname']]))), # custom colors
# labels = function(x) {
# ifelse(x == "Inf", none_label, x) # custom labels
# }
# ) +
#
scale_fill_brewer(palette = palette_name, labels = function(x) {
ifelse(is.na(x), none_label, x) # custom labels
}
) +
# guides(fill=guide_legend(title= paste0("Number of months (", title_label, ")" ), nrow = 2)) +
guides(fill=guide_legend(title=title_label, nrow = 2)) +
theme_classic() +
theme(plot.title = element_text(face = "bold", hjust = 0.5),
legend.position = "bottom", legend.key.width = unit(2,"cm"), legend.text=element_text(size=10))
print(duration_plot)
return(duration_plot)
}
#' create forward-looking month blocks summing values and divide them by the annual (calendar year) sum of values
#' @param input_dt an input data table (or data frame)
#' @param values_colname the indicator column, on which the computations are made
#' @param vector_of_durations the vector with the number of months in a block (3/4/5)
#' @param admin_colname the administrative units to group
#' @param year_colname year grouping column
#' @param month_colname month grouping column
#' @param percentage_threshold the percentage which needs to occur in a block, to qualify for seasonality
#' @return an output data table with the additional column
compute_block_percentage <- function(input_dt, values_colname, vector_of_durations, admin_colname = 'ADM2_ID', year_colname = 'YEAR', month_colname = 'MONTH', percentage_threshold = 0.6) {
dt <- copy(as.data.table(input_dt))
# ensure correct order
dt <- dt[order(get(admin_colname), get(year_colname), get(month_colname))]
# denominator: annual (calendar year) sum
denominator_colname <- toupper("sum_calendar_year")
dt[
,
(denominator_colname) := sum(get(values_colname), na.rm = TRUE),
by = c(admin_colname, year_colname)
]
# numerators for each of the durations (forward-looking)
for (n in vector_of_durations) {
numerator_colname <- toupper(glue("sum_{n}_mth_fw"))
pct_colname <- toupper(glue("pct_{n}_mth_row"))
target_colname <- toupper(glue("attained_{percentage_threshold}_cases_{n}_mth"))
dt[, (numerator_colname) := frollsum(get(values_colname),
n = n,
align = "left",
na.rm = TRUE),
by = c(admin_colname)]
dt[, (pct_colname) :=
# make NA's where it would be division by zero
fifelse(get(denominator_colname) > 0, get(numerator_colname)*100 / get(denominator_colname), NA_real_)]
dt[, (target_colname) := as.integer(get(denominator_colname) > 0 &
get(pct_colname) >= percentage_threshold)]
}
return(dt)
}
filter_cycles_data <- function(input_dt, pattern_cycle_colnames, id_colnames, year_colname, reference_years_vector, month_colname, target_month_num=8){
#' Filter data on cycles, to only the relevant beginning month
#' Keep only the useful columns for plotting/presentation
output_dt <- input_dt[
get(month_colname) == target_month_num,
.SD,
.SDcols=c(id_colnames, grep(pattern_cycle_colnames, names(input_dt), value=TRUE))
]
output_dt[, (month_colname) := NULL]
output_dt <- output_dt[
get(year_colname) %in% reference_years_vector
]
return(output_dt)
}
#' Transform a wide-format data table containing cycle coverage columns into a cleaned long-format table, extract coverage percentages from column names and keep the maximum percentage covered per group
#' @param input_dt dt in wide format containing:
#' @param space_colname name of the spatial grouping column
#' @param time_colname name of the temporalgrouping column
#' @return dt in long format with columns space_colname}{Spatial identifier column.}
#' \item{time_colname}{Temporal identifier column.}
#' \item{num_cycles}{Number of cycles (non-missing values only).}
#' \item{max_pct_covered}{Maximum extracted percentage per group.}
#' }
prep_cycles_long <- function(input_dt, space_colname, time_colname){
output_dt <- melt(
data=input_dt,
id.vars=c(space_colname, time_colname),
variable.name="category",
value.name="num_cycles"
)
# extract the number covered as integer, from the "category" column
output_dt[,max_pct_covered:= as.integer(sub(".*?(\\d+).*", "\\1", category))]
# remove the original "category" column
output_dt[, category :=NULL]
# remove rows with missing number of cycles
output_dt <- output_dt[
!is.na(num_cycles)
]
# keep only the maximum number of cases covered, for each number of cycles
output_dt <- output_dt[, .(max_pct_covered = max(max_pct_covered)), by=c(space_colname, time_colname, "num_cycles")]
return(output_dt)
}
#' Fill missing values in long-format cycles data
#' @param input_dt df/dt in long format containing spatial, temporal, ordering and value columns
#' @param spatial_colname name of the spatial grouping column
#' @param temporal_colname name of the temporal grouping column
#' @param order_colname column for within-group ordering (determines the sequence for LOCF filling)
#' @param value_colname column containing values to be filled
#'
#' @return dt with missing values in value_colname, filled using last observation carried forward within groups
fill_long_cycles_dt <- function(input_dt, spatial_colname, temporal_colname, order_colname, value_colname){
output_dt <- copy(as.data.table(input_dt))
# ensure correct order before filling
setorderv(
output_dt,
c(spatial_colname, temporal_colname, order_colname),
c(1L, 1L, 1L)
)
output_dt[, (value_colname) := nafill(get(value_colname), type = "locf"), by = c(spatial_colname, temporal_colname)]
}
#' Make a choropleth map from an sf object
#'
#' @param spatial_df sf object containing geometries and the variable map
#' @param target_colname name of the column to use for the fill aesthetic
#' @param map_colors vector of colors to scale_fill_manual with names that match the levels of the target variable