Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitattributes
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
raw/** filter=lfs diff=lfs merge=lfs -text
staging/** filter=lfs diff=lfs merge=lfs -text
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or we can track *.parquet

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure, is that a question?

5 changes: 5 additions & 0 deletions .typos.toml
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was done automatically, not sure if it's the right way

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,8 @@ extend-exclude = [
# Auto-generated by Flower
"resources"
]

[default]

[default.extend-words]
vas = "vas"
4 changes: 4 additions & 0 deletions justfile
Original file line number Diff line number Diff line change
Expand Up @@ -86,3 +86,7 @@ download-data-dict:
# Download data from REDCap
download-data:
uv run python scripts/redcap_data.py

# Stage raw data
stage-data:
uv run python scripts/stage_data.py
140 changes: 140 additions & 0 deletions scripts/stage_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
import re
from datetime import datetime
from operator import itemgetter
from pathlib import Path
from typing import cast

import polars as pl
import seedcase_soil as so

VAS_TIME_FIELD_PATTERN = re.compile(
r"^vas_(?P<field_name>.+?)(_fasted)?_(?P<time>minus10|30|60|90|120|180|240)min$"
)
Comment on lines +10 to +12
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can share this with the metadata transformation



def load_raw_data() -> pl.DataFrame:
"""Loads the latest raw data from `raw/redcap/<timestamp>.csv.gz`."""
file_path = Path("raw") / "redcap"
files = list(file_path.glob("*.csv.gz"))
if not files:
raise FileNotFoundError(
f"No raw data files found in '{file_path}'. "
"Have you run `just download-data`?"
)
Comment on lines +19 to +23
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternatively, we can just do nothing without throwing an error.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, I think these types of errors might not be necessary. And might be solved by using some type of "orchestrator", but that's for later. This is fine for now.


latest_file = max(files, key=lambda file: file.name)
so.pretty_print(f"Loading data from '{latest_file}'.")
return pl.read_csv(latest_file)


def _get_fields_by_resource() -> dict[str, list[str]]:
"""Gets a mapping from resource names to a list of field names in that resource."""
properties = so.read_properties(so.parse_source("datapackage.json"))
return dict(
so.fmap(
properties["resources"],
lambda resource: (
resource["name"],
so.fmap(resource["schema"]["fields"], itemgetter("name")),
),
)
)


def _select_with_base_cols(
raw_df: pl.DataFrame, resource_name: str, cols: list[str]
) -> pl.DataFrame:
"""Selects columns and adds base columns common to all dataframes."""
return (
raw_df.select(["redcap_event_name"] + cols)
.rename({"redcap_event_name": "event"})
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having looked at more of the data, I don't think event by itself can be the PK. Raised #95

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, I was thinking the same.

.with_columns(
pl.lit("Copenhagen").alias("center"),
pl.lit(resource_name).alias("resource_name"),
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll use this to write the parquet file then drop it

Comment on lines +52 to +53
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
pl.lit("Copenhagen").alias("center"),
pl.lit(resource_name).alias("resource_name"),
# Only used for creating the Parquet files.
pl.lit("Copenhagen").alias("center"),
pl.lit(resource_name).alias("resource_name"),

)
)


def raw_to_staged(raw_df: pl.DataFrame) -> list[pl.DataFrame]:
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe organize so either all the _fn are above or below the normal functions?

"""Transforms the raw data into staged dataframes."""
resources = _get_fields_by_resource()

# Resources with special handling
resources.pop("vas", None)
dfs = [_create_vas_df(raw_df)]

# Resources without special handling
return dfs + so.pairwise_fmap(
list(resources.items()), [raw_df], _create_df_for_resource
)


def _create_df_for_resource(
resource_entry: tuple[str, list[str]], raw_df: pl.DataFrame
) -> pl.DataFrame:
"""Creates a dataframe for a resource."""
resource_name, field_names = resource_entry

# Remove fields not in the raw data
field_names.remove("event")
field_names.remove("center")

return _select_with_base_cols(raw_df, resource_name, field_names)


def _create_vas_df(raw_df: pl.DataFrame) -> pl.DataFrame:
"""Creates a dataframe for the VAS resource."""
vas_cols = so.keep(
raw_df.columns,
lambda column: VAS_TIME_FIELD_PATTERN.match(column) is not None,
)
Comment on lines +87 to +90
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would be better by using Polars rather than a filter. E.g. select() can take a pattern/exclude


cols_grouped_by_time: dict[int, list[str]] = {}
for col in vas_cols:
match = cast(re.Match[str], VAS_TIME_FIELD_PATTERN.match(col))

time = match.group("time")
if time == "minus10":
time = "-10"

cols_grouped_by_time.setdefault(int(time), []).append(col)
Comment on lines +93 to +100
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can rewrite this dictionary construction without a for loop if you want, but I think it will just be longer and more complicated


vas_dfs = so.pairwise_fmap(
list(cols_grouped_by_time.items()), [raw_df], _create_df_for_time_group
)
return pl.concat(vas_dfs, how="vertical")
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think all of this would be better with a pivot https://docs.pola.rs/user-guide/transformations/pivot/



def _create_df_for_time_group(
time_group: tuple[int, list[str]], raw_df: pl.DataFrame
) -> pl.DataFrame:
"""Creates a dataframe for a group of VAS columns with the same time."""
time, cols = time_group

renamed_cols = {
col: cast(re.Match[str], VAS_TIME_FIELD_PATTERN.match(col)).group("field_name")
for col in cols
}

return (
_select_with_base_cols(raw_df, "vas", cols)
.rename(renamed_cols)
.with_columns(pl.lit(time).alias("minutes_from_meal"))
)


def write_staged_df(df: pl.DataFrame) -> None:
"""Writes the dataframe to `staging/redcap/<resource-name>/<timestamp>.parquet`."""
resource_name = df["resource_name"][0]
timestamp = datetime.now().strftime("%Y-%m-%dT%H-%M-%S")
file_path = Path("staging") / "redcap" / resource_name / f"{timestamp}.parquet"
file_path.parent.mkdir(parents=True, exist_ok=True)

df.drop("resource_name").write_parquet(file_path)
so.pretty_print(f"Wrote staged data to '{file_path}'.")


if __name__ == "__main__":
raw = load_raw_data()
for df in raw_to_staged(raw):
write_staged_df(df)