diff --git a/docs/source/publishing/ogcapi-features.rst b/docs/source/publishing/ogcapi-features.rst index df40d27bd..8a38e0b98 100644 --- a/docs/source/publishing/ogcapi-features.rst +++ b/docs/source/publishing/ogcapi-features.rst @@ -585,7 +585,7 @@ To publish a GeoParquet file (with a geometry column) the geopandas package is a - type: feature name: Parquet data: - source: ./tests/data/parquet/random.parquet + source: ./tests/data/parquet/naive/random.parquet id_field: id time_field: time x_field: @@ -595,11 +595,34 @@ To publish a GeoParquet file (with a geometry column) the geopandas package is a - minlat - maxlat -For GeoParquet data, the `x_field` and `y_field` must be specified in the provider definition, +For older versions of parquet data that don't comply to GeoParquet v1.1, the `x_field` and `y_field` must be specified in the provider definition, and they must be arrays of two column names that contain the x and y coordinates of the bounding box of each geometry. If the geometries in the data are all points, the `x_field` and `y_field` can be strings instead of arrays and refer to a single column each. +.. code-block:: yaml + + providers: + - type: feature + name: Parquet + id_field: id + data: + source: ./tests/data/parquet/geoparquet1.1/nyc_subset_overture.parquet + batch_size: 10000 + batch_readahead: 2 + + +For GeoParquet data which complies to spec version 1.1, all geometry metadata will be automatically +detected. + +Note that for any version of parquet, you may optionally specify ``batch_size`` and ``batch_readahead`` in the ``data`` section of the parquet provider config. +``batch_size`` controls how many rows are fetched per batch. Large batch sizes speed up data processing, but add more I/O time like increased latency when fetching data from an object store, and . If not defined it will +default to 20,000 rows. + +``batch_readahead`` controls how many batches are buffered in memory. If not specified it will default to 2. +Since OGC API Features payloads are often paginated and fairly small, it generally makes sense to specify a small number to avoid reading too many batches ahead of time, especially when fetching from an object store. + + .. _PostgreSQL: PostgreSQL diff --git a/pygeoapi/provider/parquet.py b/pygeoapi/provider/parquet.py index 8d69e9940..8413963e0 100644 --- a/pygeoapi/provider/parquet.py +++ b/pygeoapi/provider/parquet.py @@ -108,7 +108,8 @@ def __init__(self, provider_def): name: Parquet data: source: s3://example.com/parquet_directory/ - + batch_size: 10000 + batch_readahead: 2 id_field: gml_id @@ -121,6 +122,23 @@ def __init__(self, provider_def): # Source url is required self.source = self.data.get('source') + # When iterating over a dataset, the batch size + # controls how many records are read at a time; + # a larger batch size can reduce latency for large/complex + # requests at the cost of more memory usage + # and potentially overfetching; + # More information on batching can be found here: + # https://arrow.apache.org/docs/python/generated/pyarrow.dataset.Dataset.html#pyarrow.dataset.Dataset.scanner # noqa + # This value can be reduced to decrease network transfer + # if fetching data from an object store + self.batch_size = self.data.get('batch_size', 20_000) + + # batch_readahead is the number of batches to prefetch; + # This adds extra memory but can reduce latency for large + # or complicated queries; in an OGC API Features context, + # it generally makes sense to have some buffering but keep it + # low since most responses are small + self.batch_readahead = self.data.get('batch_readahead', 2) if not self.source: msg = 'Need explicit "source" attr in data' \ ' field of provider config' @@ -136,7 +154,8 @@ def __init__(self, provider_def): self.fs = None # Build pyarrow dataset pointing to the data - self.ds = pyarrow.dataset.dataset(self.source, filesystem=self.fs) + self.ds: pyarrow.dataset.Dataset = \ + pyarrow.dataset.dataset(self.source, filesystem=self.fs) if not self.id_field: LOGGER.info( @@ -231,6 +250,11 @@ def _read_parquet(self, return_scanner=False, **kwargs): :returns: generator of RecordBatch with the queried values """ scanner = self.ds.scanner( + batch_size=self.batch_size, + # default batch readahead is 16 which is generally + # far too high in a server context; we can safely set it + # to 2 which allows for queueing without excessive reads + batch_readahead=self.batch_readahead, use_threads=True, **kwargs ) @@ -573,7 +597,9 @@ def _response_feature_hits(self, filter): try: scanner = pyarrow.dataset.Scanner.from_dataset( - self.ds, filter=filter + self.ds, filter=filter, + batch_size=self.batch_size, + batch_readahead=self.batch_readahead ) return { 'type': 'FeatureCollection',