-
Notifications
You must be signed in to change notification settings - Fork 0
feat: ✨ raw data to staging #94
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
c559150
b4608bb
30214ad
627ee16
df75685
455a4ac
a7eba5f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1 +1,2 @@ | ||
| raw/** filter=lfs diff=lfs merge=lfs -text | ||
| staging/** filter=lfs diff=lfs merge=lfs -text | ||
|
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This was done automatically, not sure if it's the right way
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -5,3 +5,8 @@ extend-exclude = [ | |
| # Auto-generated by Flower | ||
| "resources" | ||
| ] | ||
|
|
||
| [default] | ||
|
|
||
| [default.extend-words] | ||
| vas = "vas" | ||
| Original file line number | Diff line number | Diff line change | ||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,140 @@ | ||||||||||||
| import re | ||||||||||||
| from datetime import datetime | ||||||||||||
| from operator import itemgetter | ||||||||||||
| from pathlib import Path | ||||||||||||
| from typing import cast | ||||||||||||
|
|
||||||||||||
| import polars as pl | ||||||||||||
| import seedcase_soil as so | ||||||||||||
|
|
||||||||||||
| VAS_TIME_FIELD_PATTERN = re.compile( | ||||||||||||
| r"^vas_(?P<field_name>.+?)(_fasted)?_(?P<time>minus10|30|60|90|120|180|240)min$" | ||||||||||||
| ) | ||||||||||||
|
Comment on lines
+10
to
+12
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We can share this with the metadata transformation |
||||||||||||
|
|
||||||||||||
|
|
||||||||||||
| def load_raw_data() -> pl.DataFrame: | ||||||||||||
| """Loads the latest raw data from `raw/redcap/<timestamp>.csv.gz`.""" | ||||||||||||
| file_path = Path("raw") / "redcap" | ||||||||||||
| files = list(file_path.glob("*.csv.gz")) | ||||||||||||
| if not files: | ||||||||||||
| raise FileNotFoundError( | ||||||||||||
| f"No raw data files found in '{file_path}'. " | ||||||||||||
| "Have you run `just download-data`?" | ||||||||||||
| ) | ||||||||||||
|
Comment on lines
+19
to
+23
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Alternatively, we can just do nothing without throwing an error.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yea, I think these types of errors might not be necessary. And might be solved by using some type of "orchestrator", but that's for later. This is fine for now. |
||||||||||||
|
|
||||||||||||
| latest_file = max(files, key=lambda file: file.name) | ||||||||||||
| so.pretty_print(f"Loading data from '{latest_file}'.") | ||||||||||||
| return pl.read_csv(latest_file) | ||||||||||||
|
|
||||||||||||
|
|
||||||||||||
| def _get_fields_by_resource() -> dict[str, list[str]]: | ||||||||||||
| """Gets a mapping from resource names to a list of field names in that resource.""" | ||||||||||||
| properties = so.read_properties(so.parse_source("datapackage.json")) | ||||||||||||
| return dict( | ||||||||||||
| so.fmap( | ||||||||||||
| properties["resources"], | ||||||||||||
| lambda resource: ( | ||||||||||||
| resource["name"], | ||||||||||||
| so.fmap(resource["schema"]["fields"], itemgetter("name")), | ||||||||||||
| ), | ||||||||||||
| ) | ||||||||||||
| ) | ||||||||||||
|
|
||||||||||||
|
|
||||||||||||
| def _select_with_base_cols( | ||||||||||||
| raw_df: pl.DataFrame, resource_name: str, cols: list[str] | ||||||||||||
| ) -> pl.DataFrame: | ||||||||||||
| """Selects columns and adds base columns common to all dataframes.""" | ||||||||||||
| return ( | ||||||||||||
| raw_df.select(["redcap_event_name"] + cols) | ||||||||||||
| .rename({"redcap_event_name": "event"}) | ||||||||||||
|
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Having looked at more of the data, I don't think
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yea, I was thinking the same. |
||||||||||||
| .with_columns( | ||||||||||||
| pl.lit("Copenhagen").alias("center"), | ||||||||||||
| pl.lit(resource_name).alias("resource_name"), | ||||||||||||
|
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'll use this to write the parquet file then drop it
Comment on lines
+52
to
+53
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||
| ) | ||||||||||||
| ) | ||||||||||||
|
|
||||||||||||
|
|
||||||||||||
| def raw_to_staged(raw_df: pl.DataFrame) -> list[pl.DataFrame]: | ||||||||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe organize so either all the |
||||||||||||
| """Transforms the raw data into staged dataframes.""" | ||||||||||||
| resources = _get_fields_by_resource() | ||||||||||||
|
|
||||||||||||
| # Resources with special handling | ||||||||||||
| resources.pop("vas", None) | ||||||||||||
| dfs = [_create_vas_df(raw_df)] | ||||||||||||
|
|
||||||||||||
| # Resources without special handling | ||||||||||||
| return dfs + so.pairwise_fmap( | ||||||||||||
| list(resources.items()), [raw_df], _create_df_for_resource | ||||||||||||
| ) | ||||||||||||
|
|
||||||||||||
|
|
||||||||||||
| def _create_df_for_resource( | ||||||||||||
| resource_entry: tuple[str, list[str]], raw_df: pl.DataFrame | ||||||||||||
| ) -> pl.DataFrame: | ||||||||||||
| """Creates a dataframe for a resource.""" | ||||||||||||
| resource_name, field_names = resource_entry | ||||||||||||
|
|
||||||||||||
| # Remove fields not in the raw data | ||||||||||||
| field_names.remove("event") | ||||||||||||
| field_names.remove("center") | ||||||||||||
|
|
||||||||||||
| return _select_with_base_cols(raw_df, resource_name, field_names) | ||||||||||||
|
|
||||||||||||
|
|
||||||||||||
| def _create_vas_df(raw_df: pl.DataFrame) -> pl.DataFrame: | ||||||||||||
| """Creates a dataframe for the VAS resource.""" | ||||||||||||
| vas_cols = so.keep( | ||||||||||||
| raw_df.columns, | ||||||||||||
| lambda column: VAS_TIME_FIELD_PATTERN.match(column) is not None, | ||||||||||||
| ) | ||||||||||||
|
Comment on lines
+87
to
+90
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This would be better by using Polars rather than a filter. E.g. |
||||||||||||
|
|
||||||||||||
| cols_grouped_by_time: dict[int, list[str]] = {} | ||||||||||||
| for col in vas_cols: | ||||||||||||
| match = cast(re.Match[str], VAS_TIME_FIELD_PATTERN.match(col)) | ||||||||||||
|
|
||||||||||||
| time = match.group("time") | ||||||||||||
| if time == "minus10": | ||||||||||||
| time = "-10" | ||||||||||||
|
|
||||||||||||
| cols_grouped_by_time.setdefault(int(time), []).append(col) | ||||||||||||
|
Comment on lines
+93
to
+100
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I can rewrite this dictionary construction without a for loop if you want, but I think it will just be longer and more complicated |
||||||||||||
|
|
||||||||||||
| vas_dfs = so.pairwise_fmap( | ||||||||||||
| list(cols_grouped_by_time.items()), [raw_df], _create_df_for_time_group | ||||||||||||
| ) | ||||||||||||
| return pl.concat(vas_dfs, how="vertical") | ||||||||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think all of this would be better with a pivot https://docs.pola.rs/user-guide/transformations/pivot/ |
||||||||||||
|
|
||||||||||||
|
|
||||||||||||
| def _create_df_for_time_group( | ||||||||||||
| time_group: tuple[int, list[str]], raw_df: pl.DataFrame | ||||||||||||
| ) -> pl.DataFrame: | ||||||||||||
| """Creates a dataframe for a group of VAS columns with the same time.""" | ||||||||||||
| time, cols = time_group | ||||||||||||
|
|
||||||||||||
| renamed_cols = { | ||||||||||||
| col: cast(re.Match[str], VAS_TIME_FIELD_PATTERN.match(col)).group("field_name") | ||||||||||||
| for col in cols | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| return ( | ||||||||||||
| _select_with_base_cols(raw_df, "vas", cols) | ||||||||||||
| .rename(renamed_cols) | ||||||||||||
| .with_columns(pl.lit(time).alias("minutes_from_meal")) | ||||||||||||
| ) | ||||||||||||
|
|
||||||||||||
|
|
||||||||||||
| def write_staged_df(df: pl.DataFrame) -> None: | ||||||||||||
| """Writes the dataframe to `staging/redcap/<resource-name>/<timestamp>.parquet`.""" | ||||||||||||
| resource_name = df["resource_name"][0] | ||||||||||||
| timestamp = datetime.now().strftime("%Y-%m-%dT%H-%M-%S") | ||||||||||||
| file_path = Path("staging") / "redcap" / resource_name / f"{timestamp}.parquet" | ||||||||||||
| file_path.parent.mkdir(parents=True, exist_ok=True) | ||||||||||||
|
|
||||||||||||
| df.drop("resource_name").write_parquet(file_path) | ||||||||||||
| so.pretty_print(f"Wrote staged data to '{file_path}'.") | ||||||||||||
|
|
||||||||||||
|
|
||||||||||||
| if __name__ == "__main__": | ||||||||||||
| raw = load_raw_data() | ||||||||||||
| for df in raw_to_staged(raw): | ||||||||||||
| write_staged_df(df) | ||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or we can track
*.parquetThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure, is that a question?