Source code for adi_lg_plugins.drivers.softwareinstaller

import os
import tarfile
import tempfile

import attr
from labgrid.driver.common import Driver
from labgrid.factory import target_factory
from labgrid.protocol import CommandProtocol, FileTransferProtocol
from labgrid.step import step


[docs] @target_factory.reg_driver @attr.s(eq=False) class SoftwareInstallerDriver(Driver): """ SoftwareInstallerDriver - Driver to install software, clone repos, copy directories, and run builds/tests on a DUT. """ bindings = { "command": CommandProtocol, "file_transfer": FileTransferProtocol, } def __attrs_post_init__(self): super().__attrs_post_init__() self._package_manager = None def _detect_package_manager(self): if self._package_manager: return self._package_manager managers = { "apt-get": "apt-get install -y", "dnf": "dnf install -y", "opkg": "opkg install", "pacman": "pacman -S --noconfirm", "apk": "apk add", } for mgr, install_cmd in managers.items(): stdout, _, exit_code = self.command.run(f"which {mgr}") if exit_code == 0: self._package_manager = (mgr, install_cmd) return self._package_manager raise Exception( "No supported package manager found on target (checked apt-get, dnf, opkg, pacman, apk)" )
[docs] @step(args=["package_name"]) def install_package(self, package_name, update=False): """ Installs a package using the detected package manager. Args: package_name (str): Name of the package to install. update (bool): Whether to update package lists before installing. """ mgr, install_cmd = self._detect_package_manager() if update: update_cmds = { "apt-get": "apt-get update", "dnf": "dnf check-update", "opkg": "opkg update", "pacman": "pacman -Sy", "apk": "apk update", } if mgr in update_cmds: self.command.run(update_cmds[mgr]) cmd = f"{install_cmd} {package_name}" stdout, stderr, exit_code = self.command.run(cmd) if exit_code != 0: raise Exception( f"Failed to install package '{package_name}'. Exit code: {exit_code}. Stderr: {stderr}" ) return stdout
[docs] @step(args=["repo_url", "destination"]) def clone_repo(self, repo_url, destination, branch=None): """ Clones a git repository to the destination. Args: repo_url (str): URL of the git repository. destination (str): Remote path to clone into. branch (str): Optional branch or tag to checkout. """ # Ensure git is installed stdout, _, exit_code = self.command.run("which git") if exit_code != 0: self.install_package("git") cmd = f"git clone {repo_url} {destination}" if branch: cmd += f" --branch {branch}" stdout, stderr, exit_code = self.command.run(cmd) if exit_code != 0: raise Exception( f"Failed to clone repo '{repo_url}'. Exit code: {exit_code}. Stderr: {stderr}" ) return stdout
[docs] @step(args=["local_path", "remote_path"]) def copy_directory(self, local_path, remote_path): """ Copies a local directory to the remote path. Args: local_path (str): Local directory path. remote_path (str): Remote directory path (parent directory must exist or be created). """ if not os.path.isdir(local_path): raise ValueError(f"Local path '{local_path}' is not a directory.") # Create a temporary tarball of the local directory with tempfile.NamedTemporaryFile(suffix=".tar.gz", delete=False) as tmp_tar: tar_path = tmp_tar.name try: with tarfile.open(tar_path, "w:gz") as tar: tar.add(local_path, arcname=os.path.basename(local_path)) # Transfer the tarball remote_tar_path = f"/tmp/{os.path.basename(tar_path)}" self.file_transfer.put(tar_path, remote_tar_path) # Ensure remote directory exists self.command.run(f"mkdir -p {remote_path}") # Extract on remote # tar -xzf archive.tar.gz -C /path/to/destination # Note: arcname included the directory name, so extracting it into remote_path # might create remote_path/dirname. # If user expects contents of local_path to be IN remote_path, we might need to adjust. # Let's assume standard cp -r behavior: cp -r dir dest -> dest/dir cmd = f"tar -xzf {remote_tar_path} -C {remote_path}" stdout, stderr, exit_code = self.command.run(cmd) # Cleanup remote tar self.command.run(f"rm {remote_tar_path}") if exit_code != 0: raise Exception( f"Failed to extract directory. Exit code: {exit_code}. Stderr: {stderr}" ) finally: if os.path.exists(tar_path): os.remove(tar_path)
[docs] @step(args=["command", "directory"]) def run_build(self, command, directory): """ Runs a build command in a specific directory. Args: command (str): Build command (e.g., 'make', 'cargo build'). directory (str): Directory to run the build in. """ full_cmd = f"cd {directory} && {command}" stdout, stderr, exit_code = self.command.run( full_cmd, timeout=3600 ) # Long timeout for builds if exit_code != 0: raise Exception(f"Build failed. Exit code: {exit_code}. Stderr: {stderr}") return stdout
[docs] @step(args=["binary_path"]) def run_binary(self, binary_path, args="", directory=None): """ Runs a binary on the target. Args: binary_path (str): Path to the binary. args (str): Arguments for the binary. directory (str): Working directory. """ cmd = f"{binary_path} {args}" if directory: cmd = f"cd {directory} && {cmd}" stdout, stderr, exit_code = self.command.run(cmd) if exit_code != 0: raise Exception(f"Binary execution failed. Exit code: {exit_code}. Stderr: {stderr}") return stdout
[docs] @step(args=["test_command"]) def run_test(self, test_command, directory=None): """ Runs a test command. Args: test_command (str): Command to run tests. directory (str): Working directory. """ cmd = test_command if directory: cmd = f"cd {directory} && {cmd}" stdout, stderr, exit_code = self.command.run(cmd) if exit_code != 0: raise Exception(f"Test failed. Exit code: {exit_code}. Stderr: {stderr}") return stdout