Skip to content

Commit 6126883

Browse files
backends now fully support branching
1 parent 5a57190 commit 6126883

31 files changed

Lines changed: 867 additions & 104 deletions

docs/authentication.md

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -79,16 +79,36 @@ For programmatic access (CI/CD, ETL, automation), use **Service Users**.
7979

8080
## 🐍 Connecting with PyIceberg
8181

82-
### Option A: Manual Token (Access Token)
82+
### Option A: Standard OAuth2 Credential Flow (Recommended)
83+
Use a Service User's `client_id` (UUID) and `client_secret` (API Key) for automated authentication.
84+
85+
```python
86+
from pyiceberg.catalog import load_catalog
87+
88+
catalog = load_catalog(
89+
"pangolin",
90+
**{
91+
"uri": "http://localhost:8080/v1/default/",
92+
"type": "rest",
93+
"credential": "<service_user_uuid>:<api_key>",
94+
"oauth2-server-uri": "http://localhost:8080/v1/default/v1/oauth/tokens",
95+
"scope": "catalog"
96+
}
97+
)
98+
```
99+
100+
> [!NOTE]
101+
> The `oauth2-server-uri` must be the full path. Pangolin supports paths starting with both `/v1/...` and `/api/v1/iceberg/...`.
102+
103+
### Option B: Manual Token (Access Token)
83104
If you have a JWT from a previous login:
84105
```python
85106
from pyiceberg.catalog import load_catalog
86107

87108
catalog = load_catalog(
88109
"pangolin",
89110
**{
90-
"uri": "http://localhost:8080",
91-
"prefix": "analytics",
111+
"uri": "http://localhost:8080/v1/default/",
92112
"token": "YOUR_JWT_TOKEN",
93113
}
94114
)

docs/features/pyiceberg_testing.md

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -153,14 +153,20 @@ from pyiceberg.catalog import load_catalog
153153
catalog = load_catalog(
154154
"pangolin",
155155
**{
156-
"uri": "http://localhost:8080/api/v1/iceberg/default",
156+
"uri": "http://localhost:8080/v1/default/",
157157
"type": "rest",
158-
"credential": "<service_user_id>:<service_user_api_key>",
159-
"oauth2-server-uri": "http://localhost:8080/v1/rest/v1/oauth/tokens",
158+
"credential": "service_user_uuid:service_user_api_key",
159+
"oauth2-server-uri": "http://localhost:8080/v1/default/v1/oauth/tokens",
160+
"scope": "catalog"
160161
}
161162
)
162163
```
163164

165+
> [!NOTE]
166+
> * **URI Suffix**: Ensure your `uri` ends with a trailing slash `/`.
167+
> * **Auth Endpoint**: `oauth2-server-uri` must be the full path: `{base_url}/v1/{catalog_name}/v1/oauth/tokens`. Pangolin also supports the `/api/v1/iceberg/{catalog_name}/v1/oauth/tokens` prefix for compatibility.
168+
> * **Credential Format**: Must be `client_id:client_secret` (UUID:API_Key).
169+
164170
### Option 2: Using an Existing Token (Temporary)
165171

166172
If you have a JWT token from the UI or another login flow, you can use it directly.

docs/features/service_users.md

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,12 @@ Administrators can provision and rotate keys directly from the terminal.
1313
```bash
1414
pangolin-admin create-service-user \
1515
--name "etl-pipeline" \
16-
--role "TenantUser" \
17-
--expires-in-days 365
16+
--role "TenantUser"
1817
```
1918

19+
> [!NOTE]
20+
> Ensure you record the **Service User ID (UUID)** from the output. If the CLI does not display the ID, use the Management UI or the API directly to retrieve it after creation.
21+
2022
**Rotate API Key:**
2123
```bash
2224
pangolin-admin rotate-service-user-key --id <service-user-uuid>
@@ -47,9 +49,10 @@ from pyiceberg.catalog import load_catalog
4749
catalog = load_catalog(
4850
"pangolin",
4951
**{
50-
"uri": "https://api.pangolin.io/api/v1/iceberg/default",
52+
"uri": "http://localhost:8080/v1/default/",
5153
"credential": "<service_user_uuid>:<api_key>",
52-
"oauth2-server-uri": "https://api.pangolin.io/v1/rest/v1/oauth/tokens",
54+
"oauth2-server-uri": "http://localhost:8080/v1/default/v1/oauth/tokens",
55+
"scope": "catalog",
5356
"type": "rest",
5457
}
5558
)

pangolin/pangolin_api/src/iceberg/oauth.rs

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,19 +19,29 @@ use crate::auth::Claims;
1919
use crate::error::ApiError;
2020
use crate::iceberg::AppState;
2121

22-
#[derive(Deserialize)]
22+
use utoipa::ToSchema;
23+
24+
#[derive(Deserialize, ToSchema)]
2325
pub struct OAuthTokenRequest {
26+
#[schema(example = "client_credentials")]
2427
grant_type: String,
28+
#[schema(example = "a1b2c3d4-e5f6-7890-1234-567890abcdef")]
2529
client_id: String,
30+
#[schema(example = "pgl_...")]
2631
client_secret: String,
32+
#[schema(example = "catalog")]
2733
scope: Option<String>,
2834
}
2935

30-
#[derive(Serialize)]
36+
#[derive(Serialize, ToSchema)]
3137
pub struct OAuthTokenResponse {
38+
#[schema(example = "eyJhbGciOiJIUzI1Ni...")]
3239
access_token: String,
40+
#[schema(example = "Bearer")]
3341
token_type: String,
42+
#[schema(example = 3600)]
3443
expires_in: u64,
44+
#[schema(example = "urn:ietf:params:oauth:token-type:access_token")]
3545
issued_token_type: String,
3646
}
3747

@@ -45,6 +55,18 @@ pub struct OAuthTokenResponse {
4555
/// - `client_secret` -> Service User API Key
4656
///
4757
/// If valid, it returns a standard Pangolin JWT signed by the server key.
58+
#[utoipa::path(
59+
post,
60+
path = "/v1/{prefix}/v1/oauth/tokens",
61+
operation_id = "oauth_token",
62+
request_body(content = OAuthTokenRequest, content_type = "application/x-www-form-urlencoded"),
63+
responses(
64+
(status = 200, description = "Token issued successfully", body = OAuthTokenResponse),
65+
(status = 400, description = "Invalid grant_type or format"),
66+
(status = 401, description = "Invalid client credentials")
67+
),
68+
tag = "Authentication"
69+
)]
4870
pub async fn handle_oauth_token(
4971
State(store): State<AppState>,
5072
Form(payload): Form<OAuthTokenRequest>,

pangolin/pangolin_api/src/iceberg/tables.rs

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -336,13 +336,15 @@ pub async fn create_table(
336336
return (StatusCode::INTERNAL_SERVER_ERROR, "Failed to write metadata").into_response();
337337
}
338338

339-
let _ = store.log_audit_event(tenant_id, pangolin_core::audit::AuditLogEntry::legacy_new(
339+
let _ = store.log_audit_event(tenant_id, pangolin_core::audit::AuditLogEntry::success(
340340
tenant_id,
341+
Some(session.user_id),
341342
session.username.clone(),
342-
"create_table".to_string(),
343+
pangolin_core::audit::AuditAction::CreateTable,
344+
pangolin_core::audit::ResourceType::Table,
345+
Some(asset.id),
343346
format!("{}/{}/{}", catalog_name, ns_name, tbl_name),
344-
Some(location.clone())
345-
)).await;
347+
).with_metadata(serde_json::json!({ "location": location.clone() }))).await;
346348

347349
let credentials = match store.get_catalog(tenant_id, catalog_name.clone()).await {
348350
Ok(Some(c)) => {
@@ -667,13 +669,15 @@ pub async fn update_table(
667669

668670
match store.update_metadata_location(tenant_id, &catalog_name, Some(branch.clone()), namespace_parts.clone(), table_name.clone(), current_metadata_location.clone(), new_metadata_location.clone()).await {
669671
Ok(_) => {
670-
let _ = store.log_audit_event(tenant_id, pangolin_core::audit::AuditLogEntry::legacy_new(
672+
let _ = store.log_audit_event(tenant_id, pangolin_core::audit::AuditLogEntry::success(
671673
tenant_id,
674+
Some(session.user_id),
672675
session.username.clone(),
673-
"update_table".to_string(),
676+
pangolin_core::audit::AuditAction::UpdateTable,
677+
pangolin_core::audit::ResourceType::Table,
678+
Some(asset.id),
674679
format!("{}/{}/{}", catalog_name, namespace, table_name),
675-
Some(new_metadata_location.clone())
676-
)).await;
680+
).with_metadata(serde_json::json!({ "new_metadata_location": new_metadata_location.clone() }))).await;
677681

678682
return (StatusCode::OK, Json(TableResponse::new(
679683
Some(new_metadata_location.clone()),
@@ -740,12 +744,14 @@ pub async fn rename_table(
740744

741745
match store.rename_asset(tenant_id, &catalog_name, branch, source_ns.clone(), source_name.clone(), dest_ns.clone(), dest_name.clone()).await {
742746
Ok(_) => {
743-
let _ = store.log_audit_event(tenant_id, pangolin_core::audit::AuditLogEntry::legacy_new(
747+
let _ = store.log_audit_event(tenant_id, pangolin_core::audit::AuditLogEntry::success(
744748
tenant_id,
749+
Some(session.user_id),
745750
session.username.clone(),
746-
"rename_table".to_string(),
751+
pangolin_core::audit::AuditAction::RenameTable,
752+
pangolin_core::audit::ResourceType::Table,
753+
None, // Cannot determine asset ID easily without lookup
747754
format!("{}/{}.{} -> {}.{}", catalog_name, source_ns.join("."), source_name, dest_ns.join("."), dest_name),
748-
None
749755
)).await;
750756

751757
StatusCode::NO_CONTENT.into_response()

pangolin/pangolin_api/src/merge_handlers.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -253,8 +253,8 @@ pub async fn complete_merge(
253253
match store.merge_branch(
254254
operation.tenant_id,
255255
&operation.catalog_name,
256-
operation.source_branch.clone(),
257256
operation.target_branch.clone(),
257+
operation.source_branch.clone(),
258258
).await {
259259
Ok(_) => {
260260
// TODO: merge_branch should return the commit ID

pangolin/pangolin_api/src/openapi.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,9 @@ use pangolin_core::business_metadata::{BusinessMetadata, AccessRequest, RequestS
180180
iceberg::tables::perform_maintenance,
181181
iceberg::tables::table_exists,
182182
iceberg::namespaces::list_namespaces_tree,
183+
184+
// Iceberg OAuth
185+
iceberg::oauth::handle_oauth_token,
183186
184187
// Signing / Credential Vending
185188
signing_handlers::get_table_credentials,
@@ -219,6 +222,9 @@ use pangolin_core::business_metadata::{BusinessMetadata, AccessRequest, RequestS
219222
service_user_handlers::CreateServiceUserRequest, service_user_handlers::UpdateServiceUserRequest,
220223
oauth_handlers::OAuthCallback, oauth_handlers::AuthorizeParams,
221224
225+
// Iceberg OAuth types
226+
iceberg::oauth::OAuthTokenRequest, iceberg::oauth::OAuthTokenResponse,
227+
222228
// Branch/Tag/Merge types
223229
pangolin_handlers::CreateBranchRequest, pangolin_handlers::ListBranchParams, pangolin_handlers::MergeBranchRequest, pangolin_handlers::BranchResponse,
224230
merge_handlers::ResolveConflictRequest, merge_handlers::MergeOperationResponse,

pangolin/pangolin_api/src/pangolin_handlers.rs

Lines changed: 51 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,7 @@ pub async fn create_branch(
137137

138138
let scope = PermissionScope::Catalog { catalog_id: catalog.id };
139139

140+
// Check permission
140141
match crate::authz::check_permission(&store, &session, &required_action, &scope).await {
141142
Ok(true) => (),
142143
Ok(false) => return (StatusCode::FORBIDDEN, "Forbidden").into_response(),
@@ -150,15 +151,13 @@ pub async fn create_branch(
150151
// 2. If assets are specified, copy them from `from_branch`.
151152

152153
let mut branch_assets = vec![];
154+
tracing::info!("create_branch called for branch: {}, from: {}", payload.name, from_branch);
155+
156+
// ...
153157

154158
if let Some(assets_to_copy) = &payload.assets {
159+
tracing::info!("Explicit asset list provided: {} assets", assets_to_copy.len());
155160
for asset_name in assets_to_copy {
156-
// We need the namespace for the asset.
157-
// The current API `CreateBranchRequest` only lists asset names, but assets are scoped by namespace.
158-
// This is a limitation. The user request said "specified tables".
159-
// Assuming for now that `assets` contains "namespace.table" strings or we need to change the request to be more structured.
160-
// Let's assume "namespace.table" format for simplicity in this MVP.
161-
162161
let parts: Vec<&str> = asset_name.split('.').collect();
163162
if parts.len() < 2 {
164163
continue; // Skip invalid format
@@ -168,18 +167,38 @@ pub async fn create_branch(
168167

169168
// Get asset from source branch
170169
if let Ok(Some(asset)) = store.get_asset(tenant_id, catalog_name, Some(from_branch.to_string()), namespace_parts.clone(), table_name.clone()).await {
171-
// Create asset in new branch
172-
if let Ok(_) = store.create_asset(tenant_id, catalog_name, Some(payload.name.clone()), namespace_parts, asset).await {
170+
// Create asset in new branch with NEW ID to avoid unique constraint violation
171+
let mut new_asset = asset.clone();
172+
new_asset.id = uuid::Uuid::new_v4();
173+
if let Ok(_) = store.create_asset(tenant_id, catalog_name, Some(payload.name.clone()), namespace_parts, new_asset).await {
173174
branch_assets.push(asset_name.clone());
174175
}
175176
}
176177
}
177178
} else {
178-
// If no assets specified, maybe we copy ALL assets?
179-
// Or create an empty branch?
180-
// The user said "a branch shouldn't apply to the whole catalog but only to specified tables".
181-
// This implies if you don't specify tables, you might get an empty branch or it's an error?
182-
// Let's assume empty branch if None.
179+
tracing::info!("No explicit assets. Auto-propagating from {}", from_branch);
180+
// ...
181+
if let Ok(namespaces) = store.list_namespaces(tenant_id, catalog_name, None).await {
182+
tracing::info!("Found {} namespaces", namespaces.len());
183+
for ns in namespaces {
184+
if let Ok(assets) = store.list_assets(tenant_id, catalog_name, Some(from_branch.to_string()), ns.name.clone()).await {
185+
tracing::info!("Found {} assets in namespace {:?} on branch {}", assets.len(), ns.name, from_branch);
186+
for asset in assets {
187+
let mut new_asset = asset.clone();
188+
new_asset.id = uuid::Uuid::new_v4();
189+
match store.create_asset(tenant_id, catalog_name, Some(payload.name.clone()), ns.name.clone(), new_asset.clone()).await {
190+
Ok(_) => {
191+
tracing::info!("Copied asset {} to branch {}", asset.name, payload.name);
192+
branch_assets.push(format!("{}.{}", ns.name.join("."), asset.name));
193+
},
194+
Err(e) => tracing::error!("Failed to copy asset {}: {}", asset.name, e),
195+
}
196+
}
197+
} else {
198+
tracing::warn!("Failed to list assets in namespace {:?}", ns.name);
199+
}
200+
}
201+
}
183202
}
184203

185204
let branch = Branch {
@@ -334,7 +353,7 @@ pub async fn merge_branch(
334353
}
335354

336355
// No conflicts - proceed with merge
337-
match store.merge_branch(tenant_id, catalog_name, payload.source_branch.clone(), payload.target_branch.clone()).await {
356+
match store.merge_branch(tenant_id, catalog_name, payload.target_branch.clone(), payload.source_branch.clone()).await {
338357
Ok(_) => {
339358
// Complete the merge operation
340359
let commit_id = uuid::Uuid::new_v4(); // In real implementation, get actual commit ID
@@ -349,11 +368,13 @@ pub async fn merge_branch(
349368
None
350369
)).await;
351370

352-
(StatusCode::OK, Json(serde_json::json!({
353-
"status": "merged",
354-
"operation_id": operation.id,
355-
"commit_id": commit_id
356-
}))).into_response()
371+
// Return full updated operation object to satisfy SDK Pydantic model
372+
let mut result_op = operation.clone();
373+
result_op.status = pangolin_core::model::MergeStatus::Completed;
374+
result_op.result_commit_id = Some(commit_id);
375+
result_op.completed_at = Some(chrono::Utc::now());
376+
377+
(StatusCode::OK, Json(result_op)).into_response()
357378
},
358379
Err(e) => {
359380
let _ = store.abort_merge_operation(operation.id).await;
@@ -720,6 +741,17 @@ pub async fn create_catalog(
720741
tracing::warn!("Failed to create default namespace for catalog {}: {}", catalog.name, e);
721742
}
722743

744+
// Create 'main' branch
745+
let main_branch = pangolin_core::model::Branch {
746+
name: "main".to_string(),
747+
head_commit_id: None,
748+
branch_type: pangolin_core::model::BranchType::Ingest,
749+
assets: vec![],
750+
};
751+
if let Err(e) = store.create_branch(tenant_id, &catalog.name, main_branch).await {
752+
tracing::warn!("Failed to create main branch for catalog {}: {}", catalog.name, e);
753+
}
754+
723755
// Audit Log
724756
let _ = store.log_audit_event(tenant_id, pangolin_core::audit::AuditLogEntry::legacy_new(
725757
tenant_id,

pangolin/pangolin_store/migrations/20251221000000_fix_asset_branching.sql

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,3 +9,4 @@ ALTER TABLE assets ALTER COLUMN branch_name SET NOT NULL;
99
-- Drop and recreate PK to include branch_name
1010
ALTER TABLE assets DROP CONSTRAINT assets_pkey;
1111
ALTER TABLE assets ADD PRIMARY KEY (tenant_id, catalog_name, branch_name, namespace_path, name);
12+

pangolin/pangolin_store/migrations/sqlite/001_initial.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ CREATE INDEX IF NOT EXISTS idx_namespaces_tenant_catalog ON namespaces(tenant_id
6565

6666
-- Assets (Added ID)
6767
CREATE TABLE IF NOT EXISTS assets (
68-
id TEXT PRIMARY KEY,
68+
id TEXT,
6969
tenant_id TEXT NOT NULL,
7070
catalog_name TEXT NOT NULL,
7171
namespace_path TEXT NOT NULL, -- JSON array

0 commit comments

Comments
 (0)