feat: ✨ raw data to staging#94
Conversation
| VAS_TIME_FIELD_PATTERN = re.compile( | ||
| r"^vas_(?P<field_name>.+?)(_fasted)?_(?P<time>minus10|30|60|90|120|180|240)min$" | ||
| ) |
There was a problem hiding this comment.
We can share this with the metadata transformation
| if not files: | ||
| raise FileNotFoundError( | ||
| f"No raw data files found in '{file_path}'. " | ||
| "Have you run `just download-data`?" | ||
| ) |
There was a problem hiding this comment.
Alternatively, we can just do nothing without throwing an error.
There was a problem hiding this comment.
Yea, I think these types of errors might not be necessary. And might be solved by using some type of "orchestrator", but that's for later. This is fine for now.
…easibility-data into feat/data-raw-to-staging
| .rename({"redcap_event_name": "event"}) | ||
| .with_columns( | ||
| pl.lit("Copenhagen").alias("center"), | ||
| pl.lit(resource_name).alias("resource_name"), |
There was a problem hiding this comment.
I'll use this to write the parquet file then drop it
| for col in vas_cols: | ||
| match = cast(re.Match[str], VAS_TIME_FIELD_PATTERN.match(col)) | ||
|
|
||
| time = match.group("time") | ||
| if time == "minus10": | ||
| time = "-10" | ||
|
|
||
| cols_grouped_by_time.setdefault(int(time), []).append(col) |
There was a problem hiding this comment.
I can rewrite this dictionary construction without a for loop if you want, but I think it will just be longer and more complicated
There was a problem hiding this comment.
This was done automatically, not sure if it's the right way
| @@ -1 +1,2 @@ | |||
| raw/** filter=lfs diff=lfs merge=lfs -text | |||
| staging/** filter=lfs diff=lfs merge=lfs -text | |||
There was a problem hiding this comment.
Or we can track *.parquet
There was a problem hiding this comment.
Not sure, is that a question?
| """Selects columns and adds base columns common to all dataframes.""" | ||
| return ( | ||
| raw_df.select(["redcap_event_name"] + cols) | ||
| .rename({"redcap_event_name": "event"}) |
There was a problem hiding this comment.
Having looked at more of the data, I don't think event by itself can be the PK. Raised #95
There was a problem hiding this comment.
Yea, I was thinking the same.
| if not files: | ||
| raise FileNotFoundError( | ||
| f"No raw data files found in '{file_path}'. " | ||
| "Have you run `just download-data`?" | ||
| ) |
There was a problem hiding this comment.
Yea, I think these types of errors might not be necessary. And might be solved by using some type of "orchestrator", but that's for later. This is fine for now.
| """Selects columns and adds base columns common to all dataframes.""" | ||
| return ( | ||
| raw_df.select(["redcap_event_name"] + cols) | ||
| .rename({"redcap_event_name": "event"}) |
There was a problem hiding this comment.
Yea, I was thinking the same.
| @@ -1 +1,2 @@ | |||
| raw/** filter=lfs diff=lfs merge=lfs -text | |||
| staging/** filter=lfs diff=lfs merge=lfs -text | |||
There was a problem hiding this comment.
Not sure, is that a question?
| ) | ||
|
|
||
|
|
||
| def raw_to_staged(raw_df: pl.DataFrame) -> list[pl.DataFrame]: |
There was a problem hiding this comment.
Maybe organize so either all the _fn are above or below the normal functions?
| pl.lit("Copenhagen").alias("center"), | ||
| pl.lit(resource_name).alias("resource_name"), |
There was a problem hiding this comment.
| pl.lit("Copenhagen").alias("center"), | |
| pl.lit(resource_name).alias("resource_name"), | |
| # Only used for creating the Parquet files. | |
| pl.lit("Copenhagen").alias("center"), | |
| pl.lit(resource_name).alias("resource_name"), |
| vas_cols = so.keep( | ||
| raw_df.columns, | ||
| lambda column: VAS_TIME_FIELD_PATTERN.match(column) is not None, | ||
| ) |
There was a problem hiding this comment.
This would be better by using Polars rather than a filter. E.g. select() can take a pattern/exclude
| vas_dfs = so.pairwise_fmap( | ||
| list(cols_grouped_by_time.items()), [raw_df], _create_df_for_time_group | ||
| ) | ||
| return pl.concat(vas_dfs, how="vertical") |
There was a problem hiding this comment.
I think all of this would be better with a pivot https://docs.pola.rs/user-guide/transformations/pivot/
Description
This PR transforms raw data to staged data. For now, this includes special transformations only for VAS.
Merge after #89
Closes #73
This PR needs an in-depth review.
Checklist
just run-all