From 892be964e0eb3afc92d3fcef483a41281bdc7e26 Mon Sep 17 00:00:00 2001 From: Ivor Bosloper Date: Tue, 19 May 2026 23:36:27 +0200 Subject: [PATCH 1/6] PerFileBaseConverter: per-file migration and merging the result, decreasing memory requirements for large data sets --- CHANGELOG.md | 3 +- fiboa_cli/conversion/per_file.py | 295 +++++++++++++++++++++++++++++++ fiboa_cli/datasets/es.py | 4 +- pyproject.toml | 2 +- 4 files changed, 300 insertions(+), 4 deletions(-) create mode 100644 fiboa_cli/conversion/per_file.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 788d21aa..24139d47 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. - Converter for Spain (whole), based on the FEGA 2025+ data - Update fr-converter to support 2021/2022 files +- PerFileBaseConverter: per-file migration and merging the result, decreasing memory requirements for large data sets ## [v0.21.0] - 2026-02-16 @@ -20,7 +21,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. - Fix the column additions of the determination fields in the AI4SF converter - Add HCAT to datasets where possible - Updated years & variants for at_crop, be_vlg, es_an, es_cl, es_pv, ie, pt, se -- Extend create_stac, include include fiboa data +- Extend create_stac, include fiboa data - Publish command; skip hidden files, generate better texts - Fix to vecorel: converter.license and provider should be string - Added a Dockerfile to simplify working with fiboa diff --git a/fiboa_cli/conversion/per_file.py b/fiboa_cli/conversion/per_file.py new file mode 100644 index 00000000..349b7a55 --- /dev/null +++ b/fiboa_cli/conversion/per_file.py @@ -0,0 +1,295 @@ +import json +import os +from typing import Optional + +import numpy as np +import pyarrow as pa +import pyarrow.parquet as pq + +from .fiboa_converter import FiboaBaseConverter + +GEO_META_KEY = b"geo" +DEFAULT_BATCH_SIZE = 64_000 + + +# This converter is experimental, use with caution. +# Use this primarily for datasets that are too large to be processed by the default converter +class PerFileBaseConverter(FiboaBaseConverter): + def convert( + self, + output_file, + cache=None, + input_files=None, + variant=None, + compression=None, + compression_level: Optional[int] = None, + geoparquet_version=None, + original_geometries=False, + **kwargs, + ) -> str: + dirname, filename = os.path.split(output_file) + filename, ext = os.path.splitext(filename) + if input_files is not None and isinstance(input_files, dict) and len(input_files) > 0: + self.warning("Using user provided input file(s) instead of the pre-defined file(s)") + urls = input_files + else: + urls = self.get_urls() + if urls is None: + raise ValueError("No input files provided") + + # Single-source: the per-file pipeline degenerates to plain convert. + if len(urls) <= 1: + return super().convert( + output_file=output_file, + cache=cache, + input_files=urls, + variant=variant, + compression=compression, + compression_level=compression_level, + geoparquet_version=geoparquet_version, + original_geometries=original_geometries, + **kwargs, + ) + + # Multi-source: convert each URI to its own GeoParquet part, then merge. + part_files = [] + for index, (uri, target) in enumerate(urls.items()): + part = os.path.join(dirname, f"{filename}_{index}{ext}") + self.info(f"Converting source {index + 1}/{len(urls)}: {uri}") + super().convert( + output_file=part, + cache=cache, + input_files={uri: target}, + variant=variant, + compression=compression, + compression_level=compression_level, + geoparquet_version=geoparquet_version, + original_geometries=original_geometries, + **kwargs, + ) + part_files.append(part) + self.merge_files(output_file, part_files, compression=compression or "zstd") + return output_file + + def merge_files( + self, + output_file: str, + paths: list, + batch_size: int = DEFAULT_BATCH_SIZE, + compression: str = "zstd", + compression_level: Optional[int] = None, + cleanup_parts: bool = False, + ) -> str: + """ + Merge a list of GeoParquet files into a single GeoParquet, globally + sorted by Hilbert distance. Streams via pyarrow row groups so peak + memory is roughly O(batch_size * k). + + Assumes each input file was produced by the standard convert pipeline, + which already sorts rows by Hilbert distance against the CRS's total + bounds (see ``vecorel_cli.vecorel.hilbert.hilbert_sort_geodataframe``). + Because every input shares the same Hilbert reference grid (derived + from the CRS, not from per-file extents), no pre-sort is required — + we just merge the already-sorted runs. + """ + if not paths: + raise ValueError("No paths to merge") + paths = [str(p) for p in paths] + + base_pf = pq.ParquetFile(paths[0]) + base_schema = base_pf.schema_arrow + base_meta = base_schema.metadata or {} + if GEO_META_KEY not in base_meta: + raise ValueError(f"{paths[0]} has no 'geo' metadata; not a GeoParquet?") + base_geo = json.loads(base_meta[GEO_META_KEY]) + primary_col = base_geo["primary_column"] + primary_col_meta = base_geo["columns"][primary_col] + crs = primary_col_meta.get("crs") + + # Validate schemas + CRS, collect per-file bboxes / geometry_types for + # the merged geo metadata. + bboxes: list = [] + geom_types: set = set() + if primary_col_meta.get("bbox") is not None: + bboxes.append(primary_col_meta["bbox"]) + geom_types.update(primary_col_meta.get("geometry_types") or []) + for path in paths[1:]: + pf = pq.ParquetFile(path) + sch = pf.schema_arrow + if not sch.equals(base_schema, check_metadata=False): + raise ValueError( + f"Schema mismatch: {path} differs from {paths[0]}.\n" + f" Expected: {base_schema}\n" + f" Got: {sch}" + ) + geo = json.loads((sch.metadata or {})[GEO_META_KEY]) + col = geo["columns"][primary_col] + if col.get("crs") != crs: + raise ValueError( + f"CRS mismatch: {path} has crs={col.get('crs')!r}, expected {crs!r}" + ) + if col.get("bbox") is not None: + bboxes.append(col["bbox"]) + geom_types.update(col.get("geometry_types") or []) + + merged_bbox = None + if bboxes: + merged_bbox = ( + min(b[0] for b in bboxes), + min(b[1] for b in bboxes), + max(b[2] for b in bboxes), + max(b[3] for b in bboxes), + ) + + # Same Hilbert reference grid that the upstream sort used. + from vecorel_cli.vecorel.hilbert import crs_total_bounds + + total_bounds = crs_total_bounds(crs) + + self.info(f"Streaming merge -> {output_file} (Hilbert ref bounds = {total_bounds})") + _streaming_merge( + paths, + output_file, + primary_col, + total_bounds, + merged_bbox, + sorted(geom_types), + batch_size, + compression, + compression_level, + ) + + if cleanup_parts: + for path in paths: + try: + os.remove(path) + except OSError: + self.warning(f"Could not remove part file {path}") + + return output_file + + +# ---------- helpers ---------- + + +def _bounds_array_for_table(table: pa.Table, primary_col: str) -> np.ndarray: + """Return an (N, 4) float64 array of [xmin, ymin, xmax, ymax] per feature. + + Uses the GeoParquet 1.1.0 covering ``bbox`` struct column when present + (zero-decode); otherwise falls back to decoding WKB. + """ + if "bbox" in table.column_names and pa.types.is_struct(table.column("bbox").type): + arr = table.column("bbox").combine_chunks() + return np.column_stack( + [ + arr.field("xmin").to_numpy(zero_copy_only=False), + arr.field("ymin").to_numpy(zero_copy_only=False), + arr.field("xmax").to_numpy(zero_copy_only=False), + arr.field("ymax").to_numpy(zero_copy_only=False), + ] + ).astype(np.float64, copy=False) + import shapely + + wkb_list = table.column(primary_col).combine_chunks().to_pylist() + geoms = shapely.from_wkb(wkb_list) + return shapely.bounds(geoms) + + +def _hilbert_keys_for_table(table: pa.Table, primary_col: str, total_bounds) -> np.ndarray: + from vecorel_cli.vecorel.hilbert import hilbert_distances_from_bounds + + bounds = _bounds_array_for_table(table, primary_col) + return hilbert_distances_from_bounds(bounds, total_bounds) + + +def _build_output_schema(input_schema: pa.Schema, merged_bbox, geom_types) -> pa.Schema: + """Patch the geo metadata: merged bbox + union of geometry_types. Other + schema metadata and field metadata are preserved unchanged.""" + meta = dict(input_schema.metadata or {}) + geo = json.loads(meta[GEO_META_KEY]) + primary_col = geo["primary_column"] + if merged_bbox is not None: + geo["columns"][primary_col]["bbox"] = [float(v) for v in merged_bbox] + if geom_types: + geo["columns"][primary_col]["geometry_types"] = list(geom_types) + meta[GEO_META_KEY] = json.dumps(geo).encode("utf-8") + return input_schema.with_metadata(meta) + + +def _streaming_merge( + paths: list, + output_file: str, + primary_col: str, + total_bounds, + merged_bbox, + geom_types, + batch_size: int, + compression: str, + compression_level: Optional[int], +) -> None: + pq_files = [pq.ParquetFile(p) for p in paths] + in_schema = pq_files[0].schema_arrow + out_schema = _build_output_schema(in_schema, merged_bbox, geom_types) + + iters = [pf.iter_batches(batch_size=batch_size) for pf in pq_files] + heads: list = [None] * len(paths) + hilberts: list = [None] * len(paths) + + def refill(i): + # Skip any empty batches; mark the iterator exhausted only when next() raises. + while True: + try: + batch = next(iters[i]) + except StopIteration: + heads[i] = None + hilberts[i] = None + return + if batch.num_rows == 0: + continue + tbl = pa.Table.from_batches([batch]) + heads[i] = tbl + hilberts[i] = _hilbert_keys_for_table(tbl, primary_col, total_bounds) + return + + for i in range(len(paths)): + refill(i) + + write_kwargs = {"compression": compression} + if compression_level is not None: + write_kwargs["compression_level"] = compression_level + writer = pq.ParquetWriter(output_file, out_schema, **write_kwargs) + + try: + while any(h is not None for h in heads): + active = [i for i, h in enumerate(heads) if h is not None] + # The horizon is the smallest "current max" Hilbert across active heads. + # Every row with hilbert <= horizon is emit-safe in this round, because + # no still-pending row from any other file can possibly be less than it. + horizon = min(hilberts[i][-1] for i in active) + + chunks = [] + chunk_h = [] + for i in active: + h = hilberts[i] + cut = int(np.searchsorted(h, horizon, side="right")) + if cut == 0: + continue + chunks.append(heads[i].slice(0, cut)) + chunk_h.append(h[:cut]) + if cut == heads[i].num_rows: + refill(i) + else: + heads[i] = heads[i].slice(cut) + hilberts[i] = h[cut:] + + if not chunks: + # Defensive: shouldn't happen because at least the file defining the + # horizon will contribute its full current batch. + break + + combined = pa.concat_tables(chunks) + combined_h = np.concatenate(chunk_h) + order = np.argsort(combined_h, kind="stable") + writer.write_table(combined.take(pa.array(order))) + finally: + writer.close() diff --git a/fiboa_cli/datasets/es.py b/fiboa_cli/datasets/es.py index 2352d444..5ecd23ba 100644 --- a/fiboa_cli/datasets/es.py +++ b/fiboa_cli/datasets/es.py @@ -3,10 +3,10 @@ import requests from vecorel_cli.vecorel.extensions import ADMIN_DIVISION -from ..conversion.fiboa_converter import FiboaBaseConverter +from ..conversion.per_file import PerFileBaseConverter -class Converter(FiboaBaseConverter): +class Converter(PerFileBaseConverter): id = "es" short_name = "Spain" title = "Spain Declared Crops (Cultivos Declarados SIGPAC)" diff --git a/pyproject.toml b/pyproject.toml index 735559ce..22cbda67 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -23,7 +23,7 @@ classifiers = [ ] requires-python = ">=3.11" dependencies = [ - "vecorel-cli==0.2.15", + "vecorel-cli==0.2.16", "spdx-license-list==3.27.0", ] From 06e47ea1f3fdd616ca48889737510c1685a71c7a Mon Sep 17 00:00:00 2001 From: Ivor Bosloper Date: Wed, 20 May 2026 20:24:25 +0200 Subject: [PATCH 2/6] Update ES Provinces list --- fiboa_cli/datasets/data-files/es_cl_prv.csv | 47 +++++++++++++++++++-- 1 file changed, 44 insertions(+), 3 deletions(-) diff --git a/fiboa_cli/datasets/data-files/es_cl_prv.csv b/fiboa_cli/datasets/data-files/es_cl_prv.csv index c05865db..4b94d7b9 100644 --- a/fiboa_cli/datasets/data-files/es_cl_prv.csv +++ b/fiboa_cli/datasets/data-files/es_cl_prv.csv @@ -1,10 +1,51 @@ code,province,filename +01,Álava,ALAVA +02,Albacete,ALBACETE +03,Alicante,ALICANTE +04,Almería,ALMERIA 05,Ávila,AVILA +06,Badajoz,BADAJOZ +07,Illes Balears,ILLES BALEARS +08,Barcelona,BARCELONA +09,Burgos,BURGOS +10,Cáceres,CACERES +11,Cádiz,CADIZ +12,Castellón,CASTELLON +13,Ciudad Real,CIUDAD REAL +14,Córdoba,CORDOBA +15,A Coruña,A CORUÑA +16,Cuenca,CUENCA +17,Girona,GIRONA +18,Granada,GRANADA +19,Guadalajara,GUADALAJARA +20,Guipúzcoa,GUIPUZCOA +21,Huelva,HUELVA +22,Huesca,HUESCA +23,Jaén,JAEN +24,León,LEON +25,Lleida,LLEIDA +26,La Rioja,LA RIOJA +27,Lugo,LUGO +28,Madrid,MADRID +29,Málaga,MALAGA +30,Murcia,MURCIA +31,Navarra,NAVARRA +32,Ourense,OURENSE +33,Asturias,ASTURIAS 34,Palencia,PALENCIA +35,Las Palmas,LAS PALMAS +36,Pontevedra,PONTEVEDRA 37,Salamanca,SALAMANCA +38,Santa Cruz de Tenerife,SANTA CRUZ DE TENERIFE +39,Cantabria,CANTABRIA 40,Segovia,SEGOVIA -42,Soria,SORIA, +41,Sevilla,SEVILLA +42,Soria,SORIA +43,Tarragona,TARRAGONA +44,Teruel,TERUEL +45,Toledo,TOLEDO +46,Valencia,VALENCIA 47,Valladolid,VALLADOLID +48,Vizcaya,VIZCAYA 49,Zamora,ZAMORA -24,León,LEON -09,Burgos,BURGOS +50,Zaragoza,ZARAGOZA From 13aa258367ab4f3b067160769f6272fb97577a1a Mon Sep 17 00:00:00 2001 From: Ivor Bosloper Date: Wed, 20 May 2026 20:24:45 +0200 Subject: [PATCH 3/6] Fix hilbert-sort bug --- fiboa_cli/conversion/per_file.py | 78 ++++++++++++++++++++++++++++---- fiboa_cli/datasets/es.py | 27 ++++------- 2 files changed, 80 insertions(+), 25 deletions(-) diff --git a/fiboa_cli/conversion/per_file.py b/fiboa_cli/conversion/per_file.py index 349b7a55..bd29f0f6 100644 --- a/fiboa_cli/conversion/per_file.py +++ b/fiboa_cli/conversion/per_file.py @@ -54,7 +54,13 @@ def convert( # Multi-source: convert each URI to its own GeoParquet part, then merge. part_files = [] for index, (uri, target) in enumerate(urls.items()): - part = os.path.join(dirname, f"{filename}_{index}{ext}") + part = os.path.join(dirname, f"{filename}_{index}_part{ext}") + part_files.append(part) + if os.path.exists(part): + self.info( + f"Skipping existing file {part}: {uri} -> {output_file} (part {index + 1}/{len(urls)})" + ) + continue self.info(f"Converting source {index + 1}/{len(urls)}: {uri}") super().convert( output_file=part, @@ -67,7 +73,6 @@ def convert( original_geometries=original_geometries, **kwargs, ) - part_files.append(part) self.merge_files(output_file, part_files, compression=compression or "zstd") return output_file @@ -85,12 +90,13 @@ def merge_files( sorted by Hilbert distance. Streams via pyarrow row groups so peak memory is roughly O(batch_size * k). - Assumes each input file was produced by the standard convert pipeline, - which already sorts rows by Hilbert distance against the CRS's total - bounds (see ``vecorel_cli.vecorel.hilbert.hilbert_sort_geodataframe``). - Because every input shares the same Hilbert reference grid (derived - from the CRS, not from per-file extents), no pre-sort is required — - we just merge the already-sorted runs. + Each input file is expected to be sorted by Hilbert distance against + the CRS's total bounds (see ``vecorel_cli.vecorel.hilbert``). If a + part file is *not* in Hilbert order it is sorted in place before the + streaming merge — this guards against pre-existing part files that + were produced by an older vecorel-cli (which sorted by WKB lex order + instead of Hilbert) and would otherwise silently drop rows in the + streaming merge (``np.searchsorted`` requires a sorted input). """ if not paths: raise ValueError("No paths to merge") @@ -146,7 +152,24 @@ def merge_files( total_bounds = crs_total_bounds(crs) + # Verify each part is Hilbert-sorted; sort in place if not. With a + # vecorel-cli that already Hilbert-sorts, this is a fast no-op read. + self.info(f"Verifying Hilbert order of {len(paths)} part file(s)") + n_resorted = 0 + for path in paths: + if _ensure_hilbert_sorted( + path, primary_col, total_bounds, compression, compression_level + ): + n_resorted += 1 + self.warning( + f" {path}: was not Hilbert-sorted, re-sorted in place. " + "(Bump vecorel-cli to skip this rewrite next time.)" + ) + if n_resorted: + self.warning(f"Re-sorted {n_resorted}/{len(paths)} part file(s) before merging.") + self.info(f"Streaming merge -> {output_file} (Hilbert ref bounds = {total_bounds})") + expected_rows = sum(pq.ParquetFile(p).metadata.num_rows for p in paths) _streaming_merge( paths, output_file, @@ -158,6 +181,14 @@ def merge_files( compression, compression_level, ) + actual_rows = pq.ParquetFile(output_file).metadata.num_rows + if actual_rows != expected_rows: + raise RuntimeError( + f"Streaming merge dropped rows: expected {expected_rows:,} " + f"(sum of inputs), wrote {actual_rows:,} to {output_file}. " + "This is a bug — inputs were verified Hilbert-sorted before merge." + ) + self.info(f"Merged {actual_rows:,} rows into {output_file}") if cleanup_parts: for path in paths: @@ -202,6 +233,37 @@ def _hilbert_keys_for_table(table: pa.Table, primary_col: str, total_bounds) -> return hilbert_distances_from_bounds(bounds, total_bounds) +def _ensure_hilbert_sorted( + path: str, + primary_col: str, + total_bounds, + compression: str, + compression_level: Optional[int], +) -> bool: + """If ``path`` is already Hilbert-sorted against ``total_bounds``, leave it + untouched and return False. Otherwise sort it in place and return True. + + The whole file is loaded into memory once; for the per-file converter this + is bounded by a single source partition (much smaller than the merged + dataset). Schema metadata (``geo``, collection JSON, etc.) is preserved. + """ + pf = pq.ParquetFile(path) + table = pf.read() + hilberts = _hilbert_keys_for_table(table, primary_col, total_bounds) + # NB: hilberts is uint64; never use np.diff for monotonicity here — uint + # underflow makes any descent wrap to a huge positive and fool the check. + if hilberts.size <= 1 or bool(np.all(hilberts[1:] >= hilberts[:-1])): + return False + order = np.argsort(hilberts, kind="stable") + sorted_table = table.take(pa.array(order)) + sorted_table = sorted_table.replace_schema_metadata(pf.schema_arrow.metadata) + write_kwargs = {"compression": compression} + if compression_level is not None: + write_kwargs["compression_level"] = compression_level + pq.write_table(sorted_table, path, **write_kwargs) + return True + + def _build_output_schema(input_schema: pa.Schema, merged_bbox, geom_types) -> pa.Schema: """Patch the geo metadata: merged bbox + union of geometry_types. Other schema metadata and field metadata are preserved unchanged.""" diff --git a/fiboa_cli/datasets/es.py b/fiboa_cli/datasets/es.py index 5ecd23ba..4870d93d 100644 --- a/fiboa_cli/datasets/es.py +++ b/fiboa_cli/datasets/es.py @@ -1,12 +1,15 @@ import re import requests +from vecorel_cli.conversion.admin import AdminConverterMixin from vecorel_cli.vecorel.extensions import ADMIN_DIVISION +from fiboa_cli.datasets.commons.hcat import AddHCATMixin + from ..conversion.per_file import PerFileBaseConverter -class Converter(PerFileBaseConverter): +class Converter(AdminConverterMixin, AddHCATMixin, PerFileBaseConverter): id = "es" short_name = "Spain" title = "Spain Declared Crops (Cultivos Declarados SIGPAC)" @@ -25,15 +28,17 @@ class Converter(PerFileBaseConverter): variants = {"2025": "2025"} + # FEGA declared-crop codelist (PARC_PRODUCTO) — separate from the SIGPAC land-use list. + # Reference list shipped inside each provincial GPKG as the `cod_producto` layer. + ec_mapping_csv = "https://fiboa.org/code/es/es.csv" + columns = { "geometry": "geometry", "id": "id", - "provincia": "admin_province_code", - "municipio": "admin_municipality_code", + "provincia": "admin:subdivision_code", "dn_surface": "metrics:area", "parc_producto": "crop:code", "parc_sistexp": "irrigation_system", - "parc_supcult": "cultivation_surface", } area_is_in_ha = False @@ -43,27 +48,15 @@ class Converter(PerFileBaseConverter): ADMIN_DIVISION, } - column_additions = { - "admin:country_code": "ES", - # FEGA declared-crop codelist (PARC_PRODUCTO) — separate from the SIGPAC land-use list. - # Reference list shipped inside each provincial GPKG as the `cod_producto` layer. - "crop:code_list": "https://fiboa.org/code/es/cultivos_declarados/parc_producto.csv", - } - column_migrations = { - # crop:code must be a string per the crop extension; parc_producto is an integer. - "parc_producto": lambda col: col.astype("Int64").astype(str), - # admin_*_code are strings; zero-pad province to 2 digits (INE convention). + "parc_producto": lambda col: col.astype("Int64").fillna(0).astype(str), "provincia": lambda col: col.astype("Int64").astype(str).str.zfill(2), - "municipio": lambda col: col.astype("Int64").astype(str), } missing_schemas = { "properties": { - "admin_province_code": {"type": "string"}, "admin_municipality_code": {"type": "string"}, "irrigation_system": {"type": "string"}, - "cultivation_surface": {"type": "int32"}, } } From cf85e8143af3110c06593f407ae2be0571e0b13c Mon Sep 17 00:00:00 2001 From: Ivor Bosloper Date: Wed, 20 May 2026 20:25:06 +0200 Subject: [PATCH 4/6] Fail early for debugging --- fiboa_cli/datasets/commons/hcat.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/fiboa_cli/datasets/commons/hcat.py b/fiboa_cli/datasets/commons/hcat.py index 5a94d219..dad4e448 100644 --- a/fiboa_cli/datasets/commons/hcat.py +++ b/fiboa_cli/datasets/commons/hcat.py @@ -35,8 +35,8 @@ def __init__(self, *args, **kwargs): def convert(self, *args, **kwargs): self.mapping_file = kwargs.get("mapping_file") if not self.mapping_file: - assert self.ec_mapping_csv is not None, ( - "Specify ec_mapping_csv in Converter, e.g. find them at https://github.com/maja601/EuroCrops/tree/main/csvs/country_mappings" + assert isinstance(self.ec_mapping_csv, str), ( + "Specify proper ec_mapping_csv in Converter, e.g. find them at https://github.com/maja601/EuroCrops/tree/main/csvs/country_mappings" ) return super().convert(*args, **kwargs) From cea4333a6f155770df4e99f599936e60216ad2ce Mon Sep 17 00:00:00 2001 From: Ivor Bosloper Date: Wed, 20 May 2026 20:25:28 +0200 Subject: [PATCH 5/6] Don't upload part files --- fiboa_cli/publish.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/fiboa_cli/publish.py b/fiboa_cli/publish.py index 122cc4da..81d8fa29 100644 --- a/fiboa_cli/publish.py +++ b/fiboa_cli/publish.py @@ -441,4 +441,6 @@ def upload_to_aws(self, target): self.info("Uploading to aws") self.check_command("aws") - self.exc(f"aws s3 sync --exclude '.*' {target} {self.s3_upload_path}") + self.exc( + f"aws s3 sync --exclude '.*' --exclude '*_part.parquet' {target} {self.s3_upload_path}" + ) From 4ba30cac8ee87dfeb92b407d128eadcd9332c9fb Mon Sep 17 00:00:00 2001 From: Ivor Bosloper Date: Wed, 20 May 2026 20:52:38 +0200 Subject: [PATCH 6/6] Keep geo_parquet version in PerFileConverter --- fiboa_cli/conversion/per_file.py | 39 ++++++++++++++++++++++++++++---- 1 file changed, 34 insertions(+), 5 deletions(-) diff --git a/fiboa_cli/conversion/per_file.py b/fiboa_cli/conversion/per_file.py index bd29f0f6..9b9e86b0 100644 --- a/fiboa_cli/conversion/per_file.py +++ b/fiboa_cli/conversion/per_file.py @@ -73,7 +73,13 @@ def convert( original_geometries=original_geometries, **kwargs, ) - self.merge_files(output_file, part_files, compression=compression or "zstd") + self.merge_files( + output_file, + part_files, + compression=compression or "zstd", + compression_level=compression_level, + geoparquet_version=geoparquet_version, + ) return output_file def merge_files( @@ -83,6 +89,7 @@ def merge_files( batch_size: int = DEFAULT_BATCH_SIZE, compression: str = "zstd", compression_level: Optional[int] = None, + geoparquet_version: Optional[str] = None, cleanup_parts: bool = False, ) -> str: """ @@ -97,7 +104,19 @@ def merge_files( were produced by an older vecorel-cli (which sorted by WKB lex order instead of Hilbert) and would otherwise silently drop rows in the streaming merge (``np.searchsorted`` requires a sorted input). + + ``geoparquet_version`` (``"1.0.0"`` / ``"1.1.0"`` / ``None``) sets the + ``version`` field of the merged file's ``geo`` metadata. When ``None`` + (default), the value declared by the input files is preserved unchanged. """ + if geoparquet_version is not None: + from vecorel_cli.const import GEOPARQUET_VERSIONS + + if geoparquet_version not in GEOPARQUET_VERSIONS: + raise ValueError( + f"Invalid geoparquet_version {geoparquet_version!r}; " + f"expected one of {GEOPARQUET_VERSIONS}" + ) if not paths: raise ValueError("No paths to merge") paths = [str(p) for p in paths] @@ -180,6 +199,7 @@ def merge_files( batch_size, compression, compression_level, + geoparquet_version, ) actual_rows = pq.ParquetFile(output_file).metadata.num_rows if actual_rows != expected_rows: @@ -264,9 +284,15 @@ def _ensure_hilbert_sorted( return True -def _build_output_schema(input_schema: pa.Schema, merged_bbox, geom_types) -> pa.Schema: - """Patch the geo metadata: merged bbox + union of geometry_types. Other - schema metadata and field metadata are preserved unchanged.""" +def _build_output_schema( + input_schema: pa.Schema, + merged_bbox, + geom_types, + geoparquet_version: Optional[str] = None, +) -> pa.Schema: + """Patch the geo metadata: merged bbox + union of geometry_types, and + optionally overwrite the GeoParquet ``version`` field. Other schema + metadata and field metadata are preserved unchanged.""" meta = dict(input_schema.metadata or {}) geo = json.loads(meta[GEO_META_KEY]) primary_col = geo["primary_column"] @@ -274,6 +300,8 @@ def _build_output_schema(input_schema: pa.Schema, merged_bbox, geom_types) -> pa geo["columns"][primary_col]["bbox"] = [float(v) for v in merged_bbox] if geom_types: geo["columns"][primary_col]["geometry_types"] = list(geom_types) + if geoparquet_version is not None: + geo["version"] = geoparquet_version meta[GEO_META_KEY] = json.dumps(geo).encode("utf-8") return input_schema.with_metadata(meta) @@ -288,10 +316,11 @@ def _streaming_merge( batch_size: int, compression: str, compression_level: Optional[int], + geoparquet_version: Optional[str] = None, ) -> None: pq_files = [pq.ParquetFile(p) for p in paths] in_schema = pq_files[0].schema_arrow - out_schema = _build_output_schema(in_schema, merged_bbox, geom_types) + out_schema = _build_output_schema(in_schema, merged_bbox, geom_types, geoparquet_version) iters = [pf.iter_batches(batch_size=batch_size) for pf in pq_files] heads: list = [None] * len(paths)