Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 25 additions & 2 deletions docs/source/publishing/ogcapi-features.rst
Original file line number Diff line number Diff line change
Expand Up @@ -585,7 +585,7 @@ To publish a GeoParquet file (with a geometry column) the geopandas package is a
- type: feature
name: Parquet
data:
source: ./tests/data/parquet/random.parquet
source: ./tests/data/parquet/naive/random.parquet
id_field: id
time_field: time
x_field:
Expand All @@ -595,11 +595,34 @@ To publish a GeoParquet file (with a geometry column) the geopandas package is a
- minlat
- maxlat

For GeoParquet data, the `x_field` and `y_field` must be specified in the provider definition,
For older versions of parquet data that don't comply to GeoParquet v1.1, the `x_field` and `y_field` must be specified in the provider definition,
and they must be arrays of two column names that contain the x and y coordinates of the
bounding box of each geometry. If the geometries in the data are all points, the `x_field` and `y_field`
can be strings instead of arrays and refer to a single column each.

.. code-block:: yaml

providers:
- type: feature
name: Parquet
id_field: id
data:
source: ./tests/data/parquet/geoparquet1.1/nyc_subset_overture.parquet
batch_size: 10000
batch_readahead: 2


For GeoParquet data which complies to spec version 1.1, all geometry metadata will be automatically
detected.

Note that for any version of parquet, you may optionally specify ``batch_size`` and ``batch_readahead`` in the ``data`` section of the parquet provider config.
``batch_size`` controls how many rows are fetched per batch. Large batch sizes speed up data processing, but add more I/O time like increased latency when fetching data from an object store, and . If not defined it will
default to 20,000 rows.

``batch_readahead`` controls how many batches are buffered in memory. If not specified it will default to 2.
Since OGC API Features payloads are often paginated and fairly small, it generally makes sense to specify a small number to avoid reading too many batches ahead of time, especially when fetching from an object store.


.. _PostgreSQL:

PostgreSQL
Expand Down
32 changes: 29 additions & 3 deletions pygeoapi/provider/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,8 @@ def __init__(self, provider_def):
name: Parquet
data:
source: s3://example.com/parquet_directory/

batch_size: 10000
batch_readahead: 2
id_field: gml_id


Expand All @@ -121,6 +122,23 @@ def __init__(self, provider_def):

# Source url is required
self.source = self.data.get('source')
# When iterating over a dataset, the batch size
# controls how many records are read at a time;
# a larger batch size can reduce latency for large/complex
# requests at the cost of more memory usage
# and potentially overfetching;
# More information on batching can be found here:
# https://arrow.apache.org/docs/python/generated/pyarrow.dataset.Dataset.html#pyarrow.dataset.Dataset.scanner # noqa
# This value can be reduced to decrease network transfer
# if fetching data from an object store
self.batch_size = self.data.get('batch_size', 20_000)

# batch_readahead is the number of batches to prefetch;
# This adds extra memory but can reduce latency for large
# or complicated queries; in an OGC API Features context,
# it generally makes sense to have some buffering but keep it
# low since most responses are small
self.batch_readahead = self.data.get('batch_readahead', 2)
if not self.source:
msg = 'Need explicit "source" attr in data' \
' field of provider config'
Expand All @@ -136,7 +154,8 @@ def __init__(self, provider_def):
self.fs = None

# Build pyarrow dataset pointing to the data
self.ds = pyarrow.dataset.dataset(self.source, filesystem=self.fs)
self.ds: pyarrow.dataset.Dataset = \
pyarrow.dataset.dataset(self.source, filesystem=self.fs)

if not self.id_field:
LOGGER.info(
Expand Down Expand Up @@ -231,6 +250,11 @@ def _read_parquet(self, return_scanner=False, **kwargs):
:returns: generator of RecordBatch with the queried values
"""
scanner = self.ds.scanner(
batch_size=self.batch_size,
# default batch readahead is 16 which is generally
# far too high in a server context; we can safely set it
# to 2 which allows for queueing without excessive reads
batch_readahead=self.batch_readahead,
use_threads=True,
**kwargs
)
Expand Down Expand Up @@ -573,7 +597,9 @@ def _response_feature_hits(self, filter):

try:
scanner = pyarrow.dataset.Scanner.from_dataset(
self.ds, filter=filter
self.ds, filter=filter,
batch_size=self.batch_size,
batch_readahead=self.batch_readahead
)
return {
'type': 'FeatureCollection',
Expand Down