diff --git a/python/fusion_engine_client/utils/log.py b/python/fusion_engine_client/utils/log.py index 465fc5be..015573ff 100644 --- a/python/fusion_engine_client/utils/log.py +++ b/python/fusion_engine_client/utils/log.py @@ -1,3 +1,5 @@ +from typing import Iterable, List, Optional, Union + import fnmatch import glob import json @@ -12,19 +14,41 @@ _logger = logging.getLogger('point_one.utils.log') -# Note: The spelling here is intentional. -MANIFEST_FILE_NAME = 'maniphest.json' +_MANIFEST_FILE_NAMES = [ + 'manifest.json', + 'maniphest.json', # Legacy spelling. +] -# The following files are listed order of priority. The first located file will be returned. -CANDIDATE_P1LOG_FILES = [ - # v- Typically captured at the time the log is recorded, or embedded in a mixed-binary log file and extracted - # by extract_fusion_engine_log(). - 'input.p1log', +# IMPORTANT: The following file lists are specified order of priority. The first located file will be returned. + +# User-configurable output files. +_CANDIDATE_USER_OUTPUT_FILES = [ + # Legacy sensor input and user output files location in top of log directory. 'fusion_engine.p1log', + # Default filename for the file1 user-configurable output interface. 'output/fusion_engine.p1log', ] -CANDIDATE_MIXED_FILES = ['input.raw', 'input.bin', 'input.rtcm3'] +# Diagnostic output files containing incoming sensor data, diagnostic information, and select user output. +_CANDIDATE_DIAG_FILES = [ + 'output/diagnostics.p1log', + 'input.p1log', + # Special key to search for a data file referenced by a log manifest file. + '', +] + +# Other input file formats, including mixed-binary (FusionEngine data + other protocols), RTCM, etc. +_CANDIDATE_MIXED_BIN_FILES = [ + 'input.raw', + 'input.bin', + 'input.rtcm3' +] + +_CANDIDATE_ALL_LOG_FILES = [ + *_CANDIDATE_USER_OUTPUT_FILES, + *_CANDIDATE_DIAG_FILES, + *_CANDIDATE_MIXED_BIN_FILES, +] # Determine the default log base directory in the following order of priority: # - P1_LOG_BASE_DIR environment variable @@ -55,6 +79,24 @@ def define_cli_arguments(parser_group, define_log=True): parser_group.add_argument( '--log-base-dir', metavar='DIR', default=DEFAULT_LOG_BASE_DIR, help="The base directory containing FusionEngine logs to be searched if a log pattern is specified.") + + types = [ + 'auto', + 'diagnostic', 'diag', + 'user' + ] + parser_group.add_argument( + '--log-type', metavar='TYPE', type=types, + default='auto', + help="""\ +The type of input file to load when searching for a log directory automatically: +- auto - Load the newest data available in the following order: + - Real-time user output (fusion_engine.p1log) + - Real-time diagnostic output (diagnostics.p1log) +- diagnostic, diag - Load a diagnostic output file (diagnostics.p1log). Do not load user output file + fusion_engine.p1log, even if present. +- user - Load user output (fusion_engine.p1log by default) if present. Do not load diagnostic output. +""") if define_log: parser_group.add_argument( 'log', @@ -82,7 +124,7 @@ def is_possible_log_pattern(pattern: str) -> bool: def find_log_by_pattern(pattern, log_base_dir=DEFAULT_LOG_BASE_DIR, allow_multiple=False, skip_empty_files=True, - log_test_filenames=(MANIFEST_FILE_NAME,), return_test_file=False): + log_test_filenames=_MANIFEST_FILE_NAMES, return_test_file=False): """! @brief Perform a pattern match to locate a log directory containing the specified files. @@ -200,6 +242,43 @@ def find_log_by_pattern(pattern, log_base_dir=DEFAULT_LOG_BASE_DIR, allow_multip return matches +def _find_manifest_file(log_dir: str) -> Optional[str]: + """! + @brief Locate a Point One log manifest file within a log directory. + + @param log_dir The path to the log directory. + + @return The path to the manifest file, or `None` if not found. + """ + for filename in os.listdir(log_dir): + if filename in _MANIFEST_FILE_NAMES: + return os.path.join(log_dir, filename) + return None + + +def _get_data_filename_from_manifest(manifest_path: str, log_dir: str = None) -> str: + """! + @brief Determine the sensor data input filename/path from a log manifest file. + + @param manifest_path The path to the manifest file. + @param log_dir The path to the log directory. Defaults to the parent directory of `manifest_path`. + + @return The path to the binary data file. + """ + if log_dir is None: + log_dir = os.path.dirname(manifest_path) + + with open(manifest_path, 'rt') as f: + manifest = json.load(f) + channels = manifest.get('channels', []) + if len(channels) > 0: + input_path = os.path.join(log_dir, channels[0]) + if os.path.exists(input_path): + return input_path + raise FileNotFoundError( + f"Found manifest file in '{log_dir}' but could not find corresponding log data/diagnostics file.") + + def find_log_file(input_path, candidate_files=None, return_output_dir=False, return_log_id=False, log_base_dir=DEFAULT_LOG_BASE_DIR, check_exact_match=True, check_pattern_match=True, skip_empty_files=True): @@ -234,8 +313,9 @@ def find_log_file(input_path, candidate_files=None, return_output_dir=False, ret @param return_output_dir If `True`, return the output directory associated with the located input file. @param return_log_id If `True`, return the ID of the log if the requested path is a FusionEngine log. @param log_base_dir The base directory to be searched when performing a pattern match for a log directory. - @param check_exact_match If `True`, check if `input_path` is the path to a data file. Otherwise, skip this check and - only perform a pattern search. + @param check_exact_match If `True`, check if `input_path` is the path to either a file or a directory containing one + of the `candidate_files`. If `False`, skip this check and only perform a pattern search within + `log_base_dir`. @param check_pattern_match If `True` and `input_path` does not refer to a log file or directory, perform a pattern match using `input_path` as the pattern. @param skip_empty_files If `True`, ignore files that exist but are 0 bytes. @@ -247,34 +327,59 @@ def find_log_file(input_path, candidate_files=None, return_output_dir=False, ret `return_log_id` is `True`. """ def _get_log_id(path): + # See if there's a manifest file in any parent directory. If so, read the log ID from the manifest. parent_dir = os.path.dirname(os.path.abspath(path)) + search_dir = parent_dir + while True: + for filename in _MANIFEST_FILE_NAMES: + manifest_path = os.path.join(search_dir, filename) + if os.path.exists(manifest_path): + with open(manifest_path, 'rt') as f: + manifest = json.load(f) + # If the manifest does not have a guid field for some reason, fall back to the parent directory + # containing the manifest file. + return manifest.get('guid', os.path.basename(search_dir)) + search_dir = os.path.dirname(search_dir) + if search_dir in ('', '/', '.'): + break + + # If no manifest file, return the parent directory name as the log ID. return os.path.basename(parent_dir) + log_file_path = None + output_dir = None + log_id = None + # Check if the input path is a file. If so, return it and set the output directory to its parent directory. if os.path.isfile(input_path) and check_exact_match: - output_dir = os.path.dirname(input_path) + log_file_path = input_path + output_dir = os.path.dirname(log_file_path) if output_dir == "": output_dir = "." - log_id = _get_log_id(input_path) + log_id = _get_log_id(log_file_path) # If the input path is a directory, see if it's a P1 log. If it is not a directory, see if it pattern matches to a # log directory within `log_base_dir`. If so for either case, set the output directory to the log directory (note # that the .p1log may be contained within a subdirectory). else: if candidate_files is None: - # No candidate files specified. Default to 'fusion_engine.p1log'. - candidate_files = ['fusion_engine.p1log'] + # No candidate files specified. Default to all known candidate filenames. + candidate_files = _CANDIDATE_ALL_LOG_FILES elif not isinstance(candidate_files, (tuple, list)): # User specified a string, not a list. Convert to a list. candidate_files = [candidate_files] - # First, see if the user's path is an existing log directory containing a data file. If so, use that. - log_dir = None - log_id = None - + # Helper function to search a directory for any of the candidate filenames. def _search_directory(dir_path): + manifest_path = _find_manifest_file(dir_path) + for i, f in enumerate(candidate_files): if f is None: continue + elif f == '': + if manifest_path is None: + continue + else: + f = _get_data_filename_from_manifest(manifest_path=manifest_path, log_dir=dir_path) test_path = os.path.join(dir_path, f) if os.path.exists(test_path): @@ -289,18 +394,18 @@ def _search_directory(dir_path): return test_path, dir_path, _get_log_id(test_path) return None, None, None + # First, see if the user's path is an existing log directory containing a data file. If so, use that. + log_dir = None if check_exact_match: dir_exists = os.path.isdir(input_path) if dir_exists: - matching_input_path, log_dir, log_id = _search_directory(input_path) - if matching_input_path is not None: - input_path = matching_input_path + log_file_path, log_dir, log_id = _search_directory(input_path) else: dir_exists = False # If we didn't find an exact match and the path contains a *, try a glob search in the current directory first. # For example, if they specified 'abc*', search for './abc*'. - if log_dir is None and '*' in input_path: + if log_file_path is None and '*' in input_path: pattern = input_path matches = glob.glob(pattern) matching_input_path = None @@ -321,7 +426,7 @@ def _search_directory(dir_path): if matching_input_path is not None: if len(matches) == 1: - input_path = matching_input_path + log_file_path = matching_input_path log_dir = matching_log_dir log_id = matching_log_id else: @@ -330,37 +435,41 @@ def _search_directory(dir_path): (pattern, '\n '.join(matches))) # If the user didn't specify a directory, or the directory wasn't considered a valid log (i.e., didn't have any - # of the candidate files in it), check if they provided a pattern match to a log (i.e., a partial log ID or a - # search pattern (foo*/partial_id*)). - if log_dir is None and check_pattern_match and not (input_path.startswith('./') or input_path.startswith('/')): + # of the candidate files in it), check if they provided a pattern match to a log (i.e., a partial log ID (e.g., + # 1aab35) or a search pattern (foo*/partial_id*)). + if log_file_path is None and check_pattern_match and not (input_path.startswith('./') or + input_path.startswith('/')): if check_exact_match: if dir_exists: _logger.info("Directory '%s' does not contain a data file. Attempting a pattern match." % input_path) else: - _logger.info("File '%s' not found. Searching for a matching log." % input_path) + _logger.info("File/directory '%s' not found. Searching for a matching log." % input_path) try: - candidate_files = list(candidate_files) + [MANIFEST_FILE_NAME] + # Include manifest filenames in the list of candidates we give to find_log_by_pattern(). That way, if + # the log doesn't use any of the typical filenames, but does have a manifest file, we can try to locate + # the data file mentioned in the manifest. + candidate_files = list(candidate_files) + _MANIFEST_FILE_NAMES matches = find_log_by_pattern(input_path, log_base_dir=log_base_dir, log_test_filenames=candidate_files, return_test_file=True) - log_dir = matches[0][0] - log_id = matches[0][1] - input_path = matches[0][2] + matching_log_dir = matches[0][0] + matching_log_id = matches[0][1] + matching_input_path = matches[0][2] # If we didn't find one of the recognized log filenames, but instead found a manifest file, load the # manifest and use that to infer the input filename. - if os.path.basename(input_path) == MANIFEST_FILE_NAME: - manifest_path = input_path - input_path = None - with open(manifest_path, 'rt') as f: - manifest = json.load(f) - channels = manifest.get('channels', []) - if len(channels) > 0: - input_path = os.path.join(log_dir, channels[0]) - if input_path is None or not os.path.exists(input_path): - raise FileNotFoundError( - "Found manifest file in '%s' but could not find corresponding log file." % log_dir) + if os.path.basename(matching_input_path) in _MANIFEST_FILE_NAMES: + if '' in candidate_files: + matching_input_path = _get_data_filename_from_manifest(manifest_path=matching_input_path, + log_dir=matching_log_dir) + else: + raise FileNotFoundError( + f"Directory '{log_dir}' matches search pattern, but diagnostic files not requested.") + + log_dir = matching_log_dir + log_id = matching_log_id + log_file_path = matching_input_path except RuntimeError as e: # Multiple matching directories found. raise e @@ -373,9 +482,12 @@ def _search_directory(dir_path): # No log directories found matching user pattern. raise e - output_dir = log_dir + if log_file_path is None: + raise FileNotFoundError("File/directory '%s' not found." % input_path) + else: + output_dir = log_dir - result = [input_path] + result = [log_file_path] if return_output_dir: result.append(output_dir) if return_log_id: @@ -387,7 +499,27 @@ def _search_directory(dir_path): return tuple(result) -def find_p1log_file(input_path, return_output_dir=False, return_log_id=False, log_base_dir=DEFAULT_LOG_BASE_DIR): +def get_candidate_file_list(log_type: str = 'auto') -> List[str]: + """! + @brief Get the list of candidate file names used to search for a log. + + @param log_type The type of search to be performed. See --log-type help text. + + @return The list of candidate file names, in the order that they should be tried. + """ + if log_type == 'auto': + candidate_files = _CANDIDATE_ALL_LOG_FILES + elif log_type in ('diagnostic', 'diag'): + candidate_files = _CANDIDATE_DIAG_FILES + elif log_type == 'user': + candidate_files = _CANDIDATE_USER_OUTPUT_FILES + else: + raise ValueError(f"Unsupported log type '{log_type}'.") + return candidate_files + + +def find_p1log_file(input_path, return_output_dir=False, return_log_id=False, log_base_dir=DEFAULT_LOG_BASE_DIR, + log_type: str = 'auto'): """! @brief Locate a FusionEngine log directory containing a `*.p1log` file from a list of expected candidate paths. @@ -403,6 +535,7 @@ def find_p1log_file(input_path, return_output_dir=False, return_log_id=False, lo @param return_output_dir If `True`, return the output directory associated with the located input file. @param return_log_id If `True`, return the ID of the log if the requested path is a FusionEngine log. @param log_base_dir The base directory to be searched when performing a pattern match for a log directory. + @param log_type The type of search to be performed. See --log-type help text. @return The path to the located file or a tuple containing: - The path to the located file. @@ -410,8 +543,9 @@ def find_p1log_file(input_path, return_output_dir=False, return_log_id=False, lo - The log ID string, or `None` if the requested file is not part of a FusionEngine log. Only provided if `return_log_id` is `True`. """ - # The following files are listed order of priority. The first located file will be returned. - candidate_files = CANDIDATE_P1LOG_FILES + candidate_files = get_candidate_file_list(log_type) + # This function specifically looks for *.p1log files. Disregard any other file extensions. + candidate_files = [c for c in candidate_files if c.endswith('.p1log')] result = find_log_file(input_path, candidate_files=candidate_files, return_output_dir=return_output_dir, return_log_id=return_log_id, log_base_dir=log_base_dir) if isinstance(result, tuple): @@ -419,7 +553,7 @@ def find_p1log_file(input_path, return_output_dir=False, return_log_id=False, lo else: p1log_path = result - if p1log_path.endswith('.p1log') or p1log_path.endswith('filter/output/fe_service/output.p1bin'): + if p1log_path.endswith('.p1log'): return result else: # If we got here and find_log_file() didn't raise an exception, the user specified a file (not a directory) and @@ -428,7 +562,8 @@ def find_p1log_file(input_path, return_output_dir=False, return_log_id=False, lo raise FileExistsError('Specified file is not a .p1log file.') -def extract_fusion_engine_log(input_path, output_path=None, warn_on_gaps=True, return_counts=False, save_index=True): +def extract_fusion_engine_log(input_path, output_path=None, warn_on_gaps=True, return_counts=False, save_index=True, + message_types: Union[Iterable[MessageType], MessageType] = None): """! @brief Extract FusionEngine data from a file containing mixed binary data. @@ -439,6 +574,8 @@ def extract_fusion_engine_log(input_path, output_path=None, warn_on_gaps=True, r @param return_counts If `True`, return the number of messages extracted for each message type. @param save_index If `True`, generate an index file to go along with the output file for faster reading in the future. See @ref FileIndex for details. + @param message_types A list of one or more @ref fusion_engine_client.messages.defs.MessageType "MessageTypes" to + be returned. If `None` or an empty list, read all available messages. @return A tuple containing: - The number of decoded messages. @@ -452,7 +589,7 @@ def extract_fusion_engine_log(input_path, output_path=None, warn_on_gaps=True, r index_builder = FileIndexBuilder() if save_index else None with open(input_path, 'rb') as in_fd, open(output_path, 'wb') as out_path: - reader = MixedLogReader(in_fd, warn_on_gaps=warn_on_gaps, save_index=False, + reader = MixedLogReader(in_fd, warn_on_gaps=warn_on_gaps, save_index=False, message_types=message_types, return_header=True, return_payload=True, return_bytes=True, return_offset=False, show_progress=True) for header, payload, data in reader: @@ -482,7 +619,7 @@ def extract_fusion_engine_log(input_path, output_path=None, warn_on_gaps=True, r def locate_log(input_path, log_base_dir=DEFAULT_LOG_BASE_DIR, return_output_dir=False, return_log_id=False, - extract_fusion_engine_data=False): + extract_fusion_engine_data=False, log_type: str = 'auto'): """! @brief Locate a FusionEngine `*.p1log` file, or a binary file containing a mixed stream of FusionEngine messages and other content. @@ -504,6 +641,7 @@ def locate_log(input_path, log_base_dir=DEFAULT_LOG_BASE_DIR, return_output_dir= @param return_log_id If `True`, return the ID of the log if the requested path is a FusionEngine log. @param extract_fusion_engine_data If `True`, extract FusionEngine content from a file containing mixed binary data and generate a new `*.p1log` file. Otherwise, return the path to the located mixed binary file. + @param log_type The type of search to be performed. See --log-type help text. @return The path to the located file or a tuple of: - The path to the located (or extracted) `*.p1log` file @@ -538,8 +676,7 @@ def _populate_result(input_file, output_dir, log_id): # # The log file may contain exclusively FusionEngine messages, or may contain mixed binary content. try: - candidate_files = CANDIDATE_P1LOG_FILES - candidate_files += CANDIDATE_MIXED_FILES + candidate_files = get_candidate_file_list(log_type) log_file_path, output_dir, log_id = find_log_file( input_path, candidate_files=candidate_files, log_base_dir=log_base_dir, return_output_dir=True, return_log_id=True) diff --git a/python/tests/test_log.py b/python/tests/test_log.py new file mode 100644 index 00000000..6f76b829 --- /dev/null +++ b/python/tests/test_log.py @@ -0,0 +1,278 @@ +import json +import os + +import pytest + +from fusion_engine_client.utils.log import get_candidate_file_list, locate_log + +LOG_HASH = '95cf9ba38f0b40f7af13b072401a82d7' + + +def _write_stub(path): + os.makedirs(os.path.dirname(os.path.abspath(path)), exist_ok=True) + with open(path, 'wb') as f: + f.write(b'\x00') + + +def make_manifest(log_dir, data_filename, guid=None): + """Write manifest.json with {"channels": [data_filename]} in log_dir.""" + with open(os.path.join(str(log_dir), 'manifest.json'), 'w') as f: + manifest = {'channels': [data_filename]} + if guid is not None: + manifest['guid'] = guid + json.dump(manifest, f) + + +def make_log_dir(base_dir, log_id, files, create_manifest=True): + """Create // with 1-byte stub files. Return log dir path.""" + log_dir = os.path.join(str(base_dir), log_id) + os.makedirs(log_dir, exist_ok=True) + for f in files: + _write_stub(os.path.join(log_dir, f)) + if create_manifest: + make_manifest(log_dir=log_dir, data_filename=files[0], guid=log_id) + return log_dir + + +def make_log_base_dir(tmpdir, log_id, files, create_manifest=True): + """ + Create realistic nested log structure: + tmpdir/log_base/2024-01-15/device_name//[files] + + Returns (log_base_path, log_dir_path). + """ + log_base = str(tmpdir.mkdir('log_base')) + nested = os.path.join(log_base, '2024-01-15', 'device_name') + log_dir = make_log_dir(base_dir=nested, log_id=log_id, files=files, create_manifest=create_manifest) + return log_base, log_dir + + +class TestGetCandidateFileList: + def test_auto(self): + files = get_candidate_file_list('auto') + assert 'output/fusion_engine.p1log' in files + assert 'output/diagnostics.p1log' in files + # User output before diagnostic. + assert files.index('output/fusion_engine.p1log') < files.index('output/diagnostics.p1log') + + def test_diagnostic(self): + files = get_candidate_file_list('diagnostic') + assert 'output/diagnostics.p1log' in files + assert 'output/fusion_engine.p1log' not in files + + def test_diag_alias(self): + assert get_candidate_file_list('diag') == get_candidate_file_list('diagnostic') + + def test_user(self): + files = get_candidate_file_list('user') + assert 'output/fusion_engine.p1log' in files + assert 'output/diagnostics.p1log' not in files + + def test_invalid(self): + with pytest.raises(ValueError): + get_candidate_file_list('bogus') + + +class TestLocateLogDirectFile: + """User specifies an exact file path.""" + + def test_file_user_p1log(self, tmpdir): + path = os.path.join(str(tmpdir), 'fusion_engine.p1log') + _write_stub(path) + assert locate_log(path, log_base_dir=str(tmpdir)) == path + + def test_file_diag_p1log(self, tmpdir): + path = os.path.join(str(tmpdir), 'output', 'diagnostics.p1log') + _write_stub(path) + assert locate_log(path, log_base_dir=str(tmpdir)) == path + + def test_file_input_p1log(self, tmpdir): + path = os.path.join(str(tmpdir), 'input.p1log') + _write_stub(path) + assert locate_log(path, log_base_dir=str(tmpdir)) == path + + def test_file_not_found(self, tmpdir): + path = os.path.join(str(tmpdir), 'nonexistent.p1log') + assert locate_log(path, log_base_dir=str(tmpdir)) is None + + def test_file_empty_direct(self, tmpdir): + # When a file is specified directly, the empty-file skip does not apply. + path = os.path.join(str(tmpdir), 'fusion_engine.p1log') + open(path, 'wb').close() + assert locate_log(path, log_base_dir=str(tmpdir)) == path + + +class TestLocateLogDirectory: + """User passes path to a log directory.""" + + def test_dir_user_output(self, tmpdir): + log_dir = make_log_dir(tmpdir, LOG_HASH, ['output/fusion_engine.p1log'], create_manifest=False) + assert locate_log(log_dir, log_base_dir=str(tmpdir)) == \ + os.path.join(log_dir, 'output', 'fusion_engine.p1log') + + def test_dir_diag(self, tmpdir): + log_dir = make_log_dir(tmpdir, LOG_HASH, ['output/diagnostics.p1log'], create_manifest=False) + assert locate_log(log_dir, log_base_dir=str(tmpdir)) == \ + os.path.join(log_dir, 'output', 'diagnostics.p1log') + + def test_dir_input_p1log(self, tmpdir): + log_dir = make_log_dir(tmpdir, LOG_HASH, ['input.p1log'], create_manifest=False) + assert locate_log(log_dir, log_base_dir=str(tmpdir)) == \ + os.path.join(log_dir, 'input.p1log') + + def test_dir_manifest_data(self, tmpdir): + log_dir = make_log_dir(tmpdir, LOG_HASH, ['test_file.bin'], create_manifest=True) + assert locate_log(log_dir, log_base_dir=str(tmpdir)) == \ + os.path.join(log_dir, 'test_file.bin') + + def test_dir_priority_user_over_diag(self, tmpdir): + log_dir = make_log_dir(tmpdir, LOG_HASH, + ['output/fusion_engine.p1log', 'output/diagnostics.p1log'], + create_manifest=False) + assert locate_log(log_dir, log_base_dir=str(tmpdir)) == \ + os.path.join(log_dir, 'output', 'fusion_engine.p1log') + + def test_dir_empty_first_candidate_skipped(self, tmpdir): + log_dir = make_log_dir(tmpdir, LOG_HASH, ['output/diagnostics.p1log'], create_manifest=False) + open(os.path.join(log_dir, 'output', 'fusion_engine.p1log'), 'wb').close() + assert locate_log(log_dir, log_base_dir=str(tmpdir), log_type='auto') == \ + os.path.join(log_dir, 'output', 'diagnostics.p1log') + + def test_dir_no_candidates(self, tmpdir): + log_dir = make_log_dir(tmpdir, LOG_HASH, ['irrelevant.txt'], create_manifest=False) + assert locate_log(log_dir, log_base_dir=str(tmpdir)) is None + + def test_logtype_diag_skips_user(self, tmpdir): + log_dir = make_log_dir(tmpdir, LOG_HASH, + ['output/fusion_engine.p1log', 'output/diagnostics.p1log'], + create_manifest=False) + assert locate_log(log_dir, log_base_dir=str(tmpdir), log_type='diagnostic') == \ + os.path.join(log_dir, 'output', 'diagnostics.p1log') + + def test_logtype_user_skips_diag(self, tmpdir): + log_dir = make_log_dir(tmpdir, LOG_HASH, + ['output/fusion_engine.p1log', 'output/diagnostics.p1log'], + create_manifest=False) + assert locate_log(log_dir, log_base_dir=str(tmpdir), log_type='user') == \ + os.path.join(log_dir, 'output', 'fusion_engine.p1log') + + def test_logtype_user_no_match(self, tmpdir): + log_dir = make_log_dir(tmpdir, LOG_HASH, ['output/diagnostics.p1log'], create_manifest=False) + assert locate_log(log_dir, log_base_dir=str(tmpdir), log_type='user') is None + + def test_logtype_diag_no_match(self, tmpdir): + log_dir = make_log_dir(tmpdir, LOG_HASH, ['output/fusion_engine.p1log'], create_manifest=False) + assert locate_log(log_dir, log_base_dir=str(tmpdir), log_type='diagnostic') is None + + +class TestLocateLogPatternMatch: + """User passes partial/full log hash; locate_log searches under log_base_dir.""" + + def test_partial_hash(self, tmpdir): + log_base, log_dir = make_log_base_dir(tmpdir, LOG_HASH, ['output/diagnostics.p1log']) + assert locate_log(LOG_HASH[:8], log_base_dir=log_base) == \ + os.path.join(log_dir, 'output', 'diagnostics.p1log') + + def test_full_hash(self, tmpdir): + log_base, log_dir = make_log_base_dir(tmpdir, LOG_HASH, ['output/diagnostics.p1log']) + assert locate_log(LOG_HASH, log_base_dir=log_base) == \ + os.path.join(log_dir, 'output', 'diagnostics.p1log') + + def test_no_match(self, tmpdir): + log_base, _ = make_log_base_dir(tmpdir, LOG_HASH, ['output/diagnostics.p1log']) + assert locate_log('zzz000', log_base_dir=log_base) is None + + def test_ambiguous_prefix(self, tmpdir): + log_base = str(tmpdir.mkdir('log_base')) + nested = os.path.join(log_base, '2024-01-15', 'device_name') + make_log_dir(nested, LOG_HASH, ['output/diagnostics.p1log']) + make_log_dir(nested, '95cf9000000000000000000000000000', ['output/diagnostics.p1log']) + assert locate_log('95cf9', log_base_dir=log_base) is None + + def test_full_hash_resolves_ambiguity(self, tmpdir): + log_base = str(tmpdir.mkdir('log_base')) + nested = os.path.join(log_base, '2024-01-15', 'device_name') + log_dir = make_log_dir(nested, LOG_HASH, ['output/diagnostics.p1log']) + make_log_dir(nested, '95cf9000000000000000000000000000', ['output/diagnostics.p1log']) + assert locate_log(LOG_HASH, log_base_dir=log_base) == \ + os.path.join(log_dir, 'output', 'diagnostics.p1log') + + def test_pattern_diag_type(self, tmpdir): + log_base, log_dir = make_log_base_dir(tmpdir, LOG_HASH, ['output/diagnostics.p1log']) + assert locate_log(LOG_HASH[:8], log_base_dir=log_base, log_type='diagnostic') == \ + os.path.join(log_dir, 'output', 'diagnostics.p1log') + + def test_pattern_manifest_only(self, tmpdir): + log_base, log_dir = make_log_base_dir(tmpdir, LOG_HASH, ['test_file.bin']) + assert locate_log(LOG_HASH[:8], log_base_dir=log_base) == \ + os.path.join(log_dir, 'test_file.bin') + + def test_pattern_manifest_missing_data_file(self, tmpdir): + log_base, log_dir = make_log_base_dir(tmpdir, LOG_HASH, [], create_manifest=False) + make_manifest(log_dir, 'missing.bin') + assert locate_log(LOG_HASH[:8], log_base_dir=log_base) is None + + +class TestLocateLogReturnModes: + def _make_log(self, tmpdir, input_path='output/diagnostics.p1log', create_manifest=True): + return make_log_base_dir(tmpdir, LOG_HASH, [input_path], create_manifest=create_manifest) + + def test_file_only(self, tmpdir): + log_base, log_dir = self._make_log(tmpdir, create_manifest=False) + result = locate_log(LOG_HASH[:8], log_base_dir=log_base) + assert isinstance(result, str) + assert result == os.path.join(log_dir, 'output', 'diagnostics.p1log') + + def test_return_output_dir(self, tmpdir): + log_base, log_dir = self._make_log(tmpdir) + path, out_dir = locate_log(LOG_HASH[:8], log_base_dir=log_base, return_output_dir=True) + assert path == os.path.join(log_dir, 'output', 'diagnostics.p1log') + assert out_dir == log_dir + + def test_return_log_id(self, tmpdir): + log_base, log_dir = self._make_log(tmpdir) + path, log_id = locate_log(LOG_HASH[:8], log_base_dir=log_base, return_log_id=True) + assert path == os.path.join(log_dir, 'output', 'diagnostics.p1log') + assert log_id == LOG_HASH + + def test_return_both(self, tmpdir): + log_base, log_dir = self._make_log(tmpdir) + path, out_dir, log_id = locate_log(LOG_HASH[:8], log_base_dir=log_base, + return_output_dir=True, return_log_id=True) + assert path == os.path.join(log_dir, 'output', 'diagnostics.p1log') + assert out_dir == log_dir + assert log_id == LOG_HASH + + def test_return_log_id_nonstandard_filename(self, tmpdir): + log_base, log_dir = self._make_log(tmpdir, input_path='test_file.bin') + path, log_id = locate_log(LOG_HASH[:8], log_base_dir=log_base, return_log_id=True) + assert path == os.path.join(log_dir, 'test_file.bin') + assert log_id == LOG_HASH + + def test_error_file_only(self, tmpdir): + assert locate_log('zzz000', log_base_dir=str(tmpdir)) is None + + def test_error_with_both(self, tmpdir): + result = locate_log('zzz000', log_base_dir=str(tmpdir), + return_output_dir=True, return_log_id=True) + assert result == (None, None, None) + + def test_return_log_id_from_directory(self, tmpdir): + # log_id should be the log hash, not a subdirectory name like "output". + log_dir = make_log_dir(tmpdir, LOG_HASH, ['output/diagnostics.p1log']) + _, log_id = locate_log(log_dir, log_base_dir=str(tmpdir), return_log_id=True) + assert log_id == LOG_HASH + + def test_return_alt_log_id_from_directory(self, tmpdir): + # log_id should be the log hash, not a subdirectory name like "output". + log_dir = make_log_dir(tmpdir, LOG_HASH, ['output/diagnostics.p1log'], create_manifest=False) + make_manifest(log_dir, 'output/diagnostics.p1log', guid='abcdef0123456789') + _, log_id = locate_log(log_dir, log_base_dir=str(tmpdir), return_log_id=True) + assert log_id == 'abcdef0123456789' + + def test_return_log_id_no_manifest(self, tmpdir): + # If there is no manifest file, the fallback behavior is to return the parent directory name (even if that dir + # happens to have its own parent dir whose name could be a valid log hash. + log_dir = make_log_dir(tmpdir, LOG_HASH, ['output/diagnostics.p1log'], create_manifest=False) + _, log_id = locate_log(log_dir, log_base_dir=str(tmpdir), return_log_id=True) + assert log_id == 'output'