Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 31 additions & 3 deletions src/ml4t/data/storage/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,22 @@
import polars as pl
from filelock import FileLock

import os
import shutil
# ============================================================
# During module loading, the optimal atomic write function is determined and bound in one go.
# ============================================================
if os.name == "nt":
# Windows Exclusive: Securely Release Handle + Explicit Unlock
def _execute_atomic_replace(tmp_path: Path, target_path: Path) -> None:
if target_path.exists():
target_path.unlink() # Forcefully unlock Windows file lock
shutil.move(str(tmp_path), str(target_path))
else:
# Linux/macOS Exclusive: Retains the highest performance of native POSIX single-step atomic replacement
def _execute_atomic_replace(tmp_path: Path, target_path: Path) -> None:
tmp_path.replace(target_path)

# Type alias for partition granularity
PartitionGranularityType = Literal["year", "month", "day", "hour"]

Expand Down Expand Up @@ -180,8 +196,13 @@ def _atomic_write(self, df: pl.DataFrame, target_path: Path) -> None:
# Write with compression
df.write_parquet(tmp_path, compression=self.config.compression or "zstd")

# Atomic rename
tmp_path.replace(target_path)
# At this point, the with block has ended, and the handle has been completely closed!
try:
# Directly call the optimal function bound during module loading that corresponds to the current system (zero runtime branch determination).
_execute_atomic_replace(tmp_path, target_path)
finally:
if tmp_path.exists():
tmp_path.unlink()

def _update_metadata(self, key: str, metadata: dict[str, Any]) -> None:
"""Update metadata for a key.
Expand Down Expand Up @@ -213,7 +234,14 @@ def _write_metadata_file(self, path: Path, metadata: dict[str, Any]) -> None:
) as tmp_file:
json.dump(metadata, tmp_file, indent=2, default=str)
tmp_path = Path(tmp_file.name)
tmp_path.replace(path)

# At this point, the with block has ended, and the handle has been completely closed!
try:
# Directly call the optimal function bound during module loading that corresponds to the current system (zero runtime branch determination).
_execute_atomic_replace(tmp_path, path)
finally:
if tmp_path.exists():
tmp_path.unlink()

def _ensure_lazy(self, data: pl.DataFrame | pl.LazyFrame) -> pl.LazyFrame:
"""Ensure data is a LazyFrame for efficient processing.
Expand Down
94 changes: 94 additions & 0 deletions tests/test_windows_compatibility.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
"""
Windows Compatibility Test Suite for ml4t-data.
Validates that both data and metadata atomic writes work seamlessly
without raising WinError 32 on Windows environments.
"""

import shutil
import polars as pl
from pathlib import Path
from datetime import datetime
import pytest

from ml4t.data.storage import HiveStorage, StorageConfig


@pytest.fixture
def test_storage_root():
"""Fixture to manage a clean temporary test storage directory."""
path = Path.cwd() / "test_ml4t_storage_output"
if path.exists():
shutil.rmtree(path)
path.mkdir(parents=True, exist_ok=True)

yield path

# Cleanup after test completes
if path.exists():
shutil.rmtree(path)


@pytest.fixture
def mock_ohlcv_df():
"""Fixture to generate standard Polars DataFrame matching ml4t-data schema."""
dates = pl.date_range(
start=datetime(2026, 1, 1),
end=datetime(2026, 1, 5),
interval="1d",
eager=True
)
return pl.DataFrame({
"timestamp": dates,
"open": [100.0, 101.0, 102.0, 101.5, 103.0],
"high": [102.0, 103.0, 103.5, 102.5, 104.0],
"low": [99.0, 100.5, 101.0, 100.0, 102.0],
"close": [101.5, 102.5, 102.2, 101.8, 103.5],
"volume": [1000, 1200, 1100, 1300, 1400]
})


def test_hive_storage_windows_atomic_write(test_storage_root, mock_ohlcv_df):
"""
Test that HiveStorage can execute sequential duplicate writes
onto both Parquet data and JSON metadata without triggering
NTFS file-locking conflicts (WinError 32).
"""
# Initialize Storage Config matching ml4t specs
config = StorageConfig(
base_path=str(test_storage_root),
partition_granularity="month"
)
storage = HiveStorage(config)

logic_path = "mock_provider/daily/TEST_SYMBOL"

# ---- PHASE 1: Initial Write (Creates Data and Metadata) ----
try:
storage.write(mock_ohlcv_df, logic_path)
except PermissionError as e:
pytest.fail(f"Initial write failed due to premature Windows file-locking: {e}")

# ---- PHASE 2: Duplicate Overwrite (The core regression test) ----
# On unpatched Windows, this step instantly crashes with WinError 32
try:
storage.write(mock_ohlcv_df, logic_path)
except PermissionError as e:
pytest.fail(f"Duplicate overwrite failed! Windows WinError 32 deadlock detected: {e}")

# ---- PHASE 3: Physical Structure Assertions ----
metadata_dir = test_storage_root / ".metadata"
expected_json = metadata_dir / "mock_provider_daily_TEST_SYMBOL.json"
assert expected_json.exists(), "Metadata JSON file was not generated correctly!"

parquet_files = list(test_storage_root.rglob("*.parquet"))
assert len(parquet_files) > 0, "No partitioned Parquet files were found!"

# ---- PHASE 4: Data Load Verification (Aligned with native .read() API) ----
# ml4t-data returns a Polars LazyFrame, we need to collect() it to get a DataFrame
lazy_df = storage.read(logic_path)
assert lazy_df is not None, "Loaded LazyFrame is None!"

# 核心修复:通过 collect() 转换为 DataFrame 后再读取长度
loaded_df = lazy_df.collect()
assert len(loaded_df) == 5, f"Loaded DataFrame row count mismatch: expected 5, got {len(loaded_df)}"