Source code for adi_lg_plugins.drivers.massstoragedriver

import os
import shutil
import subprocess
import time

import attr
from labgrid.driver.common import Driver
from labgrid.factory import target_factory


[docs] @target_factory.reg_driver @attr.s(eq=False) class MassStorageDriver(Driver): """Mount and copy files to a USB mass storage device. Supports both local-only (MassStorageDevice) and remote-exporter (NetworkUSBMassStorage / USBMassStorage) bindings. When bound to a remote resource, pmount/pumount/mkdir run on the exporter host via ssh (labgrid's command_prefix) and file copies use scp. Specify `partition` when the bound resource points at a whole block device (e.g. /dev/sdb) rather than a specific partition; its value is the absolute partition path on the exporter host — a raw device (/dev/sdb1) or a stable symlink (/dev/disk/by-partuuid/...). """ bindings = { "mass_storage": {"MassStorageDevice"}, } partition = attr.ib( default=None, validator=attr.validators.optional(attr.validators.instance_of(str)), ) mount_label = attr.ib(default="lg_mass_storage") def __attrs_post_init__(self): super().__attrs_post_init__() self.mounted = False def __del__(self): try: self.unmount_partition() except Exception: pass @property def _prefix(self): # NetworkResource subclasses expose ``command_prefix`` directly. prefix = list(getattr(self.mass_storage, "command_prefix", []) or []) if prefix: return prefix # Plain Resources proxied from a coordinator expose the exporter # host via ``extra['proxy']``; build an ssh command prefix to it. proxy = self._proxy_host() if proxy: from labgrid.util.ssh import sshmanager conn = sshmanager.get(proxy) return conn.get_prefix() + ["--"] return [] @property def _is_remote(self): return bool(self._prefix) @property def _host(self): host = getattr(self.mass_storage, "host", None) if host: return host return self._proxy_host() def _proxy_host(self): extra = getattr(self.mass_storage, "extra", None) or {} if not isinstance(extra, dict): return None return extra.get("proxy") def _device_path(self): return self.partition or self.mass_storage.path def _mount_dir(self): return f"/media/{self.mount_label}" def _run(self, cmd, check=True): return subprocess.run(self._prefix + list(cmd), check=check) def _path_exists(self, path): return self._run(["test", "-e", path], check=False).returncode == 0 def _is_mountpoint(self, path): return self._run(["mountpoint", "-q", path], check=False).returncode == 0
[docs] def mount_partition(self): """Mount the configured partition at /media/<mount_label>.""" if self.mounted: self.logger.debug("Already mounted; skipping.") return mnt = self._mount_dir() if self._is_mountpoint(mnt): self.logger.debug(f"{mnt} already mounted; treating as mounted.") self.mounted = True return device_path = self._device_path() if not self._path_exists(device_path): raise RuntimeError(f"Mass storage device path {device_path} does not exist.") try: self._run(["pmount", device_path, self.mount_label]) except subprocess.CalledProcessError as e: self.logger.error(f"Failed to mount {device_path}: {e}") raise time.sleep(2) if not self._is_mountpoint(mnt): raise RuntimeError(f"Mounting {device_path} failed; {mnt} is not a mount point.") self.logger.debug(f"Mounted {device_path} at {mnt}") self.mounted = True
[docs] def unmount_partition(self): """Unmount the mass storage device partition.""" if not self.mounted: return mnt = self._mount_dir() self._run(["sync"], check=False) try: self._run(["pumount", self.mount_label]) except subprocess.CalledProcessError as e: self.logger.error(f"Failed to unmount {self.mount_label}: {e}") raise if self._is_mountpoint(mnt): raise RuntimeError(f"Unmount failed; {mnt} is still a mount point.") self.mounted = False
[docs] def copy_file(self, src, dst): """Copy a local file onto the mass storage device. Args: src: source file path on the test runner host. dst: destination path relative to the mount point. """ if not self.mounted: raise RuntimeError("Mass storage device is not mounted. Cannot copy file.") if not os.path.exists(src): raise FileNotFoundError(f"Source file {src} does not exist.") full_dst = os.path.join(self._mount_dir(), dst.lstrip("/")) dst_dir = os.path.dirname(full_dst) self._run(["mkdir", "-p", dst_dir]) if self._is_remote: subprocess.run(["scp", "-q", src, f"{self._host}:{full_dst}"], check=True) else: shutil.copy(src, full_dst) self.logger.info(f"Copied {src} to {full_dst}")
[docs] def update_files(self): """Batch-copy files listed in mass_storage.file_updates (local-only path mapping).""" if not self.mounted: raise RuntimeError("Mass storage device is not mounted. Cannot update files.") for src, dst in self.mass_storage.file_updates.items(): self.copy_file(src, dst)