Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ['3.8', '3.9', '3.10', '3.11', '3.12', '3.13']
python-version: ['3.10', '3.11', '3.12', '3.13', '3.14']

steps:
- uses: actions/checkout@v4
Expand Down
40 changes: 40 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,46 @@ It also implements the `close()` method, as suggested by the PEP-2049
specification, to support situations where the cursor is wrapped in a
`contextmanager.closing()`.

### Storing results in cloud storage

For large query results, you can store them directly in cloud storage
instead of retrieving them over the connection. This is useful when
results are too large to transfer efficiently, or when you want to
process them later with other tools.

```python
from wherobots.db import connect, Store, StorageFormat
from wherobots.db.region import Region
from wherobots.db.runtime import Runtime

with connect(
api_key='...',
runtime=Runtime.TINY,
region=Region.AWS_US_WEST_2) as conn:
curr = conn.cursor()

# Store results with a presigned URL for easy download
curr.execute(
"SELECT * FROM wherobots_open_data.overture.places LIMIT 1000",
store=Store.for_download()
)
store_result = curr.get_store_result()
print(f"Results stored at: {store_result.result_uri}")
print(f"Size: {store_result.size} bytes")
```

The `Store` class supports the following options:

* `format`: output format - `StorageFormat.PARQUET` (default),
`StorageFormat.CSV`, or `StorageFormat.GEOJSON`
* `single`: if `True`, write results to a single file instead of
multiple partitioned files (default: `True`)
* `generate_presigned_url`: if `True`, generate a presigned URL for
downloading results (default: `False`)

Use `Store.for_download()` as a convenient shorthand for storing results
as a single Parquet file with a presigned URL.

### Runtime and region selection

You can chose the Wherobots runtime you want to use using the `runtime`
Expand Down
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
[project]
name = "wherobots-python-dbapi"
version = "0.22.0"
version = "0.23.0"
description = "Python DB-API driver for Wherobots DB"
authors = [{ name = "Maxime Petazzoni", email = "max@wherobots.com" }]
requires-python = ">=3.8, <4"
requires-python = ">=3.10, <4"
readme = "README.md"
license = "Apache-2.0"
dependencies = [
Expand Down
42 changes: 34 additions & 8 deletions tests/smoke.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
from wherobots.db.region import Region
from wherobots.db.runtime import Runtime
from wherobots.db.session_type import SessionType
from wherobots.db.models import Store, StoreResult


if __name__ == "__main__":
parser = argparse.ArgumentParser()
Expand All @@ -24,6 +26,8 @@
parser.add_argument("--region", help="Region to connect to (ie. aws-us-west-2)")
parser.add_argument("--runtime", help="Runtime type (ie. tiny)")
parser.add_argument("--version", help="Runtime version (ie. latest)")
parser.add_argument("--force-new", action="store_true")
parser.add_argument("--store", action="store_true")
parser.add_argument(
"--session-type",
help="Type of session to create",
Expand Down Expand Up @@ -64,6 +68,7 @@
api_key = None
token = None
headers = None
store = None

if args.api_key_file:
with open(args.api_key_file) as f:
Expand All @@ -75,6 +80,10 @@
token = f.read().strip()
headers = {"Authorization": f"Bearer {token}"}

if args.store:
store = Store.for_download()
logging.info("Will requests for results to be stored in cloud storage.")

if args.ws_url:
conn_func = functools.partial(connect_direct, uri=args.ws_url, headers=headers)
else:
Expand All @@ -88,23 +97,40 @@
runtime=Runtime(args.runtime) if args.runtime else Runtime.MICRO,
region=Region(args.region) if args.region else Region.AWS_US_WEST_2,
version=args.version,
force_new=args.force_new,
session_type=SessionType(args.session_type),
)

def render(results: pandas.DataFrame) -> None:
table = Table()
def render_df(df: pandas.DataFrame) -> Table:
table = Table(show_header=True)
table.add_column("#")
for column in results.columns:
for column in df.columns:
table.add_column(column, max_width=args.wide, no_wrap=True)
for row in results.itertuples(name=None):
for row in df.itertuples(name=None):
r = [str(x) for x in row]
table.add_row(*r)
Console().print(table)
return table

def render_stored(sr: StoreResult) -> Table:
table = Table(show_header=True)
table.add_column("URI")
table.add_column("Size", justify="right")
table.add_row(sr.result_uri, str(sr.size))
return table

def render(results: pandas.DataFrame | StoreResult) -> None:
if isinstance(results, StoreResult):
Console().print(render_stored(results))
else:
Console().print(render_df(results))

def execute(conn: Connection, sql: str) -> pandas.DataFrame:
def execute(conn: Connection, sql: str) -> pandas.DataFrame | StoreResult:
with conn.cursor() as cursor:
cursor.execute(sql)
return cursor.fetchall()
cursor.execute(sql, store=store)
if args.store:
return cursor.get_store_result()
else:
return cursor.fetchall()

try:
with conn_func() as conn:
Expand Down
Loading