-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathverify_catalog.py
More file actions
139 lines (120 loc) · 4.24 KB
/
verify_catalog.py
File metadata and controls
139 lines (120 loc) · 4.24 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
import time
import requests
import subprocess
import sys
import os
from pyiceberg.catalog import load_catalog
from pyiceberg.schema import Schema
from pyiceberg.types import NestedField, StringType, IntegerType
def run_server():
# Start the server in the background
env = os.environ.copy()
env["CATALOG_WAREHOUSE"] = "/tmp/warehouse"
proc = subprocess.Popen(
[sys.executable, "-m", "uvicorn", "main:app", "--port", "8001"],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=env
)
time.sleep(5) # Wait for server to start
return proc
def verify():
print("Starting verification...")
# 1. Connect to Catalog 1 (default)
print("\n--- Testing Catalog 1 (default) ---")
cat1 = load_catalog(
"cat1",
**{
"uri": "http://127.0.0.1:8001",
"warehouse": "/tmp/warehouse",
"prefix": "cat1",
}
)
ns1 = "ns1"
try:
cat1.create_namespace(ns1)
print(f"✅ Created namespace '{ns1}' in cat1")
except Exception as e:
print(f"⚠️ Namespace creation failed in cat1: {e}")
try:
cat1.create_table(f"{ns1}.table1", schema=Schema(NestedField(1, "x", IntegerType(), required=True)))
print(f"✅ Created table '{ns1}.table1' in cat1")
except Exception as e:
print(f"⚠️ Table creation failed in cat1: {e}")
# 2. Connect to Catalog 2 (cat2)
print("\n--- Testing Catalog 2 (cat2) ---")
cat2 = load_catalog(
"cat2",
**{
"uri": "http://127.0.0.1:8001",
"warehouse": "/tmp/warehouse",
"prefix": "cat2",
}
)
# Verify isolation
try:
namespaces = cat2.list_namespaces()
print(f"ℹ️ Namespaces in cat2: {namespaces}")
assert ns1 not in [n[0] for n in namespaces]
print(f"✅ Verified isolation: '{ns1}' not in cat2")
except Exception as e:
print(f"❌ Isolation check failed: {e}")
# Create distinct content in cat2
ns2 = "ns2"
try:
cat2.create_namespace(ns2)
print(f"✅ Created namespace '{ns2}' in cat2")
except Exception as e:
print(f"⚠️ Namespace creation failed in cat2: {e}")
# 4. Catalog Properties (IO Config & Warehouse)
print("\n--- Testing Catalog Properties ---")
props_url = "http://127.0.0.1:8001/v1/cat1/config/properties"
io_config = {
"s3.endpoint": "http://minio:9000",
"s3.access-key-id": "minioadmin",
"s3.secret-access-key": "minioadmin",
"warehouse": "s3://special-warehouse"
}
# Set properties
resp = requests.post(props_url, json=io_config)
if resp.status_code != 200:
print(f"❌ Failed to set catalog properties: {resp.text}")
else:
print(f"✅ Set catalog properties: {resp.json()}")
# Get properties
resp = requests.get(props_url)
if resp.status_code != 200:
print(f"❌ Failed to get catalog properties: {resp.text}")
else:
fetched_props = resp.json()
assert fetched_props["s3.endpoint"] == "http://minio:9000"
assert fetched_props["warehouse"] == "s3://special-warehouse"
print(f"✅ Verified catalog properties persisted")
# 5. Custom Metadata Isolation
print("\n--- Testing Custom Metadata Isolation ---")
key = "meta_key"
val1 = {"catalog": "cat1"}
val2 = {"catalog": "cat2"}
url1 = f"http://127.0.0.1:8001/v1/cat1/ext/metadata/{key}"
url2 = f"http://127.0.0.1:8001/v1/cat2/ext/metadata/{key}"
print(f"POST {url1}")
r1 = requests.post(url1, json=val1)
print(f"Status: {r1.status_code}, Body: {r1.text}")
print(f"POST {url2}")
r2 = requests.post(url2, json=val2)
print(f"Status: {r2.status_code}, Body: {r2.text}")
resp1 = requests.get(url1).json()
resp2 = requests.get(url2).json()
print(f"ℹ️ Cat1 Metadata: {resp1}")
print(f"ℹ️ Cat2 Metadata: {resp2}")
assert resp1["value"] == val1
assert resp2["value"] == val2
print("✅ Verified custom metadata isolation")
if __name__ == "__main__":
print("Launching server...")
server_proc = run_server()
try:
verify()
finally:
server_proc.terminate()
print("Server stopped.")