diff --git a/.github/workflows/python-app.yml b/.github/workflows/python-app.yml index 7ba5d22..225f912 100644 --- a/.github/workflows/python-app.yml +++ b/.github/workflows/python-app.yml @@ -38,4 +38,7 @@ jobs: flake8 . --count --exit-zero --max-complexity=10 --max-line-length=127 --statistics - name: Test with pytest run: | - pytest + # run tests independently to enable tests using spark session + for f in tests/test_*.py; do + pytest -q "$f" || exit 1 + done diff --git a/.gitignore b/.gitignore index f51da02..0840f28 100644 --- a/.gitignore +++ b/.gitignore @@ -51,6 +51,14 @@ coverage.xml .pytest_cache/ cover/ +# Iceberg & Delta runtime +outputjson/ +outputparquet/ +outputtext/ +iceberg-warehouse/ +spark-warehouse/ +metastore_db/ + # Translations *.mo *.pot diff --git a/src/jsoniq/jars/rumbledb-2.0.0.jar b/src/jsoniq/jars/rumbledb-2.0.0.jar new file mode 100644 index 0000000..afcebd8 Binary files /dev/null and b/src/jsoniq/jars/rumbledb-2.0.0.jar differ diff --git a/src/jsoniq/jars/rumbledb-2.0.8.jar b/src/jsoniq/jars/rumbledb-2.0.8.jar index fc11788..8314d51 100644 Binary files a/src/jsoniq/jars/rumbledb-2.0.8.jar and b/src/jsoniq/jars/rumbledb-2.0.8.jar differ diff --git a/src/jsoniq/session.py b/src/jsoniq/session.py index fb2aa7c..ec5e9f0 100644 --- a/src/jsoniq/session.py +++ b/src/jsoniq/session.py @@ -74,6 +74,10 @@ def __init__(self): sys.stderr.write("[Error] Could not determine Java version. Please ensure Java is installed and JAVA_HOME is properly set.\n") sys.exit(43) self._sparkbuilder = SparkSession.builder.config("spark.jars", jar_path_str) + self._appendable_keys = { + "spark.jars.packages", + "spark.sql.extensions", + } def getOrCreate(self): if RumbleSession._rumbleSession is None: @@ -122,16 +126,60 @@ def config(self, key=None, value=None, conf=None, *, map=None): self._sparkbuilder = self._sparkbuilder.config(key=key, value=value, conf=conf, map=map) return self; + def _append_config(self, key, value): + if key not in self._appendable_keys: + raise ValueError(f"{key} is not an appendable Spark config key.") + current = self._sparkbuilder._options.get(key) + if current: + value = current + "," + value + self._sparkbuilder = self._sparkbuilder.config(key=key, value=value) + return self; + def withDelta(self): + self._append_config("spark.jars.packages", "io.delta:delta-spark_2.13:4.0.0") + self._append_config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") self._sparkbuilder = self._sparkbuilder \ - .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \ - .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \ - .config("spark.jars.packages", "io.delta:delta-spark_2.13:4.0.0") + .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") return self; + + def withIceberg(self, catalog_names=None): + """ + Configure Iceberg catalog(s). - def withMongo(self): + - If no catalogs are provided (None or empty), the session catalog (spark_catalog) + is configured for Iceberg. + - If catalogs are provided, table names must be fully qualified with the catalog + (..). No implicit default is applied. + - Each configured catalog uses its own warehouse directory under + ./iceberg-warehouse/. + - These are the default settings for the Iceberg catalog, which can be overridden if needed. + """ + self._append_config("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-4.0_2.13:1.10.0") + self._append_config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") + if catalog_names is None: + catalog_names = [] + if not isinstance(catalog_names, (list, tuple, set)): + raise ValueError("catalog_names must be a list, tuple, or set of strings.") + catalog_names = list(catalog_names) + if len(catalog_names) == 0: + catalog_names = ["spark_catalog"] + catalog_class = "org.apache.iceberg.spark.SparkSessionCatalog" + else: + catalog_class = "org.apache.iceberg.spark.SparkCatalog" + for catalog_name in catalog_names: + if not isinstance(catalog_name, str) or not catalog_name: + raise ValueError("catalog_names must contain non-empty strings.") + warehouse = f"./iceberg-warehouse/{catalog_name}" + self._sparkbuilder = self._sparkbuilder \ + .config(f"spark.sql.catalog.{catalog_name}", catalog_class) \ + .config(f"spark.sql.catalog.{catalog_name}.type", "hadoop") \ + .config(f"spark.sql.catalog.{catalog_name}.warehouse", warehouse) self._sparkbuilder = self._sparkbuilder \ - .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.13:10.5.0") + .config("spark.sql.iceberg.check-ordering", "false") + return self; + + def withMongo(self): + self._append_config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.13:10.5.0") return self; def __getattr__(self, name): @@ -248,4 +296,4 @@ def jsoniq(self, str, **kwargs): return seq; def __getattr__(self, item): - return getattr(self._sparksession, item) \ No newline at end of file + return getattr(self._sparksession, item) diff --git a/tests/test_catalogs_update.py b/tests/test_catalogs_update.py new file mode 100644 index 0000000..703fa9b --- /dev/null +++ b/tests/test_catalogs_update.py @@ -0,0 +1,168 @@ +from jsoniq import RumbleSession +from unittest import TestCase +import uuid + + +class TestCatalogsUpdate(TestCase): + """ + Default Spark session catalog for Delta + custom Iceberg catalogs. + - Delta uses spark_catalog. + - Iceberg uses named catalogs (e.g., iceberg, ice_b, ice_one). + """ + @classmethod + def setUpClass(cls): + RumbleSession._rumbleSession = None + RumbleSession._builder = RumbleSession.Builder() + cls.rumble = ( + RumbleSession.builder + .withDelta() + .withIceberg(["iceberg", "ice_b", "ice_one"]) + .getOrCreate() + ) + + def _create_insert_count(self, rumble, create_query, insert_query, count_query): + rumble.jsoniq(create_query).applyPUL() + rumble.jsoniq(insert_query).applyPUL() + count_value = rumble.jsoniq(count_query).json() + self.assertEqual(count_value, (2,)) + + def _assert_query_fails(self, rumble, query): + with self.assertRaises(Exception): + rumble.jsoniq(query).json() + + @staticmethod + def _cleanup_warehouses(): + import os + import shutil + + for dirname in ("spark-warehouse", "iceberg-warehouse"): + path = os.path.join(os.getcwd(), dirname) + shutil.rmtree(path, ignore_errors=True) + + @classmethod + def tearDownClass(cls): + try: + if cls.rumble is not None: + cls.rumble._sparksession.stop() + finally: + RumbleSession._rumbleSession = None + cls._cleanup_warehouses() + + def test_default_catalogs(self): + """ + Delta uses spark_catalog and Iceberg uses the named catalog "iceberg". + Also verifies that cross-catalog reads are rejected. + """ + suffix = uuid.uuid4().hex + delta_table = f"default.delta_default_{suffix}" + iceberg_table = f"iceberg.default.iceberg_default_{suffix}" + + self._create_insert_count( + self.rumble, + f'create collection table("{delta_table}") with {{"k": 1}}', + f'insert {{"k": 2}} last into collection table("{delta_table}")', + f'count(table("{delta_table}"))' + ) + self._create_insert_count( + self.rumble, + f'create collection iceberg-table("{iceberg_table}") with {{"k": 1}}', + f'insert {{"k": 2}} last into collection iceberg-table("{iceberg_table}")', + f'count(iceberg-table("{iceberg_table}"))' + ) + self._assert_query_fails( + self.rumble, + f'iceberg-table("ice_b.{iceberg_table.split(".", 1)[1]}")' + ) + + def test_single_custom_catalogs(self): + """ + Iceberg on a single custom catalog (ice_one). + Ensures unqualified access does not resolve to this catalog. + """ + suffix = uuid.uuid4().hex + iceberg_table = f"ice_one.default.ice_single_{suffix}" + + self._create_insert_count( + self.rumble, + f'create collection iceberg-table("{iceberg_table}") with {{"k": 1}}', + f'insert {{"k": 2}} last into collection iceberg-table("{iceberg_table}")', + f'count(iceberg-table("{iceberg_table}"))' + ) + self._assert_query_fails( + self.rumble, + f'iceberg-table("{iceberg_table.split(".", 1)[1]}")' + ) + + def test_multiple_catalogs(self): + """ + Iceberg on multiple catalogs (iceberg + ice_b). + Verifies isolation by asserting cross-catalog reads fail. + """ + suffix = uuid.uuid4().hex + iceberg_default_table = f"iceberg.default.iceberg_multi_default_{suffix}" + iceberg_custom_table = f"ice_b.default.iceberg_multi_{suffix}" + + self._create_insert_count( + self.rumble, + f'create collection iceberg-table("{iceberg_default_table}") with {{"k": 1}}', + f'insert {{"k": 2}} last into collection iceberg-table("{iceberg_default_table}")', + f'count(iceberg-table("{iceberg_default_table}"))' + ) + self._create_insert_count( + self.rumble, + f'create collection iceberg-table("{iceberg_custom_table}") with {{"k": 1}}', + f'insert {{"k": 2}} last into collection iceberg-table("{iceberg_custom_table}")', + f'count(iceberg-table("{iceberg_custom_table}"))' + ) + self._assert_query_fails( + self.rumble, + f'iceberg-table("ice_b.{iceberg_default_table.split(".", 1)[1]}")' + ) + self._assert_query_fails( + self.rumble, + f'iceberg-table("{iceberg_custom_table.split(".", 1)[1]}")' + ) + + def test_resolution_order(self): + """ + Matches Iceberg's catalog/namespace resolution order for spark.table(). + Ensures unqualified access fails when spark_catalog is not Iceberg. + """ + suffix = uuid.uuid4().hex + table_name = f"iceberg.default.iceberg_res_{suffix}" + short_name = f"iceberg_res_{suffix}" + multi_ns_table = f"iceberg.ns1.ns2.iceberg_res_{suffix}_ns" + + self._create_insert_count( + self.rumble, + f'create collection iceberg-table("{table_name}") with {{"k": 1}}', + f'insert {{"k": 2}} last into collection iceberg-table("{table_name}")', + f'count(iceberg-table("{table_name}"))' + ) + + # catalog.table -> catalog.currentNamespace.table + self._create_insert_count( + self.rumble, + f'create collection iceberg-table("iceberg.{short_name}_2") with {{"k": 1}}', + f'insert {{"k": 2}} last into collection iceberg-table("iceberg.{short_name}_2")', + f'count(iceberg-table("iceberg.{short_name}_2"))' + ) + + # catalog.namespace1.namespace2.table -> catalog.namespace1.namespace2.table + self._create_insert_count( + self.rumble, + f'create collection iceberg-table("{multi_ns_table}") with {{"k": 1}}', + f'insert {{"k": 2}} last into collection iceberg-table("{multi_ns_table}")', + f'count(iceberg-table("{multi_ns_table}"))' + ) + + # namespace.table (current catalog) should fail because spark_catalog is not Iceberg here. + self._assert_query_fails( + self.rumble, + f'iceberg-table("default.{short_name}")' + ) + # table (current catalog + namespace) should also fail for the same reason. + self._assert_query_fails( + self.rumble, + f'iceberg-table("{short_name}")' + ) diff --git a/tests/test_iceberg_default_catalog.py b/tests/test_iceberg_default_catalog.py new file mode 100644 index 0000000..372bbaf --- /dev/null +++ b/tests/test_iceberg_default_catalog.py @@ -0,0 +1,52 @@ +from jsoniq import RumbleSession +from unittest import TestCase +import uuid + + +class TestIcebergDefaultCatalog(TestCase): + """ + Iceberg uses the session catalog (spark_catalog). + - Delta custom catalogs are not tested here (to be added later). + """ + + @classmethod + def setUpClass(cls): + RumbleSession._rumbleSession = None + RumbleSession._builder = RumbleSession.Builder() + cls.rumble = RumbleSession.builder.withIceberg().getOrCreate() + + @classmethod + def tearDownClass(cls): + try: + cls.rumble._sparksession.stop() + finally: + RumbleSession._rumbleSession = None + cls._cleanup_warehouses() + + @staticmethod + def _cleanup_warehouses(): + import os + import shutil + + for dirname in ("spark-warehouse", "iceberg-warehouse"): + path = os.path.join(os.getcwd(), dirname) + shutil.rmtree(path, ignore_errors=True) + + def test_default_catalog(self): + """ + Iceberg using spark_catalog with a default namespace. + This test runs in its own session to avoid Delta/spark_catalog conflicts. + """ + suffix = uuid.uuid4().hex + iceberg_table = f"default.iceberg_default_session_{suffix}" + + self.rumble.jsoniq( + f'create collection iceberg-table("{iceberg_table}") with {{"k": 1}}' + ).applyPUL() + self.rumble.jsoniq( + f'insert {{"k": 2}} last into collection iceberg-table("{iceberg_table}")' + ).applyPUL() + count_value = self.rumble.jsoniq( + f'count(iceberg-table("{iceberg_table}"))' + ).json() + self.assertEqual(count_value, (2,))