Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ dependencies = [
"mysqlclient",
"python_dotenv",
"xmltodict",
"pyarrow>=14.0",
"boto3>=1.34",
"python-multipart>=0.0.9",
]

[project.optional-dependencies]
Expand Down
4 changes: 4 additions & 0 deletions src/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,3 +79,7 @@ def load_database_configuration(file: Path = _config_file) -> TomlTable:

def load_configuration(file: Path = _config_file) -> TomlTable:
return tomllib.loads(file.read_text())


def load_minio_configuration(file: Path = _config_file) -> TomlTable:
return typing.cast("TomlTable", _load_configuration(file).get("minio", {}))
8 changes: 8 additions & 0 deletions src/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,11 @@ database="openml"
[routing]
minio_url="http://minio:9000/"
server_url="http://php-api:80/"

[minio]
endpoint_url="http://minio:9000"
Comment thread
Vivekgupta008 marked this conversation as resolved.
bucket="datasets"
# Credentials should be provided via environment variables:
# OPENML_MINIO_ACCESS_KEY and OPENML_MINIO_SECRET_KEY
access_key="minioadmin"
secret_key="minioadmin"
Comment thread
sourcery-ai[bot] marked this conversation as resolved.
Outdated
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated
77 changes: 77 additions & 0 deletions src/core/parquet.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
from __future__ import annotations

import hashlib
import io
from dataclasses import dataclass, field

import pyarrow as pa
import pyarrow.parquet as pq

from schemas.datasets.openml import FeatureType


def map_arrow_type(arrow_type: pa.DataType) -> FeatureType:
"""Map a PyArrow DataType to an OpenML FeatureType."""
if pa.types.is_floating(arrow_type) or pa.types.is_integer(arrow_type) or pa.types.is_decimal(
arrow_type
):
return FeatureType.NUMERIC
if pa.types.is_boolean(arrow_type) or pa.types.is_dictionary(arrow_type):
return FeatureType.NOMINAL
return FeatureType.STRING


@dataclass
class ColumnMeta:
index: int
name: str
data_type: FeatureType
number_of_missing_values: int


@dataclass
class ParquetMeta:
num_rows: int
num_columns: int
md5_checksum: str
columns: list[ColumnMeta] = field(default_factory=list)


def read_parquet_metadata(file_bytes: bytes) -> ParquetMeta:
"""Parse *file_bytes* as Parquet and extract schema / quality metadata.

Raises ``ValueError`` if the bytes are not a valid Parquet file.
"""
try:
buf = io.BytesIO(file_bytes)
pf = pq.ParquetFile(buf)
except Exception as exc:
msg = "File is not a valid Parquet file."
raise ValueError(msg) from exc

schema = pf.schema_arrow
num_rows = pf.metadata.num_rows
md5 = hashlib.md5(file_bytes, usedforsecurity=False).hexdigest() # noqa: S324

# Read full table once to count per-column nulls
table = pf.read()

columns: list[ColumnMeta] = []
for idx, col_name in enumerate(schema.names):
col = table.column(col_name)
null_count = col.null_count
columns.append(
ColumnMeta(
index=idx,
name=col_name,
data_type=map_arrow_type(schema.field(col_name).type),
number_of_missing_values=null_count,
)
)

return ParquetMeta(
num_rows=num_rows,
num_columns=len(columns),
md5_checksum=md5,
columns=columns,
)
59 changes: 59 additions & 0 deletions src/core/storage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
from __future__ import annotations

import io
import logging
import os
from typing import TYPE_CHECKING

import boto3
from botocore.exceptions import BotoCoreError, ClientError

from config import _load_configuration, _config_file

if TYPE_CHECKING:
from pathlib import Path

logger = logging.getLogger(__name__)

MINIO_ACCESS_KEY_ENV = "OPENML_MINIO_ACCESS_KEY"
MINIO_SECRET_KEY_ENV = "OPENML_MINIO_SECRET_KEY" # noqa: S105


def _minio_config(file: Path = _config_file) -> dict[str, str]:
cfg = _load_configuration(file).get("minio", {})
return {
"endpoint_url": cfg.get("endpoint_url", "http://minio:9000"),
"bucket": cfg.get("bucket", "datasets"),
"access_key": os.environ.get(MINIO_ACCESS_KEY_ENV, cfg.get("access_key", "minioadmin")),
"secret_key": os.environ.get(MINIO_SECRET_KEY_ENV, cfg.get("secret_key", "minioadmin")),
}


def _object_key(dataset_id: int) -> str:
"""Return the MinIO object key for a dataset, matching the existing URL pattern."""
ten_thousands_prefix = f"{dataset_id // 10_000:04d}"
padded_id = f"{dataset_id:04d}"
return f"datasets/{ten_thousands_prefix}/{padded_id}/dataset_{dataset_id}.pq"


def upload_to_minio(file_bytes: bytes, dataset_id: int) -> str:
"""Upload *file_bytes* to MinIO and return the object key.

Raises ``RuntimeError`` on upload failure so callers can convert to HTTP 500.
"""
cfg = _minio_config()
key = _object_key(dataset_id)
try:
client = boto3.client(
"s3",
endpoint_url=cfg["endpoint_url"],
aws_access_key_id=cfg["access_key"],
aws_secret_access_key=cfg["secret_key"],
)
client.upload_fileobj(io.BytesIO(file_bytes), cfg["bucket"], key)
logger.info("Uploaded dataset %d to MinIO at key '%s'", dataset_id, key)
except (BotoCoreError, ClientError) as exc:
msg = f"Failed to upload dataset {dataset_id} to MinIO: {exc}"
logger.exception(msg)
raise RuntimeError(msg) from exc
return key
157 changes: 157 additions & 0 deletions src/database/datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,3 +178,160 @@ def remove_deactivated_status(dataset_id: int, connection: Connection) -> None:
),
parameters={"data": dataset_id},
)


def insert_file(
*,
file_name: str,
reference: str,
md5_hash: str,
connection: Connection,
) -> int:
"""Insert a row into the `file` table and return the new file id."""
connection.execute(
text(
"""
INSERT INTO file(`name`, `reference`, `md5_hash`)
VALUES (:name, :reference, :md5_hash)
""",
),
parameters={"name": file_name, "reference": reference, "md5_hash": md5_hash},
)
result = connection.execute(text("SELECT LAST_INSERT_ID()"))
(file_id,) = result.one()
return int(file_id)


def insert_dataset( # noqa: PLR0913
Comment thread
Vivekgupta008 marked this conversation as resolved.
*,
name: str,
description: str,
format_: str,
file_id: int,
uploader: int,
visibility: str,
licence: str,
language: str,
default_target_attribute: str,
original_data_url: str,
paper_url: str,
collection_date: str,
citation: str,
md5_checksum: str,
connection: Connection,
) -> int:
"""Insert a row into the `dataset` table and return the new dataset id."""
connection.execute(
text(
"""
INSERT INTO dataset(
`name`, `description`, `format`, `file_id`, `uploader`,
`visibility`, `licence`, `language`,
`default_target_attribute`, `original_data_url`, `paper_url`,
`collection_date`, `citation`, `md5_checksum`,
`version`, `upload_date`
)
VALUES (
:name, :description, :format, :file_id, :uploader,
:visibility, :licence, :language,
:default_target_attribute, :original_data_url, :paper_url,
:collection_date, :citation, :md5_checksum,
1, NOW()
)
""",
),
parameters={
"name": name,
"description": description,
"format": format_,
"file_id": file_id,
"uploader": uploader,
"visibility": visibility,
"licence": licence,
"language": language,
"default_target_attribute": default_target_attribute,
"original_data_url": original_data_url,
"paper_url": paper_url,
"collection_date": collection_date,
"citation": citation,
"md5_checksum": md5_checksum,
},
)
result = connection.execute(text("SELECT LAST_INSERT_ID()"))
(dataset_id,) = result.one()
return int(dataset_id)


def insert_description(
*,
dataset_id: int,
description: str,
connection: Connection,
) -> None:
"""Insert the initial description into the `dataset_description` table."""
connection.execute(
text(
"""
INSERT INTO dataset_description(`did`, `description`, `version`)
VALUES (:did, :description, 1)
""",
),
parameters={"did": dataset_id, "description": description},
)


def insert_features(
*,
dataset_id: int,
features: list[dict[str, object]],
connection: Connection,
) -> None:
"""Bulk-insert feature rows into `data_feature`.

Each dict in *features* must have: index, name, data_type, is_target,
is_row_identifier, is_ignore, number_of_missing_values.
"""
if not features:
return
for feat in features:
connection.execute(
text(
"""
INSERT INTO data_feature(
`did`, `index`, `name`, `data_type`,
`is_target`, `is_row_identifier`, `is_ignore`,
`NumberOfMissingValues`
)
VALUES (
:did, :index, :name, :data_type,
:is_target, :is_row_identifier, :is_ignore,
:number_of_missing_values
)
""",
),
parameters={"did": dataset_id, **feat},
)


def insert_qualities(
*,
dataset_id: int,
qualities: list[dict[str, object]],
connection: Connection,
) -> None:
"""Insert quality rows into `data_quality`.

Each dict must have: quality (str), value (float | None).
"""
if not qualities:
return
for q in qualities:
connection.execute(
text(
"""
INSERT INTO data_quality(`data`, `quality`, `value`)
VALUES (:data, :quality, :value)
""",
),
parameters={"data": dataset_id, **q},
)
Loading