From 1da80bd7c95375312270b5d94373b90f15e24831 Mon Sep 17 00:00:00 2001 From: Tim Utegenov Date: Thu, 29 Jan 2026 17:47:28 -0800 Subject: [PATCH 1/3] Add support for dp-spark-pip magic to allow synchronous installation of packages in spark --- google/cloud/dataproc_spark_connect/magics.py | 79 ++++++++++++ tests/unit/test_magics.py | 122 ++++++++++++++++++ 2 files changed, 201 insertions(+) create mode 100644 google/cloud/dataproc_spark_connect/magics.py create mode 100644 tests/unit/test_magics.py diff --git a/google/cloud/dataproc_spark_connect/magics.py b/google/cloud/dataproc_spark_connect/magics.py new file mode 100644 index 0000000..31886ec --- /dev/null +++ b/google/cloud/dataproc_spark_connect/magics.py @@ -0,0 +1,79 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Dataproc magic implementations.""" + +import shlex +from IPython.core.magic import (Magics, magics_class, line_magic) +from pyspark.sql import SparkSession +from google.cloud.dataproc_spark_connect import DataprocSparkSession + + +@magics_class +class DataprocMagics(Magics): + + def __init__( + self, + shell, + **kwargs, + ): + super().__init__(shell, **kwargs) + + def _parse_command(self, args): + if not args or args[0] != "install": + print("Usage: %dp_spark_pip install ...") + return + + # filter out 'install' and the flags (not currently supported) + packages = [pkg for pkg in args[1:] if not pkg.startswith("-")] + return packages + + @line_magic + def dp_spark_pip(self, line): + """ + Custom magic to install pip packages as Spark Connect artifacts. + Usage: %dp_spark_pip install pandas numpy + """ + try: + packages = self._parse_command(shlex.split(line)) + + if not packages: + print("No packages specified.") + return + + sessions = [ + obj + for obj in self.shell.user_ns.values() + if isinstance(obj, DataprocSparkSession) + ] + + if not sessions: + print( + "No active Spark Sessions found. Please create one first." + ) + return + + print("Installing packages: %s", packages) + for session in sessions: + for package in packages: + session.addArtifacts(package, pypi=True) + + print("Packages successfully added as artifacts.") + except Exception as e: + print(f"Failed to add artifacts: {e}") + + +# To register the magic +def load_ipython_extension(ipython): + ipython.register_magics(DataprocMagics) diff --git a/tests/unit/test_magics.py b/tests/unit/test_magics.py new file mode 100644 index 0000000..79aa190 --- /dev/null +++ b/tests/unit/test_magics.py @@ -0,0 +1,122 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import io +import unittest +from contextlib import redirect_stdout +from unittest import mock + +from google.cloud.dataproc_spark_connect import DataprocSparkSession +from google.cloud.dataproc_spark_connect.magics import DataprocMagics +from IPython.core.interactiveshell import InteractiveShell +from traitlets.config import Config + + +class DataprocMagicsTest(unittest.TestCase): + + def setUp(self): + self.shell = mock.create_autospec(InteractiveShell, instance=True) + self.shell.user_ns = {} + self.shell.config = Config() + self.magics = DataprocMagics(shell=self.shell) + + def test_parse_command_valid(self): + packages = self.magics._parse_command(["install", "pandas", "numpy"]) + self.assertEqual(packages, ["pandas", "numpy"]) + + def test_parse_command_with_flags(self): + packages = self.magics._parse_command( + ["install", "-U", "pandas", "--upgrade", "numpy"] + ) + self.assertEqual(packages, ["pandas", "numpy"]) + + def test_parse_command_no_install(self): + packages = self.magics._parse_command(["other", "pandas"]) + self.assertIsNone(packages) + + def test_dp_spark_pip_invalid_command(self): + f = io.StringIO() + with redirect_stdout(f): + self.magics.dp_spark_pip("foo bar") + output = f.getvalue() + self.assertIn("Usage: %dp_spark_pip install", output) + self.assertIn("No packages specified", output) + + def test_dp_spark_pip_no_session(self): + f = io.StringIO() + with redirect_stdout(f): + self.magics.dp_spark_pip("install pandas") + self.assertIn("No active Spark Sessions found", f.getvalue()) + + def test_dp_spark_pip_no_packages_specified(self): + f = io.StringIO() + with redirect_stdout(f): + self.magics.dp_spark_pip("install") + self.assertIn("No packages specified", f.getvalue()) + + def test_dp_spark_pip_install_packages_single_session(self): + mock_session = mock.Mock(spec=DataprocSparkSession) + self.shell.user_ns["spark"] = mock_session + + f = io.StringIO() + with redirect_stdout(f): + self.magics.dp_spark_pip("install pandas numpy") + + mock_session.addArtifacts.assert_has_calls([ + mock.call("pandas", pypi=True), + mock.call("numpy", pypi=True), + ]) + self.assertEqual(mock_session.addArtifacts.call_count, 2) + self.assertIn("Packages successfully added as artifacts.", f.getvalue()) + + def test_dp_spark_pip_install_packages_multiple_sessions(self): + mock_session1 = mock.Mock(spec=DataprocSparkSession) + mock_session2 = mock.Mock(spec=DataprocSparkSession) + self.shell.user_ns["spark1"] = mock_session1 + self.shell.user_ns["spark2"] = mock_session2 + self.shell.user_ns["not_a_session"] = 5 + + f = io.StringIO() + with redirect_stdout(f): + self.magics.dp_spark_pip("install pandas") + + mock_session1.addArtifacts.assert_called_once_with("pandas", pypi=True) + mock_session2.addArtifacts.assert_called_once_with("pandas", pypi=True) + self.assertIn("Packages successfully added as artifacts.", f.getvalue()) + + def test_dp_spark_pip_add_artifacts_fails(self): + mock_session = mock.Mock(spec=DataprocSparkSession) + mock_session.addArtifacts.side_effect = Exception("Failed") + self.shell.user_ns["spark"] = mock_session + + f = io.StringIO() + with redirect_stdout(f): + self.magics.dp_spark_pip("install pandas") + + mock_session.addArtifacts.assert_called_once_with("pandas", pypi=True) + self.assertIn("Failed to add artifacts: Failed", f.getvalue()) + + def test_dp_spark_pip_with_flags(self): + mock_session = mock.Mock(spec=DataprocSparkSession) + self.shell.user_ns["spark"] = mock_session + + f = io.StringIO() + with redirect_stdout(f): + self.magics.dp_spark_pip("install -U pandas") + + mock_session.addArtifacts.assert_called_once_with("pandas", pypi=True) + self.assertIn("Packages successfully added as artifacts.", f.getvalue()) + +if __name__ == "__main__": + unittest.main() From b41533fadba8e45d038b1fe98c49951c70f9da48 Mon Sep 17 00:00:00 2001 From: Tim Utegenov Date: Wed, 18 Feb 2026 12:41:15 -0800 Subject: [PATCH 2/3] Add support for %dpip magic to install packages on Spark session --- google/cloud/dataproc_magics/__init__.py | 19 ++ .../magics.py | 12 +- tests/integration/dataproc_magics/__init__.py | 0 .../dataproc_magics/test_magics.py | 198 ++++++++++++++++++ tests/unit/dataproc_magics/__init__.py | 0 .../unit/{ => dataproc_magics}/test_magics.py | 43 ++-- 6 files changed, 243 insertions(+), 29 deletions(-) create mode 100644 google/cloud/dataproc_magics/__init__.py rename google/cloud/{dataproc_spark_connect => dataproc_magics}/magics.py (87%) create mode 100644 tests/integration/dataproc_magics/__init__.py create mode 100644 tests/integration/dataproc_magics/test_magics.py create mode 100644 tests/unit/dataproc_magics/__init__.py rename tests/unit/{ => dataproc_magics}/test_magics.py (78%) diff --git a/google/cloud/dataproc_magics/__init__.py b/google/cloud/dataproc_magics/__init__.py new file mode 100644 index 0000000..a348eb8 --- /dev/null +++ b/google/cloud/dataproc_magics/__init__.py @@ -0,0 +1,19 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from .magics import DataprocMagics + + +def load_ipython_extension(ipython): + ipython.register_magics(DataprocMagics) diff --git a/google/cloud/dataproc_spark_connect/magics.py b/google/cloud/dataproc_magics/magics.py similarity index 87% rename from google/cloud/dataproc_spark_connect/magics.py rename to google/cloud/dataproc_magics/magics.py index 31886ec..579e9e3 100644 --- a/google/cloud/dataproc_spark_connect/magics.py +++ b/google/cloud/dataproc_magics/magics.py @@ -16,7 +16,6 @@ import shlex from IPython.core.magic import (Magics, magics_class, line_magic) -from pyspark.sql import SparkSession from google.cloud.dataproc_spark_connect import DataprocSparkSession @@ -32,7 +31,7 @@ def __init__( def _parse_command(self, args): if not args or args[0] != "install": - print("Usage: %dp_spark_pip install ...") + print("Usage: %dpip install ...") return # filter out 'install' and the flags (not currently supported) @@ -40,10 +39,10 @@ def _parse_command(self, args): return packages @line_magic - def dp_spark_pip(self, line): + def dpip(self, line): """ Custom magic to install pip packages as Spark Connect artifacts. - Usage: %dp_spark_pip install pandas numpy + Usage: %dpip install pandas numpy """ try: packages = self._parse_command(shlex.split(line)) @@ -72,8 +71,3 @@ def dp_spark_pip(self, line): print("Packages successfully added as artifacts.") except Exception as e: print(f"Failed to add artifacts: {e}") - - -# To register the magic -def load_ipython_extension(ipython): - ipython.register_magics(DataprocMagics) diff --git a/tests/integration/dataproc_magics/__init__.py b/tests/integration/dataproc_magics/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/integration/dataproc_magics/test_magics.py b/tests/integration/dataproc_magics/test_magics.py new file mode 100644 index 0000000..81b83ff --- /dev/null +++ b/tests/integration/dataproc_magics/test_magics.py @@ -0,0 +1,198 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import os +import pytest +import certifi +from unittest import mock + +from google.cloud.dataproc_spark_connect import DataprocSparkSession + + +_SERVICE_ACCOUNT_KEY_FILE_ = "service_account_key.json" + + +@pytest.fixture(params=[None, "3.0"]) +def image_version(request): + return request.param + + +@pytest.fixture +def test_project(): + return os.getenv("GOOGLE_CLOUD_PROJECT") + + +@pytest.fixture +def test_region(): + return os.getenv("GOOGLE_CLOUD_REGION") + + +def is_ci_environment(): + """Detect if running in CI environment.""" + return os.getenv("CI") == "true" or os.getenv("GITHUB_ACTIONS") == "true" + + +@pytest.fixture +def auth_type(request): + """Auto-detect authentication type based on environment. + + CI environment (CI=true or GITHUB_ACTIONS=true): Uses SERVICE_ACCOUNT + Local environment: Uses END_USER_CREDENTIALS + Test parametrization can still override this default. + """ + # Allow test parametrization to override + if hasattr(request, "param"): + return request.param + + # Auto-detect based on environment + if is_ci_environment(): + return "SERVICE_ACCOUNT" + else: + return "END_USER_CREDENTIALS" + + +@pytest.fixture +def test_subnet(): + return os.getenv("DATAPROC_SPARK_CONNECT_SUBNET") + + +@pytest.fixture +def test_subnetwork_uri(test_subnet): + # Make DATAPROC_SPARK_CONNECT_SUBNET the full URI to align with how user would specify it in the project + return test_subnet + + +@pytest.fixture +def os_environment(auth_type, image_version, test_project, test_region): + original_environment = dict(os.environ) + if os.path.isfile(_SERVICE_ACCOUNT_KEY_FILE_): + os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = ( + _SERVICE_ACCOUNT_KEY_FILE_ + ) + os.environ["DATAPROC_SPARK_CONNECT_AUTH_TYPE"] = auth_type + if auth_type == "END_USER_CREDENTIALS": + os.environ.pop("DATAPROC_SPARK_CONNECT_SERVICE_ACCOUNT", None) + # Add SSL certificate fix + os.environ["SSL_CERT_FILE"] = certifi.where() + os.environ["REQUESTS_CA_BUNDLE"] = certifi.where() + yield os.environ + os.environ.clear() + os.environ.update(original_environment) + + +@pytest.fixture +def connect_session(test_project, test_region, os_environment): + session = ( + DataprocSparkSession.builder.projectId(test_project) + .location(test_region) + .getOrCreate() + ) + yield session + # Clean up the session after each test to prevent resource conflicts + try: + session.stop() + except Exception: + # Ignore cleanup errors to avoid masking the actual test failure + pass + + +# Tests for magics.py +@pytest.fixture +def ipython_shell(connect_session): + """Provides an IPython shell with a DataprocSparkSession in user_ns.""" + pytest.importorskip("IPython", reason="IPython not available") + try: + from IPython.terminal.interactiveshell import TerminalInteractiveShell + from google.cloud.dataproc_spark_connect import magics + + shell = TerminalInteractiveShell.instance() + shell.user_ns = {"spark": connect_session} + + # Load magics + magics.load_ipython_extension(shell) + + yield shell + finally: + from IPython.terminal.interactiveshell import TerminalInteractiveShell + + TerminalInteractiveShell.clear_instance() + + +def test_dpip_magic_loads(ipython_shell): + """Test that %dpip magic is registered.""" + assert "dpip" in ipython_shell.magics_manager.magics["line"] + + +@mock.patch.object(DataprocSparkSession, "addArtifacts") +def test_dpip_install_single_package(mock_add_artifacts, ipython_shell, capsys): + """Test installing a single package with %dpip.""" + ipython_shell.run_line_magic("dpip", "install pandas") + mock_add_artifacts.assert_called_once_with("pandas", pypi=True) + captured = capsys.readouterr() + assert "Installing packages: " in captured.out + assert "Packages successfully added as artifacts." in captured.out + + +@mock.patch.object(DataprocSparkSession, "addArtifacts") +def test_dpip_install_multiple_packages_with_flags( + mock_add_artifacts, ipython_shell, capsys +): + """Test installing multiple packages with flags like -U.""" + ipython_shell.run_line_magic("dpip", "install -U numpy scikit-learn") + calls = [ + mock.call("numpy", pypi=True), + mock.call("scikit-learn", pypi=True), + ] + mock_add_artifacts.assert_has_calls(calls, any_order=True) + assert mock_add_artifacts.call_count == 2 + captured = capsys.readouterr() + assert "Installing packages: " in captured.out + assert "Packages successfully added as artifacts." in captured.out + + +def test_dpip_no_install_command(ipython_shell, capsys): + """Test usage message when 'install' is missing.""" + ipython_shell.run_line_magic("dpip", "pandas") + captured = capsys.readouterr() + assert "Usage: %dpip install ..." in captured.out + assert "No packages specified." in captured.out + + +def test_dpip_no_packages(ipython_shell, capsys): + """Test message when no packages are specified.""" + ipython_shell.run_line_magic("dpip", "install") + captured = capsys.readouterr() + assert "No packages specified." in captured.out + + +@mock.patch.object(DataprocSparkSession, "addArtifacts") +def test_dpip_no_session(mock_add_artifacts, ipython_shell, capsys): + """Test message when no Spark session is active.""" + ipython_shell.user_ns = {} # Remove spark session from namespace + ipython_shell.run_line_magic("dpip", "install pandas") + captured = capsys.readouterr() + assert "No active Spark Sessions found." in captured.out + mock_add_artifacts.assert_not_called() + + +@mock.patch.object( + DataprocSparkSession, + "addArtifacts", + side_effect=Exception("Install failed"), +) +def test_dpip_install_failure(mock_add_artifacts, ipython_shell, capsys): + """Test error message on installation failure.""" + ipython_shell.run_line_magic("dpip", "install bad-package") + mock_add_artifacts.assert_called_once_with("bad-package", pypi=True) + captured = capsys.readouterr() + assert "Failed to add artifacts: Install failed" in captured.out diff --git a/tests/unit/dataproc_magics/__init__.py b/tests/unit/dataproc_magics/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/unit/test_magics.py b/tests/unit/dataproc_magics/test_magics.py similarity index 78% rename from tests/unit/test_magics.py rename to tests/unit/dataproc_magics/test_magics.py index 79aa190..b2382ff 100644 --- a/tests/unit/test_magics.py +++ b/tests/unit/dataproc_magics/test_magics.py @@ -18,7 +18,7 @@ from unittest import mock from google.cloud.dataproc_spark_connect import DataprocSparkSession -from google.cloud.dataproc_spark_connect.magics import DataprocMagics +from google.cloud.dataproc_magics import DataprocMagics from IPython.core.interactiveshell import InteractiveShell from traitlets.config import Config @@ -45,42 +45,44 @@ def test_parse_command_no_install(self): packages = self.magics._parse_command(["other", "pandas"]) self.assertIsNone(packages) - def test_dp_spark_pip_invalid_command(self): + def test_dpip_invalid_command(self): f = io.StringIO() with redirect_stdout(f): - self.magics.dp_spark_pip("foo bar") + self.magics.dpip("foo bar") output = f.getvalue() - self.assertIn("Usage: %dp_spark_pip install", output) + self.assertIn("Usage: %dpip install", output) self.assertIn("No packages specified", output) - def test_dp_spark_pip_no_session(self): + def test_dpip_no_session(self): f = io.StringIO() with redirect_stdout(f): - self.magics.dp_spark_pip("install pandas") + self.magics.dpip("install pandas") self.assertIn("No active Spark Sessions found", f.getvalue()) - def test_dp_spark_pip_no_packages_specified(self): + def test_dpip_no_packages_specified(self): f = io.StringIO() with redirect_stdout(f): - self.magics.dp_spark_pip("install") + self.magics.dpip("install") self.assertIn("No packages specified", f.getvalue()) - def test_dp_spark_pip_install_packages_single_session(self): + def test_dpip_install_packages_single_session(self): mock_session = mock.Mock(spec=DataprocSparkSession) self.shell.user_ns["spark"] = mock_session f = io.StringIO() with redirect_stdout(f): - self.magics.dp_spark_pip("install pandas numpy") + self.magics.dpip("install pandas numpy") - mock_session.addArtifacts.assert_has_calls([ - mock.call("pandas", pypi=True), - mock.call("numpy", pypi=True), - ]) + mock_session.addArtifacts.assert_has_calls( + [ + mock.call("pandas", pypi=True), + mock.call("numpy", pypi=True), + ] + ) self.assertEqual(mock_session.addArtifacts.call_count, 2) self.assertIn("Packages successfully added as artifacts.", f.getvalue()) - def test_dp_spark_pip_install_packages_multiple_sessions(self): + def test_dpip_install_packages_multiple_sessions(self): mock_session1 = mock.Mock(spec=DataprocSparkSession) mock_session2 = mock.Mock(spec=DataprocSparkSession) self.shell.user_ns["spark1"] = mock_session1 @@ -89,34 +91,35 @@ def test_dp_spark_pip_install_packages_multiple_sessions(self): f = io.StringIO() with redirect_stdout(f): - self.magics.dp_spark_pip("install pandas") + self.magics.dpip("install pandas") mock_session1.addArtifacts.assert_called_once_with("pandas", pypi=True) mock_session2.addArtifacts.assert_called_once_with("pandas", pypi=True) self.assertIn("Packages successfully added as artifacts.", f.getvalue()) - def test_dp_spark_pip_add_artifacts_fails(self): + def test_dpip_add_artifacts_fails(self): mock_session = mock.Mock(spec=DataprocSparkSession) mock_session.addArtifacts.side_effect = Exception("Failed") self.shell.user_ns["spark"] = mock_session f = io.StringIO() with redirect_stdout(f): - self.magics.dp_spark_pip("install pandas") + self.magics.dpip("install pandas") mock_session.addArtifacts.assert_called_once_with("pandas", pypi=True) self.assertIn("Failed to add artifacts: Failed", f.getvalue()) - def test_dp_spark_pip_with_flags(self): + def test_dpip_with_flags(self): mock_session = mock.Mock(spec=DataprocSparkSession) self.shell.user_ns["spark"] = mock_session f = io.StringIO() with redirect_stdout(f): - self.magics.dp_spark_pip("install -U pandas") + self.magics.dpip("install -U pandas") mock_session.addArtifacts.assert_called_once_with("pandas", pypi=True) self.assertIn("Packages successfully added as artifacts.", f.getvalue()) + if __name__ == "__main__": unittest.main() From 1340d9c852533c603f45ba18005d6c08c6a7f180 Mon Sep 17 00:00:00 2001 From: Tim Utegenov Date: Wed, 18 Feb 2026 13:34:38 -0800 Subject: [PATCH 3/3] Fixed integration tests --- tests/integration/dataproc_magics/test_magics.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/integration/dataproc_magics/test_magics.py b/tests/integration/dataproc_magics/test_magics.py index 81b83ff..3214187 100644 --- a/tests/integration/dataproc_magics/test_magics.py +++ b/tests/integration/dataproc_magics/test_magics.py @@ -113,13 +113,13 @@ def ipython_shell(connect_session): pytest.importorskip("IPython", reason="IPython not available") try: from IPython.terminal.interactiveshell import TerminalInteractiveShell - from google.cloud.dataproc_spark_connect import magics + from google.cloud import dataproc_magics shell = TerminalInteractiveShell.instance() shell.user_ns = {"spark": connect_session} # Load magics - magics.load_ipython_extension(shell) + dataproc_magics.load_ipython_extension(shell) yield shell finally: