diff --git a/CHANGELOG b/CHANGELOG index e9910c2e..c6f34121 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,6 +1,27 @@ == Unreleased - Remove requirement to provide scopes to Permission URL, as it should be omitted if defined with the TOML file. +- Add OAuth 2.0 Client Credentials Grant support for server-to-server authentication (required for Shopify 2026-01+ API) + - New method: Session.request_token_client_credentials() - Exchange client credentials for access token + - New method: Session.request_access_token() - Automatically selects correct OAuth flow based on API version + - New exception: OAuthException for OAuth-specific errors + - Implements RFC 6749 Section 4.4 for apps created in Shopify Dev Dashboard + - Automatic version detection: API versions >= 2026-01 require client credentials flow + - Legacy request_token() raises ValidationException for API versions >= 2026-01 + - Tokens from client credentials grant expire after 24 hours (86399 seconds) +- Add token expiration tracking and automatic refresh functionality + - New method: Session.is_token_expired() - Check if token is expired or expiring soon (with configurable buffer) + - New method: Session.refresh_token_if_needed() - Automatically refresh token if expired or expiring within buffer time + - New method: Session.refresh_token() - Manually force token refresh regardless of expiration status + - Session now tracks token_obtained_at and token_expires_at timestamps for client credentials tokens + - Default 5-minute buffer before expiration to ensure tokens are refreshed proactively + - Auto-refresh only works with client credentials flow (API versions >= 2026-01) +- Add scope filtering support for OAuth 2.0 Client Credentials Grant + - All OAuth methods now accept optional 'scope' parameter to request specific scopes + - Enables requesting only Admin API scopes when app has multiple API types configured (Admin, Customer Account, Storefront) + - Scope normalization: Comma-separated scopes automatically converted to space-separated for OAuth 2.0 spec compliance + - Methods supporting scope parameter: request_token_client_credentials(), request_access_token(), refresh_token_if_needed(), refresh_token() + - When scope parameter is omitted, Shopify grants all scopes configured for the app (default behavior) == Version 12.7.0 diff --git a/shopify/__init__.py b/shopify/__init__.py index d3f53de9..c6e5bb10 100644 --- a/shopify/__init__.py +++ b/shopify/__init__.py @@ -1,5 +1,5 @@ from shopify.version import VERSION -from shopify.session import Session, ValidationException +from shopify.session import Session, ValidationException, OAuthException from shopify.resources import * from shopify.limits import Limits from shopify.api_version import * diff --git a/shopify/session.py b/shopify/session.py index 561faacf..57564fa0 100644 --- a/shopify/session.py +++ b/shopify/session.py @@ -2,6 +2,7 @@ import hmac import json from hashlib import sha256 +from datetime import datetime, timedelta try: import simplejson as json @@ -19,6 +20,11 @@ class ValidationException(Exception): pass +class OAuthException(Exception): + """Exception raised for OAuth-related errors""" + pass + + class Session(object): api_key = None secret = None @@ -51,8 +57,30 @@ def __init__(self, shop_url, version=None, token=None, access_scopes=None): self.token = token self.version = ApiVersion.coerce_to_version(version) self.access_scopes = access_scopes + self.token_expires_at = None + self.token_obtained_at = None return + def _requires_client_credentials(self): + """ + Check if the API version requires OAuth 2.0 Client Credentials Grant. + + Starting from API version 2026-01, Shopify requires apps created in + Dev Dashboard to use OAuth 2.0 client credentials instead of permanent + access tokens. + + Returns: + bool: True if version is 2026-01 or higher, False otherwise + """ + if not self.version: + return False + + # Get numeric version (e.g., 202601 for "2026-01") + version_number = self.version.numeric_version + + # 2026-01 = 202601, check if >= this threshold + return version_number >= 202601 + def create_permission_url(self, redirect_uri, scope=None, state=None): query_params = {"client_id": self.api_key, "redirect_uri": redirect_uri} # `scope` should be omitted if provided by app's TOML @@ -63,9 +91,34 @@ def create_permission_url(self, redirect_uri, scope=None, state=None): return "https://%s/admin/oauth/authorize?%s" % (self.url, urllib.parse.urlencode(query_params)) def request_token(self, params): + """ + Exchange authorization code for access token (Authorization Code Grant). + + Note: This method is for the traditional OAuth Authorization Code Grant flow + and will not work with API version 2026-01 or higher. For 2026-01+, use + request_token_client_credentials() instead, or use request_access_token() + which automatically selects the correct method based on API version. + + Args: + params: OAuth callback parameters including 'code', 'hmac', 'timestamp' + + Returns: + str: The access token + + Raises: + ValidationException: If HMAC validation fails + """ if self.token: return self.token + # Warn if using old auth method with new API version + if self._requires_client_credentials(): + raise ValidationException( + "API version %s requires OAuth 2.0 Client Credentials Grant. " + "Use request_token_client_credentials() or request_access_token() instead." + % self.version.name + ) + if not self.validate_params(params): raise ValidationException("Invalid HMAC: Possibly malicious login") @@ -85,6 +138,353 @@ def request_token(self, params): else: raise Exception(response.msg) + def request_token_client_credentials(self, scope=None): + """ + Exchange client credentials for an access token. + + OAuth 2.0 Client Credentials Grant (RFC 6749, Section 4.4) + https://shopify.dev/docs/apps/build/authentication-authorization/access-tokens/client-credentials-grant + + This method is used for server-to-server authentication where the app + authenticates with its own credentials rather than on behalf of a user. + + Requirements: + - Session.api_key (client_id) must be set + - Session.secret (client_secret) must be set + - Shop URL must be valid + + Args: + scope (str, optional): Space or comma-separated list of scopes to request. + If not provided, Shopify will grant all scopes configured + for the app. Use this to request only Admin API scopes + when your app has multiple API types configured. + + Returns: + dict: Token response containing: + - access_token (str): The access token for API requests + - scope (str): Comma-separated list of granted scopes + - expires_in (int): Seconds until token expires (typically 86399 = 24 hours) + + Raises: + ValidationException: If required credentials are missing + OAuthException: If OAuth request fails + + Example: + >>> session = shopify.Session("mystore.myshopify.com", "2026-01") + >>> shopify.Session.setup(api_key="client_id", secret="client_secret") + >>> # Request all scopes + >>> token_response = session.request_token_client_credentials() + >>> # Or request specific Admin API scopes only + >>> token_response = session.request_token_client_credentials( + ... scope="read_products,write_products,read_orders" + ... ) + >>> session.token = token_response["access_token"] + >>> shopify.ShopifyResource.activate_session(session) + """ + # Validate required credentials + if not self.api_key: + raise ValidationException("api_key (client_id) is required for client credentials grant") + if not self.secret: + raise ValidationException("secret (client_secret) is required for client credentials grant") + if not self.url: + raise ValidationException("shop_url is required for client credentials grant") + + # Return existing token if already set + if self.token: + return { + "access_token": self.token, + "scope": str(self.access_scopes) if self.access_scopes else "", + "expires_in": None # Unknown if token was set manually + } + + # Construct OAuth endpoint URL + url = "https://%s/admin/oauth/access_token" % self.url + + # Prepare request data (form-encoded) + data = { + "grant_type": "client_credentials", + "client_id": self.api_key, + "client_secret": self.secret + } + + # Add scope parameter if provided (to filter which scopes to request) + if scope: + # Normalize scope format (convert commas to spaces for OAuth spec) + normalized_scope = scope.replace(',', ' ') + data["scope"] = normalized_scope + + # Prepare request headers + headers = {"Content-Type": "application/x-www-form-urlencoded"} + + try: + # Make OAuth request + request = urllib.request.Request( + url, + urllib.parse.urlencode(data).encode("utf-8"), + headers=headers + ) + + # Set timeout to prevent hanging + response = urllib.request.urlopen(request, timeout=10) + + if response.code == 200: + # Parse response + json_payload = json.loads(response.read().decode("utf-8")) + + # Store token and scopes in session + self.token = json_payload["access_token"] + self.access_scopes = json_payload.get("scope", "") + + # Store expiration tracking information + expires_in = json_payload.get("expires_in", 86399) + self.token_obtained_at = datetime.now() + self.token_expires_at = self.token_obtained_at + timedelta(seconds=expires_in) + + # Return full response for caller + return { + "access_token": self.token, + "scope": self.access_scopes, + "expires_in": expires_in + } + else: + raise OAuthException("OAuth request failed with status %d: %s" % (response.code, response.msg)) + + except urllib.error.HTTPError as e: + # Parse error response if available + error_body = "" + try: + error_body = e.read().decode('utf-8') + error_json = json.loads(error_body) + error_message = error_json.get("error_description", error_json.get("error", error_body)) + except (ValueError, KeyError): + error_message = error_body if error_body else str(e) + + raise OAuthException("OAuth request failed: %s (HTTP %d)" % (error_message, e.code)) + + except urllib.error.URLError as e: + raise OAuthException("OAuth request failed: %s" % str(e.reason)) + + except Exception as e: + raise OAuthException("Unexpected error during OAuth request: %s" % str(e)) + + def request_access_token(self, params=None, scope=None): + """ + Automatically select and execute the appropriate OAuth flow based on API version. + + For API versions 2026-01 and higher: Uses OAuth 2.0 Client Credentials Grant + For API versions before 2026-01: Uses Authorization Code Grant + + This is the recommended method for obtaining access tokens as it automatically + adapts to the API version requirements. + + Args: + params: OAuth callback parameters (required for versions < 2026-01) + Should include 'code', 'hmac', 'timestamp' for authorization code flow. + Not used for client credentials flow (versions >= 2026-01). + scope (str, optional): Space or comma-separated list of scopes to request. + Only used for client credentials flow (versions >= 2026-01). + If not provided, Shopify grants all configured scopes. + + Returns: + For versions >= 2026-01: dict with 'access_token', 'scope', 'expires_in' + For versions < 2026-01: str (access token) + + Raises: + ValidationException: If required credentials or parameters are missing + OAuthException: If OAuth request fails (versions >= 2026-01) + + Example: + # For 2026-01+ (automatic client credentials) + >>> session = shopify.Session("store.myshopify.com", "2026-01") + >>> shopify.Session.setup(api_key="client_id", secret="client_secret") + >>> response = session.request_access_token() + >>> token = response["access_token"] + + # For older versions (authorization code) + >>> session = shopify.Session("store.myshopify.com", "2024-10") + >>> token = session.request_access_token(callback_params) + """ + if self._requires_client_credentials(): + # API version 2026-01+: Use client credentials grant + return self.request_token_client_credentials(scope=scope) + else: + # Older API versions: Use authorization code grant + if params is None: + raise ValidationException( + "params are required for authorization code grant (API versions < 2026-01). " + "For API version 2026-01+, client credentials are used automatically." + ) + return self.request_token(params) + + def is_token_expired(self, buffer_seconds=300): + """ + Check if access token is expired or will expire soon. + + This method checks whether the current access token has expired or will + expire within the specified buffer time. It's useful for proactively + refreshing tokens before they expire to avoid authentication failures. + + Args: + buffer_seconds (int): Number of seconds before expiration to consider + the token as "expired". Default is 300 (5 minutes). + This buffer allows time to refresh the token before + it actually expires. + + Returns: + bool: True if: + - No token is set + - No expiration time is available + - Token has expired + - Token will expire within buffer_seconds + False if token is valid and won't expire soon + + Example: + >>> session = shopify.Session("store.myshopify.com", "2026-01") + >>> session.request_token_client_credentials() + >>> if session.is_token_expired(): + ... session.refresh_token() + >>> # Or check with custom buffer (10 minutes) + >>> if session.is_token_expired(buffer_seconds=600): + ... session.refresh_token() + """ + # No token set - consider expired + if not self.token: + return True + + # No expiration tracking available - consider expired for safety + if not self.token_expires_at: + return True + + # Calculate if token is expired or expiring soon + now = datetime.now() + buffer = timedelta(seconds=buffer_seconds) + + # Token is expired if current time + buffer >= expiration time + return now + buffer >= self.token_expires_at + + def refresh_token_if_needed(self, buffer_seconds=300, scope=None): + """ + Automatically refresh the access token if expired or expiring soon. + + This method checks if the token is expired or will expire within the buffer + time, and automatically refreshes it if needed. It's the recommended way to + ensure you always have a valid token without manually tracking expiration. + + Args: + buffer_seconds (int): Number of seconds before expiration to trigger + refresh. Default is 300 (5 minutes). This ensures + the token is refreshed before it expires. + scope (str, optional): Space or comma-separated list of scopes to request. + If not provided, Shopify grants all configured scopes. + Use this to request only Admin API scopes when your + app has multiple API types configured. + + Returns: + dict or None: + - dict: Token response if refresh was performed, containing: + - access_token (str): The new access token + - scope (str): Comma-separated list of granted scopes + - expires_in (int): Seconds until token expires + - None: If token is still valid and no refresh was needed + + Raises: + ValidationException: If required credentials are missing + OAuthException: If OAuth refresh request fails + + Example: + >>> session = shopify.Session("store.myshopify.com", "2026-01") + >>> shopify.Session.setup(api_key="client_id", secret="client_secret") + >>> + >>> # Initial token request + >>> session.request_token_client_credentials() + >>> + >>> # Later, before making API calls, ensure token is fresh + >>> result = session.refresh_token_if_needed() + >>> if result: + ... print("Token was refreshed") + ... else: + ... print("Token is still valid") + >>> + >>> # Request only Admin API scopes when refreshing + >>> result = session.refresh_token_if_needed( + ... scope="read_products,write_products,read_orders" + ... ) + """ + if self.is_token_expired(buffer_seconds=buffer_seconds): + # Only refresh if we have credentials (client credentials flow) + if self._requires_client_credentials(): + return self.request_token_client_credentials(scope=scope) + else: + # For authorization code flow, we can't auto-refresh + # because we need user interaction for the callback + return None + + # Token is still valid, no refresh needed + return None + + def refresh_token(self, scope=None): + """ + Manually force a refresh of the access token. + + This method unconditionally refreshes the access token, regardless of + whether it has expired or not. Use this when you need to force a token + refresh (e.g., after permission changes, for testing, or if you suspect + the token is invalid). + + For automatic refresh based on expiration, use refresh_token_if_needed() + instead. + + Args: + scope (str, optional): Space or comma-separated list of scopes to request. + If not provided, Shopify grants all configured scopes. + Use this to request only Admin API scopes when your + app has multiple API types configured. + + Returns: + dict: Token response containing: + - access_token (str): The new access token + - scope (str): Comma-separated list of granted scopes + - expires_in (int): Seconds until token expires (typically 86399) + + Raises: + ValidationException: If required credentials are missing or if this + method is called on a session using authorization + code flow (requires user interaction) + OAuthException: If OAuth refresh request fails + + Example: + >>> session = shopify.Session("store.myshopify.com", "2026-01") + >>> shopify.Session.setup(api_key="client_id", secret="client_secret") + >>> + >>> # Initial token request + >>> session.request_token_client_credentials() + >>> + >>> # Force token refresh (e.g., after permission changes) + >>> new_token = session.refresh_token() + >>> print(f"New token: {new_token['access_token']}") + >>> print(f"Expires in: {new_token['expires_in']} seconds") + >>> + >>> # Request only Admin API scopes + >>> new_token = session.refresh_token( + ... scope="read_products,write_products,read_orders" + ... ) + """ + # Only works with client credentials flow + if not self._requires_client_credentials(): + raise ValidationException( + "Manual token refresh is only supported for API versions >= 2026-01 " + "using client credentials flow. For authorization code flow, " + "users must re-authorize through the OAuth callback." + ) + + # Clear existing token to force new request + self.token = None + self.token_expires_at = None + self.token_obtained_at = None + + # Request new token + return self.request_token_client_credentials(scope=scope) + @property def api_version(self): return self.version diff --git a/test/session_test.py b/test/session_test.py index 8d73e293..864c5a68 100644 --- a/test/session_test.py +++ b/test/session_test.py @@ -319,3 +319,445 @@ def test_session_with_coerced_version(self): def test_session_with_invalid_version(self): with self.assertRaises(shopify.VersionNotFoundError): shopify.Session("test.myshopify.com", "invalid-version", "token") + + def test_request_token_client_credentials_success(self): + """Test successful client credentials token request""" + shopify.Session.setup(api_key="test_client_id", secret="test_client_secret") + session = shopify.Session("testshop.myshopify.com", "2026-01") + + self.fake( + None, + url="https://testshop.myshopify.com/admin/oauth/access_token", + method="POST", + body='{"access_token": "shpca_test_token_12345", "scope": "read_products,write_orders", "expires_in": 86399}', + has_user_agent=False, + ) + + token_response = session.request_token_client_credentials() + + # Verify response structure + self.assertEqual(token_response["access_token"], "shpca_test_token_12345") + self.assertEqual(token_response["scope"], "read_products,write_orders") + self.assertEqual(token_response["expires_in"], 86399) + + # Verify token stored in session + self.assertEqual(session.token, "shpca_test_token_12345") + self.assertEqual(str(session.access_scopes), "read_products,write_orders") + + def test_request_token_client_credentials_missing_api_key(self): + """Test that ValidationException is raised when api_key is missing""" + shopify.Session.setup(api_key=None, secret="test_secret") + session = shopify.Session("testshop.myshopify.com", "2026-01") + + with self.assertRaises(shopify.ValidationException) as context: + session.request_token_client_credentials() + + self.assertIn("api_key", str(context.exception)) + + def test_request_token_client_credentials_missing_secret(self): + """Test that ValidationException is raised when secret is missing""" + shopify.Session.setup(api_key="test_key", secret=None) + session = shopify.Session("testshop.myshopify.com", "2026-01") + + with self.assertRaises(shopify.ValidationException) as context: + session.request_token_client_credentials() + + self.assertIn("secret", str(context.exception)) + + def test_request_token_client_credentials_http_error(self): + """Test that OAuthException is raised on HTTP error""" + shopify.Session.setup(api_key="test_key", secret="test_secret") + session = shopify.Session("testshop.myshopify.com", "2026-01") + + self.fake( + None, + url="https://testshop.myshopify.com/admin/oauth/access_token", + method="POST", + code=401, + body='{"error": "invalid_client", "error_description": "Client authentication failed"}', + has_user_agent=False, + ) + + with self.assertRaises(shopify.OAuthException) as context: + session.request_token_client_credentials() + + self.assertIn("Client authentication failed", str(context.exception)) + self.assertIn("401", str(context.exception)) + + def test_request_token_client_credentials_returns_existing_token(self): + """Test that existing token is returned without new request""" + shopify.Session.setup(api_key="test_key", secret="test_secret") + session = shopify.Session("testshop.myshopify.com", "2026-01", "existing_token") + + # Should return existing token without making HTTP request + token_response = session.request_token_client_credentials() + + # Verify existing token returned + self.assertEqual(token_response["access_token"], "existing_token") + self.assertIsNone(token_response["expires_in"]) + + def test_requires_client_credentials_for_2026_01(self): + """Test that API version 2026-01 requires client credentials""" + session = shopify.Session("testshop.myshopify.com", "2026-01") + self.assertTrue(session._requires_client_credentials()) + + def test_requires_client_credentials_for_2026_04(self): + """Test that API version 2026-04 requires client credentials""" + session = shopify.Session("testshop.myshopify.com", "2026-04") + self.assertTrue(session._requires_client_credentials()) + + def test_does_not_require_client_credentials_for_2025_10(self): + """Test that API version 2025-10 does not require client credentials""" + session = shopify.Session("testshop.myshopify.com", "2025-10") + self.assertFalse(session._requires_client_credentials()) + + def test_does_not_require_client_credentials_for_2024_10(self): + """Test that API version 2024-10 does not require client credentials""" + session = shopify.Session("testshop.myshopify.com", "2024-10") + self.assertFalse(session._requires_client_credentials()) + + def test_request_token_fails_with_2026_01(self): + """Test that old request_token method raises error for 2026-01""" + shopify.Session.setup(api_key="test_key", secret="test_secret") + session = shopify.Session("testshop.myshopify.com", "2026-01") + + params = {"code": "test_code", "timestamp": time.time()} + hmac_value = shopify.Session.calculate_hmac(params) + params["hmac"] = hmac_value + + with self.assertRaises(shopify.ValidationException) as context: + session.request_token(params) + + self.assertIn("2026-01", str(context.exception)) + self.assertIn("Client Credentials Grant", str(context.exception)) + + def test_request_access_token_auto_selects_client_credentials(self): + """Test that request_access_token automatically uses client credentials for 2026-01""" + shopify.Session.setup(api_key="test_client_id", secret="test_client_secret") + session = shopify.Session("testshop.myshopify.com", "2026-01") + + self.fake( + None, + url="https://testshop.myshopify.com/admin/oauth/access_token", + method="POST", + body='{"access_token": "shpca_auto_token", "scope": "read_products", "expires_in": 86399}', + has_user_agent=False, + ) + + # Should automatically use client credentials (no params needed) + token_response = session.request_access_token() + + self.assertEqual(token_response["access_token"], "shpca_auto_token") + self.assertEqual(token_response["expires_in"], 86399) + + def test_request_access_token_uses_authorization_code_for_old_version(self): + """Test that request_access_token uses authorization code for versions < 2026-01""" + shopify.Session.setup(api_key="test_key", secret="test_secret") + session = shopify.Session("testshop.myshopify.com", "2024-10") + + params = {"code": "test_code", "timestamp": time.time()} + hmac_value = shopify.Session.calculate_hmac(params) + params["hmac"] = hmac_value + + self.fake( + None, + url="https://testshop.myshopify.com/admin/oauth/access_token", + method="POST", + body='{"access_token": "old_style_token", "scope": "read_products"}', + has_user_agent=False, + ) + + # Should use authorization code flow + token = session.request_access_token(params) + + self.assertEqual(token, "old_style_token") + + def test_is_token_expired_with_no_token(self): + """Test that is_token_expired returns True when no token is set""" + session = shopify.Session("testshop.myshopify.com", "2026-01") + self.assertTrue(session.is_token_expired()) + + def test_is_token_expired_with_no_expiration(self): + """Test that is_token_expired returns True when token has no expiration tracking""" + session = shopify.Session("testshop.myshopify.com", "2026-01") + session.token = "test_token" + # token_expires_at is None + self.assertTrue(session.is_token_expired()) + + def test_is_token_expired_within_buffer(self): + """Test that is_token_expired returns True when token expires within buffer""" + from datetime import datetime, timedelta + + session = shopify.Session("testshop.myshopify.com", "2026-01") + session.token = "test_token" + + # Set token to expire in 4 minutes + session.token_expires_at = datetime.now() + timedelta(minutes=4) + + # With default 5-minute buffer, should be considered expired + self.assertTrue(session.is_token_expired()) + + # With 3-minute buffer, should not be considered expired + self.assertFalse(session.is_token_expired(buffer_seconds=180)) + + def test_is_token_not_expired(self): + """Test that is_token_expired returns False when token is valid""" + from datetime import datetime, timedelta + + session = shopify.Session("testshop.myshopify.com", "2026-01") + session.token = "test_token" + + # Set token to expire in 1 hour + session.token_expires_at = datetime.now() + timedelta(hours=1) + + # With default 5-minute buffer, should not be expired + self.assertFalse(session.is_token_expired()) + + def test_refresh_token_if_needed_refreshes_expired_token(self): + """Test that refresh_token_if_needed refreshes an expired token""" + from datetime import datetime, timedelta + + shopify.Session.setup(api_key="test_client_id", secret="test_client_secret") + session = shopify.Session("testshop.myshopify.com", "2026-01") + session.token = "old_token" + + # Set token as expired + session.token_expires_at = datetime.now() - timedelta(minutes=1) + + self.fake( + None, + url="https://testshop.myshopify.com/admin/oauth/access_token", + method="POST", + body='{"access_token": "new_token", "scope": "read_products", "expires_in": 86399}', + has_user_agent=False, + ) + + result = session.refresh_token_if_needed() + + # Should return token response + self.assertIsNotNone(result) + self.assertEqual(result["access_token"], "new_token") + self.assertEqual(session.token, "new_token") + + def test_refresh_token_if_needed_does_not_refresh_valid_token(self): + """Test that refresh_token_if_needed does not refresh a valid token""" + from datetime import datetime, timedelta + + shopify.Session.setup(api_key="test_client_id", secret="test_client_secret") + session = shopify.Session("testshop.myshopify.com", "2026-01") + session.token = "valid_token" + + # Set token to expire in 1 hour + session.token_expires_at = datetime.now() + timedelta(hours=1) + + result = session.refresh_token_if_needed() + + # Should return None (no refresh needed) + self.assertIsNone(result) + self.assertEqual(session.token, "valid_token") + + def test_refresh_token_forces_refresh(self): + """Test that refresh_token forces a token refresh""" + from datetime import datetime, timedelta + + shopify.Session.setup(api_key="test_client_id", secret="test_client_secret") + session = shopify.Session("testshop.myshopify.com", "2026-01") + session.token = "old_valid_token" + + # Set token to expire in 1 hour (still valid) + session.token_expires_at = datetime.now() + timedelta(hours=1) + + self.fake( + None, + url="https://testshop.myshopify.com/admin/oauth/access_token", + method="POST", + body='{"access_token": "forced_new_token", "scope": "read_products", "expires_in": 86399}', + has_user_agent=False, + ) + + result = session.refresh_token() + + # Should force refresh even though token was valid + self.assertEqual(result["access_token"], "forced_new_token") + self.assertEqual(session.token, "forced_new_token") + + def test_refresh_token_fails_for_old_api_version(self): + """Test that refresh_token raises error for API versions < 2026-01""" + shopify.Session.setup(api_key="test_key", secret="test_secret") + session = shopify.Session("testshop.myshopify.com", "2025-10") + session.token = "old_version_token" + + with self.assertRaises(shopify.ValidationException) as context: + session.refresh_token() + + self.assertIn("2026-01", str(context.exception)) + self.assertIn("client credentials flow", str(context.exception)) + + def test_token_expiration_tracking_stored(self): + """Test that token expiration tracking is properly stored""" + from datetime import datetime, timedelta + + shopify.Session.setup(api_key="test_client_id", secret="test_client_secret") + session = shopify.Session("testshop.myshopify.com", "2026-01") + + before_request = datetime.now() + + self.fake( + None, + url="https://testshop.myshopify.com/admin/oauth/access_token", + method="POST", + body='{"access_token": "test_token", "scope": "read_products", "expires_in": 86399}', + has_user_agent=False, + ) + + session.request_token_client_credentials() + + after_request = datetime.now() + + # Verify expiration tracking is set + self.assertIsNotNone(session.token_obtained_at) + self.assertIsNotNone(session.token_expires_at) + + # Verify obtained_at is between before and after + self.assertGreaterEqual(session.token_obtained_at, before_request) + self.assertLessEqual(session.token_obtained_at, after_request) + + # Verify expires_at is approximately 24 hours from obtained_at + expected_expiration = session.token_obtained_at + timedelta(seconds=86399) + self.assertEqual(session.token_expires_at, expected_expiration) + + def test_request_token_client_credentials_with_scope_parameter(self): + """Test that scope parameter is included in OAuth request when provided""" + import urllib.parse + + shopify.Session.setup(api_key="test_client_id", secret="test_client_secret") + session = shopify.Session("testshop.myshopify.com", "2026-01") + + # Mock the HTTP request and capture what was sent + self.fake( + None, + url="https://testshop.myshopify.com/admin/oauth/access_token", + method="POST", + body='{"access_token": "test_token", "scope": "read_products write_products", "expires_in": 86399}', + has_user_agent=False, + ) + + # Request token with specific scopes (Admin API only) + token_response = session.request_token_client_credentials(scope="read_products write_products") + + # Verify token received + self.assertEqual(token_response["access_token"], "test_token") + self.assertEqual(token_response["scope"], "read_products write_products") + + def test_request_token_client_credentials_without_scope_parameter(self): + """Test that scope parameter is NOT included when not provided (default behavior)""" + shopify.Session.setup(api_key="test_client_id", secret="test_client_secret") + session = shopify.Session("testshop.myshopify.com", "2026-01") + + self.fake( + None, + url="https://testshop.myshopify.com/admin/oauth/access_token", + method="POST", + body='{"access_token": "test_token", "scope": "read_products write_products read_orders", "expires_in": 86399}', + has_user_agent=False, + ) + + # Request token without scope parameter (should grant all configured scopes) + token_response = session.request_token_client_credentials() + + # Verify token received with all scopes + self.assertEqual(token_response["access_token"], "test_token") + # Without scope filter, Shopify returns all configured scopes + self.assertEqual(token_response["scope"], "read_products write_products read_orders") + + def test_request_token_client_credentials_scope_normalization(self): + """Test that comma-separated scopes are normalized to space-separated for OAuth spec""" + shopify.Session.setup(api_key="test_client_id", secret="test_client_secret") + session = shopify.Session("testshop.myshopify.com", "2026-01") + + self.fake( + None, + url="https://testshop.myshopify.com/admin/oauth/access_token", + method="POST", + body='{"access_token": "test_token", "scope": "read_products write_products", "expires_in": 86399}', + has_user_agent=False, + ) + + # Request with comma-separated scopes (should be normalized to spaces) + token_response = session.request_token_client_credentials(scope="read_products,write_products") + + # Verify token received + self.assertEqual(token_response["access_token"], "test_token") + + def test_refresh_token_if_needed_with_scope_parameter(self): + """Test that refresh_token_if_needed passes scope parameter correctly""" + from datetime import datetime, timedelta + + shopify.Session.setup(api_key="test_client_id", secret="test_client_secret") + session = shopify.Session("testshop.myshopify.com", "2026-01") + + # Set expired token + session.token = "expired_token" + session.token_obtained_at = datetime.now() - timedelta(hours=24) + session.token_expires_at = datetime.now() - timedelta(minutes=10) + + self.fake( + None, + url="https://testshop.myshopify.com/admin/oauth/access_token", + method="POST", + body='{"access_token": "refreshed_token", "scope": "read_products write_products", "expires_in": 86399}', + has_user_agent=False, + ) + + # Refresh with specific scope + result = session.refresh_token_if_needed(scope="read_products write_products") + + # Verify token was refreshed with correct scopes + self.assertIsNotNone(result) + self.assertEqual(result["access_token"], "refreshed_token") + self.assertEqual(result["scope"], "read_products write_products") + self.assertEqual(session.token, "refreshed_token") + + def test_refresh_token_with_scope_parameter(self): + """Test that refresh_token passes scope parameter correctly""" + shopify.Session.setup(api_key="test_client_id", secret="test_client_secret") + session = shopify.Session("testshop.myshopify.com", "2026-01") + + # Set existing token + session.token = "old_token" + + self.fake( + None, + url="https://testshop.myshopify.com/admin/oauth/access_token", + method="POST", + body='{"access_token": "new_token", "scope": "read_products", "expires_in": 86399}', + has_user_agent=False, + ) + + # Force refresh with specific scope (Admin API only) + result = session.refresh_token(scope="read_products") + + # Verify token was refreshed with correct scopes + self.assertEqual(result["access_token"], "new_token") + self.assertEqual(result["scope"], "read_products") + self.assertEqual(session.token, "new_token") + + def test_request_access_token_with_scope_parameter_2026_01(self): + """Test that request_access_token passes scope to client credentials flow for 2026-01+""" + shopify.Session.setup(api_key="test_client_id", secret="test_client_secret") + session = shopify.Session("testshop.myshopify.com", "2026-01") + + self.fake( + None, + url="https://testshop.myshopify.com/admin/oauth/access_token", + method="POST", + body='{"access_token": "test_token", "scope": "read_products", "expires_in": 86399}', + has_user_agent=False, + ) + + # Use smart method with scope parameter + result = session.request_access_token(scope="read_products") + + # Verify correct token received + self.assertEqual(result["access_token"], "test_token") + self.assertEqual(result["scope"], "read_products")