From 58e211ad5eadb0e3ad5131d60b1d8a48caa3db5c Mon Sep 17 00:00:00 2001 From: Leonel Ramirez <50677216+leonelramirez@users.noreply.github.com> Date: Thu, 26 Mar 2026 17:15:25 -0500 Subject: [PATCH 1/7] feat: added ovewrite-children --- arcflow/utils/bulk_import.py | 85 +++++++++++++++++++++++++++++++----- 1 file changed, 73 insertions(+), 12 deletions(-) diff --git a/arcflow/utils/bulk_import.py b/arcflow/utils/bulk_import.py index 4a1467c..f6d2036 100644 --- a/arcflow/utils/bulk_import.py +++ b/arcflow/utils/bulk_import.py @@ -80,12 +80,61 @@ def check_for_children(repo_id, rid, asnake_client): print(f'Error retrieving child count for resource ID: {e}') return -1 +def delete_archival_object(repo_id, ao_id, asnake_client): + """ + Function to delete an archival object by ID. + Returns True if successful, False otherwise. + """ + try: + delete_response = asnake_client.delete( + f"/repositories/{repo_id}/archival_objects/{ao_id}") + if delete_response.status_code == 200: + print(f"Deleted archival object {ao_id} successfully.") + return True + else: + print(f"Failed to delete archival object {ao_id}. Status code: {delete_response.status_code}") + return False + except Exception as e: + print(f'Error deleting archival object ID {ao_id}: {e}') + return False + +def delete_children(repo_id, rid, asnake_client): + """ + Function to delete all top-level children of a resource. + Returns integer value for the number of children deleted or -1 if encounters an error. + """ + try: + info = asnake_client.get(f"/repositories/{repo_id}/resources/{rid}/tree/root").json() + child_count = int(info.get('child_count', 0)) + if child_count > 0: + for child in info['precomputed_waypoints']['']['0']: + delete_archival_object(repo_id, child['uri'].split('/')[-1], asnake_client) + + waypoints = int(info.get('waypoints', 0)) + # in case there are children than the precomputed_waypoints + # starting with 2 because 1 equals to precomputed_waypoints + for i in range(2, waypoints+1): + info = asnake_client.get(f"/repositories/{repo_id}/resources/{rid}/tree/waypoint", + params={ + 'offset': i-1, + }).json() + for child in info: + delete_archival_object(repo_id, child['uri'].split('/')[-1], asnake_client) + except Exception as e: + print(f'Error deleting children for resource ID: {e}') + return -1 + def report_csv_error(report_dict, error_string): """Function to print and log error messages (assumes only one error message).""" report_dict["error"] = error_string print(error_string) -def csv_bulk_import(csv_directory=None, load_type='ao', only_validate='false', save_output_files=False): +def csv_bulk_import( + csv_directory=None, + load_type='ao', + only_validate='false', + save_output_files=False, + overwrite_children=False): """Function to handle CSV bulk import.""" print("Starting CSV bulk import...") if not csv_directory or not os.path.exists(csv_directory): @@ -136,15 +185,22 @@ def csv_bulk_import(csv_directory=None, load_type='ao', only_validate='false', s file_import_report["rid"] = rid if load_type == "ao": - child_count = check_for_children(repo, rid, client) - if child_count > 0: - report_csv_error(file_import_report, f'EAD ID {ead_id} already has {child_count} top-level children in ASpace. Not imported.') - bulk_import_report.append(file_import_report) - continue - elif child_count == -1: - report_csv_error(file_import_report, f'Error checking children for EAD ID {ead_id}. Not imported.') - bulk_import_report.append(file_import_report) - continue + if overwrite_children: + deleted_children = delete_children(repo, rid, client) + if deleted_children == -1: + report_csv_error(file_import_report, f'Error deleting children for EAD ID {ead_id}. Not imported.') + bulk_import_report.append(file_import_report) + continue + else: + child_count = check_for_children(repo, rid, client) + if child_count > 0: + report_csv_error(file_import_report, f'EAD ID {ead_id} already has {child_count} top-level children in ASpace. Not imported.') + bulk_import_report.append(file_import_report) + continue + elif child_count == -1: + report_csv_error(file_import_report, f'Error checking children for EAD ID {ead_id}. Not imported.') + bulk_import_report.append(file_import_report) + continue file_list = [] with open(f, 'rb') as file: @@ -328,6 +384,10 @@ def main(): '--save-output-files', action='store_true', help='Download job output files',) + parser.add_argument( + '--overwrite-children', + action='store_true', + help='Overwrite existing children during import',) args = parser.parse_args() if not args.dir.endswith('/'): @@ -337,8 +397,9 @@ def main(): csv_directory=args.dir, load_type=args.load_type, only_validate='true' if args.only_validate else 'false', - save_output_files=args.save_output_files) - + save_output_files=args.save_output_files, + overwrite_children=args.overwrite_children) + save_report(args.dir, import_report, args.only_validate) if __name__ == '__main__': From 6e15ae41376187b84cf46952ebab118db666bef0 Mon Sep 17 00:00:00 2001 From: Leonel Ramirez <50677216+leonelramirez@users.noreply.github.com> Date: Mon, 30 Mar 2026 14:03:33 -0500 Subject: [PATCH 2/7] feat: added only-delete-children (with multiprocessing) --- arcflow/utils/bulk_import.py | 58 +++++++++++++++++++++++++----------- 1 file changed, 41 insertions(+), 17 deletions(-) diff --git a/arcflow/utils/bulk_import.py b/arcflow/utils/bulk_import.py index e857f78..3ad46b9 100644 --- a/arcflow/utils/bulk_import.py +++ b/arcflow/utils/bulk_import.py @@ -8,6 +8,7 @@ from pathlib import Path from datetime import datetime from asnake.client import ASnakeClient +from multiprocessing.pool import ThreadPool as Pool import re @@ -106,20 +107,32 @@ def delete_children(repo_id, rid, asnake_client): try: info = asnake_client.get(f"/repositories/{repo_id}/resources/{rid}/tree/root").json() child_count = int(info.get('child_count', 0)) - if child_count > 0: - for child in info['precomputed_waypoints']['']['0']: - delete_archival_object(repo_id, child['uri'].split('/')[-1], asnake_client) - - waypoints = int(info.get('waypoints', 0)) - # in case there are children than the precomputed_waypoints - # starting with 2 because 1 equals to precomputed_waypoints - for i in range(2, waypoints+1): - info = asnake_client.get(f"/repositories/{repo_id}/resources/{rid}/tree/waypoint", - params={ - 'offset': i-1, - }).json() - for child in info: - delete_archival_object(repo_id, child['uri'].split('/')[-1], asnake_client) + with Pool(processes=10) as pool: + if child_count > 0: + results = [pool.apply_async( + delete_archival_object, + args=(repo_id, child['uri'].split('/')[-1], asnake_client)) + for child in info['precomputed_waypoints']['']['0']] + # wait for task to complete + for r in results: + r.get() + + waypoints = int(info.get('waypoints', 0)) + # in case there are children than the precomputed_waypoints + # starting with 2 because 1 equals to precomputed_waypoints + for i in range(2, waypoints+1): + info = asnake_client.get(f"/repositories/{repo_id}/resources/{rid}/tree/waypoint", + params={ + 'offset': i-1, + }).json() + results = [pool.apply_async( + delete_archival_object, + args=(repo_id, child['uri'].split('/')[-1], asnake_client)) + for child in info] + # wait for task to complete + for r in results: + r.get() + return child_count except Exception as e: print(f'Error deleting children for resource ID: {e}') return -1 @@ -134,7 +147,8 @@ def csv_bulk_import( load_type='ao', only_validate='false', save_output_files=False, - overwrite_children=False): + overwrite_children=False, + only_delete_children=False): """Function to handle CSV bulk import.""" print("Starting CSV bulk import...") if not csv_directory or not os.path.exists(csv_directory): @@ -185,12 +199,17 @@ def csv_bulk_import( file_import_report["rid"] = rid if load_type == "ao": - if overwrite_children: + if overwrite_children or only_delete_children: deleted_children = delete_children(repo, rid, client) if deleted_children == -1: report_csv_error(file_import_report, f'Error deleting children for EAD ID {ead_id}. Not imported.') bulk_import_report.append(file_import_report) continue + if only_delete_children: + file_import_report["results_status"] = "Completed" + file_import_report["results_warnings"] = f"Deleted {deleted_children} children. No import performed." + bulk_import_report.append(file_import_report) + continue else: child_count = check_for_children(repo, rid, client) if child_count > 0: @@ -419,6 +438,10 @@ def main(): '--overwrite-children', action='store_true', help='Overwrite existing children during import',) + parser.add_argument( + '--only-delete-children', + action='store_true', + help='Only delete existing children without performing import',) args = parser.parse_args() if not args.dir.endswith('/'): @@ -429,7 +452,8 @@ def main(): load_type=args.load_type, only_validate='true' if args.only_validate else 'false', save_output_files=args.save_output_files, - overwrite_children=args.overwrite_children) + overwrite_children=args.overwrite_children, + only_delete_children=args.only_delete_children) save_report(args.dir, import_report, args.only_validate) From 3158a3c3d14a3199b91fa0f41334f82098a5bc14 Mon Sep 17 00:00:00 2001 From: Leonel Ramirez <50677216+leonelramirez@users.noreply.github.com> Date: Mon, 30 Mar 2026 18:40:55 -0500 Subject: [PATCH 3/7] feat: added max-retries --- arcflow/utils/bulk_import.py | 71 +++++++++++++++++++++++++++++------- 1 file changed, 57 insertions(+), 14 deletions(-) diff --git a/arcflow/utils/bulk_import.py b/arcflow/utils/bulk_import.py index 3ad46b9..087a8df 100644 --- a/arcflow/utils/bulk_import.py +++ b/arcflow/utils/bulk_import.py @@ -148,9 +148,14 @@ def csv_bulk_import( only_validate='false', save_output_files=False, overwrite_children=False, - only_delete_children=False): + only_delete_children=False, + report_text_file=""): """Function to handle CSV bulk import.""" - print("Starting CSV bulk import...") + if report_text_file: + print(f"Retrying CSV bulk import with report file {report_text_file}...") + else: + print("Starting CSV bulk import...") + if not csv_directory or not os.path.exists(csv_directory): print(f'Directory {csv_directory} does not exist. Exiting.') exit(0) @@ -160,8 +165,26 @@ def csv_bulk_import( bulk_import_report = [] - for f in glob.iglob(f'{csv_directory}*.csv'): - print(f'Processing file {f}...') + if report_text_file: + try: + with open(report_text_file, "r") as file: + entries = yaml.safe_load(file) + except FileNotFoundError: + print(f"File {report_text_file} not found.") + exit(0) + else: + entries = glob.iglob(f'{csv_directory}*.csv') + + for f in entries: + if report_text_file: + if f['java_mysql_error']>0: + f = f"{csv_directory}{f['identifier']}.csv" + print(f'Retrying file {f}...') + else: + continue + else: + print(f'Processing file {f}...') + file_import_report = {} file_import_report["identifier"] = Path(f).stem file_import_report["type"] = load_type @@ -258,6 +281,10 @@ def csv_bulk_import( bulk_import_report.append(file_import_report) print(json.dumps(import_job, indent=4)) + + if not bulk_import_report: + print("No more files to process. Exiting.") + exit(0) if save_output_files: try: @@ -281,7 +308,7 @@ def save_report(path, report_list, validate_only): txt_report_save_path = os.path.join(report_save_path, report_text_file_name) with open(txt_report_save_path, 'w', encoding='utf-8') as report: - print("Import Job Info", file=report) + print("# Import Job Info", file=report) json.dump(report_list, report, indent=4) report_csv_file_name = report_file_name_stem + ".csv" @@ -297,6 +324,8 @@ def save_report(path, report_list, validate_only): for row in report_list: writer.writerow(row) + return f"{report_save_path}/{report_text_file_name}" + def check_job_status(asnake_client, repo_id, job_id): """Function to check whether a job has completed (and thus output files are ready).""" while True: @@ -442,20 +471,34 @@ def main(): '--only-delete-children', action='store_true', help='Only delete existing children without performing import',) + parser.add_argument( + '--max-retries', + type=int, + default=0, + help='Number of times to retry a failed job (default: 0)',) args = parser.parse_args() if not args.dir.endswith('/'): args.dir += '/' - import_report = csv_bulk_import( - csv_directory=args.dir, - load_type=args.load_type, - only_validate='true' if args.only_validate else 'false', - save_output_files=args.save_output_files, - overwrite_children=args.overwrite_children, - only_delete_children=args.only_delete_children) - - save_report(args.dir, import_report, args.only_validate) + report_text_file = "" + while True: + if args.max_retries < 0: + print("Maximum retries reached. Exiting.") + break + else: + import_report = csv_bulk_import( + csv_directory=args.dir, + load_type=args.load_type, + only_validate='true' if args.only_validate else 'false', + save_output_files=args.save_output_files, + overwrite_children=args.overwrite_children, + only_delete_children=args.only_delete_children, + report_text_file=report_text_file) + + report_text_file = save_report(args.dir, import_report, args.only_validate) + + args.max_retries -= 1 if __name__ == '__main__': main() \ No newline at end of file From aaede165b6b69c46a67a41c06b0f8f5e666acab5 Mon Sep 17 00:00:00 2001 From: Leonel Ramirez <50677216+leonelramirez@users.noreply.github.com> Date: Fri, 3 Apr 2026 13:45:59 -0500 Subject: [PATCH 4/7] feat: added deleted_children column to reports --- arcflow/utils/bulk_import.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/arcflow/utils/bulk_import.py b/arcflow/utils/bulk_import.py index 087a8df..ed44754 100644 --- a/arcflow/utils/bulk_import.py +++ b/arcflow/utils/bulk_import.py @@ -224,6 +224,7 @@ def csv_bulk_import( if load_type == "ao": if overwrite_children or only_delete_children: deleted_children = delete_children(repo, rid, client) + file_import_report["deleted_children"] = deleted_children if deleted_children == -1: report_csv_error(file_import_report, f'Error deleting children for EAD ID {ead_id}. Not imported.') bulk_import_report.append(file_import_report) @@ -313,7 +314,7 @@ def save_report(path, report_list, validate_only): report_csv_file_name = report_file_name_stem + ".csv" - fieldnames = ['identifier','ead_id','aspace_url','import_date','repo_id', 'rid', 'only_validate','type','resource_id','error','results_status','results_warnings','results_id','results_uri'] + fieldnames = ['identifier','ead_id','aspace_url','import_date','repo_id', 'rid', 'only_validate','type','resource_id','error','results_status','results_warnings','results_id','results_uri','deleted_children'] issue_assessment_fieldnames = get_issue_assessment_fieldnames() fieldnames.extend(issue_assessment_fieldnames) From 869aab444b8b64b9d82588f70525ed116adf12f0 Mon Sep 17 00:00:00 2001 From: Leonel Ramirez <50677216+leonelramirez@users.noreply.github.com> Date: Fri, 3 Apr 2026 14:23:05 -0500 Subject: [PATCH 5/7] fix: added flag to display "Maximum retries reached" message --- arcflow/utils/bulk_import.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/arcflow/utils/bulk_import.py b/arcflow/utils/bulk_import.py index ed44754..fb087ee 100644 --- a/arcflow/utils/bulk_import.py +++ b/arcflow/utils/bulk_import.py @@ -177,7 +177,7 @@ def csv_bulk_import( for f in entries: if report_text_file: - if f['java_mysql_error']>0: + if f.get("java_mysql_error", 0) > 0: f = f"{csv_directory}{f['identifier']}.csv" print(f'Retrying file {f}...') else: @@ -483,9 +483,11 @@ def main(): args.dir += '/' report_text_file = "" + is_retrying = args.max_retries > 0 while True: if args.max_retries < 0: - print("Maximum retries reached. Exiting.") + if is_retrying: + print("Maximum retries reached. Exiting.") break else: import_report = csv_bulk_import( From c06d7192b2b0719c937fd5d55251446413b06de6 Mon Sep 17 00:00:00 2001 From: Leonel Ramirez <50677216+leonelramirez@users.noreply.github.com> Date: Fri, 3 Apr 2026 17:39:58 -0500 Subject: [PATCH 6/7] fix: delete children in reverse sequence to prevent offset recalculation errors --- arcflow/utils/bulk_import.py | 37 ++++++++++++++++++------------------ 1 file changed, 19 insertions(+), 18 deletions(-) diff --git a/arcflow/utils/bulk_import.py b/arcflow/utils/bulk_import.py index fb087ee..4644a48 100644 --- a/arcflow/utils/bulk_import.py +++ b/arcflow/utils/bulk_import.py @@ -107,8 +107,25 @@ def delete_children(repo_id, rid, asnake_client): try: info = asnake_client.get(f"/repositories/{repo_id}/resources/{rid}/tree/root").json() child_count = int(info.get('child_count', 0)) - with Pool(processes=10) as pool: - if child_count > 0: + if child_count > 0: + with Pool(processes=10) as pool: + waypoints = int(info.get('waypoints', 0)) + # in case there are more children than the precomputed_waypoints + # starting with the highest waypoint and working backwards to avoid the list shrinking and changing offsets for remaining waypoints + for i in range(waypoints, 1, -1): + waypoint = asnake_client.get(f"/repositories/{repo_id}/resources/{rid}/tree/waypoint", + params={ + 'offset': i-1, + }).json() + results = [pool.apply_async( + delete_archival_object, + args=(repo_id, child['uri'].split('/')[-1], asnake_client)) + for child in waypoint] + # wait for task to complete + for r in results: + r.get() + + # then delete the remaining children in the precomputed_waypoints results = [pool.apply_async( delete_archival_object, args=(repo_id, child['uri'].split('/')[-1], asnake_client)) @@ -116,22 +133,6 @@ def delete_children(repo_id, rid, asnake_client): # wait for task to complete for r in results: r.get() - - waypoints = int(info.get('waypoints', 0)) - # in case there are children than the precomputed_waypoints - # starting with 2 because 1 equals to precomputed_waypoints - for i in range(2, waypoints+1): - info = asnake_client.get(f"/repositories/{repo_id}/resources/{rid}/tree/waypoint", - params={ - 'offset': i-1, - }).json() - results = [pool.apply_async( - delete_archival_object, - args=(repo_id, child['uri'].split('/')[-1], asnake_client)) - for child in info] - # wait for task to complete - for r in results: - r.get() return child_count except Exception as e: print(f'Error deleting children for resource ID: {e}') From 2dd5e8ccafe956cfa10ba46c006746faf525d1b0 Mon Sep 17 00:00:00 2001 From: Leonel Ramirez <50677216+leonelramirez@users.noreply.github.com> Date: Fri, 3 Apr 2026 18:12:18 -0500 Subject: [PATCH 7/7] Apply suggestion from @graykr Co-authored-by: graykr --- arcflow/utils/bulk_import.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/arcflow/utils/bulk_import.py b/arcflow/utils/bulk_import.py index 4644a48..be59b58 100644 --- a/arcflow/utils/bulk_import.py +++ b/arcflow/utils/bulk_import.py @@ -468,7 +468,7 @@ def main(): parser.add_argument( '--overwrite-children', action='store_true', - help='Overwrite existing children during import',) + help='Overwrite/delete existing children during import/validation',) parser.add_argument( '--only-delete-children', action='store_true',