Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 119 additions & 0 deletions .github/workflows/migrate_pipelines.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
name: pipelines_migration

on:
workflow_dispatch:
inputs:
env_url:
description: "Select source and target env's"
type: choice
default: "Source: SANDBOX, Target: PRODUCTION"
options:
- "Source: SANDBOX, Target: PRODUCTION"
Comment thread
skyflow-srivyshnavi marked this conversation as resolved.
- "Source: SANDBOX, Target: SANDBOX"
- "Source: PRODUCTION, Target: PRODUCTION"
- "Source: PRODUCTION, Target: SANDBOX"
source_datastore_config:
description: "Source datastore JSON. Provide either an ftpServer or s3Bucket object."
required: true
default: |
{
"ftpServer": {
"transferProtocol": "FTPS",
"plainText": {
"hostname": "",
"port": "",
"username": "",
"password": "",
},
"skyflowHosted": false
}
}
target_datastore_config:
description: "Destination datastore JSON. Provide either an ftpServer or s3Bucket object."
required: true
default: |
{
"s3Bucket": {
"name": "",
"region": "",
"assumedRoleARN": ""
}
}
source_vault_id:
description: "Source Vault ID. Required if migrate_all_connections if checked."
Comment thread
skyflow-srivyshnavi marked this conversation as resolved.
Outdated
required: false
pipeline_id:
description: "PipelineID to be migrated."
required: false
default: ""
target_vault_id:
description: "Target Vault ID"
required: true
source_account_access_token:
description: "Access token of the Source Account. (Not required, if config file is selected)"
required: false
target_account_access_token:
description: "Access token of the Target Account"
required: true
source_account_id:
description: "Source Account ID. If not provided, will use the repository variable"
required: false
target_account_id:
description: "Target Account ID. If not provided, will use the repository variable"
required: false


jobs:
execute-pipelines-migration-script:
runs-on: ubuntu-latest

steps:
- name: Checkout code
uses: actions/checkout@v4

- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: "3.x"

- name: Install dependencies
run: pip install requests

- name: Parse and map environment URLs
id: map_envs
shell: bash
run: |
input="${{ github.event.inputs.env_url }}"

source_name=$(echo "$input" | sed -n 's/Source: \([^,]*\),.*/\1/p' | xargs)
target_name=$(echo "$input" | sed -n 's/.*Target: \(.*\)/\1/p' | xargs)

get_env_url() {
case "$1" in
SANDBOX) echo "https://manage.skyflowapis-preview.com" ;;
PRODUCTION) echo "https://manage.skyflowapis.com" ;;
*) echo "Invalid environment: $1" >&2; exit 1 ;;
esac
}

# Resolve URLs
source_url=$(get_env_url "$source_name")
target_url=$(get_env_url "$target_name")

echo "source_url=$source_url" >> $GITHUB_OUTPUT
echo "target_url=$target_url" >> $GITHUB_OUTPUT

- name: Run Python script
env:
PIPELINE_ID: ${{ github.event.inputs.pipeline_id }}
SOURCE_DATASTORE_CONFIG: ${{ github.event.inputs.source_datastore_config }}
TARGET_DATASTORE_CONFIG: ${{ github.event.inputs.target_datastore_config }}
SOURCE_VAULT_ID: ${{ github.event.inputs.source_vault_id }}
TARGET_VAULT_ID: ${{ github.event.inputs.target_vault_id }}
SOURCE_ACCOUNT_AUTH: ${{ github.event.inputs.source_account_access_token }}
TARGET_ACCOUNT_AUTH: ${{ github.event.inputs.target_account_access_token }}
SOURCE_ACCOUNT_ID: ${{ github.event.inputs.source_account_id != '' && github.event.inputs.source_account_id || vars.SOURCE_ACCOUNT_ID }}
TARGET_ACCOUNT_ID: ${{ github.event.inputs.target_account_id != '' && github.event.inputs.target_account_id || vars.TARGET_ACCOUNT_ID }}
SOURCE_ENV_URL: ${{ steps.map_envs.outputs.source_url }}
TARGET_ENV_URL: ${{ steps.map_envs.outputs.target_url }}
run: python3 migrate_pipelines.py
43 changes: 43 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,49 @@ Note: Please note that if all values are provided `config_file` will take the pr
- The script doesn't migrate service accounts related to connection, this has to be done from Studio.
- Migration of connections associated with functions is not supported.

### Pipelines Migration

Migrates a pipeline definition from source vault into the target vault.
Comment thread
skyflow-srivyshnavi marked this conversation as resolved.
Outdated

##### Parameters:
- **`source_and_target_env`**: Source and Target Env's.
Comment thread
skyflow-srivyshnavi marked this conversation as resolved.
- **`pipeline_id`**: Pipeline ID to migrate. Get the pipeline ID from Studio.
- **`source_datastore_config`**: JSON object that replaces the source datastore configuration. Provide either an `ftpServer` or `s3Bucket` object with the required credentials.
- **`target_datastore_config`**: JSON object that replaces the destination datastore configuration. Provide either an `ftpServer` or `s3Bucket` object with the required credentials.
- **`source_account_access_token`**: Access token of the source account.
- **`target_account_access_token`**: Access token of the target account.

##### Notes:
- Datastore overrides accept exactly one of `ftpServer` or `s3Bucket`. FTP datastore require `transferProtocol` plus either `plainText` or `encrypted` credentials. S3 datastore must include `name`, `region`, and `assumedRoleARN`.
- The script validates incompatible overrides (for example, replacing an S3 datastore with FTP).

##### Sample datastore configurations:

```jsonc
{
"ftpServer": {
"transferProtocol": "SFTP",
"plainText": {
"hostname": "sftp.example.com",
"port": "22",
"username": "pipeline-user",
"password": "secret"
},
"skyflowHosted": false
}
}
```

```jsonc
{
"s3Bucket": {
"name": "pipeline-export-bucket",
"region": "us-west-2",
"assumedRoleARN": "arn:aws:iam::123456789012:role/pipeline-export-role"
}
}
```

## Steps to run the workflows

### Prerequisites
Expand Down
228 changes: 228 additions & 0 deletions migrate_pipelines.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,228 @@
import copy
import json
import os
import requests
from typing import Any, Dict, List, Optional


PIPELINE_ID = os.getenv("PIPELINE_ID")
SOURCE_VAULT_ID = os.getenv("SOURCE_VAULT_ID")
TARGET_VAULT_ID = os.getenv("TARGET_VAULT_ID")
SOURCE_ACCOUNT_ID = os.getenv("SOURCE_ACCOUNT_ID")
TARGET_ACCOUNT_ID = os.getenv("TARGET_ACCOUNT_ID")
SOURCE_ACCOUNT_AUTH = os.getenv("SOURCE_ACCOUNT_AUTH")
TARGET_ACCOUNT_AUTH = os.getenv("TARGET_ACCOUNT_AUTH")
SOURCE_ENV_URL = os.getenv("SOURCE_ENV_URL")
TARGET_ENV_URL = os.getenv("TARGET_ENV_URL")
SOURCE_DATASTORE_CONFIG = os.getenv("SOURCE_DATASTORE_CONFIG")
TARGET_DATASTORE_CONFIG = os.getenv("TARGET_DATASTORE_CONFIG")

FTP_ALLOWED_KEYS = {"transferProtocol", "plainText", "encrypted", "skyflowHosted"}
S3_ALLOWED_KEYS = {"name", "region", "assumedRoleARN"}

SOURCE_ACCOUNT_HEADERS = {
"X-SKYFLOW-ACCOUNT-ID": SOURCE_ACCOUNT_ID,
"Authorization": f"Bearer {SOURCE_ACCOUNT_AUTH}",
"Content-Type": "application/json",
}

TARGET_ACCOUNT_HEADERS = {
"X-SKYFLOW-ACCOUNT-ID": TARGET_ACCOUNT_ID,
"Authorization": f"Bearer {TARGET_ACCOUNT_AUTH}",
"Content-Type": "application/json",
}

def list_pipelines(vault_id: str) -> List[Dict[str, Any]]:
"""Return all pipelines in the supplied vault."""
pipelines = []
response = requests.get(
f"{SOURCE_ENV_URL}/v1/pipelines?vaultID={vault_id}",
headers=SOURCE_ACCOUNT_HEADERS,
)
response.raise_for_status()
pipelines.extend(response.json()["pipelines"])
return pipelines

def get_pipeline(pipeline_id: str) -> Dict[str, Any]:
"""Fetch a single pipeline definition from the source environment."""
response = requests.get(
f"{SOURCE_ENV_URL}/v1/pipelines/{pipeline_id}",
headers=SOURCE_ACCOUNT_HEADERS,
)
response.raise_for_status()
return response.json()["pipeline"]
Comment thread
skyflow-srivyshnavi marked this conversation as resolved.
Outdated

def create_pipeline(pipeline: Dict[str, Any]) -> requests.Response:
"""Create a pipeline in the target environment."""
response = requests.post(
f"{TARGET_ENV_URL}/v1/pipelines",
json=pipeline,
headers=TARGET_ACCOUNT_HEADERS,
)
response.raise_for_status()
return response


def strip_empty_values(value: Any) -> Any:
"""Recursively drop values that are empty strings or None."""
if isinstance(value, dict):
cleaned = {}
for key, val in value.items():
cleaned_val = strip_empty_values(val)
if cleaned_val is None:
continue
cleaned[key] = cleaned_val
return cleaned
if isinstance(value, list):
cleaned_list = [strip_empty_values(item) for item in value]
return [item for item in cleaned_list if item is not None]
if value == "" or value is None:
return None
return value


def validate_ftp_server(config: Dict[str, Any], label: str) -> Dict[str, Any]:
"""Return an FTP server configuration with only supported fields."""
if not isinstance(config, dict):
raise ValueError(f"-- {label} datastore ftpServer must be an object. --")
sanitised = {key: config[key] for key in config if key in FTP_ALLOWED_KEYS}
if "plainText" in sanitised:
if not isinstance(sanitised["plainText"], dict):
raise ValueError(f"-- {label} datastore ftpServer.plainText must be an object. --")
sanitised["plainText"] = strip_empty_values(sanitised["plainText"])
if "encrypted" in sanitised:
if not isinstance(sanitised["encrypted"], dict):
raise ValueError(f"-- {label} datastore ftpServer.encrypted must be an object. --")
sanitised["encrypted"] = strip_empty_values(sanitised["encrypted"])
sanitised = strip_empty_values(sanitised)
if not sanitised:
raise ValueError(f"-- {label} datastore ftpServer must include non-empty credentials. --")
if "transferProtocol" not in sanitised:
raise ValueError(f"-- {label} datastore ftpServer.transferProtocol is required. --")
has_plain = "plainText" in sanitised and sanitised["plainText"]
has_encrypted = "encrypted" in sanitised and sanitised["encrypted"]
if not (has_plain or has_encrypted):
raise ValueError(
f"-- {label} datastore ftpServer must include plainText or encrypted credentials. --"
)
return sanitised


def validate_s3_bucket(config: Dict[str, Any], label: str) -> Dict[str, Any]:
"""Return an S3 bucket configuration with only supported fields."""
if not isinstance(config, dict):
raise ValueError(f"-- {label} datastore s3Bucket must be an object. --")
sanitised = {key: config[key] for key in config if key in S3_ALLOWED_KEYS}
sanitised = strip_empty_values(sanitised)
if not sanitised:
raise ValueError(f"-- {label} datastore s3Bucket must include non-empty configuration. --")
missing = sorted(S3_ALLOWED_KEYS - set(sanitised.keys()))
if missing:
raise ValueError(
f"-- {label} datastore s3Bucket is missing required fields: {', '.join(missing)}. --"
)
return sanitised


def load_datastore_input(raw_config: Optional[str], label: str) -> Optional[Dict[str, Any]]:
"""Return a sanitized datastore override dict or None if config is empty."""
if raw_config is None or raw_config.strip() == "":
return None
try:
parsed = json.loads(raw_config)
except json.JSONDecodeError as exc:
raise ValueError(f"-- Invalid JSON for {label} datastore config: {exc} --") from exc
if not isinstance(parsed, dict):
raise ValueError(f"-- {label} datastore config must be a JSON object. --")
datastore_keys = [key for key in ("ftpServer", "s3Bucket") if key in parsed and parsed[key] is not None]
if len(datastore_keys) != 1:
raise ValueError(
f"-- {label} datastore config must contain exactly one of ftpServer or s3Bucket. --"
)
datastore_key = datastore_keys[0]
if datastore_key == "ftpServer":
return {"ftpServer": validate_ftp_server(parsed["ftpServer"], label)}
return {"s3Bucket": validate_s3_bucket(parsed["s3Bucket"], label)}


def replace_datastore_input(
existing_section: Optional[Dict[str, Any]], override: Dict[str, Any]
) -> Dict[str, Any]:
"""Replace the datastore section while preserving other configuration."""
section = copy.deepcopy(existing_section or {})
existing_datastore_keys = [
key for key in ("ftpServer", "s3Bucket") if key in section and section[key] is not None
]
datastore_key, datastore_value = next(iter(override.items()))
if datastore_key == "s3Bucket" and "ftpServer" in existing_datastore_keys:
raise ValueError("-- Cannot override FTP datastore with an S3 override. --")
if datastore_key == "ftpServer" and "s3Bucket" in existing_datastore_keys:
raise ValueError("-- Cannot override S3 datastore with an FTP override. --")
section.pop(datastore_key, None)
section[datastore_key] = copy.deepcopy(datastore_value)
return section


def transform_pipeline_payload(
source_resource: Dict[str, Any],
source_datastore_input: Optional[Dict[str, Any]] = None,
target_datastore_input: Optional[Dict[str, Any]] = None,
) -> Dict[str, Any]:
"""Prepare the payload for the target pipeline."""
transformed_resource = copy.deepcopy(source_resource)
if 'ID' in transformed_resource.keys():
Comment thread
skyflow-srivyshnavi marked this conversation as resolved.
Outdated
del transformed_resource['ID'] # remove pipeline ID
transformed_resource["vaultID"] = TARGET_VAULT_ID
if source_datastore_input:
transformed_resource["source"] = replace_datastore_input(
transformed_resource.get("source"), source_datastore_input
)
if target_datastore_input:
transformed_resource["destination"] = replace_datastore_input(
transformed_resource.get("destination"), target_datastore_input
)
return transformed_resource


def main(pipeline_id: str) -> None:
"""pipeline migration"""
Comment thread
skyflow-srivyshnavi marked this conversation as resolved.
try:
print("-- Initiating Pipelines migration --")
source_datastore_input = load_datastore_input(SOURCE_DATASTORE_CONFIG, "source")
target_datastore_input = load_datastore_input(TARGET_DATASTORE_CONFIG, "destination")
pipeline = get_pipeline(pipeline_id)
pipeline_name = pipeline.get("name", "Pipeline")
print(f"-- Working on pipeline: {pipeline_name} --")

pipeline_payload = transform_pipeline_payload(
pipeline, source_datastore_input, target_datastore_input
)
create_pipeline_response = create_pipeline(pipeline_payload)

if create_pipeline_response.status_code == 200:
created_pipeline = create_pipeline_response.json()
print(
f"-- Pipeline migrated successfully: {pipeline_name}. "
f"Source PIPELINE_ID: {pipeline.get('ID')}, "
f"Target PIPELINE_ID: {created_pipeline.get('ID')} --"
)
else:
print(
f"-- Pipeline migration failed: {create_pipeline_response.status_code}. "
f"{create_pipeline_response.content}"
)
print("-- Pipelines migration script executed successfully. --")
except requests.exceptions.HTTPError as http_err:
print(
f"-- migrate_pipelines HTTP error: {http_err.response.content.decode()} --"
Comment thread
skyflow-srivyshnavi marked this conversation as resolved.
)
raise http_err
except Exception as err:
print(f"-- migrate_pipelines other error: {err} --")
raise err


if __name__ == "__main__":
if not PIPELINE_ID:
raise ValueError("-- PIPELINE_ID is required to migrate a pipeline. --")
main(PIPELINE_ID)
Loading
Loading