Python SDK for the Yekta data catalog — credential vending, catalog browsing, schema tracking, and access control.
Yekta is a credential broker first, data catalog second. The SDK gives data engineers temporary, scoped credentials for their existing tools (DuckDB, Spark, Polars, pandas) without ever exposing raw storage keys.
pip install yekta-sdk # Core (httpx + pydantic)
pip install yekta-sdk[duckdb] # + DuckDB helpers
pip install yekta-sdk[polars] # + Polars/Delta helpers
pip install yekta-sdk[pandas] # + pandas/fsspec helpers
pip install yekta-sdk[all] # Everything
# Or with uv
uv pip install yekta-sdk
uv add yekta-sdk[duckdb]Browse resources, preview data, view schemas, manage access control.
Get short-lived, scoped storage tokens for DuckDB, Spark, Polars, pandas, or any Azure/S3/GCS tool.
from yekta import SeparAuth, YektaClient
# Option 1: Login with email/password
auth = SeparAuth("http://localhost:8080")
tokens = auth.login_token("user@example.com", "password")
client = YektaClient("http://localhost:8081", token=tokens.access_token)
# Option 2: Direct token
client = YektaClient("http://localhost:8081", token="your-jwt-token")
# Option 3: From environment variables
client = YektaClient.from_env() # reads YEKTA_URL, YEKTA_TOKEN| Method | Description |
|---|---|
login(email, password) |
Login via /auth/login |
login_token(email, password) |
Login via /auth/token (grant_type=password) |
set_token(token) |
Set token directly |
whoami() → UserInfo |
Validate token, get user info |
info = auth.whoami()
info.user_id # "019c1808-..."
info.email # "user@example.com"
info.display_name # "Alice Chen"
info.tenant_id # "tenant-uuid"
info.workspace_id # "workspace-uuid"
info.roles # ["admin", "read", "write"]The core feature. Get temporary, scoped credentials for your existing tools.
from yekta import CredentialVendor
vendor = CredentialVendor(
separ_url="http://localhost:8080",
api_key="your-api-key",
)
# Or from env: vendor = CredentialVendor.from_env()
cred = vendor.vend(
connection_id="your-connection-uuid",
resource_path="dev/bronze/my-table",
operation="read", # "read" or "read_write"
scope="container", # "container" (DuckDB/Spark) or "directory" (file listing)
ttl_seconds=3600, # 1 hour (default)
)cred.token # SAS token string
cred.cloud # "azure" (today), "aws"/"gcs" (future)
cred.account # Storage account name
cred.container # Container/bucket name
cred.resource_path # Scoped path
cred.expires_at # datetime (UTC)
cred.expires_at_ms # Unix epoch milliseconds
cred.is_expired # bool — check before useEvery adapter is cloud-agnostic — the same method works for Azure today and AWS/GCS in the future.
import duckdb
# Option A: Manual setup
conn = duckdb.connect()
conn.execute(cred.duckdb_secret()) # CREATE SECRET with SAS token
df = conn.sql(f"SELECT * FROM delta_scan('{cred.az_uri()}') LIMIT 100").df()
# Option B: Pre-configured connection
conn = vendor.duckdb_connection(connection_id="...", resource_path="dev/bronze/my-table")
df = conn.sql("SELECT * FROM delta_scan('az://container/path') LIMIT 100").df()for key, value in cred.spark_config().items():
spark.conf.set(key, value)
df = spark.read.format("delta").load(cred.abfss_uri())import polars as pl
df = pl.read_delta(
cred.abfss_uri(),
storage_options=cred.storage_options(),
)import pandas as pd
df = pd.read_parquet(
cred.abfss_uri(),
storage_options=cred.storage_options(),
)# Use the connection string directly
conn_str = cred.connection_string()
# "AccountName=myaccount;SharedAccessSignature=sv=2022-11-02&sr=c&sp=rl&..."cred.abfss_uri() # abfss://container@account.dfs.core.windows.net/path
cred.az_uri() # az://container/path (DuckDB format)
cred.s3_uri() # s3://container/path (future)
cred.connection_string() # AccountName=...;SharedAccessSignature=...from yekta import YektaClient
client = YektaClient.from_env()resources = client.ls("/dev/bronze")
resources = client.ls("/", format="delta", limit=50)
for r in resources:
print(f"{r.path} | {r.format} | {r.storage_location}")r = client.get("/dev/bronze/my-table")
r.path # "/dev/bronze/my-table"
r.format # "delta"
r.storage_location # "az://container/path/"
r.connection_id # "connection-uuid"
r.workspace_id # "workspace-uuid"
r.description # "My table description"
r.tags # {"team": "data-eng", "env": "dev"}
r.created_at # "2026-01-15T..."r = client.create(
path="/dev/bronze/new-table",
storage_location="az://container/dev/bronze/new-table/",
format="delta",
description="New table from pipeline",
tags={"team": "data-eng"},
connection_id="connection-uuid",
)r = client.update("/dev/bronze/my-table", description="Updated description")
r = client.update("/dev/bronze/my-table", tags={"env": "staging"})client.delete("/dev/bronze/my-table")
client.delete("/dev/bronze", cascade=True) # Delete all under /dev/bronzen = client.count("/dev/bronze") # 42All storage operations use SAS tokens under the hood (no raw credentials).
files = client.files("/dev/bronze/my-table")
for f in files:
print(f"{f.name} | {f.size_human} | {f.last_modified}")# As raw data
preview = client.preview("/dev/bronze/my-table", rows=100)
preview.columns # [{"name": "id", "type": "INTEGER"}, ...]
preview.rows # [[1, "alice"], [2, "bob"], ...]
preview.row_count # 100
# As pandas DataFrame
df = client.preview_df("/dev/bronze/my-table", rows=100)
df.head()columns = client.describe("/dev/bronze/my-table")
for col in columns:
print(f"{col.column_name}: {col.column_type} (null={col.null})")stats = client.summarize("/dev/bronze/my-table")
for col in stats:
print(f"{col.column_name}: min={col.min}, max={col.max}, unique≈{col.approx_unique}")meta = client.metadata("/dev/bronze/my-table")
meta.row_count # 613711
meta.file_count # 2
meta.compressed_size_human # "72.9 MB"
meta.format # "delta"
meta.version # 1
meta.partition_columns # ["etl_date"]
meta.compression_codecs # ["SNAPPY"]
meta.compute_time_ms # 1234stats = client.stats("/dev/bronze/my-table")
# {"file_count": 5, "total_size": 123456789, "total_size_human": "117.7 MB"}schema = client.schema("/dev/bronze/my-table")
schema.version # 3
schema.columns # [Column(name="id", data_type="INTEGER", nullable=False), ...]
for col in schema.columns:
print(f" {col.name}: {col.data_type} {'NULL' if col.nullable else 'NOT NULL'}")versions = client.schema_history("/dev/bronze/my-table")
for v in versions:
print(f" v{v.version} | {v.change_level} | {v.created_at}")old_schema = client.schema_version("/dev/bronze/my-table", version=1)diff = client.schema_diff("/dev/bronze/my-table", from_version=1, to_version=3)
for col in diff.added:
print(f" + {col.name}: {col.data_type}")
for col in diff.removed:
print(f" - {col.name}: {col.data_type}")
for change in diff.changed:
print(f" ~ {change}")# List
aliases = client.aliases()
for a in aliases:
print(f" @{a.name} → {a.target_path}")
# Get
a = client.alias("patents")
print(f"@{a.name} → {a.target_path}")
# Create
client.create_alias("patents", "/dev/bronze/my-patents", description="Patent data")
# Delete
client.delete_alias("patents")result = client.resolve("@patents")
result["path"] # "/dev/bronze/my-patents"
result["storage_location"] # "az://container/path/"
result["format"] # "delta"
# With DuckDB query
result = client.resolve("/dev/bronze/my-table", include_query=True)
result["duckdb_query"] # "SELECT * FROM delta_scan('az://...')"Fine-grained per-user, per-path permissions on catalog paths.
grants = client.grants()
grants = client.grants(path="/dev/bronze")
grants = client.grants(principal_id="user-uuid")
for g in grants:
print(f" {g.path_prefix} → {g.principal_id}: {g.permission}")# Permission levels: "read", "read_write", "admin", "deny"
g = client.grant(
path="/dev/bronze",
principal_id="user-uuid",
permission="read", # Can view only
principal_type="user", # or "group"
)
print(f"Grant created: {g.id}")client.revoke(grant_id="grant-uuid")| Permission | Storage Access | UI Access |
|---|---|---|
read |
Preview, describe, list files | View only |
read_write |
Above + create/modify resources | Edit |
admin |
Full control + manage grants | Full |
deny |
Blocked (overrides workspace role) | None |
# Trigger refresh
job = client.refresh("/dev/bronze/my-table")
print(f"Job {job.id}: {job.status}")
# List jobs
jobs = client.jobs(path="/dev/bronze/my-table", limit=10)
# Get job status
job = client.job("job-uuid")
print(f"{job.status} — {job.error or 'OK'}")
# Queue stats
q = client.queue()
print(f"Pending: {q.pending}, Running: {q.running}, Failed: {q.failed_today}")# List
policies = client.policies()
# Create (refresh every hour)
p = client.create_policy("/dev/bronze/my-table", interval_seconds=3600)
# Delete
client.delete_policy("policy-uuid")stats = client.cache_stats()
print(f"Metadata: {stats.metadata_count}, Samples: {stats.sample_count}")
client.cache_clear()from yekta import (
YektaError, # Base exception
AuthenticationError, # 401 — invalid/expired token
AccessDeniedError, # 403 — insufficient permissions
ResourceNotFoundError, # 404 — resource doesn't exist
StorageEmptyError, # 404 — storage path has no data
StorageError, # 502 — storage access failed
ValidationError, # 400/422 — invalid input
CredentialExpiredError, # Vended credential expired
)
try:
client.get("/nonexistent/path")
except ResourceNotFoundError as e:
print(f"Not found: {e.message}")
except AccessDeniedError:
print("No permission")
except YektaError as e:
print(f"Error ({e.status_code}): {e.message}")with YektaClient.from_env() as client:
resources = client.ls("/")
# client.close() called automatically| Variable | Default | Description |
|---|---|---|
YEKTA_URL |
http://localhost:8081 |
Yekta catalog API URL |
YEKTA_TOKEN |
— | JWT token for authentication |
SEPAR_URL |
http://localhost:8080 |
Separ auth/credential API URL |
SEPAR_API_KEY |
— | API key for service-to-service auth |
| Method | Returns | Description |
|---|---|---|
ls(path, format, limit) |
list[Resource] |
List resources |
get(path) |
Resource |
Get resource details |
create(path, storage_location, format, ...) |
Resource |
Register resource |
update(path, **kwargs) |
Resource |
Update resource |
delete(path, cascade) |
None |
Delete resource(s) |
count(path) |
int |
Count resources under path |
resolve(ref, include_query) |
dict |
Resolve path or @alias |
files(path, limit) |
list[FileInfo] |
List storage files |
preview(path, rows) |
PreviewData |
Preview sample rows |
preview_df(path, rows) |
DataFrame |
Preview as pandas DataFrame |
describe(path) |
list[DescribeColumn] |
DESCRIBE columns |
summarize(path) |
list[SummarizeColumn] |
SUMMARIZE statistics |
metadata(path) |
ResourceMetadata |
Full metadata |
stats(path) |
dict |
Storage stats |
schema(path) |
SchemaInfo |
Current schema |
schema_history(path, limit) |
list[SchemaVersion] |
Version history |
schema_version(path, version) |
SchemaInfo |
Specific version |
schema_diff(path, from_v, to_v) |
SchemaDiff |
Diff between versions |
aliases() |
list[Alias] |
List aliases |
alias(name) |
Alias |
Get alias |
create_alias(name, target, desc) |
Alias |
Create alias |
delete_alias(name) |
None |
Delete alias |
grants(path, principal_id) |
list[Grant] |
List grants |
grant(path, principal_id, permission) |
Grant |
Create grant |
revoke(grant_id) |
None |
Delete grant |
refresh(path) |
RefreshJob |
Trigger refresh |
jobs(path, limit) |
list[RefreshJob] |
List jobs |
job(job_id) |
RefreshJob |
Get job |
queue() |
QueueStats |
Queue stats |
policies() |
list[RefreshPolicy] |
List policies |
create_policy(path, interval) |
RefreshPolicy |
Create policy |
delete_policy(id) |
None |
Delete policy |
cache_stats() |
CacheStats |
Cache info |
cache_clear() |
None |
Clear cache |
| Method | Returns | Description |
|---|---|---|
vend(connection_id, resource_path, operation, scope, ttl) |
VendedCredential |
Get temporary credential |
duckdb_connection(connection_id, resource_path) |
duckdb.Connection |
Pre-configured DuckDB |
| Method / Property | Returns | Description |
|---|---|---|
.token |
str |
Raw SAS/STS token |
.cloud |
str |
"azure", "aws", "gcs" |
.account |
str |
Storage account |
.container |
str |
Container/bucket |
.is_expired |
bool |
Check expiry |
.duckdb_secret(name) |
str |
CREATE SECRET SQL |
.spark_config() |
dict |
Spark hadoop config |
.storage_options() |
dict |
fsspec/polars/pandas options |
.connection_string() |
str |
Azure connection string |
.abfss_uri() |
str |
ADLS Gen2 URI |
.az_uri() |
str |
DuckDB-style URI |
.s3_uri() |
str |
S3 URI (future) |
Apache-2.0