Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions changes/365.added
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Added the remote file copy feature to Arista EOS devices.
Added unittests for remote file copy on Arista EOS devices.
50 changes: 48 additions & 2 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

263 changes: 263 additions & 0 deletions pyntc/devices/eos_device.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import os
import re
import time
from urllib.parse import urlparse

from netmiko import ConnectHandler, FileTransfer
from pyeapi import connect as eos_connect
Expand All @@ -23,6 +24,7 @@
RebootTimeoutError,
)
from pyntc.utils import convert_list_by_key
from pyntc.utils.models import FileCopyModel

BASIC_FACTS_KM = {"model": "modelName", "os_version": "internalVersion", "serial_number": "serialNumber"}
INTERFACES_KM = {
Expand Down Expand Up @@ -421,6 +423,267 @@ def file_copy_remote_exists(self, src, dest=None, file_system=None):
log.debug("Host %s: File %s does not already exist on remote.", self.host, src)
return False

def check_file_exists(self, filename, file_system=None):
"""Check if a remote file exists by filename.

Args:
filename (str): The name of the file to check for on the remote device.
file_system (str): Supported only for Arista. The file system for the
remote file. If no file_system is provided, then the `get_file_system`
method is used to determine the correct file system to use.

Returns:
(bool): True if the remote file exists, False if it doesn't.

Raises:
CommandError: If there is an error in executing the command to check if the file exists.
"""
exists = False

file_system = file_system or self._get_file_system()
command = f"dir {file_system}/{filename}"
result = self.native_ssh.send_command(command, read_timeout=30)

log.debug(
"Host %s: Checking if file %s exists on remote with command '%s' and result: %s",
self.host,
filename,
command,
result,
)

# Check for error patterns
if re.search(r"% Error listing directory|No such file|No files found|Path does not exist", result):
log.debug("Host %s: File %s does not exist on remote.", self.host, filename)
exists = False
elif re.search(rf"Directory of .*{filename}", result):
log.debug("Host %s: File %s exists on remote.", self.host, filename)
exists = True
else:
raise CommandError(command, f"Unable to determine if file {filename} exists on remote: {result}")

return exists

def get_remote_checksum(self, filename, hashing_algorithm="md5", **kwargs):
"""Get the checksum of a remote file on Arista EOS device using netmiko SSH.

Uses Arista's 'verify' command via SSH to compute file checksums.
Note, Netmiko FileTransfer only supports `verify /md5`

Args:
filename (str): The name of the file to check for on the remote device.
hashing_algorithm (str): The hashing algorithm to use (default: "md5").
**kwargs: Additional keyword arguments.

Keyword Args:
file_system (str): The file system for the remote file (e.g., "flash:").

Returns:
(str): The checksum of the remote file.

Raises:
CommandError: If the verify command fails (but not if file doesn't exist).
"""
file_system = kwargs.get("file_system")
if file_system is None:
file_system = self._get_file_system()

# Normalize file_system to Arista format (e.g., "flash:" or "/mnt/flash")
if file_system.endswith(":"):
# Already in shorthand format like "flash:"
pass
elif file_system.startswith("/"):
# Full path format like "/mnt/flash" - keep as is
pass
else:
# Assume shorthand without colon (e.g., "flash" -> "flash:")
file_system = f"{file_system}:"

# Build the path
if file_system.endswith(":"):
path = f"{file_system}{filename}"
else:
path = f"{file_system}/{filename}"

# Use Arista's verify command to get the checksum
# Example: verify /sha512 flash:nautobot.png
command = f"verify /{hashing_algorithm} {path}"

try:
result = self.native_ssh.send_command(command, read_timeout=30)

log.debug(
"Host %s: Verify command '%s' returned: %s",
self.host,
command,
result,
)

# Parse the checksum from the output
# Expected format: verify /sha512 (flash:nautobot.png) = <checksum>
match = re.search(r"=\s*([a-fA-F0-9]+)", result)
if match:
remote_checksum = match.group(1).lower()
log.debug("Host %s: Remote checksum for %s: %s", self.host, filename, remote_checksum)
return remote_checksum

log.error("Host %s: Could not parse checksum from verify output: %s", self.host, result)
raise CommandError(command, f"Could not parse checksum from verify output: {result}")

except Exception as e:
log.error("Host %s: Error getting remote checksum: %s", self.host, str(e))
raise CommandError(command, f"Error getting remote checksum: {str(e)}")

def remote_file_copy(self, src: FileCopyModel, dest=None, file_system=None, include_username=False, **kwargs):
"""Copy a file from remote source to device.

Args:
src (FileCopyModel): The source file model with transfer parameters.
dest (str): Destination filename (defaults to src.file_name).
file_system (str): Device filesystem (auto-detected if not provided).
include_username (bool): Whether to include username in the copy command. Defaults to False.
**kwargs: Additional keyword arguments for future extensibility.

Raises:
TypeError: If src is not a FileCopyModel.
FileTransferError: If transfer or verification fails.
FileSystemNotFoundError: If filesystem cannot be determined.
"""
# Validate input
if not isinstance(src, FileCopyModel):
raise TypeError("src must be an instance of FileCopyModel")

# Determine file system
if file_system is None:
file_system = self._get_file_system()

# Determine destination
if dest is None:
dest = src.file_name

log.debug("Host %s: Starting remote file copy for %s to %s/%s", self.host, src.file_name, file_system, dest)

# Open SSH connection and enable
self.open()
self.enable()

# Validate scheme
supported_schemes = ["http", "https", "scp", "ftp", "sftp", "tftp"]
if src.scheme not in supported_schemes:
raise ValueError(f"Unsupported scheme: {src.scheme}")

# Parse URL components
parsed = urlparse(src.download_url)
hostname = parsed.hostname
path = parsed.path
port = (
parsed.port
if parsed.port
else ("443" if src.scheme == "https" else "80" if src.scheme in ["http", "https"] else "")
)

# Build command based on scheme and credentials
detect_prompt = False

# TFTP, HTTP, HTTPS without credentials
if src.scheme in ["tftp", "http", "https"] and not include_username:
command = f"copy {src.download_url} {file_system}"
log.debug("Host %s: Preparing copy command without credentials: %s", self.host, src.scheme)

# HTTP/HTTPS with credentials embedded in URL
elif src.scheme in ["http", "https"] and (include_username and src.username):
port_str = f":{port}" if port else ""
command = f"copy {src.scheme}://{src.username}:{src.token}@{hostname}{port_str}{path} {file_system}"
log.debug("Host %s: Preparing copy command with embedded credentials for %s", self.host, src.scheme)

# SCP, FTP, SFTP with username (password via prompt)
elif src.scheme in ["scp", "ftp", "sftp"]:
port_str = f":{port}" if port else ""
command = f"copy {src.scheme}://{src.username}@{hostname}{port_str}{path} {file_system}"
detect_prompt = True
log.debug(
"Host %s: Preparing copy command with username for %s (password via prompt)", self.host, src.scheme
)

else:
raise ValueError(f"Unable to construct copy command for scheme {src.scheme} with provided credentials")

# Execute copy command
if detect_prompt and src.token:
# Use send_command_timing for interactive password prompt
output = self.native_ssh.send_command_timing(command, read_timeout=src.timeout, cmd_verify=False)
log.debug("Host %s: Copy command (with timing) output: %s", self.host, output)

if "Password:" in output or "password:" in output:
self.native_ssh.write_channel(src.token + "\n")
# Read the response after sending password
output += self.native_ssh.read_channel()
log.debug("Host %s: Output after password entry: %s", self.host, output)
elif any(error in output for error in ["Error", "Invalid", "Failed"]):
log.error("Host %s: Error detected in copy command output: %s", self.host, output)
raise FileTransferError(f"Error detected in copy command output: {output}")
else:
# Use regular send_command for non-interactive transfers
output = self.native_ssh.send_command(command, read_timeout=src.timeout)
log.debug("Host %s: Copy command output: %s", self.host, output)

if any(error in output for error in ["Error", "Invalid", "Failed"]):
log.error("Host %s: Error detected in copy command output: %s", self.host, output)
raise FileTransferError(f"Error detected in copy command output: {output}")

# Verify transfer success
verification_result = self.verify_file(
src.checksum, dest, hashing_algorithm=src.hashing_algorithm, file_system=file_system
)
log.debug(
"Host %s: File verification result for %s - Checksum: %s, Algorithm: %s, Result: %s",
self.host,
dest,
src.checksum,
src.hashing_algorithm,
verification_result,
)

if not verification_result:
log.error(
"Host %s: File verification failed for %s - Expected checksum: %s",
self.host,
dest,
src.checksum,
)
raise FileTransferError

log.info("Host %s: File %s transferred and verified successfully", self.host, dest)

def verify_file(self, checksum, filename, hashing_algorithm="md5", **kwargs):
"""Verify a file on the remote device by confirming the file exists and validate the checksum.

Args:
checksum (str): The checksum of the file.
filename (str): The name of the file to check for on the remote device.
hashing_algorithm (str): The hashing algorithm to use (default: "md5").
**kwargs: Additional keyword arguments (e.g., file_system).

Returns:
(bool): True if the file is verified successfully, False otherwise.
"""
exists = self.check_file_exists(filename, **kwargs)
device_checksum = (
self.get_remote_checksum(filename, hashing_algorithm=hashing_algorithm, **kwargs) if exists else None
)
if checksum == device_checksum:
log.debug("Host %s: Checksum verification successful for file %s", self.host, filename)
return True
else:
log.debug(
"Host %s: Checksum verification failed for file %s - Expected: %s, Actual: %s",
self.host,
filename,
checksum,
device_checksum,
)
return False

def install_os(self, image_name, **vendor_specifics):
"""Install new OS on device.

Expand Down
2 changes: 1 addition & 1 deletion pyntc/log.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def init(**kwargs):

logging.basicConfig(**kwargs)
# info is defined at the end of the file
info("Logging initialized for host %s.", kwargs.get("host"))
info("Logging initialized for host %s.", kwargs.pop("host", None))


def logger(level):
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ attrs = "^23.2.0"
towncrier = ">=23.6.0,<=24.8.0"
ruff = "*"
Markdown = "*"
hypothesis = "*"

[tool.poetry.group.docs.dependencies]
# Rendering docs to HTML
Expand Down
Loading
Loading