99import pyarrow as pa
1010import pytest
1111from pyiceberg .catalog .rest import RestCatalog
12+ from pyiceberg .partitioning import PartitionField , PartitionSpec
1213from pyiceberg .schema import Schema
14+ from pyiceberg .table .sorting import SortField , SortOrder
15+ from pyiceberg .transforms import DayTransform , IdentityTransform
1316from pyiceberg .types import (
1417 DoubleType ,
1518 NestedField ,
1619 StringType ,
20+ TimestampType ,
1721)
1822
1923from helpers .cluster import ClickHouseCluster
2529CATALOG_NAME = "demo"
2630WAREHOUSE_NAME = "warehouse"
2731
28- DEFAULT_CREATE_TABLE = "CREATE TABLE {}.`{}.{}`\n (\n `id` Nullable(Float64),\n `data` Nullable(String)\n )\n ENGINE = Iceberg('http://minio:9000/warehouse-rest/data/', 'minio', '[HIDDEN]')\n "
32+ DEFAULT_SCHEMA = Schema (
33+ NestedField (
34+ field_id = 1 , name = "datetime" , field_type = TimestampType (), required = False
35+ ),
36+ NestedField (field_id = 2 , name = "symbol" , field_type = StringType (), required = False ),
37+ NestedField (field_id = 3 , name = "bid" , field_type = DoubleType (), required = False ),
38+ NestedField (field_id = 4 , name = "ask" , field_type = DoubleType (), required = False ),
39+ NestedField (
40+ field_id = 5 ,
41+ name = "details" ,
42+ field_type = StringType (), # Simplified from struct for compatibility
43+ required = False ,
44+ ),
45+ )
46+
47+ DEFAULT_PARTITION_SPEC = PartitionSpec (
48+ PartitionField (
49+ source_id = 1 , field_id = 1000 , transform = DayTransform (), name = "datetime_day"
50+ )
51+ )
52+
53+ DEFAULT_SORT_ORDER = SortOrder (SortField (source_id = 2 , transform = IdentityTransform ()))
54+
55+
56+ def create_table (
57+ catalog ,
58+ namespace ,
59+ table ,
60+ schema = DEFAULT_SCHEMA ,
61+ partition_spec = DEFAULT_PARTITION_SPEC ,
62+ sort_order = DEFAULT_SORT_ORDER ,
63+ ):
64+ return catalog .create_table (
65+ identifier = f"{ namespace } .{ table } " ,
66+ schema = schema ,
67+ partition_spec = partition_spec ,
68+ sort_order = sort_order ,
69+ properties = {"write.metadata.compression-codec" : "none" },
70+ )
71+
72+
73+ def create_clickhouse_iceberg_database (
74+ started_cluster , node , name , additional_settings = {}
75+ ):
76+ settings = {
77+ "catalog_type" : "rest" ,
78+ "warehouse" : "warehouse" ,
79+ "storage_endpoint" : "http://minio:9000/warehouse-rest" ,
80+ }
81+
82+ settings .update (additional_settings )
2983
84+ node .query (
85+ f"""
86+ DROP DATABASE IF EXISTS { name } ;
87+ SET allow_experimental_database_iceberg=true;
88+ CREATE DATABASE { name } ENGINE = DataLakeCatalog('{ BASE_URL } ', 'minio', '{ minio_secret_key } ')
89+ SETTINGS { "," .join ((k + "=" + repr (v ) for k , v in settings .items ()))}
90+ """
91+ )
92+ show_result = node .query (f"SHOW DATABASE { name } " )
93+ assert minio_secret_key not in show_result
94+ assert "HIDDEN" in show_result
3095
3196
3297def load_catalog_impl (started_cluster ):
3398 minio_ip = started_cluster .get_instance_ip ('minio' )
3499 s3_endpoint = f"http://{ minio_ip } :9000"
35-
36- # Add minio hostname to /etc/hosts to resolve DNS issues
100+
101+ # Add minio hostname mapping so PyIceberg can resolve 'minio' hostnames in table metadata
37102 import subprocess
38103 try :
39- # Add hostname mapping so PyIceberg can resolve 'minio' hostnames in table metadata
40104 subprocess .run (['bash' , '-c' , f'echo "{ minio_ip } minio" >> /etc/hosts' ], check = True )
41- endpoint_for_catalog = "http:// minio:9000"
42- except Exception :
43- endpoint_for_catalog = s3_endpoint
105+ print ( f"Added minio hostname mapping: { minio_ip } minio" )
106+ except Exception as e :
107+ print ( f"Failed to add hostname mapping: { e } " )
44108
45109 return RestCatalog (
46110 name = "my_catalog" ,
47111 warehouse = WAREHOUSE_NAME ,
48112 uri = BASE_URL_LOCAL ,
49113 token = "dummy" ,
50114 ** {
51- "s3.endpoint" : endpoint_for_catalog ,
115+ "s3.endpoint" : s3_endpoint ,
52116 "s3.access-key-id" : minio_access_key ,
53117 "s3.secret-access-key" : minio_secret_key ,
54118 "s3.region" : "us-east-1" ,
@@ -60,13 +124,6 @@ def load_catalog_impl(started_cluster):
60124@pytest .fixture (scope = "module" )
61125def started_cluster ():
62126 try :
63- # Install s3fs if not available
64- try :
65- import s3fs
66- except ImportError :
67- import subprocess
68- subprocess .check_call (["pip" , "install" , "s3fs==2024.12.0" ])
69-
70127 cluster = ClickHouseCluster (__file__ )
71128 cluster .add_instance (
72129 "node1" ,
@@ -107,7 +164,7 @@ def test_list_tables(started_cluster):
107164
108165 create_clickhouse_iceberg_database (started_cluster , node , CATALOG_NAME )
109166
110- tables_list = ""
167+ tables_list = []
111168 schema = Schema (
112169 NestedField (field_id = 1 , name = "id" , field_type = DoubleType (), required = False ),
113170 NestedField (field_id = 2 , name = "data" , field_type = StringType (), required = False ),
@@ -119,26 +176,22 @@ def test_list_tables(started_cluster):
119176 schema = schema ,
120177 properties = {"write.metadata.compression-codec" : "none" },
121178 )
122- if len (tables_list ) > 0 :
123- tables_list += "\n "
124- tables_list += f"{ namespace_1 } .{ table } "
179+ tables_list .append (f"{ namespace_1 } .{ table } " )
125180
126181 for table in namespace_2_tables :
127182 catalog .create_table (
128183 (namespace_2 , table ),
129184 schema = schema ,
130185 properties = {"write.metadata.compression-codec" : "none" },
131186 )
132- if len (tables_list ) > 0 :
133- tables_list += "\n "
134- tables_list += f"{ namespace_2 } .{ table } "
187+ tables_list .append (f"{ namespace_2 } .{ table } " )
135188
136189 # Verify tables were created via PyIceberg
137190 assert len (catalog .list_tables ((namespace_1 ,))) == 2
138191 assert len (catalog .list_tables ((namespace_2 ,))) == 2
139192
140193 assert (
141- tables_list
194+ " \n " . join ( sorted ( tables_list ))
142195 == node .query (
143196 f"SELECT name FROM system.tables WHERE database = '{ CATALOG_NAME } ' and name ILIKE '{ namespace_prefix } %' ORDER BY name"
144197 ).strip ()
@@ -154,13 +207,11 @@ def test_select(started_cluster):
154207
155208 test_ref = f"test_select_{ uuid .uuid4 ().hex [:8 ]} "
156209 test_namespace = (f"{ test_ref } _namespace" ,)
157- existing_namespaces = catalog .list_namespaces ()
158-
159- if test_namespace not in existing_namespaces :
160- catalog .create_namespace (test_namespace )
210+
211+ catalog .create_namespace (test_namespace )
161212
162213 test_table_name = f"{ test_ref } _table"
163- test_table_identifier = test_namespace + ( test_table_name , )
214+ test_table_identifier = ( test_namespace [ 0 ], test_table_name )
164215
165216 try :
166217 existing_tables = catalog .list_tables (namespace = test_namespace )
@@ -217,33 +268,6 @@ def test_select(started_cluster):
217268 assert csv_compare (result , expected ), f"got\n { result } \n want\n { expected } "
218269
219270
220-
221-
222-
223- def create_clickhouse_iceberg_database (
224- started_cluster , node , name , additional_settings = {}
225- ):
226- settings = {
227- "catalog_type" : "rest" ,
228- "warehouse" : "warehouse" ,
229- "storage_endpoint" : "http://minio:9000/warehouse-rest" ,
230- }
231-
232- settings .update (additional_settings )
233-
234- node .query (
235- f"""
236- DROP DATABASE IF EXISTS { name } ;
237- SET allow_experimental_database_iceberg=true;
238- CREATE DATABASE { name } ENGINE = DataLakeCatalog('{ BASE_URL } ', 'minio', '{ minio_secret_key } ')
239- SETTINGS { "," .join ((k + "=" + repr (v ) for k , v in settings .items ()))}
240- """
241- )
242- show_result = node .query (f"SHOW DATABASE { name } " )
243- assert minio_secret_key not in show_result
244- assert "HIDDEN" in show_result
245-
246-
247271def test_hide_sensitive_info (started_cluster ):
248272 node = started_cluster .instances ["node1" ]
249273
@@ -254,16 +278,14 @@ def test_hide_sensitive_info(started_cluster):
254278 namespace = (root_namespace ,)
255279 catalog = load_catalog_impl (started_cluster )
256280
257- existing_namespaces = catalog .list_namespaces ()
258- if namespace not in existing_namespaces :
259- catalog .create_namespace (namespace )
281+ catalog .create_namespace (namespace )
260282
261283 schema = Schema (
262284 NestedField (field_id = 1 , name = "id" , field_type = DoubleType (), required = False ),
263285 NestedField (field_id = 2 , name = "data" , field_type = StringType (), required = False ),
264286 )
265287 catalog .create_table (
266- namespace + ( table_name , ),
288+ ( namespace [ 0 ], table_name ),
267289 schema = schema ,
268290 properties = {"write.metadata.compression-codec" : "none" },
269291 )
@@ -289,51 +311,37 @@ def test_hide_sensitive_info(started_cluster):
289311 assert minio_secret_key not in show_result
290312
291313def test_tables_with_same_location (started_cluster ):
292-
293314 node = started_cluster .instances ["node1" ]
294315
295- test_ref = f"test_tables_with_same_location_{ uuid .uuid4 (). hex [: 8 ] } "
296- namespace = ( f"{ test_ref } _namespace" ,)
316+ test_ref = f"test_tables_with_same_location_{ uuid .uuid4 ()} "
317+ namespace = f"{ test_ref } _namespace"
297318 catalog = load_catalog_impl (started_cluster )
298319
299320 table_name = f"{ test_ref } _table"
300321 table_name_2 = f"{ test_ref } _table_2"
301322
302- existing_namespaces = catalog .list_namespaces ()
303- if namespace not in existing_namespaces :
304- catalog .create_namespace (namespace )
305-
306- schema = Schema (
307- NestedField (field_id = 1 , name = "id" , field_type = DoubleType (), required = False ),
308- NestedField (field_id = 2 , name = "symbol" , field_type = StringType (), required = False ),
309- )
310- table = catalog .create_table (
311- namespace + (table_name ,),
312- schema = schema ,
313- properties = {"write.metadata.compression-codec" : "none" },
314- )
315- table_2 = catalog .create_table (
316- namespace + (table_name_2 ,),
317- schema = schema ,
318- properties = {"write.metadata.compression-codec" : "none" },
319- )
320-
321- df1 = pd .DataFrame ({"id" : [1.0 , 2.0 , 3.0 ], "symbol" : ["aaa" , "aaa" , "aaa" ]})
322- df2 = pd .DataFrame ({"id" : [1.0 , 2.0 , 3.0 ], "symbol" : ["bbb" , "bbb" , "bbb" ]})
323+ catalog .create_namespace ((namespace ,))
324+ table = create_table (catalog , namespace , table_name )
325+ table_2 = create_table (catalog , namespace , table_name_2 )
326+
327+ def record (key ):
328+ return {
329+ "datetime" : datetime .now (),
330+ "symbol" : str (key ),
331+ "bid" : round (random .uniform (100 , 200 ), 2 ),
332+ "ask" : round (random .uniform (200 , 300 ), 2 ),
333+ "details" : "created_by Alice Smith" , # Simplified from nested dict
334+ }
323335
324- table .append (pa .Table .from_pandas (df1 ))
325- table_2 .append (pa .Table .from_pandas (df2 ))
336+ data = [record ('aaa' ) for _ in range (3 )]
337+ df = pa .Table .from_pylist (data )
338+ table .append (df )
326339
327- scan_result_1 = table .scan ().to_pandas ()
328- scan_result_2 = table_2 .scan ().to_pandas ()
329- assert len (scan_result_1 ) == 3
330- assert len (scan_result_2 ) == 3
340+ data = [record ('bbb' ) for _ in range (3 )]
341+ df = pa .Table .from_pylist (data )
342+ table_2 .append (df )
331343
332344 create_clickhouse_iceberg_database (started_cluster , node , CATALOG_NAME )
333345
334- assert 'aaa\n aaa\n aaa' == node .query (
335- f"SELECT symbol FROM { CATALOG_NAME } .`{ namespace [0 ]} .{ table_name } `"
336- ).strip ()
337- assert 'bbb\n bbb\n bbb' == node .query (
338- f"SELECT symbol FROM { CATALOG_NAME } .`{ namespace [0 ]} .{ table_name_2 } `"
339- ).strip ()
346+ assert 'aaa\n aaa\n aaa' == node .query (f"SELECT symbol FROM { CATALOG_NAME } .`{ namespace } .{ table_name } `" ).strip ()
347+ assert 'bbb\n bbb\n bbb' == node .query (f"SELECT symbol FROM { CATALOG_NAME } .`{ namespace } .{ table_name_2 } `" ).strip ()
0 commit comments