Skip to content

Angelerator/yekta-sdk

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

3 Commits
 
 
 
 
 
 
 
 
 
 

Repository files navigation

yekta-sdk

Python SDK for the Yekta data catalog — credential vending, catalog browsing, schema tracking, and access control.

Yekta is a credential broker first, data catalog second. The SDK gives data engineers temporary, scoped credentials for their existing tools (DuckDB, Spark, Polars, pandas) without ever exposing raw storage keys.

Install

pip install yekta-sdk                # Core (httpx + pydantic)
pip install yekta-sdk[duckdb]        # + DuckDB helpers
pip install yekta-sdk[polars]        # + Polars/Delta helpers
pip install yekta-sdk[pandas]        # + pandas/fsspec helpers
pip install yekta-sdk[all]           # Everything

# Or with uv
uv pip install yekta-sdk
uv add yekta-sdk[duckdb]

Two Entry Points

1. YektaClient — Catalog operations (talks to Yekta API)

Browse resources, preview data, view schemas, manage access control.

2. CredentialVendor — Temporary credentials (talks to Separ API)

Get short-lived, scoped storage tokens for DuckDB, Spark, Polars, pandas, or any Azure/S3/GCS tool.


Authentication

from yekta import SeparAuth, YektaClient

# Option 1: Login with email/password
auth = SeparAuth("http://localhost:8080")
tokens = auth.login_token("user@example.com", "password")
client = YektaClient("http://localhost:8081", token=tokens.access_token)

# Option 2: Direct token
client = YektaClient("http://localhost:8081", token="your-jwt-token")

# Option 3: From environment variables
client = YektaClient.from_env()  # reads YEKTA_URL, YEKTA_TOKEN

SeparAuth

Method Description
login(email, password) Login via /auth/login
login_token(email, password) Login via /auth/token (grant_type=password)
set_token(token) Set token directly
whoami()UserInfo Validate token, get user info

UserInfo

info = auth.whoami()
info.user_id        # "019c1808-..."
info.email          # "user@example.com"
info.display_name   # "Alice Chen"
info.tenant_id      # "tenant-uuid"
info.workspace_id   # "workspace-uuid"
info.roles          # ["admin", "read", "write"]

Credential Vending

The core feature. Get temporary, scoped credentials for your existing tools.

from yekta import CredentialVendor

vendor = CredentialVendor(
    separ_url="http://localhost:8080",
    api_key="your-api-key",
)
# Or from env: vendor = CredentialVendor.from_env()

cred = vendor.vend(
    connection_id="your-connection-uuid",
    resource_path="dev/bronze/my-table",
    operation="read",           # "read" or "read_write"
    scope="container",          # "container" (DuckDB/Spark) or "directory" (file listing)
    ttl_seconds=3600,           # 1 hour (default)
)

VendedCredential

cred.token              # SAS token string
cred.cloud              # "azure" (today), "aws"/"gcs" (future)
cred.account            # Storage account name
cred.container          # Container/bucket name
cred.resource_path      # Scoped path
cred.expires_at         # datetime (UTC)
cred.expires_at_ms      # Unix epoch milliseconds
cred.is_expired         # bool — check before use

Tool Adapters

Every adapter is cloud-agnostic — the same method works for Azure today and AWS/GCS in the future.

DuckDB

import duckdb

# Option A: Manual setup
conn = duckdb.connect()
conn.execute(cred.duckdb_secret())  # CREATE SECRET with SAS token
df = conn.sql(f"SELECT * FROM delta_scan('{cred.az_uri()}') LIMIT 100").df()

# Option B: Pre-configured connection
conn = vendor.duckdb_connection(connection_id="...", resource_path="dev/bronze/my-table")
df = conn.sql("SELECT * FROM delta_scan('az://container/path') LIMIT 100").df()

Spark

for key, value in cred.spark_config().items():
    spark.conf.set(key, value)

df = spark.read.format("delta").load(cred.abfss_uri())

Polars

import polars as pl

df = pl.read_delta(
    cred.abfss_uri(),
    storage_options=cred.storage_options(),
)

pandas

import pandas as pd

df = pd.read_parquet(
    cred.abfss_uri(),
    storage_options=cred.storage_options(),
)

Raw Azure SDK

# Use the connection string directly
conn_str = cred.connection_string()
# "AccountName=myaccount;SharedAccessSignature=sv=2022-11-02&sr=c&sp=rl&..."

URI Helpers

cred.abfss_uri()           # abfss://container@account.dfs.core.windows.net/path
cred.az_uri()              # az://container/path  (DuckDB format)
cred.s3_uri()              # s3://container/path  (future)
cred.connection_string()   # AccountName=...;SharedAccessSignature=...

Resource Management

from yekta import YektaClient

client = YektaClient.from_env()

List Resources

resources = client.ls("/dev/bronze")
resources = client.ls("/", format="delta", limit=50)

for r in resources:
    print(f"{r.path} | {r.format} | {r.storage_location}")

Get Resource

r = client.get("/dev/bronze/my-table")
r.path                  # "/dev/bronze/my-table"
r.format                # "delta"
r.storage_location      # "az://container/path/"
r.connection_id         # "connection-uuid"
r.workspace_id          # "workspace-uuid"
r.description           # "My table description"
r.tags                  # {"team": "data-eng", "env": "dev"}
r.created_at            # "2026-01-15T..."

Create Resource

r = client.create(
    path="/dev/bronze/new-table",
    storage_location="az://container/dev/bronze/new-table/",
    format="delta",
    description="New table from pipeline",
    tags={"team": "data-eng"},
    connection_id="connection-uuid",
)

Update Resource

r = client.update("/dev/bronze/my-table", description="Updated description")
r = client.update("/dev/bronze/my-table", tags={"env": "staging"})

Delete Resource

client.delete("/dev/bronze/my-table")
client.delete("/dev/bronze", cascade=True)  # Delete all under /dev/bronze

Count Resources

n = client.count("/dev/bronze")  # 42

Storage Operations

All storage operations use SAS tokens under the hood (no raw credentials).

List Files

files = client.files("/dev/bronze/my-table")

for f in files:
    print(f"{f.name} | {f.size_human} | {f.last_modified}")

Preview Data

# As raw data
preview = client.preview("/dev/bronze/my-table", rows=100)
preview.columns     # [{"name": "id", "type": "INTEGER"}, ...]
preview.rows        # [[1, "alice"], [2, "bob"], ...]
preview.row_count   # 100

# As pandas DataFrame
df = client.preview_df("/dev/bronze/my-table", rows=100)
df.head()

Describe (Column Types)

columns = client.describe("/dev/bronze/my-table")

for col in columns:
    print(f"{col.column_name}: {col.column_type} (null={col.null})")

Summarize (Statistics)

stats = client.summarize("/dev/bronze/my-table")

for col in stats:
    print(f"{col.column_name}: min={col.min}, max={col.max}, unique≈{col.approx_unique}")

Metadata (Comprehensive)

meta = client.metadata("/dev/bronze/my-table")
meta.row_count              # 613711
meta.file_count             # 2
meta.compressed_size_human  # "72.9 MB"
meta.format                 # "delta"
meta.version                # 1
meta.partition_columns      # ["etl_date"]
meta.compression_codecs     # ["SNAPPY"]
meta.compute_time_ms        # 1234

Storage Stats

stats = client.stats("/dev/bronze/my-table")
# {"file_count": 5, "total_size": 123456789, "total_size_human": "117.7 MB"}

Schema

Current Schema

schema = client.schema("/dev/bronze/my-table")
schema.version    # 3
schema.columns    # [Column(name="id", data_type="INTEGER", nullable=False), ...]

for col in schema.columns:
    print(f"  {col.name}: {col.data_type} {'NULL' if col.nullable else 'NOT NULL'}")

Schema History

versions = client.schema_history("/dev/bronze/my-table")

for v in versions:
    print(f"  v{v.version} | {v.change_level} | {v.created_at}")

Schema Version

old_schema = client.schema_version("/dev/bronze/my-table", version=1)

Schema Diff

diff = client.schema_diff("/dev/bronze/my-table", from_version=1, to_version=3)

for col in diff.added:
    print(f"  + {col.name}: {col.data_type}")
for col in diff.removed:
    print(f"  - {col.name}: {col.data_type}")
for change in diff.changed:
    print(f"  ~ {change}")

Aliases

# List
aliases = client.aliases()
for a in aliases:
    print(f"  @{a.name}{a.target_path}")

# Get
a = client.alias("patents")
print(f"@{a.name}{a.target_path}")

# Create
client.create_alias("patents", "/dev/bronze/my-patents", description="Patent data")

# Delete
client.delete_alias("patents")

Resolve

result = client.resolve("@patents")
result["path"]              # "/dev/bronze/my-patents"
result["storage_location"]  # "az://container/path/"
result["format"]            # "delta"

# With DuckDB query
result = client.resolve("/dev/bronze/my-table", include_query=True)
result["duckdb_query"]      # "SELECT * FROM delta_scan('az://...')"

Access Control (Grants)

Fine-grained per-user, per-path permissions on catalog paths.

List Grants

grants = client.grants()
grants = client.grants(path="/dev/bronze")
grants = client.grants(principal_id="user-uuid")

for g in grants:
    print(f"  {g.path_prefix}{g.principal_id}: {g.permission}")

Create Grant

# Permission levels: "read", "read_write", "admin", "deny"
g = client.grant(
    path="/dev/bronze",
    principal_id="user-uuid",
    permission="read",           # Can view only
    principal_type="user",       # or "group"
)
print(f"Grant created: {g.id}")

Revoke Grant

client.revoke(grant_id="grant-uuid")

Permission Levels

Permission Storage Access UI Access
read Preview, describe, list files View only
read_write Above + create/modify resources Edit
admin Full control + manage grants Full
deny Blocked (overrides workspace role) None

Refresh Jobs

# Trigger refresh
job = client.refresh("/dev/bronze/my-table")
print(f"Job {job.id}: {job.status}")

# List jobs
jobs = client.jobs(path="/dev/bronze/my-table", limit=10)

# Get job status
job = client.job("job-uuid")
print(f"{job.status}{job.error or 'OK'}")

# Queue stats
q = client.queue()
print(f"Pending: {q.pending}, Running: {q.running}, Failed: {q.failed_today}")

Refresh Policies

# List
policies = client.policies()

# Create (refresh every hour)
p = client.create_policy("/dev/bronze/my-table", interval_seconds=3600)

# Delete
client.delete_policy("policy-uuid")

Cache Management

stats = client.cache_stats()
print(f"Metadata: {stats.metadata_count}, Samples: {stats.sample_count}")

client.cache_clear()

Error Handling

from yekta import (
    YektaError,              # Base exception
    AuthenticationError,     # 401 — invalid/expired token
    AccessDeniedError,       # 403 — insufficient permissions
    ResourceNotFoundError,   # 404 — resource doesn't exist
    StorageEmptyError,       # 404 — storage path has no data
    StorageError,            # 502 — storage access failed
    ValidationError,         # 400/422 — invalid input
    CredentialExpiredError,  # Vended credential expired
)

try:
    client.get("/nonexistent/path")
except ResourceNotFoundError as e:
    print(f"Not found: {e.message}")
except AccessDeniedError:
    print("No permission")
except YektaError as e:
    print(f"Error ({e.status_code}): {e.message}")

Context Manager

with YektaClient.from_env() as client:
    resources = client.ls("/")
    # client.close() called automatically

Environment Variables

Variable Default Description
YEKTA_URL http://localhost:8081 Yekta catalog API URL
YEKTA_TOKEN JWT token for authentication
SEPAR_URL http://localhost:8080 Separ auth/credential API URL
SEPAR_API_KEY API key for service-to-service auth

Full API Reference

YektaClient

Method Returns Description
ls(path, format, limit) list[Resource] List resources
get(path) Resource Get resource details
create(path, storage_location, format, ...) Resource Register resource
update(path, **kwargs) Resource Update resource
delete(path, cascade) None Delete resource(s)
count(path) int Count resources under path
resolve(ref, include_query) dict Resolve path or @alias
files(path, limit) list[FileInfo] List storage files
preview(path, rows) PreviewData Preview sample rows
preview_df(path, rows) DataFrame Preview as pandas DataFrame
describe(path) list[DescribeColumn] DESCRIBE columns
summarize(path) list[SummarizeColumn] SUMMARIZE statistics
metadata(path) ResourceMetadata Full metadata
stats(path) dict Storage stats
schema(path) SchemaInfo Current schema
schema_history(path, limit) list[SchemaVersion] Version history
schema_version(path, version) SchemaInfo Specific version
schema_diff(path, from_v, to_v) SchemaDiff Diff between versions
aliases() list[Alias] List aliases
alias(name) Alias Get alias
create_alias(name, target, desc) Alias Create alias
delete_alias(name) None Delete alias
grants(path, principal_id) list[Grant] List grants
grant(path, principal_id, permission) Grant Create grant
revoke(grant_id) None Delete grant
refresh(path) RefreshJob Trigger refresh
jobs(path, limit) list[RefreshJob] List jobs
job(job_id) RefreshJob Get job
queue() QueueStats Queue stats
policies() list[RefreshPolicy] List policies
create_policy(path, interval) RefreshPolicy Create policy
delete_policy(id) None Delete policy
cache_stats() CacheStats Cache info
cache_clear() None Clear cache

CredentialVendor

Method Returns Description
vend(connection_id, resource_path, operation, scope, ttl) VendedCredential Get temporary credential
duckdb_connection(connection_id, resource_path) duckdb.Connection Pre-configured DuckDB

VendedCredential

Method / Property Returns Description
.token str Raw SAS/STS token
.cloud str "azure", "aws", "gcs"
.account str Storage account
.container str Container/bucket
.is_expired bool Check expiry
.duckdb_secret(name) str CREATE SECRET SQL
.spark_config() dict Spark hadoop config
.storage_options() dict fsspec/polars/pandas options
.connection_string() str Azure connection string
.abfss_uri() str ADLS Gen2 URI
.az_uri() str DuckDB-style URI
.s3_uri() str S3 URI (future)

License

Apache-2.0

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors

Languages