Skip to content
Open
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
2c6f9ed
rebase commit on new master
Vdeub-cloudinary Jul 3, 2025
5fbc26a
add tests
Vdeub-cloudinary Jul 3, 2025
d791f85
fix tests
Vdeub-cloudinary Jul 3, 2025
c6019bf
fix tests due to wrong mock data
Vdeub-cloudinary Jul 3, 2025
f25738a
fix tests due to wrong mock data + api missing method
Vdeub-cloudinary Jul 3, 2025
4c068ea
fix tests due to wrong definition
Vdeub-cloudinary Jul 3, 2025
d0f67ea
fix list first
Vdeub-cloudinary Jul 3, 2025
e77f822
fix patch
Vdeub-cloudinary Jul 3, 2025
c0a1d64
fix assertion error for list
Vdeub-cloudinary Jul 3, 2025
0e51e47
fix compare create
Vdeub-cloudinary Jul 3, 2025
03d053d
add listing after creation test
Vdeub-cloudinary Jul 3, 2025
36e19c3
fix test list after creation
Vdeub-cloudinary Jul 3, 2025
b7486b1
fix tests
Vdeub-cloudinary Jul 10, 2025
820c889
Merge branch 'master' into devx-16946-smd-for-clone
Vdeub-cloudinary Jul 16, 2025
3e3d67c
fix declarations due to conflict resolution mistake
Vdeub-cloudinary Jul 16, 2025
a2cf5a1
fix fields declaration for smd
Vdeub-cloudinary Jul 16, 2025
2b6fd28
improve scripts based on discussions
Vdeub-cloudinary Jul 21, 2025
23febe2
tentatively add user input mock
Vdeub-cloudinary Jul 21, 2025
b074d0e
tentatively add user input mock - 2
Vdeub-cloudinary Jul 21, 2025
17981a8
tentatively add user input mock - 3
Vdeub-cloudinary Jul 21, 2025
fe0233e
tentatively add user input mock - 4
Vdeub-cloudinary Jul 21, 2025
a626d18
tentatively add user input mock - 5
Vdeub-cloudinary Jul 21, 2025
30c8f6b
revamp code to make it more reusable and improve tests
Vdeub-cloudinary Dec 18, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 92 additions & 5 deletions cloudinary_cli/modules/clone.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@
import cloudinary
from cloudinary.auth_token import _digest
from cloudinary_cli.utils.utils import run_tasks_concurrently
from cloudinary_cli.utils.api_utils import upload_file
from cloudinary_cli.utils.config_utils import get_cloudinary_config, config_to_dict
from cloudinary_cli.utils.api_utils import upload_file, handle_api_command
from cloudinary_cli.utils.json_utils import print_json
from cloudinary_cli.utils.config_utils import get_cloudinary_config, config_to_dict, config_to_tuple_list
from cloudinary_cli.defaults import logger
from cloudinary_cli.core.search import execute_single_request, handle_auto_pagination
import time
Expand Down Expand Up @@ -36,7 +37,7 @@
help="Specify the number of concurrent network threads.")
@option("-fi", "--fields", multiple=True,
help=("Specify whether to copy tags and/or context. "
"Valid options: `tags,context`."))
"Valid options: `tags,context,metadata`."))
@option("-se", "--search_exp", default="",
help="Define a search expression to filter the assets to clone.")
@option("--async", "async_", is_flag=True, default=False,
Expand All @@ -61,6 +62,20 @@ def clone(target, force, overwrite, concurrent_workers, fields,
if not isinstance(source_assets, dict) or not source_assets.get('resources'):
logger.error(style(f"No asset(s) found in {cloudinary.config().cloud_name}", fg="red"))
return False

if 'metadata' in normalize_list_params(fields):
source_metadata = list_metadata_items("metadata_fields")
if source_metadata.get('metadata_fields'):
target_metadata = list_metadata_items("metadata_fields", config_to_tuple_list(target_config))
fields_compare = compare_create_metadata_items(source_metadata, target_metadata, config_to_tuple_list(target_config), key="metadata_fields")
source_metadata_rules = list_metadata_items("metadata_rules")
if source_metadata_rules.get('metadata_rules'):
target_metadata_rules = list_metadata_items("metadata_rules", config_to_tuple_list(target_config))
rules_compare = compare_create_metadata_items(source_metadata_rules,target_metadata_rules, config_to_tuple_list(target_config), key="metadata_rules", id_field="name")
else:
logger.info(style(f"No metadata rules found in {cloudinary.config().cloud_name}", fg="yellow"))
else:
logger.info(style(f"No metadata found in {cloudinary.config().cloud_name}", fg="yellow"))

upload_list = _prepare_upload_list(
source_assets, target_config, overwrite, async_,
Expand Down Expand Up @@ -92,6 +107,7 @@ def _validate_clone_inputs(target):
"as source environment.")
return None, None


auth_token = cloudinary.config().auth_token
if auth_token:
# It is important to validate auth_token if provided as this prevents
Expand Down Expand Up @@ -119,22 +135,93 @@ def _prepare_upload_list(source_assets, target_config, overwrite, async_,
upload_list.append((asset_url, {**updated_options}))
return upload_list


def search_assets(search_exp, force):
search_exp = _normalize_search_expression(search_exp)
if not search_exp:
return False

search = cloudinary.search.Search().expression(search_exp)
search.fields(['tags', 'context', 'access_control',
'secure_url', 'display_name', 'format'])
'secure_url', 'display_name', 'metadata', 'format'])
search.max_results(DEFAULT_MAX_RESULTS)

res = execute_single_request(search, fields_to_keep="")
res = handle_auto_pagination(res, search, force, fields_to_keep="")

return res

def list_metadata_items(method_key, *options):
api_method_name = 'list_' + method_key
params = [api_method_name]
if options:
options = options[0]
res = handle_api_command(params, (), options, None, None, None,
doc_url="", api_instance=cloudinary.api,
api_name="admin",
auto_paginate=True,
force=True, return_data=True)
res.get(method_key).sort(key=lambda x: x["external_id"])

return res


def create_metadata_item(api_method_name, item, *options):
params = (api_method_name, item)
if options:
options = options[0]
res = handle_api_command(params, (), options, None, None, None,
doc_url="", api_instance=cloudinary.api,
api_name="admin",
return_data=True)

return res


def deep_diff(obj_source, obj_target):
diffs = {}
for k in set(obj_source.keys()).union(obj_target.keys()):
if obj_source.get(k) != obj_target.get(k):
diffs[k] = {"json_source": obj_source.get(k), "json_target": obj_target.get(k)}

return diffs


def compare_create_metadata_items(json_source, json_target, target_config, key, id_field = "external_id"):
list_source = {item[id_field]: item for item in json_source.get(key, [])}
list_target = {item[id_field]: item for item in json_target.get(key, [])}

only_in_source = list(list_source.keys() - list_target.keys())
common = list_source.keys() & list_target.keys()

if not len(only_in_source):
logger.info(style(f"{(' '.join(key.split('_')))} in {dict(target_config)['cloud_name']} and in {cloudinary.config().cloud_name} are identical. No {(' '.join(key.split('_')))} will be cloned", fg="yellow"))
else:
logger.info(style(f"Copying {len(only_in_source)} {(' '.join(key.split('_')))} from {cloudinary.config().cloud_name} to {dict(target_config)['cloud_name']}", fg="blue"))

for key_field in only_in_source:
if key == 'metadata_fields':
try:
res = create_metadata_item('add_metadata_field', list_source[key_field],target_config)
logger.info(style(f"Successfully created {(' '.join(key.split('_')))} {key_field} to {dict(target_config)['cloud_name']}", fg="green"))
except Exception as e:
logger.error(style(f"Error when creating {(' '.join(key.split('_')))} {key_field} to {dict(target_config)['cloud_name']}", fg="red"))
else:
try:
res = create_metadata_item('add_metadata_rule', list_source[key_field],target_config)
logger.info(style(f"Successfully created {(' '.join(key.split('_')))} {key_field} to {dict(target_config)['cloud_name']}", fg="green"))
except Exception as e:
logger.error(style(f"Error when creating {(' '.join(key.split('_')))} {key_field} to {dict(target_config)['cloud_name']}", fg="red"))


diffs = {}
for id_ in common:
if list_source[id_] != list_target[id_]:
diffs[id_] = deep_diff(list_source[id_], list_target[id_])

return {
"only_in_json_source": only_in_source,
"differences": diffs
}

def _normalize_search_expression(search_exp):
"""
Expand Down
3 changes: 3 additions & 0 deletions cloudinary_cli/utils/config_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ def get_cloudinary_config(target):
def config_to_dict(config):
return {k: v for k, v in config.__dict__.items() if not k.startswith("_")}

def config_to_tuple_list(config):
return [(k, v) for k, v in config.__dict__.items() if not k.startswith("_")]

def show_cloudinary_config(cloudinary_config):
obfuscated_config = config_to_dict(cloudinary_config)

Expand Down
Loading