-
Notifications
You must be signed in to change notification settings - Fork 7
IE-498: added support to migrate pipelines #5
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 3 commits
Commits
Show all changes
6 commits
Select commit
Hold shift + click to select a range
2105bfc
IE-498: added support to migrate pipelines
skyflow-srivyshnavi 09f4b4a
IE-508: added test cases
skyflow-srivyshnavi dcbf435
IE-515: added workflow, and updated README
skyflow-srivyshnavi 11a7235
IE-498: addressed comments and fixed CI
skyflow-srivyshnavi f0d8ccb
IE-498: updated README
skyflow-srivyshnavi 9ee4646
IE-498: addressed feedback
skyflow-srivyshnavi File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,119 @@ | ||
| name: pipelines_migration | ||
|
|
||
| on: | ||
| workflow_dispatch: | ||
| inputs: | ||
| env_url: | ||
| description: "Select source and target env's" | ||
| type: choice | ||
| default: "Source: SANDBOX, Target: PRODUCTION" | ||
| options: | ||
| - "Source: SANDBOX, Target: PRODUCTION" | ||
| - "Source: SANDBOX, Target: SANDBOX" | ||
| - "Source: PRODUCTION, Target: PRODUCTION" | ||
| - "Source: PRODUCTION, Target: SANDBOX" | ||
| source_datastore_config: | ||
| description: "Source datastore JSON. Provide either an ftpServer or s3Bucket object." | ||
| required: true | ||
| default: | | ||
| { | ||
| "ftpServer": { | ||
| "transferProtocol": "FTPS", | ||
| "plainText": { | ||
| "hostname": "", | ||
| "port": "", | ||
| "username": "", | ||
| "password": "", | ||
| }, | ||
| "skyflowHosted": false | ||
| } | ||
| } | ||
| target_datastore_config: | ||
| description: "Destination datastore JSON. Provide either an ftpServer or s3Bucket object." | ||
| required: true | ||
| default: | | ||
| { | ||
| "s3Bucket": { | ||
| "name": "", | ||
| "region": "", | ||
| "assumedRoleARN": "" | ||
| } | ||
| } | ||
| source_vault_id: | ||
| description: "Source Vault ID. Required if migrate_all_connections if checked." | ||
|
skyflow-srivyshnavi marked this conversation as resolved.
Outdated
|
||
| required: false | ||
| pipeline_id: | ||
| description: "PipelineID to be migrated." | ||
| required: false | ||
| default: "" | ||
| target_vault_id: | ||
| description: "Target Vault ID" | ||
| required: true | ||
| source_account_access_token: | ||
| description: "Access token of the Source Account. (Not required, if config file is selected)" | ||
| required: false | ||
| target_account_access_token: | ||
| description: "Access token of the Target Account" | ||
| required: true | ||
| source_account_id: | ||
| description: "Source Account ID. If not provided, will use the repository variable" | ||
| required: false | ||
| target_account_id: | ||
| description: "Target Account ID. If not provided, will use the repository variable" | ||
| required: false | ||
|
|
||
|
|
||
| jobs: | ||
| execute-pipelines-migration-script: | ||
| runs-on: ubuntu-latest | ||
|
|
||
| steps: | ||
| - name: Checkout code | ||
| uses: actions/checkout@v4 | ||
|
|
||
| - name: Set up Python | ||
| uses: actions/setup-python@v5 | ||
| with: | ||
| python-version: "3.x" | ||
|
|
||
| - name: Install dependencies | ||
| run: pip install requests | ||
|
|
||
| - name: Parse and map environment URLs | ||
| id: map_envs | ||
| shell: bash | ||
| run: | | ||
| input="${{ github.event.inputs.env_url }}" | ||
|
|
||
| source_name=$(echo "$input" | sed -n 's/Source: \([^,]*\),.*/\1/p' | xargs) | ||
| target_name=$(echo "$input" | sed -n 's/.*Target: \(.*\)/\1/p' | xargs) | ||
|
|
||
| get_env_url() { | ||
| case "$1" in | ||
| SANDBOX) echo "https://manage.skyflowapis-preview.com" ;; | ||
| PRODUCTION) echo "https://manage.skyflowapis.com" ;; | ||
| *) echo "Invalid environment: $1" >&2; exit 1 ;; | ||
| esac | ||
| } | ||
|
|
||
| # Resolve URLs | ||
| source_url=$(get_env_url "$source_name") | ||
| target_url=$(get_env_url "$target_name") | ||
|
|
||
| echo "source_url=$source_url" >> $GITHUB_OUTPUT | ||
| echo "target_url=$target_url" >> $GITHUB_OUTPUT | ||
|
|
||
| - name: Run Python script | ||
| env: | ||
| PIPELINE_ID: ${{ github.event.inputs.pipeline_id }} | ||
| SOURCE_DATASTORE_CONFIG: ${{ github.event.inputs.source_datastore_config }} | ||
| TARGET_DATASTORE_CONFIG: ${{ github.event.inputs.target_datastore_config }} | ||
| SOURCE_VAULT_ID: ${{ github.event.inputs.source_vault_id }} | ||
| TARGET_VAULT_ID: ${{ github.event.inputs.target_vault_id }} | ||
| SOURCE_ACCOUNT_AUTH: ${{ github.event.inputs.source_account_access_token }} | ||
| TARGET_ACCOUNT_AUTH: ${{ github.event.inputs.target_account_access_token }} | ||
| SOURCE_ACCOUNT_ID: ${{ github.event.inputs.source_account_id != '' && github.event.inputs.source_account_id || vars.SOURCE_ACCOUNT_ID }} | ||
| TARGET_ACCOUNT_ID: ${{ github.event.inputs.target_account_id != '' && github.event.inputs.target_account_id || vars.TARGET_ACCOUNT_ID }} | ||
| SOURCE_ENV_URL: ${{ steps.map_envs.outputs.source_url }} | ||
| TARGET_ENV_URL: ${{ steps.map_envs.outputs.target_url }} | ||
| run: python3 migrate_pipelines.py | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,228 @@ | ||
| import copy | ||
| import json | ||
| import os | ||
| import requests | ||
| from typing import Any, Dict, List, Optional | ||
|
|
||
|
|
||
| PIPELINE_ID = os.getenv("PIPELINE_ID") | ||
| SOURCE_VAULT_ID = os.getenv("SOURCE_VAULT_ID") | ||
| TARGET_VAULT_ID = os.getenv("TARGET_VAULT_ID") | ||
| SOURCE_ACCOUNT_ID = os.getenv("SOURCE_ACCOUNT_ID") | ||
| TARGET_ACCOUNT_ID = os.getenv("TARGET_ACCOUNT_ID") | ||
| SOURCE_ACCOUNT_AUTH = os.getenv("SOURCE_ACCOUNT_AUTH") | ||
| TARGET_ACCOUNT_AUTH = os.getenv("TARGET_ACCOUNT_AUTH") | ||
| SOURCE_ENV_URL = os.getenv("SOURCE_ENV_URL") | ||
| TARGET_ENV_URL = os.getenv("TARGET_ENV_URL") | ||
| SOURCE_DATASTORE_CONFIG = os.getenv("SOURCE_DATASTORE_CONFIG") | ||
| TARGET_DATASTORE_CONFIG = os.getenv("TARGET_DATASTORE_CONFIG") | ||
|
|
||
| FTP_ALLOWED_KEYS = {"transferProtocol", "plainText", "encrypted", "skyflowHosted"} | ||
| S3_ALLOWED_KEYS = {"name", "region", "assumedRoleARN"} | ||
|
|
||
| SOURCE_ACCOUNT_HEADERS = { | ||
| "X-SKYFLOW-ACCOUNT-ID": SOURCE_ACCOUNT_ID, | ||
| "Authorization": f"Bearer {SOURCE_ACCOUNT_AUTH}", | ||
| "Content-Type": "application/json", | ||
| } | ||
|
|
||
| TARGET_ACCOUNT_HEADERS = { | ||
| "X-SKYFLOW-ACCOUNT-ID": TARGET_ACCOUNT_ID, | ||
| "Authorization": f"Bearer {TARGET_ACCOUNT_AUTH}", | ||
| "Content-Type": "application/json", | ||
| } | ||
|
|
||
| def list_pipelines(vault_id: str) -> List[Dict[str, Any]]: | ||
| """Return all pipelines in the supplied vault.""" | ||
| pipelines = [] | ||
| response = requests.get( | ||
| f"{SOURCE_ENV_URL}/v1/pipelines?vaultID={vault_id}", | ||
| headers=SOURCE_ACCOUNT_HEADERS, | ||
| ) | ||
| response.raise_for_status() | ||
| pipelines.extend(response.json()["pipelines"]) | ||
| return pipelines | ||
|
|
||
| def get_pipeline(pipeline_id: str) -> Dict[str, Any]: | ||
| """Fetch a single pipeline definition from the source environment.""" | ||
| response = requests.get( | ||
| f"{SOURCE_ENV_URL}/v1/pipelines/{pipeline_id}", | ||
| headers=SOURCE_ACCOUNT_HEADERS, | ||
| ) | ||
| response.raise_for_status() | ||
| return response.json()["pipeline"] | ||
|
skyflow-srivyshnavi marked this conversation as resolved.
Outdated
|
||
|
|
||
| def create_pipeline(pipeline: Dict[str, Any]) -> requests.Response: | ||
| """Create a pipeline in the target environment.""" | ||
| response = requests.post( | ||
| f"{TARGET_ENV_URL}/v1/pipelines", | ||
| json=pipeline, | ||
| headers=TARGET_ACCOUNT_HEADERS, | ||
| ) | ||
| response.raise_for_status() | ||
| return response | ||
|
|
||
|
|
||
| def strip_empty_values(value: Any) -> Any: | ||
| """Recursively drop values that are empty strings or None.""" | ||
| if isinstance(value, dict): | ||
| cleaned = {} | ||
| for key, val in value.items(): | ||
| cleaned_val = strip_empty_values(val) | ||
| if cleaned_val is None: | ||
| continue | ||
| cleaned[key] = cleaned_val | ||
| return cleaned | ||
| if isinstance(value, list): | ||
| cleaned_list = [strip_empty_values(item) for item in value] | ||
| return [item for item in cleaned_list if item is not None] | ||
| if value == "" or value is None: | ||
| return None | ||
| return value | ||
|
|
||
|
|
||
| def validate_ftp_server(config: Dict[str, Any], label: str) -> Dict[str, Any]: | ||
| """Return an FTP server configuration with only supported fields.""" | ||
| if not isinstance(config, dict): | ||
| raise ValueError(f"-- {label} datastore ftpServer must be an object. --") | ||
| sanitised = {key: config[key] for key in config if key in FTP_ALLOWED_KEYS} | ||
| if "plainText" in sanitised: | ||
| if not isinstance(sanitised["plainText"], dict): | ||
| raise ValueError(f"-- {label} datastore ftpServer.plainText must be an object. --") | ||
| sanitised["plainText"] = strip_empty_values(sanitised["plainText"]) | ||
| if "encrypted" in sanitised: | ||
| if not isinstance(sanitised["encrypted"], dict): | ||
| raise ValueError(f"-- {label} datastore ftpServer.encrypted must be an object. --") | ||
| sanitised["encrypted"] = strip_empty_values(sanitised["encrypted"]) | ||
| sanitised = strip_empty_values(sanitised) | ||
| if not sanitised: | ||
| raise ValueError(f"-- {label} datastore ftpServer must include non-empty credentials. --") | ||
| if "transferProtocol" not in sanitised: | ||
| raise ValueError(f"-- {label} datastore ftpServer.transferProtocol is required. --") | ||
| has_plain = "plainText" in sanitised and sanitised["plainText"] | ||
| has_encrypted = "encrypted" in sanitised and sanitised["encrypted"] | ||
| if not (has_plain or has_encrypted): | ||
| raise ValueError( | ||
| f"-- {label} datastore ftpServer must include plainText or encrypted credentials. --" | ||
| ) | ||
| return sanitised | ||
|
|
||
|
|
||
| def validate_s3_bucket(config: Dict[str, Any], label: str) -> Dict[str, Any]: | ||
| """Return an S3 bucket configuration with only supported fields.""" | ||
| if not isinstance(config, dict): | ||
| raise ValueError(f"-- {label} datastore s3Bucket must be an object. --") | ||
| sanitised = {key: config[key] for key in config if key in S3_ALLOWED_KEYS} | ||
| sanitised = strip_empty_values(sanitised) | ||
| if not sanitised: | ||
| raise ValueError(f"-- {label} datastore s3Bucket must include non-empty configuration. --") | ||
| missing = sorted(S3_ALLOWED_KEYS - set(sanitised.keys())) | ||
| if missing: | ||
| raise ValueError( | ||
| f"-- {label} datastore s3Bucket is missing required fields: {', '.join(missing)}. --" | ||
| ) | ||
| return sanitised | ||
|
|
||
|
|
||
| def load_datastore_input(raw_config: Optional[str], label: str) -> Optional[Dict[str, Any]]: | ||
| """Return a sanitized datastore override dict or None if config is empty.""" | ||
| if raw_config is None or raw_config.strip() == "": | ||
| return None | ||
| try: | ||
| parsed = json.loads(raw_config) | ||
| except json.JSONDecodeError as exc: | ||
| raise ValueError(f"-- Invalid JSON for {label} datastore config: {exc} --") from exc | ||
| if not isinstance(parsed, dict): | ||
| raise ValueError(f"-- {label} datastore config must be a JSON object. --") | ||
| datastore_keys = [key for key in ("ftpServer", "s3Bucket") if key in parsed and parsed[key] is not None] | ||
| if len(datastore_keys) != 1: | ||
| raise ValueError( | ||
| f"-- {label} datastore config must contain exactly one of ftpServer or s3Bucket. --" | ||
| ) | ||
| datastore_key = datastore_keys[0] | ||
| if datastore_key == "ftpServer": | ||
| return {"ftpServer": validate_ftp_server(parsed["ftpServer"], label)} | ||
| return {"s3Bucket": validate_s3_bucket(parsed["s3Bucket"], label)} | ||
|
|
||
|
|
||
| def replace_datastore_input( | ||
| existing_section: Optional[Dict[str, Any]], override: Dict[str, Any] | ||
| ) -> Dict[str, Any]: | ||
| """Replace the datastore section while preserving other configuration.""" | ||
| section = copy.deepcopy(existing_section or {}) | ||
| existing_datastore_keys = [ | ||
| key for key in ("ftpServer", "s3Bucket") if key in section and section[key] is not None | ||
| ] | ||
| datastore_key, datastore_value = next(iter(override.items())) | ||
| if datastore_key == "s3Bucket" and "ftpServer" in existing_datastore_keys: | ||
| raise ValueError("-- Cannot override FTP datastore with an S3 override. --") | ||
| if datastore_key == "ftpServer" and "s3Bucket" in existing_datastore_keys: | ||
| raise ValueError("-- Cannot override S3 datastore with an FTP override. --") | ||
| section.pop(datastore_key, None) | ||
| section[datastore_key] = copy.deepcopy(datastore_value) | ||
| return section | ||
|
|
||
|
|
||
| def transform_pipeline_payload( | ||
| source_resource: Dict[str, Any], | ||
| source_datastore_input: Optional[Dict[str, Any]] = None, | ||
| target_datastore_input: Optional[Dict[str, Any]] = None, | ||
| ) -> Dict[str, Any]: | ||
| """Prepare the payload for the target pipeline.""" | ||
| transformed_resource = copy.deepcopy(source_resource) | ||
| if 'ID' in transformed_resource.keys(): | ||
|
skyflow-srivyshnavi marked this conversation as resolved.
Outdated
|
||
| del transformed_resource['ID'] # remove pipeline ID | ||
| transformed_resource["vaultID"] = TARGET_VAULT_ID | ||
| if source_datastore_input: | ||
| transformed_resource["source"] = replace_datastore_input( | ||
| transformed_resource.get("source"), source_datastore_input | ||
| ) | ||
| if target_datastore_input: | ||
| transformed_resource["destination"] = replace_datastore_input( | ||
| transformed_resource.get("destination"), target_datastore_input | ||
| ) | ||
| return transformed_resource | ||
|
|
||
|
|
||
| def main(pipeline_id: str) -> None: | ||
| """pipeline migration""" | ||
|
skyflow-srivyshnavi marked this conversation as resolved.
|
||
| try: | ||
| print("-- Initiating Pipelines migration --") | ||
| source_datastore_input = load_datastore_input(SOURCE_DATASTORE_CONFIG, "source") | ||
| target_datastore_input = load_datastore_input(TARGET_DATASTORE_CONFIG, "destination") | ||
| pipeline = get_pipeline(pipeline_id) | ||
| pipeline_name = pipeline.get("name", "Pipeline") | ||
| print(f"-- Working on pipeline: {pipeline_name} --") | ||
|
|
||
| pipeline_payload = transform_pipeline_payload( | ||
| pipeline, source_datastore_input, target_datastore_input | ||
| ) | ||
| create_pipeline_response = create_pipeline(pipeline_payload) | ||
|
|
||
| if create_pipeline_response.status_code == 200: | ||
| created_pipeline = create_pipeline_response.json() | ||
| print( | ||
| f"-- Pipeline migrated successfully: {pipeline_name}. " | ||
| f"Source PIPELINE_ID: {pipeline.get('ID')}, " | ||
| f"Target PIPELINE_ID: {created_pipeline.get('ID')} --" | ||
| ) | ||
| else: | ||
| print( | ||
| f"-- Pipeline migration failed: {create_pipeline_response.status_code}. " | ||
| f"{create_pipeline_response.content}" | ||
| ) | ||
| print("-- Pipelines migration script executed successfully. --") | ||
| except requests.exceptions.HTTPError as http_err: | ||
| print( | ||
| f"-- migrate_pipelines HTTP error: {http_err.response.content.decode()} --" | ||
|
skyflow-srivyshnavi marked this conversation as resolved.
|
||
| ) | ||
| raise http_err | ||
| except Exception as err: | ||
| print(f"-- migrate_pipelines other error: {err} --") | ||
| raise err | ||
|
|
||
|
|
||
| if __name__ == "__main__": | ||
| if not PIPELINE_ID: | ||
| raise ValueError("-- PIPELINE_ID is required to migrate a pipeline. --") | ||
| main(PIPELINE_ID) | ||
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.