Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
164 changes: 12 additions & 152 deletions arcflow/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from asnake.client import ASnakeClient
from multiprocessing.pool import ThreadPool as Pool
from utils.stage_classifications import extract_labels
from services.solr_service import SolrService
import glob

base_dir = os.path.abspath((__file__) + "/../../")
Expand Down Expand Up @@ -105,6 +106,14 @@ def __init__(self, arclight_dir, aspace_dir, solr_url, aspace_solr_url, traject_
self.log.error(f'Error authorizing ASnakeClient: {e}')
exit(0)

# Initialize Solr service
self.solr_service = SolrService(
solr_url=self.solr_url,
aspace_solr_url=self.aspace_solr_url,
logger=self.log,
force_update=self.force_update
)


def is_running(self):
"""
Expand Down Expand Up @@ -694,137 +703,6 @@ def get_creator_bioghist(self, resource, indent_size=0):
return '\n'.join(bioghist_elements)
return None

def _get_target_agent_criteria(self, modified_since=0):
"""
Defines the Solr query criteria for "target" agents.
These are agents we want to process.
"""
criteria = [
"linked_agent_roles:creator",
"system_generated:false",
"is_user:false",
# "is_repo_agent:false",
]

# Add time filter if applicable
if modified_since > 0 and not self.force_update:
mtime_utc = datetime.fromtimestamp(modified_since, tz=timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ')
criteria.append(f"system_mtime:[{mtime_utc} TO *]")

return criteria

def _get_nontarget_agent_criteria(self, modified_since=0):
"""
Defines the Solr query criteria for "non-target" (excluded) agents.
This is the logical inverse of the target criteria.
"""
# The core logic for what makes an agent a "target"
target_logic = " AND ".join([
"linked_agent_roles:creator",
"system_generated:false",
"is_user:false",
# "is_repo_agent:false",
])

# We find non-targets by negating the entire block of target logic
criteria = [f"NOT ({target_logic})"]

# We still apply the time filter to the overall query
if modified_since > 0 and not self.force_update:
mtime_utc = datetime.fromtimestamp(modified_since, tz=timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ')
criteria.append(f"system_mtime:[{mtime_utc} TO *]")

return criteria

def _execute_solr_query(self, query_parts, solr_url=None, fields=['id'], indent_size=0):
"""
A generic function to execute a query against the Solr index.

Args:
query_parts (list): A list of strings that will be joined with " AND ".
fields (list): A list of Solr fields to return in the response.

Returns:
list: A list of dictionaries, where each dictionary contains the requested fields.
Returns an empty list on failure.
"""
indent = ' ' * indent_size
if not query_parts:
self.log.error("Cannot execute Solr query with empty criteria.")
return []

if not solr_url:
solr_url = self.solr_url

query_string = " AND ".join(query_parts)
self.log.info(f"{indent}Executing Solr query: {query_string}")

try:
# First, get the total count of matching documents
count_params = {'q': query_string, 'rows': 0, 'wt': 'json'}
count_response = requests.get(f'{solr_url}/select', params=count_params)
self.log.info(f" [Solr Count Request]: {count_response.request.url}")

count_response.raise_for_status()
num_found = count_response.json()['response']['numFound']

if num_found == 0:
return [] # No need to query again if nothing was found

# Now, fetch the actual data for the documents
data_params = {
'q': query_string,
'rows': num_found, # Use the exact count to fetch all results
'fl': ','.join(fields), # Join field list into a comma-separated string
'wt': 'json'
}
response = requests.get(f'{solr_url}/select', params=data_params)
response.raise_for_status()
# Log the exact URL for the data request
self.log.info(f" [Solr Data Request]: {response.request.url}")

return response.json()['response']['docs']

except requests.exceptions.RequestException as e:
self.log.error(f"Failed to execute Solr query: {e}")
self.log.error(f" Failed query string: {query_string}")
return []

def get_all_agents(self, agent_types=None, modified_since=0, indent_size=0):
"""
Fetch target agent URIs from the Solr index and log non-target agents.
"""
if agent_types is None:
agent_types = ['agent_person', 'agent_corporate_entity', 'agent_family']

if self.force_update:
modified_since = 0
indent = ' ' * indent_size
self.log.info(f'{indent}Fetching agent data from Solr...')

# Base criteria for all queries in this function
base_criteria = [f"primary_type:({' OR '.join(agent_types)})"]

# Get and log the non-target agents
nontarget_criteria = base_criteria + self._get_nontarget_agent_criteria(modified_since)
excluded_docs = self._execute_solr_query(nontarget_criteria,self.aspace_solr_url, fields=['id'])
if excluded_docs:
excluded_ids = [doc['id'] for doc in excluded_docs]
self.log.info(f"{indent} Found {len(excluded_ids)} non-target (excluded) agents.")
# Optional: Log the actual IDs if the list isn't too long
# for agent_id in excluded_ids:
# self.log.debug(f"{indent} - Excluded: {agent_id}")

# Get and return the target agents
target_criteria = base_criteria + self._get_target_agent_criteria(modified_since)
self.log.info('Target Criteria:')
target_docs = self._execute_solr_query(target_criteria, self.aspace_solr_url, fields=['id'])

target_agents = [doc['id'] for doc in target_docs]
self.log.info(f"{indent} Found {len(target_agents)} target agents to process.")

return target_agents

def task_agent(self, agent_uri, agents_dir, repo_id=1, indent_size=0):
"""
Process a single agent and generate a creator document in EAC-CPF XML format.
Expand Down Expand Up @@ -910,7 +788,7 @@ def process_creators(self):
self.log.info(f'{indent}Processing creator agents...')

# Get agents to process
agents = self.get_all_agents(modified_since=modified_since, indent_size=indent_size)
agents = self.solr_service.get_all_agents(modified_since=modified_since, indent_size=indent_size)

# Process agents in parallel
with Pool(processes=10) as pool:
Expand Down Expand Up @@ -1185,24 +1063,6 @@ def create_symlink(self, target_path, symlink_path, indent_size=0):
self.log.info(f'{indent}{e}')
return False

def delete_arclight_solr_record(self, solr_record_id, indent_size=0):
indent = ' ' * indent_size

try:
response = requests.post(
f'{self.solr_url}/update?commit=true',
json={'delete': {'id': solr_record_id}},
)
if response.status_code == 200:
self.log.info(f'{indent}Deleted Solr record {solr_record_id}. from ArcLight Solr')
return True
else:
self.log.error(
f'{indent}Failed to delete Solr record {solr_record_id} from Arclight Solr. Status code: {response.status_code}')
return False
except requests.exceptions.RequestException as e:
self.log.error(f'{indent}Error deleting Solr record {solr_record_id} from ArcLight Solr: {e}')

def delete_file(self, file_path, indent_size=0):
indent = ' ' * indent_size

Expand All @@ -1215,7 +1075,7 @@ def delete_file(self, file_path, indent_size=0):
def delete_ead(self, resource_id, ead_id,
xml_file_path, pdf_file_path, indent_size=0):
# delete from solr
deleted_solr_record = self.delete_arclight_solr_record(ead_id, indent_size=indent_size)
deleted_solr_record = self.solr_service.delete_record(ead_id, indent_size=indent_size)
if deleted_solr_record:
self.delete_file(pdf_file_path, indent_size=indent_size)
self.delete_file(xml_file_path, indent_size=indent_size)
Expand All @@ -1224,7 +1084,7 @@ def delete_ead(self, resource_id, ead_id,
self.delete_file(symlink_path, indent_size=indent_size)

def delete_creator(self, file_path, solr_id, indent_size=0):
deleted_solr_record = self.delete_arclight_solr_record(solr_id, indent_size=indent_size)
deleted_solr_record = self.solr_service.delete_record(solr_id, indent_size=indent_size)
if deleted_solr_record:
self.delete_file(file_path, indent_size=indent_size)

Expand Down
Empty file added arcflow/services/__init__.py
Empty file.
Loading